diff --git a/src/hive_fork_manager/authorization.sql b/src/hive_fork_manager/authorization.sql index 30d125e69d75611340aedc64b7bbdade0c935ff5..ab721387cf8d807d6a1c841b5f7ede889bc8aef4 100644 --- a/src/hive_fork_manager/authorization.sql +++ b/src/hive_fork_manager/authorization.sql @@ -236,6 +236,8 @@ GRANT EXECUTE ON FUNCTION , hive.remove_obsolete_operations , hive.detach_table , hive.app_check_contexts_synchronized(_contexts hive.contexts_group) + , hive.set_sync_state( _new_state hafd.sync_state ) + , hive.get_sync_state() TO hived_group; GRANT USAGE ON SCHEMA hive to haf_maintainer; diff --git a/src/hive_fork_manager/hived_api.sql b/src/hive_fork_manager/hived_api.sql index 6186e494c7007a60ae42978c533d055096942ad7..a5b470c9f79745d0e85e816f340f76c23a7d8caa 100644 --- a/src/hive_fork_manager/hived_api.sql +++ b/src/hive_fork_manager/hived_api.sql @@ -565,3 +565,32 @@ END; $BODY$ ; + +CREATE OR REPLACE FUNCTION hive.get_sync_state() + RETURNS hafd.sync_state + LANGUAGE plpgsql + STABLE +AS +$BODY$ +DECLARE + __result hafd.sync_state; +BEGIN + SELECT state INTO __result + FROM hafd.hive_state; + + RETURN __result; +END; +$BODY$ +; + +CREATE OR REPLACE FUNCTION hive.set_sync_state( _new_state hafd.sync_state ) + RETURNS void + LANGUAGE plpgsql + VOLATILE +AS +$BODY$ +BEGIN + UPDATE hafd.hive_state SET state = _new_state; +END; +$BODY$ +; diff --git a/src/sql_serializer/include/hive/plugins/sql_serializer/indexation_state.hpp b/src/sql_serializer/include/hive/plugins/sql_serializer/indexation_state.hpp index b86a138bc244928548c0f9c5cd281f049b0bde15..eb6f00a4e558f08882a610e7fdad31917af4e3f2 100644 --- a/src/sql_serializer/include/hive/plugins/sql_serializer/indexation_state.hpp +++ b/src/sql_serializer/include/hive/plugins/sql_serializer/indexation_state.hpp @@ -77,6 +77,9 @@ namespace hive::plugins::sql_serializer { void force_trigger_flush_with_all_data( cached_data_t& cached_data, int last_block_num ); bool can_move_to_livesync() const; uint32_t expected_number_of_blocks_to_sync() const; + + INDEXATION get_state() const; + void set_state( INDEXATION _new_state ); private: const sql_serializer_plugin& _main_plugin; hive::chain::database& _chain_db; @@ -90,7 +93,6 @@ namespace hive::plugins::sql_serializer { const uint32_t _psql_first_block; boost::signals2::connection _on_irreversible_block_conn; - INDEXATION _state{ INDEXATION::P2P }; std::shared_ptr< data_dumper > _dumper; std::shared_ptr< flush_trigger > _trigger; int32_t _irreversible_block_num; @@ -98,6 +100,8 @@ namespace hive::plugins::sql_serializer { write_ahead_log_manager& _write_ahead_log; fc::time_point _start_state_time; + + mutable std::optional< INDEXATION > _cached_state_value; }; } // namespace hive::plugins::sql_serializer diff --git a/src/sql_serializer/indexation_state.cpp b/src/sql_serializer/indexation_state.cpp index 962341e17066307111bd271a09d5547b268510f1..2305dbd2e64586acc4a42a5deea661c7d66a6e3c 100644 --- a/src/sql_serializer/indexation_state.cpp +++ b/src/sql_serializer/indexation_state.cpp @@ -161,7 +161,7 @@ indexation_state::indexation_state( { FC_ASSERT( _psql_first_block >= 1, "psql-first-block=${v} < 1", ("v", _psql_first_block) ); cached_data_t empty_data{0}; - update_state( INDEXATION::START, empty_data, 0 ); + set_state( INDEXATION::START ); _on_irreversible_block_conn = _chain_db.add_irreversible_block_handler( [this]( uint32_t block_num ){ on_irreversible_block( block_num ); } @@ -171,7 +171,7 @@ indexation_state::indexation_state( void indexation_state::on_pre_reindex( cached_data_t& cached_data, int last_block_num, uint32_t number_of_blocks_to_add ) { - switch ( _state ) { + switch ( get_state() ) { case INDEXATION::P2P: case INDEXATION::LIVE: case INDEXATION::WAIT: @@ -203,7 +203,7 @@ indexation_state::on_post_reindex( cached_data_t& cached_data, uint32_t last_blo return true; }; - switch ( _state ) { + switch ( get_state() ) { case INDEXATION::START: case INDEXATION::P2P: case INDEXATION::LIVE: @@ -226,7 +226,7 @@ indexation_state::on_post_reindex( cached_data_t& cached_data, uint32_t last_blo void indexation_state::on_end_of_syncing( cached_data_t& cached_data, int last_block_num ) { - if ( _state == INDEXATION::LIVE ) { + if ( get_state() == INDEXATION::LIVE ) { return; } @@ -236,7 +236,7 @@ indexation_state::on_end_of_syncing( cached_data_t& cached_data, int last_block_ void indexation_state::on_first_block( int last_block_num ) { cached_data_t empty_cache{0}; - switch( _state ) { + switch( get_state() ) { case INDEXATION::START: case INDEXATION::WAIT: if ( can_move_to_livesync() ) { @@ -257,7 +257,7 @@ indexation_state::on_first_block( int last_block_num ) { void indexation_state::on_block( int last_block_num ) { - switch( _state ) { + switch( get_state() ) { case INDEXATION::START: { cached_data_t empty_cache{0}; update_state( INDEXATION::WAIT, empty_cache, last_block_num ); @@ -286,7 +286,7 @@ indexation_state::update_state( , cached_data_t& cached_data , uint32_t last_block_num, uint32_t number_of_blocks_to_add ) { - FC_ASSERT( _state != INDEXATION::LIVE, "Move from LIVE state is illegal" ); + FC_ASSERT( get_state() != INDEXATION::LIVE, "Move from LIVE state is illegal" ); switch ( state ) { case INDEXATION::START: ilog( "Entered START sync state" ); @@ -318,7 +318,7 @@ indexation_state::update_state( break; case INDEXATION::REINDEX: ilog("PROFILE: Entering REINDEX sync from start state, dropping constraints: ${t} s",("t",(fc::time_point::now() - _start_state_time).to_seconds())); - FC_ASSERT( _state == INDEXATION::START || _state == INDEXATION::REINDEX_WAIT , "Reindex always starts after START or REINDEX_WAIT" ); + FC_ASSERT( get_state() == INDEXATION::START || get_state() == INDEXATION::REINDEX_WAIT , "Reindex always starts after START or REINDEX_WAIT" ); force_trigger_flush_with_all_data( cached_data, last_block_num ); _trigger.reset(); @@ -346,7 +346,7 @@ indexation_state::update_state( case INDEXATION::LIVE: { ilog("PROFILE: Entering LIVE sync, creating indexes/constraints as needed: ${t} s",("t",(fc::time_point::now() - _start_state_time).to_seconds())); - if ( _state != INDEXATION::START ) + if ( get_state() != INDEXATION::START ) { auto irreversible_cached_data = move_irreveresible_blocks(cached_data, _irreversible_block_num ); force_trigger_flush_with_all_data( irreversible_cached_data, _irreversible_block_num ); @@ -405,7 +405,8 @@ indexation_state::update_state( default: FC_ASSERT( false, "Unknown INDEXATION state" ); } - _state = state; + + set_state(state); } void @@ -452,7 +453,7 @@ indexation_state::on_irreversible_block( uint32_t block_num ) { void indexation_state::on_switch_fork( cached_data_t& cached_data, uint32_t block_num ) { - if ( _state != INDEXATION::P2P ) { + if ( get_state() != INDEXATION::P2P ) { return; } @@ -470,7 +471,91 @@ indexation_state::on_switch_fork( cached_data_t& cached_data, uint32_t block_num bool indexation_state::collect_blocks() const { - return static_cast< uint32_t >( _state ) & COLLECT_BLOCKS_MASK; + return static_cast< uint32_t >( get_state() ) & COLLECT_BLOCKS_MASK; +} + +indexation_state::INDEXATION +indexation_state::get_state() const { + using namespace std::literals; + + if ( _cached_state_value.has_value() ) + return _cached_state_value.value(); + + indexation_state::INDEXATION result = INDEXATION::START; + queries_commit_data_processor state_getter( + _db_url + , "Get synchronization state" + , "" + , [&result](const data_processor::data_chunk_ptr&, transaction_controllers::transaction& tx) -> data_processor::data_processing_status { + static const std::unordered_map< std::string, INDEXATION > string_to_enum = { + {"START", INDEXATION::START} + , {"WAIT", INDEXATION::WAIT} + , {"REINDEX_WAIT", INDEXATION::REINDEX_WAIT} + , {"REINDEX", INDEXATION::REINDEX} + , {"P2P", INDEXATION::P2P} + , {"LIVE", INDEXATION::LIVE} + }; + + pqxx::result data = tx.exec("select hive.get_sync_state() as _result;"); + FC_ASSERT( !data.empty(), "No response from database" ); + FC_ASSERT( data.size() == 1, "Wrong data size" ); + const auto& record = data[0]; + auto state_text = record[ "_result" ].as<std::string>(); + + auto enum_it = string_to_enum.find( state_text ); + FC_ASSERT( enum_it != string_to_enum.end(), "Unknown hafd.sync_state value" ); + result = enum_it->second; + + return data_processor::data_processing_status(); + } + , nullptr + , theApp + ); + + state_getter.trigger(data_processor::data_chunk_ptr(), 0); + state_getter.join(); + + _cached_state_value = result; + + return result; +} + +void +indexation_state::set_state( INDEXATION _new_state ) { + using namespace std::literals; + + queries_commit_data_processor state_setter( + _db_url + , "Set synchronization state" + , "" + , [_new_state](const data_processor::data_chunk_ptr&, transaction_controllers::transaction& tx) -> data_processor::data_processing_status { + static const std::unordered_map< INDEXATION, std::string > enum_to_string = { + {INDEXATION::START, "'START'::hafd.sync_state"} + , {INDEXATION::WAIT, "'WAIT'::hafd.sync_state"} + , {INDEXATION::REINDEX_WAIT, "'REINDEX_WAIT'::hafd.sync_state"} + , {INDEXATION::REINDEX, "'REINDEX'::hafd.sync_state"} + , {INDEXATION::P2P, "'P2P'::hafd.sync_state"} + , {INDEXATION::LIVE, "'LIVE'::hafd.sync_state"} + }; + + auto enum_it = enum_to_string.find( _new_state ); + FC_ASSERT( enum_it != enum_to_string.end(), "Unknown enum value" ); + auto value_to_set = enum_it->second; + + std::string query = "select hive.set_sync_state("s + value_to_set + ");"s; + + pqxx::result data = tx.exec(query); + + return data_processor::data_processing_status(); + } + , nullptr + , theApp + ); + + state_setter.trigger(data_processor::data_chunk_ptr(), 0); + state_setter.join(); + + _cached_state_value.reset(); } }}} // namespace hive{ namespace plugins{ namespace sql_serializer diff --git a/tests/integration/functional/hive_fork_manager/CMakeLists.txt b/tests/integration/functional/hive_fork_manager/CMakeLists.txt index 68777966d0888e6658fa0c066e777b628a01f0c8..22e6c270ca54b3d56c0bb2853440e48e78eaa521 100644 --- a/tests/integration/functional/hive_fork_manager/CMakeLists.txt +++ b/tests/integration/functional/hive_fork_manager/CMakeLists.txt @@ -157,6 +157,8 @@ ADD_TEST( NAME test_update_script ADD_SQL_FUNCTIONAL_TEST( hived_api/are_fk_dropped_test.sql ) ADD_SQL_FUNCTIONAL_TEST( hived_api/autodetach.sql ) ADD_SQL_FUNCTIONAL_TEST( hived_api/connect_test_not_remove_reversible.sql ) + ADD_SQL_FUNCTIONAL_TEST( hived_api/set_sync_state.sql ) + ADD_SQL_FUNCTIONAL_TEST( hived_api/get_sync_state.sql ) ADD_SQL_FUNCTIONAL_TEST( app_api/create_context_test.sql ) ADD_SQL_FUNCTIONAL_TEST( app_api/app_context_remove_test.sql ) diff --git a/tests/integration/functional/hive_fork_manager/hived_api/get_sync_state.sql b/tests/integration/functional/hive_fork_manager/hived_api/get_sync_state.sql new file mode 100644 index 0000000000000000000000000000000000000000..f52a0ba6ac68a6d73806323c5b599f0bca9bf5a9 --- /dev/null +++ b/tests/integration/functional/hive_fork_manager/hived_api/get_sync_state.sql @@ -0,0 +1,29 @@ +CREATE OR REPLACE PROCEDURE haf_admin_test_given() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + UPDATE hafd.hive_state SET state = 'REINDEX_WAIT'; +END; +$BODY$ +; + +CREATE OR REPLACE PROCEDURE test_hived_test_then() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + ASSERT ( SELECT hive.get_sync_state() ) = 'REINDEX_WAIT', 'Wrong sync state'; +END; +$BODY$ +; + +CREATE OR REPLACE PROCEDURE alice_test_then() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + ASSERT ( SELECT hive.get_sync_state() ) = 'REINDEX_WAIT', 'Wrong sync state'; +END; +$BODY$ +; \ No newline at end of file diff --git a/tests/integration/functional/hive_fork_manager/hived_api/set_sync_state.sql b/tests/integration/functional/hive_fork_manager/hived_api/set_sync_state.sql new file mode 100644 index 0000000000000000000000000000000000000000..0b8735beec0707fe0a8fe40d35f0aadaf91b19ca --- /dev/null +++ b/tests/integration/functional/hive_fork_manager/hived_api/set_sync_state.sql @@ -0,0 +1,30 @@ +CREATE OR REPLACE PROCEDURE test_hived_test_given() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + PERFORM hive.set_sync_state( 'REINDEX_WAIT' ); +END; +$BODY$ +; + +CREATE OR REPLACE PROCEDURE alice_test_error() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + -- Alice has no access to set state + PERFORM hive.set_sync_state( 'REINDEX_WAIT' ); +END; +$BODY$ +; + +CREATE OR REPLACE PROCEDURE test_hived_test_then() + LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN + ASSERT ( SELECT state FROM hafd.hive_state ) = 'REINDEX_WAIT', 'Wrong sync state'; +END; +$BODY$ +; \ No newline at end of file