Skip to content
Snippets Groups Projects

exception handling during sync

Merged Marcin requested to merge mi_exception_handling into develop
1 file
+ 11
9
Compare changes
  • Side-by-side
  • Inline
+ 11
9
@@ -34,7 +34,7 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime
from signal import signal, SIGINT
from signal import signal, SIGINT, SIGTERM
from atomic import AtomicLong
log = logging.getLogger(__name__)
@@ -42,23 +42,23 @@ log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
EXCEPTION_THROWN = AtomicLong(0)
SIG_INT_DURING_SYNC = AtomicLong(0)
FINISH_SIGNAL_DURING_SYNC = AtomicLong(0)
def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC += 1
def finish_signals_handler(signal, frame):
global FINISH_SIGNAL_DURING_SYNC
FINISH_SIGNAL_DURING_SYNC += 1
log.info("""
**********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
CAUGHT {}. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
""".format( "SIGINT" if signal == SIGINT else "SIGTERM" ) )
def set_exception_thrown():
global EXCEPTION_THROWN
EXCEPTION_THROWN += 1
def can_continue_thread():
return EXCEPTION_THROWN.value == 0 and SIG_INT_DURING_SYNC.value == 0
return EXCEPTION_THROWN.value == 0 and FINISH_SIGNAL_DURING_SYNC.value == 0
def prepare_vops(vops_by_block):
@@ -224,7 +224,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
old_sig_int_handler = signal(SIGINT, set_sig_int)
old_sig_int_handler = signal(SIGINT, finish_signals_handler)
old_sig_term_handler = signal(SIGTERM, finish_signals_handler)
with ThreadPoolExecutor(max_workers = 4) as pool:
block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
@@ -245,6 +246,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
raise vops_exception
signal(SIGINT, old_sig_int_handler)
signal(SIGTERM, old_sig_term_handler)
blocksQueue.queue.clear()
vopsQueue.queue.clear()
Loading