diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index d99304e3a0030572c4cb51a04e26ee2ff71a20f3..67668ded0a32b1db22baa785ff606db92038a9a0 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -35,22 +35,31 @@ from hive.indexer.mock_vops_provider import MockVopsProvider from datetime import datetime from signal import signal, SIGINT +from atomic import AtomicLong log = logging.getLogger(__name__) CONTINUE_PROCESSING = True -SIG_INT_DURING_SYNC = False +EXCEPTION_THROWN = AtomicLong(0) +SIG_INT_DURING_SYNC = AtomicLong(0) def set_sig_int(signal, frame): global SIG_INT_DURING_SYNC - SIG_INT_DURING_SYNC = True + SIG_INT_DURING_SYNC += 1 log.info(""" ********************************************************** CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES... ********************************************************** """) +def set_exception_thrown(): + global EXCEPTION_THROWN + EXCEPTION_THROWN += 1 + +def can_continue_thread(): + return EXCEPTION_THROWN.value == 0 and SIG_INT_DURING_SYNC.value == 0 + def prepare_vops(vops_by_block): preparedVops = {} @@ -62,7 +71,7 @@ def prepare_vops(vops_by_block): return preparedVops def put_to_queue( data_queue, value ): - while not SIG_INT_DURING_SYNC: + while can_continue_thread(): try: data_queue.put( value, True, 1) return @@ -75,7 +84,7 @@ def _block_provider(node, queue, lbound, ubound, chunk_size): count = ubound - lbound log.info("[SYNC] start block %d, +%d to sync", lbound, count) timer = Timer(count, entity='block', laps=['rps', 'wps']) - while not SIG_INT_DURING_SYNC and lbound < ubound: + while can_continue_thread() and lbound < ubound: to = min(lbound + chunk_size, ubound) timer.batch_start() blocks = node.get_blocks_range(lbound, to) @@ -86,7 +95,8 @@ def _block_provider(node, queue, lbound, ubound, chunk_size): return num except Exception: log.exception("Exception caught during fetching blocks") - return num + set_exception_thrown() + raise def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): try: @@ -95,7 +105,7 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): log.info("[SYNC] start vops %d, +%d to sync", lbound, count) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) - while not SIG_INT_DURING_SYNC and lbound < ubound: + while can_continue_thread() and lbound < ubound: to = min(lbound + chunk_size, ubound) timer.batch_start() vops = node.enum_virtual_ops(conf, lbound, to) @@ -108,9 +118,11 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): return num except Exception: log.exception("Exception caught during fetching vops...") + set_exception_thrown() + raise def get_from_queue( data_queue ): - while not SIG_INT_DURING_SYNC: + while can_continue_thread(): try: ret = data_queue.get(True, 1) data_queue.task_done() @@ -125,26 +137,39 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun num = 0 time_start = OPSM.start() rate = {} + + def print_summary(): + stop = OPSM.stop(time_start) + log.info("=== TOTAL STATS ===") + wtm = WSM.log_global("Total waiting times") + ftm = FSM.log_global("Total flush times") + otm = OPSM.log_global("All operations present in the processed blocks") + ttm = ftm + otm + wtm + log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s") + log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}") + log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}") + log.info("=== TOTAL STATS ===") + try: count = ubound - lbound timer = Timer(count, entity='block', laps=['rps', 'wps']) while lbound < ubound: wait_time_1 = WSM.start() - if blocksQueue.empty() and not SIG_INT_DURING_SYNC: + if blocksQueue.empty() and can_continue_thread(): log.info("Awaiting any block to process...") blocks = [] - if not blocksQueue.empty() or not SIG_INT_DURING_SYNC: + if not blocksQueue.empty() or can_continue_thread(): blocks = get_from_queue( blocksQueue ) WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1)) wait_time_2 = WSM.start() - if vopsQueue.empty() and not SIG_INT_DURING_SYNC: + if vopsQueue.empty() and can_continue_thread(): log.info("Awaiting any vops to process...") preparedVops = [] - if not vopsQueue.empty() or not SIG_INT_DURING_SYNC: + if not vopsQueue.empty() or can_continue_thread(): preparedVops = get_from_queue(vopsQueue) WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2)) @@ -152,7 +177,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun timer.batch_start() - if SIG_INT_DURING_SYNC: + if not can_continue_thread(): break; block_start = perf() @@ -185,22 +210,16 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun num = num + 1 - if SIG_INT_DURING_SYNC and blocksQueue.empty() and vopsQueue.empty(): + if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty(): break except Exception: log.exception("Exception caught during processing blocks...") - finally: - stop = OPSM.stop(time_start) - log.info("=== TOTAL STATS ===") - wtm = WSM.log_global("Total waiting times") - ftm = FSM.log_global("Total flush times") - otm = OPSM.log_global("All operations present in the processed blocks") - ttm = ftm + otm + wtm - log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s") - log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}") - log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}") - log.info("=== TOTAL STATS ===") - return num + set_exception_thrown() + print_summary() + raise + + print_summary() + return num def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blocksQueue = queue.Queue(maxsize=10) @@ -208,21 +227,27 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): old_sig_int_handler = signal(SIGINT, set_sig_int) with ThreadPoolExecutor(max_workers = 4) as pool: - pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) - pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) + block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) + vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) - blockConsumerFuture.result() - if SIG_INT_DURING_SYNC: - pool.shutdown(False) + consumer_exception = blockConsumerFuture.exception() + block_exception = block_provider_future.exception() + vops_exception = vops_provider_future.exception() + + if consumer_exception: + raise consumer_exception + + if block_exception: + raise block_exception + + if vops_exception: + raise vops_exception signal(SIGINT, old_sig_int_handler) blocksQueue.queue.clear() vopsQueue.queue.clear() - if SIG_INT_DURING_SYNC: - exit(0) - class Sync: """Manages the sync/index process. @@ -288,7 +313,7 @@ class Sync: DbState.before_initial_sync(last_imported_block, hived_head_block) # resume initial sync self.initial() - if not CONTINUE_PROCESSING: + if not can_continue_thread(): return current_imported_block = Blocks.head_num() DbState.finish_initial_sync(current_imported_block) @@ -311,6 +336,8 @@ class Sync: while True: # sync up to irreversible block self.from_steemd() + if not can_continue_thread(): + return try: # listen for new blocks @@ -325,7 +352,7 @@ class Sync: log.info("[INIT] *** Initial fast sync ***") self.from_steemd(is_initial_sync=True) - if not CONTINUE_PROCESSING: + if not can_continue_thread(): return def from_steemd(self, is_initial_sync=False, chunk_size=1000): diff --git a/setup.py b/setup.py index 8085695a6c714a1a712b6da2b4dd247d114448c1..21245f7c74d2f1bfd32b65d1b49fdb7b7d29518a 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ if __name__ == "__main__": 'diff-match-patch', 'prometheus-client', 'psutil', + 'atomic', ], extras_require={ 'dev': [