Skip to content
Snippets Groups Projects
Commit 045603f1 authored by Marcin's avatar Marcin
Browse files

exception during sync stops the sync process gentle

parent 8730d584
No related branches found
No related tags found
2 merge requests!456Release candidate v1 24,!399exception handling during sync
...@@ -35,22 +35,31 @@ from hive.indexer.mock_vops_provider import MockVopsProvider ...@@ -35,22 +35,31 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime from datetime import datetime
from signal import signal, SIGINT from signal import signal, SIGINT
from atomic import AtomicLong
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True CONTINUE_PROCESSING = True
SIG_INT_DURING_SYNC = False EXCEPTION_THROWN = AtomicLong(0)
SIG_INT_DURING_SYNC = AtomicLong(0)
def set_sig_int(signal, frame): def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC = True SIG_INT_DURING_SYNC += 1
log.info(""" log.info("""
********************************************************** **********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES... CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
********************************************************** **********************************************************
""") """)
def set_exception_thrown():
global EXCEPTION_THROWN
EXCEPTION_THROWN += 1
def can_continue_thread():
return EXCEPTION_THROWN.value == 0 and SIG_INT_DURING_SYNC.value == 0
def prepare_vops(vops_by_block): def prepare_vops(vops_by_block):
preparedVops = {} preparedVops = {}
...@@ -62,7 +71,7 @@ def prepare_vops(vops_by_block): ...@@ -62,7 +71,7 @@ def prepare_vops(vops_by_block):
return preparedVops return preparedVops
def put_to_queue( data_queue, value ): def put_to_queue( data_queue, value ):
while not SIG_INT_DURING_SYNC: while can_continue_thread():
try: try:
data_queue.put( value, True, 1) data_queue.put( value, True, 1)
return return
...@@ -75,7 +84,7 @@ def _block_provider(node, queue, lbound, ubound, chunk_size): ...@@ -75,7 +84,7 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
count = ubound - lbound count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count) log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while not SIG_INT_DURING_SYNC and lbound < ubound: while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
blocks = node.get_blocks_range(lbound, to) blocks = node.get_blocks_range(lbound, to)
...@@ -86,7 +95,8 @@ def _block_provider(node, queue, lbound, ubound, chunk_size): ...@@ -86,7 +95,8 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
return num return num
except Exception: except Exception:
log.exception("Exception caught during fetching blocks") log.exception("Exception caught during fetching blocks")
return num set_exception_thrown()
raise
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try: try:
...@@ -95,7 +105,7 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): ...@@ -95,7 +105,7 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count) log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while not SIG_INT_DURING_SYNC and lbound < ubound: while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to) vops = node.enum_virtual_ops(conf, lbound, to)
...@@ -108,9 +118,11 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): ...@@ -108,9 +118,11 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
return num return num
except Exception: except Exception:
log.exception("Exception caught during fetching vops...") log.exception("Exception caught during fetching vops...")
set_exception_thrown()
raise
def get_from_queue( data_queue ): def get_from_queue( data_queue ):
while not SIG_INT_DURING_SYNC: while can_continue_thread():
try: try:
ret = data_queue.get(True, 1) ret = data_queue.get(True, 1)
data_queue.task_done() data_queue.task_done()
...@@ -125,26 +137,39 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -125,26 +137,39 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = 0 num = 0
time_start = OPSM.start() time_start = OPSM.start()
rate = {} rate = {}
def print_summary():
stop = OPSM.stop(time_start)
log.info("=== TOTAL STATS ===")
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
otm = OPSM.log_global("All operations present in the processed blocks")
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
try: try:
count = ubound - lbound count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound: while lbound < ubound:
wait_time_1 = WSM.start() wait_time_1 = WSM.start()
if blocksQueue.empty() and not SIG_INT_DURING_SYNC: if blocksQueue.empty() and can_continue_thread():
log.info("Awaiting any block to process...") log.info("Awaiting any block to process...")
blocks = [] blocks = []
if not blocksQueue.empty() or not SIG_INT_DURING_SYNC: if not blocksQueue.empty() or can_continue_thread():
blocks = get_from_queue( blocksQueue ) blocks = get_from_queue( blocksQueue )
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1)) WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start() wait_time_2 = WSM.start()
if vopsQueue.empty() and not SIG_INT_DURING_SYNC: if vopsQueue.empty() and can_continue_thread():
log.info("Awaiting any vops to process...") log.info("Awaiting any vops to process...")
preparedVops = [] preparedVops = []
if not vopsQueue.empty() or not SIG_INT_DURING_SYNC: if not vopsQueue.empty() or can_continue_thread():
preparedVops = get_from_queue(vopsQueue) preparedVops = get_from_queue(vopsQueue)
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2)) WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
...@@ -152,7 +177,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -152,7 +177,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
timer.batch_start() timer.batch_start()
if SIG_INT_DURING_SYNC: if not can_continue_thread():
break; break;
block_start = perf() block_start = perf()
...@@ -185,22 +210,16 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -185,22 +210,16 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1 num = num + 1
if SIG_INT_DURING_SYNC and blocksQueue.empty() and vopsQueue.empty(): if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty():
break break
except Exception: except Exception:
log.exception("Exception caught during processing blocks...") log.exception("Exception caught during processing blocks...")
finally: set_exception_thrown()
stop = OPSM.stop(time_start) print_summary()
log.info("=== TOTAL STATS ===") raise
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times") print_summary()
otm = OPSM.log_global("All operations present in the processed blocks") return num
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
return num
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10) blocksQueue = queue.Queue(maxsize=10)
...@@ -208,21 +227,27 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): ...@@ -208,21 +227,27 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
old_sig_int_handler = signal(SIGINT, set_sig_int) old_sig_int_handler = signal(SIGINT, set_sig_int)
with ThreadPoolExecutor(max_workers = 4) as pool: with ThreadPoolExecutor(max_workers = 4) as pool:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result() consumer_exception = blockConsumerFuture.exception()
if SIG_INT_DURING_SYNC: block_exception = block_provider_future.exception()
pool.shutdown(False) vops_exception = vops_provider_future.exception()
if consumer_exception:
raise consumer_exception
if block_exception:
raise block_exception
if vops_exception:
raise vops_exception
signal(SIGINT, old_sig_int_handler) signal(SIGINT, old_sig_int_handler)
blocksQueue.queue.clear() blocksQueue.queue.clear()
vopsQueue.queue.clear() vopsQueue.queue.clear()
if SIG_INT_DURING_SYNC:
exit(0)
class Sync: class Sync:
"""Manages the sync/index process. """Manages the sync/index process.
...@@ -288,7 +313,7 @@ class Sync: ...@@ -288,7 +313,7 @@ class Sync:
DbState.before_initial_sync(last_imported_block, hived_head_block) DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync # resume initial sync
self.initial() self.initial()
if not CONTINUE_PROCESSING: if not can_continue_thread():
return return
current_imported_block = Blocks.head_num() current_imported_block = Blocks.head_num()
DbState.finish_initial_sync(current_imported_block) DbState.finish_initial_sync(current_imported_block)
...@@ -311,6 +336,8 @@ class Sync: ...@@ -311,6 +336,8 @@ class Sync:
while True: while True:
# sync up to irreversible block # sync up to irreversible block
self.from_steemd() self.from_steemd()
if not can_continue_thread():
return
try: try:
# listen for new blocks # listen for new blocks
...@@ -325,7 +352,7 @@ class Sync: ...@@ -325,7 +352,7 @@ class Sync:
log.info("[INIT] *** Initial fast sync ***") log.info("[INIT] *** Initial fast sync ***")
self.from_steemd(is_initial_sync=True) self.from_steemd(is_initial_sync=True)
if not CONTINUE_PROCESSING: if not can_continue_thread():
return return
def from_steemd(self, is_initial_sync=False, chunk_size=1000): def from_steemd(self, is_initial_sync=False, chunk_size=1000):
......
...@@ -92,6 +92,7 @@ if __name__ == "__main__": ...@@ -92,6 +92,7 @@ if __name__ == "__main__":
'diff-match-patch', 'diff-match-patch',
'prometheus-client', 'prometheus-client',
'psutil', 'psutil',
'atomic',
], ],
extras_require={ extras_require={
'dev': [ 'dev': [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment