Commit 1aa1b7a4 authored by Bartek Wrona's avatar Bartek Wrona

Enhanced debug logging capabilities during snapshot loading/dumping.

Any exception thrown during snapshot load leads to whole loading process failure.
parent 00efa447
......@@ -332,13 +332,22 @@ namespace chainbase {
/**
* Construct a new element and puts in the multi_index_container, basing on snapshot stream.
*/
void unpack_from_snapshot(typename value_type::id_type objectId, std::function<void(value_type&)>&& unpack) {
void unpack_from_snapshot(typename value_type::id_type objectId, std::function<void(value_type&)>&& unpack,
std::function<std::string(const fc::variant&)>&& preetify) {
_next_id = objectId;
value_type tmp(_indices.get_allocator(), objectId, std::move(unpack));
auto insert_result = _indices.emplace(std::move(tmp));
if(!insert_result.second) {
BOOST_THROW_EXCEPTION(std::logic_error("could not insert unpacked object, most likely a uniqueness constraint was violated"));
std::string s = preetify(fc::variant(tmp));
const auto& conflictingObject = *insert_result.first;
std::string s2 = preetify(fc::variant(conflictingObject));
std::string msg = "could not insert unpacked object, most likely a uniqueness constraint was violated: `" + s +
std::string("' conflicting object:`") + s2 + "'";
BOOST_THROW_EXCEPTION(std::logic_error(msg));
}
++_next_id;
......
......@@ -99,6 +99,7 @@ public:
{
public:
virtual void flush_converted_data(const worker_common_base::serialized_object_cache& cache) = 0;
virtual std::string prettifyObject(const fc::variant& object, const std::vector<char>& buffer) const = 0;
protected:
worker(chainbase::snapshot_writer& controller, size_t startId, size_t endId) :
......@@ -130,6 +131,7 @@ class snapshot_reader : public snapshot_base_serializer
/** Allows to load data from snapshot into the cache.
*/
virtual void load_converted_data(worker_common_base::serialized_object_cache* cache) = 0;
virtual std::string prettifyObject(const fc::variant& object, const std::vector<char>& buffer) const = 0;
void update_processed_id(size_t id)
{
......@@ -264,9 +266,12 @@ private:
("i", id)("l", _startId)("r", _endId));
serializedCache.emplace_back(id, std::vector<char>());
serialization::pack_to_buffer(serializedCache.back().second, object);
//std::string dump = worker->prettifyObject(fc::variant(object), serializedCache.back().second);
//if(dump.empty() == false)
// ilog("Dumping object: id: ${id}, `${o}'", ("id", id)("o", dump));
if(serializedCache.size() >= max_cache_size)
{
worker->flush_converted_data(serializedCache);
......@@ -398,11 +403,17 @@ class generic_index_snapshot_loader final : public generic_index_serialize_base
/// Just to catch loading context on some caught exception.
worker->update_processed_id(buffer.first);
auto prettyDump = [&buffer, worker](fc::variant object) -> std::string
{
return worker->prettifyObject(object, buffer.second);
};
_generic_index.unpack_from_snapshot(id_type(buffer.first),
[this, &buffer](typename MultiIndexType::value_type& object)
{
serialization::unpack_from_buffer(object, buffer.second);
}
},
std::move(prettyDump)
);
}
......
......@@ -16,6 +16,8 @@
#include <appbase/application.hpp>
#include <fc/io/json.hpp>
#include <fc/io/raw.hpp>
#include <fc/reflect/reflect.hpp>
#include <rocksdb/db.h>
......@@ -384,6 +386,14 @@ class snapshot_processor_data : public BaseClass
//return false;
}
bool perform_debug_logging() const
{
//if(indexDescription == "comment_object")
// return true;
return false;
}
protected:
snapshot_processor_data(const bfs::path& rootPath) :
......@@ -400,9 +410,9 @@ class snapshot_processor_data : public BaseClass
class index_dump_writer final : public snapshot_processor_data<chainbase::snapshot_writer>
{
public:
index_dump_writer(const chainbase::abstract_index& index, const bfs::path& outputRootPath,
index_dump_writer(const chain::database& mainDb, const chainbase::abstract_index& index, const bfs::path& outputRootPath,
bool allow_concurrency) :
snapshot_processor_data<chainbase::snapshot_writer>(outputRootPath), _index(index), _firstId(0), _lastId(0),
snapshot_processor_data<chainbase::snapshot_writer>(outputRootPath), _mainDb(mainDb), _index(index), _firstId(0), _lastId(0),
_allow_concurrency(allow_concurrency) {}
index_dump_writer(const index_dump_writer&) = delete;
......@@ -414,9 +424,15 @@ class index_dump_writer final : public snapshot_processor_data<chainbase::snapsh
snapshot_converter_t converter) override;
virtual void start(const workers& workers) override;
const chain::database& getMainDb() const
{
return _mainDb;
}
void store_index_manifest(index_manifest_info* manifest) const;
private:
const chain::database& _mainDb;
const chainbase::abstract_index& _index;
std::vector <std::unique_ptr<dumping_worker>> _builtWorkers;
size_t _firstId;
......@@ -481,6 +497,20 @@ class dumping_worker final : public chainbase::snapshot_writer::worker
private:
virtual void flush_converted_data(const serialized_object_cache& cache) override;
virtual std::string prettifyObject(const fc::variant& object, const std::vector<char>& buffer) const override
{
std::string s;
if(_controller.perform_debug_logging())
{
s = fc::json::to_pretty_string(object);
s += '\n';
s += "Buffer size: " + std::to_string(buffer.size());
}
return s;
}
void prepareWriter();
private:
......@@ -707,6 +737,18 @@ class loading_worker final : public chainbase::snapshot_reader::worker
virtual ~loading_worker() = default;
virtual void load_converted_data(worker_common_base::serialized_object_cache* cache) override;
virtual std::string prettifyObject(const fc::variant& object, const std::vector<char>& buffer) const override
{
std::string s;
if(_controller.perform_debug_logging())
{
s = fc::json::to_pretty_string(object);
s += '\n';
s += "Data buffer size: " + std::to_string(buffer.size());
}
return s;
}
void perform_load();
......@@ -1135,7 +1177,7 @@ void state_snapshot_plugin::impl::safe_spawn_snapshot_load(chainbase::abstract_i
idx->load_snapshot(*reader);
reader->set_processing_success(true);
}
FC_CAPTURE_AND_LOG((reader->getIndexDescription())(reader->getCurrentlyProcessedId()))
FC_CAPTURE_LOG_AND_RETHROW((reader->getIndexDescription())(reader->getCurrentlyProcessedId()))
}
void state_snapshot_plugin::impl::prepare_snapshot(const std::string& snapshotName)
......@@ -1173,11 +1215,11 @@ void state_snapshot_plugin::impl::prepare_snapshot(const std::string& snapshotNa
for(unsigned int i = 0; _allow_concurrency && i < _num_threads; ++i)
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
std::vector<std::unique_ptr< index_dump_writer>> builtWriters;
std::vector<std::unique_ptr<index_dump_writer>> builtWriters;
for(const chainbase::abstract_index* idx : indices)
{
builtWriters.emplace_back(std::make_unique<index_dump_writer>(*idx, actualStoragePath, _allow_concurrency));
builtWriters.emplace_back(std::make_unique<index_dump_writer>(_mainDb, *idx, actualStoragePath, _allow_concurrency));
index_dump_writer* writer = builtWriters.back().get();
if(_allow_concurrency)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment