diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 9f1f4eb68b74e36d3948605fbe96723ecd3fce4d..a09974186de2dd2efe57b40522e012de2706f9a1 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -34,10 +34,33 @@ from hive.indexer.mock_vops_provider import MockVopsProvider from datetime import datetime +from signal import signal, SIGINT, SIGTERM +from atomic import AtomicLong + log = logging.getLogger(__name__) CONTINUE_PROCESSING = True +EXCEPTION_THROWN = AtomicLong(0) +FINISH_SIGNAL_DURING_SYNC = AtomicLong(0) + +def finish_signals_handler(signal, frame): + global FINISH_SIGNAL_DURING_SYNC + FINISH_SIGNAL_DURING_SYNC += 1 + log.info(""" + ********************************************************** + CAUGHT {}. PLEASE WAIT... PROCESSING DATA IN QUEUES... + ********************************************************** + """.format( "SIGINT" if signal == SIGINT else "SIGTERM" ) ) + +def set_exception_thrown(): + global EXCEPTION_THROWN + EXCEPTION_THROWN += 1 + +def can_continue_thread(): + return EXCEPTION_THROWN.value == 0 and FINISH_SIGNAL_DURING_SYNC.value == 0 + + def prepare_vops(vops_by_block): preparedVops = {} @@ -47,26 +70,33 @@ def prepare_vops(vops_by_block): return preparedVops +def put_to_queue( data_queue, value ): + while can_continue_thread(): + try: + data_queue.put( value, True, 1) + return + except queue.Full: + continue + def _block_provider(node, queue, lbound, ubound, chunk_size): try: num = 0 count = ubound - lbound log.info("[SYNC] start block %d, +%d to sync", lbound, count) timer = Timer(count, entity='block', laps=['rps', 'wps']) - while CONTINUE_PROCESSING and lbound < ubound: + while can_continue_thread() and lbound < ubound: to = min(lbound + chunk_size, ubound) timer.batch_start() blocks = node.get_blocks_range(lbound, to) lbound = to timer.batch_lap() - queue.put(blocks) + put_to_queue( queue, blocks ) num = num + 1 return num - except KeyboardInterrupt: - log.info("Caught SIGINT") - except Exception: log.exception("Exception caught during fetching blocks") + set_exception_thrown() + raise def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): try: @@ -75,21 +105,31 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): log.info("[SYNC] start vops %d, +%d to sync", lbound, count) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) - while CONTINUE_PROCESSING and lbound < ubound: + while can_continue_thread() and lbound < ubound: to = min(lbound + chunk_size, ubound) timer.batch_start() vops = node.enum_virtual_ops(conf, lbound, to) preparedVops = prepare_vops(vops) + lbound = to timer.batch_lap() - queue.put(preparedVops) + put_to_queue( queue, preparedVops ) num = num + 1 return num - except KeyboardInterrupt: - log.info("Caught SIGINT") - except Exception: log.exception("Exception caught during fetching vops...") + set_exception_thrown() + raise + +def get_from_queue( data_queue ): + while can_continue_thread(): + try: + ret = data_queue.get(True, 1) + data_queue.task_done() + except queue.Empty: + continue + return ret + return [] def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): from hive.utils.stats import minmax @@ -97,36 +137,49 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun num = 0 time_start = OPSM.start() rate = {} + + def print_summary(): + stop = OPSM.stop(time_start) + log.info("=== TOTAL STATS ===") + wtm = WSM.log_global("Total waiting times") + ftm = FSM.log_global("Total flush times") + otm = OPSM.log_global("All operations present in the processed blocks") + ttm = ftm + otm + wtm + log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s") + log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}") + log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}") + log.info("=== TOTAL STATS ===") + try: count = ubound - lbound timer = Timer(count, entity='block', laps=['rps', 'wps']) while lbound < ubound: - wait_time_1 = WSM.start() - if blocksQueue.empty() and CONTINUE_PROCESSING: + if blocksQueue.empty() and can_continue_thread(): log.info("Awaiting any block to process...") blocks = [] - if not blocksQueue.empty() or CONTINUE_PROCESSING: - blocks = blocksQueue.get() - blocksQueue.task_done() + if not blocksQueue.empty() or can_continue_thread(): + blocks = get_from_queue( blocksQueue ) WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1)) wait_time_2 = WSM.start() - if vopsQueue.empty() and CONTINUE_PROCESSING: + if vopsQueue.empty() and can_continue_thread(): log.info("Awaiting any vops to process...") preparedVops = [] - if not vopsQueue.empty() or CONTINUE_PROCESSING: - preparedVops = vopsQueue.get() - vopsQueue.task_done() + if not vopsQueue.empty() or can_continue_thread(): + preparedVops = get_from_queue(vopsQueue) WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2)) to = min(lbound + chunk_size, ubound) timer.batch_start() + if not can_continue_thread(): + break; + block_start = perf() Blocks.process_multi(blocks, preparedVops, is_initial_sync) block_end = perf() @@ -157,47 +210,45 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun num = num + 1 - if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): + if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty(): break - except KeyboardInterrupt: - log.info("Caught SIGINT") except Exception: log.exception("Exception caught during processing blocks...") - finally: - stop = OPSM.stop(time_start) - log.info("=== TOTAL STATS ===") - wtm = WSM.log_global("Total waiting times") - ftm = FSM.log_global("Total flush times") - otm = OPSM.log_global("All operations present in the processed blocks") - ttm = ftm + otm + wtm - log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s") - log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}") - log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}") - log.info("=== TOTAL STATS ===") - return num + set_exception_thrown() + print_summary() + raise + + print_summary() + return num def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blocksQueue = queue.Queue(maxsize=10) vopsQueue = queue.Queue(maxsize=10) - global CONTINUE_PROCESSING + old_sig_int_handler = signal(SIGINT, finish_signals_handler) + old_sig_term_handler = signal(SIGTERM, finish_signals_handler) with ThreadPoolExecutor(max_workers = 4) as pool: - try: - pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) - pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) - blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) - - blockConsumerFuture.result() - if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): - pool.shutdown(False) - except KeyboardInterrupt: - log.info(""" ********************************************************** - CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES... - ********************************************************** - """) - CONTINUE_PROCESSING = False - blocksQueue.join() - vopsQueue.join() + block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) + vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) + blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) + + consumer_exception = blockConsumerFuture.exception() + block_exception = block_provider_future.exception() + vops_exception = vops_provider_future.exception() + + if consumer_exception: + raise consumer_exception + + if block_exception: + raise block_exception + + if vops_exception: + raise vops_exception + + signal(SIGINT, old_sig_int_handler) + signal(SIGTERM, old_sig_term_handler) + blocksQueue.queue.clear() + vopsQueue.queue.clear() class Sync: """Manages the sync/index process. @@ -264,7 +315,7 @@ class Sync: DbState.before_initial_sync(last_imported_block, hived_head_block) # resume initial sync self.initial() - if not CONTINUE_PROCESSING: + if not can_continue_thread(): return current_imported_block = Blocks.head_num() DbState.finish_initial_sync(current_imported_block) @@ -287,6 +338,8 @@ class Sync: while True: # sync up to irreversible block self.from_steemd() + if not can_continue_thread(): + return try: # listen for new blocks @@ -301,7 +354,7 @@ class Sync: log.info("[INIT] *** Initial fast sync ***") self.from_steemd(is_initial_sync=True) - if not CONTINUE_PROCESSING: + if not can_continue_thread(): return def from_steemd(self, is_initial_sync=False, chunk_size=1000): diff --git a/setup.py b/setup.py index 8085695a6c714a1a712b6da2b4dd247d114448c1..21245f7c74d2f1bfd32b65d1b49fdb7b7d29518a 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ if __name__ == "__main__": 'diff-match-patch', 'prometheus-client', 'psutil', + 'atomic', ], extras_require={ 'dev': [