Commit 30f15173 authored by Bartek Wrona's avatar Bartek Wrona

WIP: Implemented dumping of plugin's external data. Prerequisites for loading them.

parent cf01e411
......@@ -1364,6 +1364,16 @@ void database::notify_post_apply_transaction( const transaction_notification& no
HIVE_TRY_NOTIFY( _post_apply_transaction_signal, note )
}
void database::notify_prepare_snapshot_data_supplement(const prepare_snapshot_supplement_notification& note)
{
HIVE_TRY_NOTIFY(_prepare_snapshot_supplement_signal, note)
}
void database::notify_load_snapshot_data_supplement(const load_snapshot_supplement_notification& note)
{
HIVE_TRY_NOTIFY(_load_snapshot_supplement_signal, note)
}
account_name_type database::get_scheduled_witness( uint32_t slot_num )const
{
const dynamic_global_property_object& dpo = get_dynamic_global_properties();
......@@ -4556,6 +4566,16 @@ boost::signals2::connection database::add_prepare_snapshot_handler(const prepare
return connect_impl(_prepare_snapshot_signal, func, plugin, group, "->prepare_snapshot");
}
boost::signals2::connection database::add_snapshot_supplement_handler(const prepare_snapshot_data_supplement_handler_t& func, const abstract_plugin& plugin, int32_t group)
{
return connect_impl(_prepare_snapshot_supplement_signal, func, plugin, group, "->prepare_snapshot_data_supplement");
}
boost::signals2::connection database::add_snapshot_supplement_handler(const load_snapshot_data_supplement_handler_t& func, const abstract_plugin& plugin, int32_t group)
{
return connect_impl(_load_snapshot_supplement_signal, func, plugin, group, "->load_snapshot_data_supplement");
}
const witness_object& database::validate_block_header( uint32_t skip, const signed_block& next_block )const
{ try {
FC_ASSERT( head_block_id() == next_block.previous, "", ("head_block_id",head_block_id())("next.prev",next_block.previous) );
......
......@@ -25,7 +25,17 @@
#include <functional>
#include <map>
namespace hive { namespace chain {
namespace hive {
namespace plugins {namespace chain
{
class snapshot_dump_helper;
class snapshot_load_helper;
} /// namespace chain
} /// namespace plugins
namespace chain {
using hive::protocol::signed_transaction;
using hive::protocol::operation;
......@@ -35,6 +45,10 @@ namespace hive { namespace chain {
using hive::protocol::price;
using abstract_plugin = appbase::abstract_plugin;
struct prepare_snapshot_supplement_notification;
struct load_snapshot_supplement_notification;
struct hardfork_versions
{
fc::time_point_sec times[ HIVE_NUM_HARDFORKS + 1 ];
......@@ -348,7 +362,11 @@ namespace hive { namespace chain {
using reindex_handler_t = std::function< void(const reindex_notification&) >;
using generate_optional_actions_handler_t = std::function< void(const generate_optional_actions_notification&) >;
using prepare_snapshot_handler_t = std::function < void(const database&, const database::abstract_index_cntr_t&)>;
using prepare_snapshot_data_supplement_handler_t = std::function < void(const prepare_snapshot_supplement_notification&) >;
using load_snapshot_data_supplement_handler_t = std::function < void(const load_snapshot_supplement_notification&) >;
void notify_prepare_snapshot_data_supplement(const prepare_snapshot_supplement_notification& n);
void notify_load_snapshot_data_supplement(const load_snapshot_supplement_notification& n);
private:
template <typename TSignal,
......@@ -378,6 +396,24 @@ namespace hive { namespace chain {
boost::signals2::connection add_generate_optional_actions_handler ( const generate_optional_actions_handler_t& func, const abstract_plugin& plugin, int32_t group = -1 );
boost::signals2::connection add_prepare_snapshot_handler (const prepare_snapshot_handler_t& func, const abstract_plugin& plugin, int32_t group = -1);
/// <summary>
/// All plugins storing data in different way than chainbase::generic_index (wrapping
/// a multi_index) should register to this handler to add its own data to the prepared snapshot.
/// </summary>
/// <param name="func">function to be called on notification</param>
/// <param name="plugin">the plugin be registering its handler</param>
/// <param name="group"></param>
/// <returns></returns>
boost::signals2::connection add_snapshot_supplement_handler (const prepare_snapshot_data_supplement_handler_t& func, const abstract_plugin& plugin, int32_t group = -1);
/// <summary>
/// All plugins storing data in different way than chainbase::generic_index (wrapping
/// a multi_index) should register to this handler to load its own data from the loaded snapshot.
/// </summary>
/// <param name="func"></param>
/// <param name="plugin"></param>
/// <param name="group"></param>
/// <returns></returns>
boost::signals2::connection add_snapshot_supplement_handler (const load_snapshot_data_supplement_handler_t& func, const abstract_plugin& plugin, int32_t group = -1);
//////////////////// db_witness_schedule.cpp ////////////////////
......@@ -814,6 +850,15 @@ namespace hive { namespace chain {
fc::signal<void(const database&, const database::abstract_index_cntr_t&)> _prepare_snapshot_signal;
/// <summary>
/// Emitted by snapshot plugin implementation to allow all registered plugins to include theirs custom-stored data in the snapshot
/// </summary>
fc::signal<void(const prepare_snapshot_supplement_notification&)> _prepare_snapshot_supplement_signal;
/// <summary>
/// Emitted by snapshot plugin implementation to allow all registered plugins to load theirs custom-stored data from the snapshot
/// </summary>
fc::signal<void(const load_snapshot_supplement_notification&)> _load_snapshot_supplement_signal;
/**
* Emitted After a block has been applied and committed. The callback
* should not yield and should execute quickly.
......@@ -841,4 +886,20 @@ namespace hive { namespace chain {
const open_args& args;
};
struct prepare_snapshot_supplement_notification
{
prepare_snapshot_supplement_notification(const fc::path& _external_data_storage_base_path, hive::plugins::chain::snapshot_dump_helper& _dump_helper) :
external_data_storage_base_path(_external_data_storage_base_path), dump_helper(_dump_helper) {}
fc::path external_data_storage_base_path;
hive::plugins::chain::snapshot_dump_helper& dump_helper;
};
struct load_snapshot_supplement_notification
{
explicit load_snapshot_supplement_notification(hive::plugins::chain::snapshot_load_helper& _load_helper) :
load_helper(_load_helper) {}
hive::plugins::chain::snapshot_load_helper& load_helper;
};
} }
......@@ -12,6 +12,7 @@
#include <hive/chain/util/uint256.hpp>
#include <hive/plugins/chain/chain_plugin.hpp>
#include <hive/plugins/chain/state_snapshot_provider.hpp>
#include <hive/utilities/benchmark_dumper.hpp>
#include <hive/utilities/plugin_utilities.hpp>
......@@ -21,6 +22,7 @@
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/utilities/backupable_db.h>
#include <rocksdb/utilities/write_batch_with_index.h>
#include <boost/type.hpp>
......@@ -496,6 +498,16 @@ public:
on_post_reindex( note );
}, _self, 0);
_mainDb.add_snapshot_supplement_handler([&](const hive::chain::prepare_snapshot_supplement_notification& note) -> void
{
supplement_snapshot(note);
}, _self, 0);
_mainDb.add_snapshot_supplement_handler([&](const hive::chain::load_snapshot_supplement_notification& note) -> void
{
load_additional_data_from_snapshot(note);
}, _self, 0);
HIVE_ADD_PLUGIN_INDEX(_mainDb, volatile_operation_index);
}
......@@ -596,6 +608,9 @@ public:
}
private:
void supplement_snapshot(const hive::chain::prepare_snapshot_supplement_notification& note);
void load_additional_data_from_snapshot(const hive::chain::load_snapshot_supplement_notification& note);
uint32_t get_lib(const uint32_t* fallbackIrreversibleBlock = nullptr) const;
void update_lib( uint32_t );
......@@ -1172,6 +1187,57 @@ bool account_history_rocksdb_plugin::impl::find_transaction_info(const protocol:
return false;
}
void account_history_rocksdb_plugin::impl::supplement_snapshot(const hive::chain::prepare_snapshot_supplement_notification& note)
{
fc::path actual_path(note.external_data_storage_base_path);
actual_path /= "account_history_rocksdb_data";
if(bfs::exists(actual_path) == false)
bfs::create_directories(actual_path);
auto pathString = actual_path.to_native_ansi_path();
::rocksdb::Env* backupEnv = ::rocksdb::Env::Default();
::rocksdb::BackupableDBOptions backupableDbOptions(pathString);
std::unique_ptr<::rocksdb::BackupEngine> backupEngine;
::rocksdb::BackupEngine* _backupEngine = nullptr;
auto status = ::rocksdb::BackupEngine::Open(backupEnv, backupableDbOptions, &_backupEngine);
checkStatus(status);
backupEngine.reset(_backupEngine);
ilog("Attempting to create a AccountHistoryRocksDB backup in the location: `${p}'", ("p", pathString));
std::string meta_data = "Account History RocksDB plugin data. Current head block: ";
meta_data += std::to_string(_mainDb.head_block_num());
status = _backupEngine->CreateNewBackupWithMetadata(this->_storage.get(), meta_data, true);
checkStatus(status);
std::vector<::rocksdb::BackupInfo> backupInfos;
_backupEngine->GetBackupInfo(&backupInfos);
FC_ASSERT(backupInfos.size() == 1);
note.dump_helper.store_external_data_info(_self, actual_path);
}
void account_history_rocksdb_plugin::impl::load_additional_data_from_snapshot(const hive::chain::load_snapshot_supplement_notification& note)
{
fc::path extdata_path;
if(note.load_helper.load_external_data_info(_self, &extdata_path) == false)
{
wlog("No external data present for Account History RocksDB plugin...");
return;
}
ilog("Attempting to load external data for Account History RocksDB plugin from location: `${p}'", ("p", extdata_path.string()));
}
uint32_t account_history_rocksdb_plugin::impl::get_lib(const uint32_t* fallbackIrreversibleBlock /*= nullptr*/) const
{
std::string data;
......
#pragma once
namespace appbase
{
class abstract_plugin;
} /// namespace appbase
namespace fc
{
class path;
} /// namespace fc
namespace hive {
namespace chain {
struct open_args;
......@@ -8,6 +18,7 @@ struct open_args;
namespace plugins {
namespace chain {
using abstract_plugin = appbase::abstract_plugin;
/** Purpose of this interface is provide state snapshot functionality to the chain_plugin even it is implemented in separate one.
*/
......@@ -22,4 +33,34 @@ class state_snapshot_provider
virtual ~state_snapshot_provider() = default;
};
class snapshot_dump_helper
{
public:
/// <summary>
/// Allows to store additional (external) data held in given plugin in the snapshot being produced atm.
/// </summary>
/// <param name="plugin">object representing given plugin, to dump external data for</param>
/// <param name="storage_path">specifies the directory where external data shall be written to</param>
virtual void store_external_data_info(const abstract_plugin& plugin, const fc::path& storage_path) = 0;
protected:
virtual ~snapshot_dump_helper() {}
};
class snapshot_load_helper
{
public:
/// <summary>
/// Allows to ask snapshot provider for additional data (to be loaded) for given plugin.
/// </summary>
/// <param name="plugin">object representing given plugin, asking to load external data for</param>
/// <param name="storage_path">output parameter to be filled with the storage path of external data specific to given plugin</param>
/// <returns>true if given plugin had saved external data to be load for.</returns>
virtual bool load_external_data_info(const abstract_plugin& plugin, fc::path* storage_path) = 0;
protected:
virtual ~snapshot_load_helper() {}
};
}}}
\ No newline at end of file
......@@ -174,6 +174,58 @@ void rocksdb_cleanup_helper::close()
return cf;
}
struct plugin_external_data_info
{
fc::path path;
};
typedef std::map<std::string, plugin_external_data_info> plugin_external_data_index;
class snapshot_dump_supplement_helper final : public hive::plugins::chain::snapshot_dump_helper
{
public:
const plugin_external_data_index& get_external_data_index() const
{
return external_data_index;
}
virtual void store_external_data_info(const hive::chain::abstract_plugin& plugin, const fc::path& storage_path) override
{
plugin_external_data_info info;
info.path = storage_path;
auto ii = external_data_index.emplace(plugin.get_name(), info);
FC_ASSERT(ii.second, "Only one external data path allowed per plugin");
}
private:
plugin_external_data_index external_data_index;
};
class snapshot_load_supplement_helper final : public hive::plugins::chain::snapshot_load_helper
{
public:
explicit snapshot_load_supplement_helper(const plugin_external_data_index& idx) : ext_data_idx(idx) {}
virtual bool load_external_data_info(const hive::chain::abstract_plugin& plugin, fc::path* storage_path) override
{
const std::string& name = plugin.get_name();
auto i = ext_data_idx.find(name);
if(i == ext_data_idx.end())
{
*storage_path = fc::path();
return false;
}
*storage_path = i->second.path;
return true;
}
private:
const plugin_external_data_index& ext_data_idx;
};
} /// namespace anonymous
FC_REFLECT(index_manifest_info, (name)(dumpedItems)(firstId)(lastId)(storage_files))
......@@ -818,8 +870,11 @@ class state_snapshot_plugin::impl final : protected chain::state_snapshot_provid
std::string generate_name() const;
void safe_spawn_snapshot_dump(const chainbase::abstract_index* idx, index_dump_writer* writer);
void safe_spawn_snapshot_load(chainbase::abstract_index* idx, index_dump_reader* reader);
void store_snapshot_manifest(const bfs::path& actualStoragePath, const std::vector<std::unique_ptr<index_dump_writer>>& builtWriters) const;
snapshot_manifest load_snapshot_manifest(const bfs::path& actualStoragePath);
void store_snapshot_manifest(const bfs::path& actualStoragePath, const std::vector<std::unique_ptr<index_dump_writer>>& builtWriters,
const snapshot_dump_supplement_helper& dumpHelper) const;
std::pair<snapshot_manifest, plugin_external_data_index> load_snapshot_manifest(const bfs::path& actualStoragePath);
void load_snapshot_external_data(const plugin_external_data_index& idx);
private:
state_snapshot_plugin& _self;
......@@ -879,7 +934,7 @@ void state_snapshot_plugin::impl::safe_spawn_snapshot_dump(const chainbase::abst
}
void state_snapshot_plugin::impl::store_snapshot_manifest(const bfs::path& actualStoragePath,
const std::vector<std::unique_ptr<index_dump_writer>>& builtWriters) const
const std::vector<std::unique_ptr<index_dump_writer>>& builtWriters, const snapshot_dump_supplement_helper& dumpHelper) const
{
bfs::path manifestDbPath(actualStoragePath);
manifestDbPath /= "snapshot-manifest";
......@@ -893,6 +948,7 @@ void state_snapshot_plugin::impl::store_snapshot_manifest(const bfs::path& actua
rocksdb_cleanup_helper db = rocksdb_cleanup_helper::open(dbOptions, manifestDbPath);
::rocksdb::ColumnFamilyHandle* manifestCF = db.create_column_family("INDEX_MANIFEST");
::rocksdb::ColumnFamilyHandle* externalDataCF = db.create_column_family("EXTERNAL_DATA");
::rocksdb::WriteOptions writeOptions;
......@@ -920,10 +976,34 @@ void state_snapshot_plugin::impl::store_snapshot_manifest(const bfs::path& actua
}
}
const auto& extDataIdx = dumpHelper.get_external_data_index();
for(const auto& d : extDataIdx)
{
const auto& plugin_name = d.first;
const auto& path = d.second.path;
auto relativePath = bfs::relative(path, actualStoragePath);
auto relativePathStr = relativePath.string();
Slice key(plugin_name);
Slice value(relativePathStr);
auto status = db->Put(writeOptions, externalDataCF, key, value);
if(status.ok() == false)
{
elog("Cannot write an index manifest entry to output file: `${p}'. Error details: `${e}'.", ("p", manifestDbPath.string())("e", status.ToString()));
ilog("Failing key value: ${k}", ("k", plugin_name));
throw std::exception();
}
}
db.close();
}
snapshot_manifest state_snapshot_plugin::impl::load_snapshot_manifest(const bfs::path& actualStoragePath)
std::pair<snapshot_manifest, plugin_external_data_index> state_snapshot_plugin::impl::load_snapshot_manifest(const bfs::path& actualStoragePath)
{
bfs::path manifestDbPath(actualStoragePath);
manifestDbPath /= "snapshot-manifest";
......@@ -934,10 +1014,15 @@ snapshot_manifest state_snapshot_plugin::impl::load_snapshot_manifest(const bfs:
::rocksdb::ColumnFamilyDescriptor cfDescriptor;
cfDescriptor.name = "INDEX_MANIFEST";
std::vector <::rocksdb::ColumnFamilyDescriptor> cfDescriptors;
cfDescriptors.emplace_back(::rocksdb::kDefaultColumnFamilyName, ::rocksdb::ColumnFamilyOptions());
cfDescriptors.push_back(cfDescriptor);
cfDescriptor = ::rocksdb::ColumnFamilyDescriptor();
cfDescriptor.name = "EXTERNAL_DATA";
cfDescriptors.push_back(cfDescriptor);
std::vector<::rocksdb::ColumnFamilyHandle*> cfHandles;
std::unique_ptr<::rocksdb::DB> manifestDbPtr;
::rocksdb::DB* manifestDb = nullptr;
......@@ -984,6 +1069,40 @@ snapshot_manifest state_snapshot_plugin::impl::load_snapshot_manifest(const bfs:
}
}
plugin_external_data_index extDataIdx;
{
::rocksdb::ReadOptions rOptions;
std::unique_ptr<::rocksdb::Iterator> indexIterator(manifestDb->NewIterator(rOptions, cfHandles[2]));
std::vector<char> buffer;
for(indexIterator->SeekToFirst(); indexIterator->Valid(); indexIterator->Next())
{
auto keySlice = indexIterator->key();
auto valueSlice = indexIterator->value();
std::string plugin_name = keySlice.data();
std::string relative_path = valueSlice.data();
ilog("Loaded external data info for plugin ${p} having storage of external files inside: `${d}' (relative path)", ("p", plugin_name)("d", relative_path));
bfs::path extDataPath(actualStoragePath);
extDataPath /= relative_path;
if(bfs::exists(extDataPath) == false)
{
elog("Specified path to the external data does not exists: `${d}'.", ("d", extDataPath.string()));
throw std::exception();
}
plugin_external_data_info info;
info.path = extDataPath;
auto ii = extDataIdx.emplace(plugin_name, info);
FC_ASSERT(ii.second, "Multiple entries for plugin: ${p}", ("p", plugin_name));
}
}
for(auto* cfh : cfHandles)
{
status = manifestDb->DestroyColumnFamilyHandle(cfh);
......@@ -996,7 +1115,16 @@ snapshot_manifest state_snapshot_plugin::impl::load_snapshot_manifest(const bfs:
manifestDb->Close();
manifestDbPtr.release();
return std::move(retVal);
return std::make_pair(retVal, extDataIdx);
}
void state_snapshot_plugin::impl::load_snapshot_external_data(const plugin_external_data_index& idx)
{
snapshot_load_supplement_helper load_helper(idx);
hive::chain::load_snapshot_supplement_notification notification(load_helper);
_mainDb.notify_load_snapshot_data_supplement(notification);
}
void state_snapshot_plugin::impl::safe_spawn_snapshot_load(chainbase::abstract_index* idx, index_dump_reader* reader)
......@@ -1065,7 +1193,19 @@ void state_snapshot_plugin::impl::prepare_snapshot(const std::string& snapshotNa
threadpool.join_all();
store_snapshot_manifest(actualStoragePath, builtWriters);
fc::path external_data_storage_base_path(actualStoragePath);
external_data_storage_base_path /= "ext_data";
if(bfs::exists(external_data_storage_base_path) == false)
bfs::create_directories(external_data_storage_base_path);
snapshot_dump_supplement_helper dump_helper;
hive::chain::prepare_snapshot_supplement_notification notification(external_data_storage_base_path, dump_helper);
_mainDb.notify_prepare_snapshot_data_supplement(notification);
store_snapshot_manifest(actualStoragePath, builtWriters, dump_helper);
auto blockNo = _mainDb.head_block_num();
......@@ -1119,7 +1259,7 @@ void state_snapshot_plugin::impl::load_snapshot(const std::string& snapshotName,
for(chainbase::abstract_index* idx : indices)
{
builtReaders.emplace_back(std::make_unique<index_dump_reader>(snapshotManifest, actualStoragePath));
builtReaders.emplace_back(std::make_unique<index_dump_reader>(snapshotManifest.first, actualStoragePath));
index_dump_reader* reader = builtReaders.back().get();
if(_allow_concurrency)
......@@ -1135,6 +1275,15 @@ void state_snapshot_plugin::impl::load_snapshot(const std::string& snapshotName,
threadpool.join_all();
if(snapshotManifest.second.empty())
{
ilog("Skipping external data load due to lack of data saved to the snapshot");
}
else
{
load_snapshot_external_data(snapshotManifest.second);
}
auto blockNo = _mainDb.head_block_num();
ilog("Setting chainbase revision to ${b} block...", ("b", blockNo));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment