Commit aa4fe0a8 authored by Mariusz's avatar Mariusz

Changed line endings `LF` -> `CR+LF`

parent 4f9d5492
#include <hive/chain/database_exceptions.hpp>
#include <hive/plugins/chain/abstract_block_producer.hpp>
#include <hive/plugins/chain/state_snapshot_provider.hpp>
#include <hive/plugins/chain/chain_plugin.hpp>
#include <hive/plugins/statsd/utility.hpp>
#include <hive/utilities/benchmark_dumper.hpp>
#include <hive/utilities/database_configuration.hpp>
#include <fc/string.hpp>
#include <fc/io/json.hpp>
#include <fc/io/fstream.hpp>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <boost/bind.hpp>
#include <boost/preprocessor/stringize.hpp>
#include <boost/thread/future.hpp>
#include <boost/lockfree/queue.hpp>
#include <thread>
#include <memory>
#include <iostream>
namespace hive { namespace plugins { namespace chain {
#define BENCHMARK_FILE_NAME "replay_benchmark.json"
using namespace hive;
using fc::flat_map;
using hive::chain::block_id_type;
namespace asio = boost::asio;
using hive::plugins::chain::synchronization_type;
using index_memory_details_cntr_t = hive::utilities::benchmark_dumper::index_memory_details_cntr_t;
using get_indexes_memory_details_type = std::function< void( index_memory_details_cntr_t&, bool ) >;
#define NUM_THREADS 1
struct generate_block_request
{
generate_block_request( const fc::time_point_sec w, const account_name_type& wo, const fc::ecc::private_key& priv_key, uint32_t s ) :
when( w ),
witness_owner( wo ),
block_signing_private_key( priv_key ),
skip( s ) {}
const fc::time_point_sec when;
const account_name_type& witness_owner;
const fc::ecc::private_key& block_signing_private_key;
uint32_t skip;
signed_block block;
};
typedef fc::static_variant< const signed_block*, const signed_transaction*, generate_block_request* > write_request_ptr;
typedef fc::static_variant< boost::promise< void >*, fc::future< void >* > promise_ptr;
struct write_context
{
write_request_ptr req_ptr;
uint32_t skip = 0;
bool success = true;
fc::optional< fc::exception > except;
promise_ptr prom_ptr;
};
namespace detail {
class chain_plugin_impl
{
public:
chain_plugin_impl() : write_queue( 64 ) {}
~chain_plugin_impl() { stop_write_processing(); }
void register_snapshot_provider(state_snapshot_provider& provider)
{
snapshot_provider = &provider;
}
void start_write_processing();
void stop_write_processing();
bool start_replay_processing( synchronization_type& on_sync );
void initial_settings();
void open();
bool replay_blockchain();
void process_snapshot();
bool check_data_consistency();
void work( synchronization_type& on_sync );
void write_default_database_config( bfs::path& p );
uint64_t shared_memory_size = 0;
uint16_t shared_file_full_threshold = 0;
uint16_t shared_file_scale_rate = 0;
int16_t sps_remove_threshold = -1;
uint32_t chainbase_flags = 0;
bfs::path shared_memory_dir;
bool replay = false;
bool resync = false;
bool readonly = false;
bool check_locks = false;
bool validate_invariants = false;
bool dump_memory_details = false;
bool benchmark_is_enabled = false;
bool statsd_on_replay = false;
uint32_t stop_replay_at = 0;
bool exit_after_replay = false;
bool force_replay = false;
uint32_t benchmark_interval = 0;
uint32_t flush_interval = 0;
bool replay_in_memory = false;
std::vector< std::string > replay_memory_indices{};
flat_map<uint32_t,block_id_type> loaded_checkpoints;
uint32_t allow_future_time = 5;
bool running = true;
std::shared_ptr< std::thread > write_processor_thread;
boost::lockfree::queue< write_context* > write_queue;
int16_t write_lock_hold_time = 500;
vector< string > loaded_plugins;
fc::mutable_variant_object plugin_state_opts;
bfs::path database_cfg;
database db;
std::string block_generator_registrant;
std::shared_ptr< abstract_block_producer > block_generator;
hive::utilities::benchmark_dumper dumper;
hive::chain::open_args db_open_args;
get_indexes_memory_details_type get_indexes_memory_details;
state_snapshot_provider* snapshot_provider = nullptr;
bool is_p2p_enabled = true;
};
struct write_request_visitor
{
write_request_visitor() {}
database* db;
uint32_t skip = 0;
fc::optional< fc::exception >* except;
std::shared_ptr< abstract_block_producer > block_generator;
typedef bool result_type;
bool operator()( const signed_block* block )
{
bool result = false;
try
{
STATSD_START_TIMER( "chain", "write_time", "push_block", 1.0f )
result = db->push_block( *block, skip );
STATSD_STOP_TIMER( "chain", "write_time", "push_block" )
}
catch( fc::exception& e )
{
*except = e;
}
catch( ... )
{
*except = fc::unhandled_exception( FC_LOG_MESSAGE( warn, "Unexpected exception while pushing block." ),
std::current_exception() );
}
return result;
}
bool operator()( const signed_transaction* trx )
{
bool result = false;
try
{
STATSD_START_TIMER( "chain", "write_time", "push_transaction", 1.0f )
db->push_transaction( *trx );
STATSD_STOP_TIMER( "chain", "write_time", "push_transaction" )
result = true;
}
catch( fc::exception& e )
{
*except = e;
}
catch( ... )
{
*except = fc::unhandled_exception( FC_LOG_MESSAGE( warn, "Unexpected exception while pushing block." ),
std::current_exception() );
}
return result;
}
bool operator()( generate_block_request* req )
{
bool result = false;
try
{
if( !block_generator )
FC_THROW_EXCEPTION( chain_exception, "Received a generate block request, but no block generator has been registered." );
STATSD_START_TIMER( "chain", "write_time", "generate_block", 1.0f )
req->block = block_generator->generate_block(
req->when,
req->witness_owner,
req->block_signing_private_key,
req->skip
);
STATSD_STOP_TIMER( "chain", "write_time", "generate_block" )
result = true;
}
catch( fc::exception& e )
{
*except = e;
}
catch( ... )
{
*except = fc::unhandled_exception( FC_LOG_MESSAGE( warn, "Unexpected exception while pushing block." ),
std::current_exception() );
}
return result;
}
};
struct request_promise_visitor
{
request_promise_visitor(){}
typedef void result_type;
template< typename T >
void operator()( T* t )
{
t->set_value();
}
};
void chain_plugin_impl::start_write_processing()
{
write_processor_thread = std::make_shared< std::thread >( [&]()
{
bool is_syncing = true;
write_context* cxt;
fc::time_point_sec start = fc::time_point::now();
write_request_visitor req_visitor;
req_visitor.db = &db;
req_visitor.block_generator = block_generator;
request_promise_visitor prom_visitor;
/* This loop monitors the write request queue and performs writes to the database. These
* can be blocks or pending transactions. Because the caller needs to know the success of
* the write and any exceptions that are thrown, a write context is passed in the queue
* to the processing thread which it will use to store the results of the write. It is the
* caller's responsibility to ensure the pointer to the write context remains valid until
* the contained promise is complete.
*
* The loop has two modes, sync mode and live mode. In sync mode we want to process writes
* as quickly as possible with minimal overhead. The outer loop busy waits on the queue
* and the inner loop drains the queue as quickly as possible. We exit sync mode when the
* head block is within 1 minute of system time.
*
* Live mode needs to balance between processing pending writes and allowing readers access
* to the database. It will batch writes together as much as possible to minimize lock
* overhead but will willingly give up the write lock after 500ms. The thread then sleeps for
* 10ms. This allows time for readers to access the database as well as more writes to come
* in. When the node is live the rate at which writes come in is slower and busy waiting is
* not an optimal use of system resources when we could give CPU time to read threads.
*/
while( running )
{
if( !is_syncing )
start = fc::time_point::now();
if( write_queue.pop( cxt ) )
{
db.with_write_lock( [&]()
{
STATSD_START_TIMER( "chain", "lock_time", "write_lock", 1.0f )
while( true )
{
req_visitor.skip = cxt->skip;
req_visitor.except = &(cxt->except);
cxt->success = cxt->req_ptr.visit( req_visitor );
cxt->prom_ptr.visit( prom_visitor );
if( is_syncing && start - db.head_block_time() < fc::minutes(1) )
{
start = fc::time_point::now();
is_syncing = false;
}
if( !is_syncing && write_lock_hold_time >= 0 && fc::time_point::now() - start > fc::milliseconds( write_lock_hold_time ) )
{
break;
}
if( !write_queue.pop( cxt ) )
{
break;
}
}
});
}
if( !is_syncing )
boost::this_thread::sleep_for( boost::chrono::milliseconds( 10 ) );
}
});
}
void chain_plugin_impl::stop_write_processing()
{
running = false;
if( write_processor_thread )
write_processor_thread->join();
write_processor_thread.reset();
}
bool chain_plugin_impl::start_replay_processing( synchronization_type& on_sync )
{
bool replay_is_last_operation = replay_blockchain();
if( replay_is_last_operation )
{
if( !appbase::app().is_interrupt_request() )
{
/*
Triggering artifical signal.
Whole application should be closed in identical way, as if it was closed by user.
This case occurs only when `exit-after-replay` switch is used.
*/
appbase::app().generate_interrupt_request();
}
}
else
{
//if `stop_replay_at` > 0 stay in API node context( without p2p connections )
if( stop_replay_at > 0 )
is_p2p_enabled = false;
}
return replay_is_last_operation;
}
void chain_plugin_impl::initial_settings()
{
if( statsd_on_replay )
{
auto statsd = appbase::app().find_plugin< hive::plugins::statsd::statsd_plugin >();
if( statsd != nullptr )
{
statsd->start_logging();
}
}
ilog( "Starting chain with shared_file_size: ${n} bytes", ("n", shared_memory_size) );
if(resync)
{
wlog("resync requested: deleting block log and shared memory");
db.wipe( app().data_dir() / "blockchain", shared_memory_dir, true );
}
db.set_flush_interval( flush_interval );
db.add_checkpoints( loaded_checkpoints );
db.set_require_locking( check_locks );
const auto& abstract_index_cntr = db.get_abstract_index_cntr();
get_indexes_memory_details = [ this, &abstract_index_cntr ]
(index_memory_details_cntr_t& index_memory_details_cntr, bool onlyStaticInfo)
{
if( dump_memory_details == false )
return;
for (auto idx : abstract_index_cntr)
{
auto info = idx->get_statistics(onlyStaticInfo);
index_memory_details_cntr.emplace_back(std::move(info._value_type_name), info._item_count,
info._item_sizeof, info._item_additional_allocation, info._additional_container_allocation);
}
};
fc::variant database_config;
#ifdef ENABLE_MIRA
try
{
database_config = fc::json::from_file( database_cfg, fc::json::strict_parser );
}
catch ( const std::exception& e )
{
elog( "Error while parsing database configuration: ${e}", ("e", e.what()) );
exit( EXIT_FAILURE );
}
catch ( const fc::exception& e )
{
elog( "Error while parsing database configuration: ${e}", ("e", e.what()) );
exit( EXIT_FAILURE );
}
#endif
db_open_args.data_dir = app().data_dir() / "blockchain";
db_open_args.shared_mem_dir = shared_memory_dir;
db_open_args.initial_supply = HIVE_INIT_SUPPLY;
db_open_args.hbd_initial_supply = HIVE_HBD_INIT_SUPPLY;
db_open_args.shared_file_size = shared_memory_size;
db_open_args.shared_file_full_threshold = shared_file_full_threshold;
db_open_args.shared_file_scale_rate = shared_file_scale_rate;
db_open_args.sps_remove_threshold = sps_remove_threshold;
db_open_args.chainbase_flags = chainbase_flags;
db_open_args.do_validate_invariants = validate_invariants;
db_open_args.stop_replay_at = stop_replay_at;
db_open_args.exit_after_replay = exit_after_replay;
db_open_args.force_replay = force_replay;
db_open_args.benchmark_is_enabled = benchmark_is_enabled;
db_open_args.database_cfg = database_config;
db_open_args.replay_in_memory = replay_in_memory;
db_open_args.replay_memory_indices = replay_memory_indices;
auto benchmark_lambda = [ this ] ( uint32_t current_block_number,
const chainbase::database::abstract_index_cntr_t& abstract_index_cntr )
{
if( current_block_number == 0 ) // initial call
{
typedef hive::utilities::benchmark_dumper::database_object_sizeof_cntr_t database_object_sizeof_cntr_t;
auto get_database_objects_sizeofs = [ this, &abstract_index_cntr ]
(database_object_sizeof_cntr_t& database_object_sizeof_cntr)
{
if ( dump_memory_details == false)
return;
for (auto idx : abstract_index_cntr)
{
auto info = idx->get_statistics(true);
database_object_sizeof_cntr.emplace_back(std::move(info._value_type_name), info._item_sizeof);
}
};
dumper.initialize( get_database_objects_sizeofs, BENCHMARK_FILE_NAME );
return;
}
const hive::utilities::benchmark_dumper::measurement& measure =
dumper.measure( current_block_number, get_indexes_memory_details );
ilog( "Performance report at block ${n}. Elapsed time: ${rt} ms (real), ${ct} ms (cpu). Memory usage: ${cm} (current), ${pm} (peak) kilobytes.",
("n", current_block_number)
("rt", measure.real_ms)
("ct", measure.cpu_ms)
("cm", measure.current_mem)
("pm", measure.peak_mem) );
};
db_open_args.benchmark = hive::chain::TBenchmark( benchmark_interval, benchmark_lambda );
}
bool chain_plugin_impl::check_data_consistency()
{
uint64_t head_block_num_origin = 0;
uint64_t head_block_num_state = 0;
auto _is_reindex_complete = db.is_reindex_complete( &head_block_num_origin, &head_block_num_state );
if( !_is_reindex_complete )
{
if( head_block_num_state > head_block_num_origin )
{
appbase::app().generate_interrupt_request();
return false;
}
if( db.get_snapshot_loaded() )
{
wlog( "Replaying has to be forced, after snapshot's loading. { \"block_log-head\": ${b1}, \"state-head\": ${b2} }", ( "b1", head_block_num_origin )( "b2", head_block_num_state ) );
}
else
{
wlog( "Replaying is not finished. Synchronization is not allowed. { \"block_log-head\": ${b1}, \"state-head\": ${b2} }", ( "b1", head_block_num_origin )( "b2", head_block_num_state ) );
appbase::app().generate_interrupt_request();
return false;
}
}
return true;
}
void chain_plugin_impl::open()
{
try
{
ilog("Opening shared memory from ${path}", ("path",shared_memory_dir.generic_string()));
db.open( db_open_args );
if( dump_memory_details )
dumper.dump( true, get_indexes_memory_details );
}
catch( const fc::exception& e )
{
wlog( "Error opening database. If the binary or configuration has changed, replay the blockchain explicitly using `--replay-blockchain`." );
wlog( "If you know what you are doing you can skip this check and force open the database using `--force-open`." );
wlog( "WARNING: THIS MAY CORRUPT YOUR DATABASE. FORCE OPEN AT YOUR OWN RISK." );
wlog( " Error: ${e}", ("e", e) );
exit(EXIT_FAILURE);
}
}
bool chain_plugin_impl::replay_blockchain()
{
try
{
ilog("Replaying blockchain on user request.");
uint32_t last_block_number = 0;
last_block_number = db.reindex( db_open_args );
if( benchmark_interval > 0 )
{
const hive::utilities::benchmark_dumper::measurement& total_data = dumper.dump( true, get_indexes_memory_details );
ilog( "Performance report (total). Blocks: ${b}. Elapsed time: ${rt} ms (real), ${ct} ms (cpu). Memory usage: ${cm} (current), ${pm} (peak) kilobytes.",
("b", total_data.block_number)
("rt", total_data.real_ms)
("ct", total_data.cpu_ms)
("cm", total_data.current_mem)
("pm", total_data.peak_mem) );
}
if( stop_replay_at > 0 && stop_replay_at == last_block_number )
{
ilog("Stopped blockchain replaying on user request. Last applied block number: ${n}.", ("n", last_block_number));
}
/*
Returns information if the replay is last operation.
*/
return appbase::app().is_interrupt_request()/*user triggered SIGINT/SIGTERM*/ || exit_after_replay/*shutdown node definitely*/;
} FC_CAPTURE_AND_LOG( () )
return true;
}
void chain_plugin_impl::process_snapshot()
{
if( snapshot_provider != nullptr )
snapshot_provider->process_explicit_snapshot_requests( db_open_args );
}
void chain_plugin_impl::work( synchronization_type& on_sync )
{
ilog( "Started on blockchain with ${n} blocks", ("n", db.head_block_num()) );
on_sync();
start_write_processing();
}
void chain_plugin_impl::write_default_database_config( bfs::path &p )
{
ilog( "writing database configuration: ${p}", ("p", p.string()) );
fc::json::save_to_file( hive::utilities::default_database_configuration(), p );
}
} // detail
chain_plugin::chain_plugin() : my( new detail::chain_plugin_impl() ) {}
chain_plugin::~chain_plugin(){}
database& chain_plugin::db() { return my->db; }
const hive::chain::database& chain_plugin::db() const { return my->db; }
bfs::path chain_plugin::state_storage_dir() const
{
return my->shared_memory_dir;
}
void chain_plugin::set_program_options(options_description& cli, options_description& cfg)
{
cfg.add_options()
("sps-remove-threshold", bpo::value<uint16_t>()->default_value( 200 ), "Maximum numbers of proposals/votes which can be removed in the same cycle")
("shared-file-dir", bpo::value<bfs::path>()->default_value("blockchain"),
"the location of the chain shared memory files (absolute path or relative to application data dir)")
("shared-file-size", bpo::value<string>()->default_value("54G"), "Size of the shared memory file. Default: 54G. If running a full node, increase this value to 200G.")
("shared-file-full-threshold", bpo::value<uint16_t>()->default_value(0),
"A 2 precision percentage (0-10000) that defines the threshold for when to autoscale the shared memory file. Setting this to 0 disables autoscaling. Recommended value for consensus node is 9500 (95%). Full node is 9900 (99%)" )
("shared-file-scale-rate", bpo::value<uint16_t>()->default_value(0),
"A 2 precision percentage (0-10000) that defines how quickly to scale the shared memory file. When autoscaling occurs the file's size will be increased by this percent. Setting this to 0 disables autoscaling. Recommended value is between 1000-2000 (10-20%)" )
("checkpoint,c", bpo::value<vector<string>>()->composing(), "Pairs of [BLOCK_NUM,BLOCK_ID] that should be enforced as checkpoints.")
("flush-state-interval", bpo::value<uint32_t>(),
"flush shared memory changes to disk every N blocks")
#ifdef ENABLE_MIRA
("memory-replay-indices", bpo::value<vector<string>>()->multitoken()->composing(), "Specify which indices should be in memory during replay")
#endif
;
cli.add_options()
("sps-remove-threshold", bpo::value<uint16_t>()->default_value( 200 ), "Maximum numbers of proposals/votes which can be removed in the same cycle")
("replay-blockchain", bpo::bool_switch()->default_value(false), "clear chain database and replay all blocks" )