Skip to content
Snippets Groups Projects
Commit 85a1b770 authored by Marcin's avatar Marcin
Browse files

support SIGTERM during sync

parent 09cdb996
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !399. Comments created here will be created in the context of that merge request.
......@@ -34,7 +34,7 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime
from signal import signal, SIGINT
from signal import signal, SIGINT, SIGTERM
from atomic import AtomicLong
log = logging.getLogger(__name__)
......@@ -42,23 +42,23 @@ log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
EXCEPTION_THROWN = AtomicLong(0)
SIG_INT_DURING_SYNC = AtomicLong(0)
FINISH_SIGNAL_DURING_SYNC = AtomicLong(0)
def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC += 1
def finish_signals_handler(signal, frame):
global FINISH_SIGNAL_DURING_SYNC
FINISH_SIGNAL_DURING_SYNC += 1
log.info("""
**********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
CAUGHT {}. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
""".format( "SIGINT" if signal == SIGINT else "SIGTERM" ) )
def set_exception_thrown():
global EXCEPTION_THROWN
EXCEPTION_THROWN += 1
def can_continue_thread():
return EXCEPTION_THROWN.value == 0 and SIG_INT_DURING_SYNC.value == 0
return EXCEPTION_THROWN.value == 0 and FINISH_SIGNAL_DURING_SYNC.value == 0
def prepare_vops(vops_by_block):
......@@ -224,7 +224,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
old_sig_int_handler = signal(SIGINT, set_sig_int)
old_sig_int_handler = signal(SIGINT, finish_signals_handler)
old_sig_term_handler = signal(SIGTERM, finish_signals_handler)
with ThreadPoolExecutor(max_workers = 4) as pool:
block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
......@@ -245,6 +246,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
raise vops_exception
signal(SIGINT, old_sig_int_handler)
signal(SIGTERM, old_sig_term_handler)
blocksQueue.queue.clear()
vopsQueue.queue.clear()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment