diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 4e589e07cea44b5aa29a15d5925b2c4db3b6b373..2d411726c82372d7bc336b642ad1bc845b8bb50b 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -22,15 +22,15 @@ class Blocks: blocks_to_flush = [] ops_stats = {} - operations_in_tx = 0; - opened_tx = False; + operations_in_tx = 0 + opened_tx = False OPERATIONS_IN_TX_TRESHOLD = 500000 @staticmethod def merge_ops_stats(od1, od2): for (k, v) in od2.items(): - if(k in od1): + if k in od1: od1[k] += v else: od1[k] = v @@ -123,7 +123,7 @@ class Blocks: def _track_tx(cls, opCount = 1): if(cls.opened_tx == False): DB.query("START TRANSACTION") - cls.operations_in_tx = 0; + cls.operations_in_tx = 0 cls.opened_tx = True cls.operations_in_tx += opCount diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 591061a4499fd9a3a4e6ab9b3bd84befa7b50e21..93a28b20b35350514de916af7669a558620c0eab 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -30,7 +30,7 @@ from hive.server.common.mutes import Mutes log = logging.getLogger(__name__) -continue_processing = 1 +CONTINUE_PROCESSING = 1 def print_ops_stats(prefix, ops_stats): log.info("############################################################################") @@ -51,131 +51,132 @@ def prepare_vops(vops_by_block): return preparedVops def _block_provider(node, queue, lbound, ubound, chunk_size): - try: - count = ubound - lbound - log.info("[SYNC] start block %d, +%d to sync", lbound, count) - timer = Timer(count, entity='block', laps=['rps', 'wps']) - num = 0 - while continue_processing and lbound < ubound: - to = min(lbound + chunk_size, ubound) -# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to) - timer.batch_start() - blocks = node.get_blocks_range(lbound, to) - lbound = to - timer.batch_lap() -# if(queue.full()): -# log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize()) - - queue.put(blocks) - num = num + 1 - return num - except KeyboardInterrupt as ki: - log.info("Caught SIGINT") - - except Exception as ex: - log.exception("Exception caught during fetching blocks") + try: + count = ubound - lbound + log.info("[SYNC] start block %d, +%d to sync", lbound, count) + timer = Timer(count, entity='block', laps=['rps', 'wps']) + num = 0 + while CONTINUE_PROCESSING and lbound < ubound: + to = min(lbound + chunk_size, ubound) + # log.info("Querying a node for blocks from range: [%d, %d]", lbound, to) + timer.batch_start() + blocks = node.get_blocks_range(lbound, to) + lbound = to + timer.batch_lap() + # if(queue.full()): + # log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize()) + + queue.put(blocks) + num = num + 1 + return num + except KeyboardInterrupt: + log.info("Caught SIGINT") + + except Exception: + log.exception("Exception caught during fetching blocks") def _vops_provider(node, queue, lbound, ubound, chunk_size): - try: - count = ubound - lbound - log.info("[SYNC] start vops %d, +%d to sync", lbound, count) - timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) - num = 0 - - while continue_processing and lbound < ubound: - to = min(lbound + chunk_size, ubound) -# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to) - timer.batch_start() - vops = node.enum_virtual_ops(lbound, to) - preparedVops = prepare_vops(vops) - lbound = to - timer.batch_lap() -# if(queue.full()): -# log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize()) - queue.put(preparedVops) - num = num + 1 - - return num - except KeyboardInterrupt as ki: - log.info("Caught SIGINT") - - except Exception as ex: - log.exception("Exception caught during fetching vops...") + try: + count = ubound - lbound + log.info("[SYNC] start vops %d, +%d to sync", lbound, count) + timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) + num = 0 + + while CONTINUE_PROCESSING and lbound < ubound: + to = min(lbound + chunk_size, ubound) + # log.info("Querying a node for vops from block range: [%d, %d]", lbound, to) + timer.batch_start() + vops = node.enum_virtual_ops(lbound, to) + preparedVops = prepare_vops(vops) + lbound = to + timer.batch_lap() + # if(queue.full()): + # log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize()) + queue.put(preparedVops) + num = num + 1 + + return num + except KeyboardInterrupt: + log.info("Caught SIGINT") + + except Exception: + log.exception("Exception caught during fetching vops...") def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): - try: - count = ubound - lbound - num = 0 - timer = Timer(count, entity='block', laps=['rps', 'wps']) + try: + count = ubound - lbound + num = 0 + timer = Timer(count, entity='block', laps=['rps', 'wps']) - total_ops_stats = {} + total_ops_stats = {} - time_start = perf() + time_start = perf() - while continue_processing and lbound < ubound: - if(blocksQueue.empty()): - log.info("Awaiting any block to process...") + while CONTINUE_PROCESSING and lbound < ubound: + if(blocksQueue.empty()): + log.info("Awaiting any block to process...") - blocks = blocksQueue.get() + blocks = blocksQueue.get() - if(vopsQueue.empty()): - log.info("Awaiting any vops to process...") + if(vopsQueue.empty()): + log.info("Awaiting any vops to process...") - preparedVops = vopsQueue.get() + preparedVops = vopsQueue.get() - to = min(lbound + chunk_size, ubound) -# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to) + to = min(lbound + chunk_size, ubound) + # log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to) - timer.batch_start() - - block_start = perf() - ops_stats = Blocks.process_multi(blocks, preparedVops, node, is_initial_sync) - block_end = perf() + timer.batch_start() + + block_start = perf() + ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)) + Blocks.ops_stats.clear() + block_end = perf() - timer.batch_lap() - timer.batch_finish(len(blocks)) - time_current = perf() + timer.batch_lap() + timer.batch_finish(len(blocks)) + time_current = perf() - prefix = ("[SYNC] Got block %d @ %s" % ( - to - 1, blocks[-1]['timestamp'])) - log.info(timer.batch_status(prefix)) - log.info("Time elapsed: %fs", time_current - time_start) + prefix = ("[SYNC] Got block %d @ %s" % ( + to - 1, blocks[-1]['timestamp'])) + log.info(timer.batch_status(prefix)) + log.info("Time elapsed: %fs", time_current - time_start) - total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats) + total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats) - if block_end - block_start > 1.0: - print_ops_stats("Operations present in the processed blocks:", ops_stats) + if block_end - block_start > 1.0: + print_ops_stats("Operations present in the processed blocks:", ops_stats) - blocksQueue.task_done() - vopsQueue.task_done() + blocksQueue.task_done() + vopsQueue.task_done() - lbound = to + lbound = to - num = num + 1 + num = num + 1 - print_ops_stats("All operations present in the processed blocks:", total_ops_stats) - return num - except KeyboardInterrupt as ki: - log.info("Caught SIGINT") - except Exception as ex: - log.exception("Exception caught during processing blocks...") + print_ops_stats("All operations present in the processed blocks:", total_ops_stats) + return num + except KeyboardInterrupt: + log.info("Caught SIGINT") + except Exception: + log.exception("Exception caught during processing blocks...") def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blocksQueue = queue.Queue(maxsize=10) vopsQueue = queue.Queue(maxsize=10) with ThreadPoolExecutor(max_workers = 4) as pool: - try: - pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) - pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size) - blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) - - blockConsumerFuture.result() - # pool.shutdown() - except KeyboardInterrupt as ex: - global continue_processing - continue_processing = 0 - pool.shutdown(False) + try: + pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) + pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size) + blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) + + blockConsumerFuture.result() + # pool.shutdown() + except KeyboardInterrupt: + global CONTINUE_PROCESSING + CONTINUE_PROCESSING = 0 + pool.shutdown(False) blocksQueue.join() vopsQueue.join()