Skip to content
Snippets Groups Projects
Commit 31d8e538 authored by Marcin's avatar Marcin
Browse files

break hived process when any thread of sql_serializer failed

When reindex/livesync dumper any threads fail then hived process is interrupted
and dumper throws an exception
parent 160117da
No related branches found
No related tags found
1 merge request!16break hived process when any thread of sql_serialize failed
Pipeline #33651 passed
......@@ -2,6 +2,7 @@
#include <fc/exception/exception.hpp>
#include <fc/log/logger.hpp>
#include <appbase/application.hpp>
#include <boost/exception/diagnostic_information.hpp>
......@@ -92,20 +93,32 @@ data_processor::data_processor( std::string description, const data_processing_f
}
catch(const pqxx::sql_error& ex)
{
//elog("Data processor ${d} detected SQL statement execution failure. Failing statement: `${q}', SQLState: ${s}.", ("d", _description)("q", ex.query())("s", ex.sqlstate()));
elog("Data processor ${d} detected SQL statement execution failure. Failing statement: `${q}'.", ("d", _description)("q", ex.query()));
appbase::app().generate_interrupt_request();
throw;
}
catch(const pqxx::pqxx_exception& ex)
{
elog("Data processor ${d} detected SQL execution failure: ${e}", ("d", _description)("e", ex.base().what()));
appbase::app().generate_interrupt_request();
throw;
}
catch(const fc::exception& ex)
{
elog("Data processor ${d} execution failed: ${e}", ("d", _description)("e", ex.what()));
appbase::app().generate_interrupt_request();
throw;
}
catch(const std::exception& ex)
{
elog("Data processor ${d} execution failed: ${e}", ("d", _description)("e", ex.what()));
appbase::app().generate_interrupt_request();
throw;
}
catch(...) {
elog("Data processor ${d} execution failed: unknown exception", ("d", _description));
appbase::app().generate_interrupt_request();
throw;
}
ilog("Leaving data processor thread: ${d}", ("d", _description));
......@@ -116,13 +129,11 @@ data_processor::data_processor( std::string description, const data_processing_f
data_processor::~data_processor()
{
ilog("~data_processor: ${d}", ("d", _description));
}
void data_processor::trigger(data_chunk_ptr dataPtr, uint32_t last_blocknum)
{
if(!_future.valid())
return;
/// Set immediately data processing flag
_is_processing_data = true;
......@@ -142,7 +153,6 @@ void data_processor::trigger(data_chunk_ptr dataPtr, uint32_t last_blocknum)
_cv.wait(lk, [this] {return _dataPtr.valid() == false; });
}
dlog("Leaving trigger of data data processor: ${d}...", ("d", _description));
}
......@@ -184,13 +194,11 @@ void data_processor::join()
}
_cv.notify_one();
ilog("Waiting for data processor: ${d} worker thread finish...", ("d", _description));
try {
if( _future.valid() )
_future.get();
} catch (...) {
elog( "Caught unhandled exception ${diagnostic}", ("diagnostic", boost::current_exception_diagnostic_information()) );
throw;
}
ilog("Data processor: ${d} finished execution...", ("d", _description));
......
......@@ -164,4 +164,35 @@ namespace hive::plugins::sql_serializer {
return processingStatus;
}
template< typename Writer >
inline std::exception_ptr
join_writers_impl( Writer& writer ) try {
try{
writer.join();
}
FC_CAPTURE_AND_RETHROW()
return nullptr;
} catch( ... ) {
return std::current_exception();
}
template< typename Writer, typename... Writers >
inline std::exception_ptr
join_writers_impl( Writer& writer, Writers& ...writers ) {
std::exception_ptr current_exception = join_writers_impl( writer );;
auto next_exception = join_writers_impl( writers... );
if ( current_exception != nullptr ) {
return current_exception;
}
return next_exception;
}
template< typename... Writers >
inline void
join_writers( Writers& ...writers ) {
auto exception = join_writers_impl( writers... );
if ( exception != nullptr ) {
std::rethrow_exception( exception );
}
}
} // namespace hive::plugins::sql_serializer
\ No newline at end of file
......@@ -8,7 +8,6 @@ namespace hive::plugins::sql_serializer {
virtual ~data_dumper() = default;
virtual void trigger_data_flush( cached_data_t& cached_data, int last_block_num ) = 0;
virtual void join() = 0;
virtual void wait_for_data_processing_finish() = 0;
virtual uint32_t blocks_per_flush() const = 0;
};
......
......@@ -33,11 +33,11 @@ namespace hive::plugins::sql_serializer {
livesync_data_dumper& operator=(livesync_data_dumper&) = delete;
void trigger_data_flush( cached_data_t& cached_data, int last_block_num ) override;
void join() override;
void wait_for_data_processing_finish() override;
uint32_t blocks_per_flush() const override { return 1; }
private:
void join();
void on_irreversible_block( uint32_t block_num );
void on_switch_fork( uint32_t block_num );
......
......@@ -23,17 +23,18 @@ namespace hive::plugins::sql_serializer {
, uint32_t account_operation_threads
);
~reindex_data_dumper() { join(); }
~reindex_data_dumper();
reindex_data_dumper(reindex_data_dumper&) = delete;
reindex_data_dumper(reindex_data_dumper&&) = delete;
reindex_data_dumper& operator=(reindex_data_dumper&&) = delete;
reindex_data_dumper& operator=(reindex_data_dumper&) = delete;
void trigger_data_flush( cached_data_t& cached_data, int last_block_num ) override;
void join() override;
void wait_for_data_processing_finish() override;
uint32_t blocks_per_flush() const override { return 1000; }
private:
void join();
using block_data_container_t_writer = table_data_writer<hive_blocks>;
using transaction_data_container_t_writer = chunks_for_writers_splitter<
table_data_writer<
......
......@@ -78,13 +78,15 @@ namespace hive{ namespace plugins{ namespace sql_serializer {
[this]( uint32_t block_num ){ on_switch_fork( block_num ); }
, plugin
);
ilog( "livesync dumper created" );
}
livesync_data_dumper::~livesync_data_dumper() {
ilog( "livesync dumper is closing..." );
_on_irreversible_block_conn.disconnect();
_on_switch_fork_conn.disconnect();
livesync_data_dumper::join();
ilog( "livesync dumper closed" );
}
void livesync_data_dumper::trigger_data_flush( cached_data_t& cached_data, int last_block_num ) {
......@@ -104,12 +106,14 @@ namespace hive{ namespace plugins{ namespace sql_serializer {
}
void livesync_data_dumper::join() {
_block_writer->join();
_transaction_writer->join();
_transaction_multisig_writer->join();
_operation_writer->join();
_account_writer->join();
_account_operations_writer->join();
join_writers(
*_block_writer
, *_transaction_writer
, *_transaction_multisig_writer
, *_operation_writer
, *_account_writer
, *_account_operations_writer
);
}
void livesync_data_dumper::wait_for_data_processing_finish()
......
#include <hive/plugins/sql_serializer/reindex_data_dumper.h>
#include <exception>
namespace hive{ namespace plugins{ namespace sql_serializer {
reindex_data_dumper::reindex_data_dumper(
......@@ -27,6 +29,12 @@ namespace hive{ namespace plugins{ namespace sql_serializer {
_account_operations_writer = std::make_unique< account_operations_data_container_t_writer >( account_operation_threads, db_url, "Account operations data writer", api_trigger);
}
reindex_data_dumper::~reindex_data_dumper() {
ilog( "Reindex dumper is closing...." );
reindex_data_dumper::join();
ilog( "Reindex dumper closed" );
}
void reindex_data_dumper::trigger_data_flush( cached_data_t& cached_data, int last_block_num ) {
_block_writer->trigger( std::move( cached_data.blocks ), false, last_block_num );
_transaction_writer->trigger( std::move( cached_data.transactions ), last_block_num);
......@@ -37,14 +45,15 @@ namespace hive{ namespace plugins{ namespace sql_serializer {
}
void reindex_data_dumper::join() {
_block_writer->join();
_transaction_writer->join();
_transaction_multisig_writer->join();
_operation_writer->join();
_account_writer->join();
_account_operations_writer->join();
_end_massive_sync_processor->join();
join_writers(
*_block_writer
, *_transaction_writer
, *_transaction_multisig_writer
, *_operation_writer
, *_account_writer
, *_account_operations_writer
, *_end_massive_sync_processor
);
}
void reindex_data_dumper::wait_for_data_processing_finish()
......
......@@ -405,8 +405,6 @@ using chain::reindex_notification;
("s", op_sequence_id + 1)("pbn", psql_block_number));
}
void join_data_processors() { _dumper->join(); }
void wait_for_data_processing_finish();
void process_cached_data();
......@@ -418,7 +416,6 @@ using chain::reindex_notification;
ilog("Flushing rest of data, wait a moment...");
process_cached_data();
join_data_processors();
ilog("Done, cleanup complete");
}
......@@ -623,6 +620,7 @@ void sql_serializer_plugin_impl::on_pre_reindex(const reindex_notification& note
if(_on_pre_apply_block_con.connected())
chain::util::disconnect_signal(_on_pre_apply_block_con);
_dumper.reset();
_dumper = std::make_unique< reindex_data_dumper >(
db_url
, psql_operations_threads_number
......@@ -643,6 +641,7 @@ void sql_serializer_plugin_impl::on_post_reindex(const reindex_notification& not
if(note.last_block_number >= note.max_block_number)
switch_db_items(true/*mode*/);
_dumper.reset();
_dumper = std::make_unique< livesync_data_dumper >( db_url, main_plugin, chain_db );
}
......@@ -777,7 +776,6 @@ bool sql_serializer_plugin_impl::skip_reversible_block(uint32_t block_no)
void sql_serializer_plugin::plugin_shutdown()
{
ilog("Flushing left data...");
my->join_data_processors();
my->disconnect_signals();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment