Skip to content
Snippets Groups Projects
Commit 7b0b89eb authored by Marcin's avatar Marcin
Browse files

sync state in hafd.hive_state

sqlserializer reads and sets the  synchronization state
in sync_state column of hafd.havie_state.

methods added:
hive.get_sync_state()
hive.set_sync_state() - only accessible for hived
parent a594a0d9
No related branches found
No related tags found
1 merge request!595issue haf#236 add sync_state type and column in irreversible_data
Pipeline #117987 passed
......@@ -236,6 +236,8 @@ GRANT EXECUTE ON FUNCTION
, hive.remove_obsolete_operations
, hive.detach_table
, hive.app_check_contexts_synchronized(_contexts hive.contexts_group)
, hive.set_sync_state( _new_state hafd.sync_state )
, hive.get_sync_state()
TO hived_group;
GRANT USAGE ON SCHEMA hive to haf_maintainer;
......
......@@ -565,3 +565,32 @@ END;
$BODY$
;
CREATE OR REPLACE FUNCTION hive.get_sync_state()
RETURNS hafd.sync_state
LANGUAGE plpgsql
STABLE
AS
$BODY$
DECLARE
__result hafd.sync_state;
BEGIN
SELECT state INTO __result
FROM hafd.hive_state;
RETURN __result;
END;
$BODY$
;
CREATE OR REPLACE FUNCTION hive.set_sync_state( _new_state hafd.sync_state )
RETURNS void
LANGUAGE plpgsql
VOLATILE
AS
$BODY$
BEGIN
UPDATE hafd.hive_state SET state = _new_state;
END;
$BODY$
;
......@@ -77,6 +77,9 @@ namespace hive::plugins::sql_serializer {
void force_trigger_flush_with_all_data( cached_data_t& cached_data, int last_block_num );
bool can_move_to_livesync() const;
uint32_t expected_number_of_blocks_to_sync() const;
INDEXATION get_state() const;
void set_state( INDEXATION _new_state );
private:
const sql_serializer_plugin& _main_plugin;
hive::chain::database& _chain_db;
......@@ -90,7 +93,6 @@ namespace hive::plugins::sql_serializer {
const uint32_t _psql_first_block;
boost::signals2::connection _on_irreversible_block_conn;
INDEXATION _state{ INDEXATION::P2P };
std::shared_ptr< data_dumper > _dumper;
std::shared_ptr< flush_trigger > _trigger;
int32_t _irreversible_block_num;
......@@ -98,6 +100,8 @@ namespace hive::plugins::sql_serializer {
write_ahead_log_manager& _write_ahead_log;
fc::time_point _start_state_time;
mutable std::optional< INDEXATION > _cached_state_value;
};
} // namespace hive::plugins::sql_serializer
......@@ -161,7 +161,7 @@ indexation_state::indexation_state(
{
FC_ASSERT( _psql_first_block >= 1, "psql-first-block=${v} < 1", ("v", _psql_first_block) );
cached_data_t empty_data{0};
update_state( INDEXATION::START, empty_data, 0 );
set_state( INDEXATION::START );
_on_irreversible_block_conn = _chain_db.add_irreversible_block_handler(
[this]( uint32_t block_num ){ on_irreversible_block( block_num ); }
......@@ -171,7 +171,7 @@ indexation_state::indexation_state(
void
indexation_state::on_pre_reindex( cached_data_t& cached_data, int last_block_num, uint32_t number_of_blocks_to_add ) {
switch ( _state ) {
switch ( get_state() ) {
case INDEXATION::P2P:
case INDEXATION::LIVE:
case INDEXATION::WAIT:
......@@ -203,7 +203,7 @@ indexation_state::on_post_reindex( cached_data_t& cached_data, uint32_t last_blo
return true;
};
switch ( _state ) {
switch ( get_state() ) {
case INDEXATION::START:
case INDEXATION::P2P:
case INDEXATION::LIVE:
......@@ -226,7 +226,7 @@ indexation_state::on_post_reindex( cached_data_t& cached_data, uint32_t last_blo
void
indexation_state::on_end_of_syncing( cached_data_t& cached_data, int last_block_num ) {
if ( _state == INDEXATION::LIVE ) {
if ( get_state() == INDEXATION::LIVE ) {
return;
}
......@@ -236,7 +236,7 @@ indexation_state::on_end_of_syncing( cached_data_t& cached_data, int last_block_
void
indexation_state::on_first_block( int last_block_num ) {
cached_data_t empty_cache{0};
switch( _state ) {
switch( get_state() ) {
case INDEXATION::START:
case INDEXATION::WAIT:
if ( can_move_to_livesync() ) {
......@@ -257,7 +257,7 @@ indexation_state::on_first_block( int last_block_num ) {
void
indexation_state::on_block( int last_block_num ) {
switch( _state ) {
switch( get_state() ) {
case INDEXATION::START: {
cached_data_t empty_cache{0};
update_state( INDEXATION::WAIT, empty_cache, last_block_num );
......@@ -286,7 +286,7 @@ indexation_state::update_state(
, cached_data_t& cached_data
, uint32_t last_block_num, uint32_t number_of_blocks_to_add
) {
FC_ASSERT( _state != INDEXATION::LIVE, "Move from LIVE state is illegal" );
FC_ASSERT( get_state() != INDEXATION::LIVE, "Move from LIVE state is illegal" );
switch ( state ) {
case INDEXATION::START:
ilog( "Entered START sync state" );
......@@ -318,7 +318,7 @@ indexation_state::update_state(
break;
case INDEXATION::REINDEX:
ilog("PROFILE: Entering REINDEX sync from start state, dropping constraints: ${t} s",("t",(fc::time_point::now() - _start_state_time).to_seconds()));
FC_ASSERT( _state == INDEXATION::START || _state == INDEXATION::REINDEX_WAIT , "Reindex always starts after START or REINDEX_WAIT" );
FC_ASSERT( get_state() == INDEXATION::START || get_state() == INDEXATION::REINDEX_WAIT , "Reindex always starts after START or REINDEX_WAIT" );
force_trigger_flush_with_all_data( cached_data, last_block_num );
_trigger.reset();
......@@ -346,7 +346,7 @@ indexation_state::update_state(
case INDEXATION::LIVE:
{
ilog("PROFILE: Entering LIVE sync, creating indexes/constraints as needed: ${t} s",("t",(fc::time_point::now() - _start_state_time).to_seconds()));
if ( _state != INDEXATION::START )
if ( get_state() != INDEXATION::START )
{
auto irreversible_cached_data = move_irreveresible_blocks(cached_data, _irreversible_block_num );
force_trigger_flush_with_all_data( irreversible_cached_data, _irreversible_block_num );
......@@ -405,7 +405,8 @@ indexation_state::update_state(
default:
FC_ASSERT( false, "Unknown INDEXATION state" );
}
_state = state;
set_state(state);
}
void
......@@ -452,7 +453,7 @@ indexation_state::on_irreversible_block( uint32_t block_num ) {
void
indexation_state::on_switch_fork( cached_data_t& cached_data, uint32_t block_num ) {
if ( _state != INDEXATION::P2P ) {
if ( get_state() != INDEXATION::P2P ) {
return;
}
......@@ -470,7 +471,91 @@ indexation_state::on_switch_fork( cached_data_t& cached_data, uint32_t block_num
bool
indexation_state::collect_blocks() const {
return static_cast< uint32_t >( _state ) & COLLECT_BLOCKS_MASK;
return static_cast< uint32_t >( get_state() ) & COLLECT_BLOCKS_MASK;
}
indexation_state::INDEXATION
indexation_state::get_state() const {
using namespace std::literals;
if ( _cached_state_value.has_value() )
return _cached_state_value.value();
indexation_state::INDEXATION result = INDEXATION::START;
queries_commit_data_processor state_getter(
_db_url
, "Get synchronization state"
, ""
, [&result](const data_processor::data_chunk_ptr&, transaction_controllers::transaction& tx) -> data_processor::data_processing_status {
static const std::unordered_map< std::string, INDEXATION > string_to_enum = {
{"START", INDEXATION::START}
, {"WAIT", INDEXATION::WAIT}
, {"REINDEX_WAIT", INDEXATION::REINDEX_WAIT}
, {"REINDEX", INDEXATION::REINDEX}
, {"P2P", INDEXATION::P2P}
, {"LIVE", INDEXATION::LIVE}
};
pqxx::result data = tx.exec("select hive.get_sync_state() as _result;");
FC_ASSERT( !data.empty(), "No response from database" );
FC_ASSERT( data.size() == 1, "Wrong data size" );
const auto& record = data[0];
auto state_text = record[ "_result" ].as<std::string>();
auto enum_it = string_to_enum.find( state_text );
FC_ASSERT( enum_it != string_to_enum.end(), "Unknown hafd.sync_state value" );
result = enum_it->second;
return data_processor::data_processing_status();
}
, nullptr
, theApp
);
state_getter.trigger(data_processor::data_chunk_ptr(), 0);
state_getter.join();
_cached_state_value = result;
return result;
}
void
indexation_state::set_state( INDEXATION _new_state ) {
using namespace std::literals;
queries_commit_data_processor state_setter(
_db_url
, "Set synchronization state"
, ""
, [_new_state](const data_processor::data_chunk_ptr&, transaction_controllers::transaction& tx) -> data_processor::data_processing_status {
static const std::unordered_map< INDEXATION, std::string > enum_to_string = {
{INDEXATION::START, "'START'::hafd.sync_state"}
, {INDEXATION::WAIT, "'WAIT'::hafd.sync_state"}
, {INDEXATION::REINDEX_WAIT, "'REINDEX_WAIT'::hafd.sync_state"}
, {INDEXATION::REINDEX, "'REINDEX'::hafd.sync_state"}
, {INDEXATION::P2P, "'P2P'::hafd.sync_state"}
, {INDEXATION::LIVE, "'LIVE'::hafd.sync_state"}
};
auto enum_it = enum_to_string.find( _new_state );
FC_ASSERT( enum_it != enum_to_string.end(), "Unknown enum value" );
auto value_to_set = enum_it->second;
std::string query = "select hive.set_sync_state("s + value_to_set + ");"s;
pqxx::result data = tx.exec(query);
return data_processor::data_processing_status();
}
, nullptr
, theApp
);
state_setter.trigger(data_processor::data_chunk_ptr(), 0);
state_setter.join();
_cached_state_value.reset();
}
}}} // namespace hive{ namespace plugins{ namespace sql_serializer
......
......@@ -157,6 +157,8 @@ ADD_TEST( NAME test_update_script
ADD_SQL_FUNCTIONAL_TEST( hived_api/are_fk_dropped_test.sql )
ADD_SQL_FUNCTIONAL_TEST( hived_api/autodetach.sql )
ADD_SQL_FUNCTIONAL_TEST( hived_api/connect_test_not_remove_reversible.sql )
ADD_SQL_FUNCTIONAL_TEST( hived_api/set_sync_state.sql )
ADD_SQL_FUNCTIONAL_TEST( hived_api/get_sync_state.sql )
ADD_SQL_FUNCTIONAL_TEST( app_api/create_context_test.sql )
ADD_SQL_FUNCTIONAL_TEST( app_api/app_context_remove_test.sql )
......
CREATE OR REPLACE PROCEDURE haf_admin_test_given()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
UPDATE hafd.hive_state SET state = 'REINDEX_WAIT';
END;
$BODY$
;
CREATE OR REPLACE PROCEDURE test_hived_test_then()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
ASSERT ( SELECT hive.get_sync_state() ) = 'REINDEX_WAIT', 'Wrong sync state';
END;
$BODY$
;
CREATE OR REPLACE PROCEDURE alice_test_then()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
ASSERT ( SELECT hive.get_sync_state() ) = 'REINDEX_WAIT', 'Wrong sync state';
END;
$BODY$
;
\ No newline at end of file
CREATE OR REPLACE PROCEDURE test_hived_test_given()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
PERFORM hive.set_sync_state( 'REINDEX_WAIT' );
END;
$BODY$
;
CREATE OR REPLACE PROCEDURE alice_test_error()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
-- Alice has no access to set state
PERFORM hive.set_sync_state( 'REINDEX_WAIT' );
END;
$BODY$
;
CREATE OR REPLACE PROCEDURE test_hived_test_then()
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
ASSERT ( SELECT state FROM hafd.hive_state ) = 'REINDEX_WAIT', 'Wrong sync state';
END;
$BODY$
;
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment