diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index d93e8865150f9342195e891e584edc751fb5b1f0..eacc41f0b211a26c8719ddc2a4f552b671a869d0 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -5,6 +5,11 @@ import glob from time import perf_counter as perf import os import ujson as json +import time + +import concurrent, threading, queue +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future from funcy.seqs import drop from toolz import partition_all @@ -25,6 +30,111 @@ from hive.server.common.mutes import Mutes log = logging.getLogger(__name__) +continue_processing = 1 + +def _block_provider(node, queue, lbound, ubound, chunk_size): + try: + count = ubound - lbound + log.info("[SYNC] start block %d, +%d to sync", lbound, count) + timer = Timer(count, entity='block', laps=['rps', 'wps']) + num = 0 + while continue_processing and lbound < ubound: + to = min(lbound + chunk_size, ubound) +# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to) + timer.batch_start() + blocks = node.get_blocks_range(lbound, to) + lbound = to + timer.batch_lap() +# log.info("Enqueuing retrieved blocks from range: [%d, %d]. Blocks queue size: %d", lbound, to, queue.qsize()) + queue.put(blocks) + num = num + 1 + return num + except KeyboardInterrupt as ki: + log.info("Caught SIGINT") + + except Exception as ex: + log.exception("Exception caught during fetching blocks") + +def _vops_provider(node, queue, lbound, ubound, chunk_size): + try: + count = ubound - lbound + log.info("[SYNC] start vops %d, +%d to sync", lbound, count) + timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) + num = 0 + + while continue_processing and lbound < ubound: + to = min(lbound + chunk_size, ubound) +# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to) + timer.batch_start() + vops = node.enum_virtual_ops(lbound, to) + lbound = to + timer.batch_lap() +# log.info("Enqueuing retrieved vops for block range: [%d, %d]. Vops queue size: %d", lbound, to, queue.qsize()) + queue.put(vops) + num = num + 1 + + return num + except KeyboardInterrupt as ki: + log.info("Caught SIGINT") + + except Exception as ex: + log.exception("Exception caught during fetching vops...") + +def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): + try: + count = ubound - lbound + num = 0 + timer = Timer(count, entity='block', laps=['rps', 'wps']) + + while continue_processing and lbound < ubound: +# log.info("Awaiting any block to process...") + blocks = blocksQueue.get() + blocksQueue.task_done() + +# log.info("Awaiting any vops to process...") + vops = vopsQueue.get() + + to = min(lbound + chunk_size, ubound) +# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to) + + timer.batch_start() + Blocks.process_multi(blocks, vops, node, is_initial_sync) + timer.batch_lap() + timer.batch_finish(len(blocks)) + prefix = ("[SYNC] Got block %d @ %s" % ( + to - 1, blocks[-1]['timestamp'])) + log.info(timer.batch_status(prefix)) + + lbound = to + + vopsQueue.task_done() + num = num + 1 + + return num + except KeyboardInterrupt as ki: + log.info("Caught SIGINT") + except Exception as ex: + log.exception("Exception caught during processing blocks...") + +def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): + blocksQueue = queue.Queue(maxsize=10) + vopsQueue = queue.Queue(maxsize=10) + + with ThreadPoolExecutor(max_workers = 4) as pool: + try: + pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) + pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size) + blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) + + blockConsumerFuture.result() + # pool.shutdown() + except KeyboardInterrupt as ex: + continue_processing = 0 + pool.shutdown(false) + + blocksQueue.join() + vopsQueue.join() + class Sync: """Manages the sync/index process. @@ -149,6 +259,10 @@ class Sync: if count < 1: return + if is_initial_sync: + _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size) + return + log.info("[SYNC] start block %d, +%d to sync", lbound, count) timer = Timer(count, entity='block', laps=['rps', 'wps']) while lbound < ubound: diff --git a/hive/utils/stats.py b/hive/utils/stats.py index f330408f725d1a4a43ea0dd6a5d429e288ecbc20..d179eb66689c0f234c6bdb25a5874d9774b1bef7 100644 --- a/hive/utils/stats.py +++ b/hive/utils/stats.py @@ -65,8 +65,16 @@ class StatsAbstract: log.info('%7s %9s %9s %9s', '-pct-', '-ttl-', '-avg-', '-cnt-') for call, ms, reqs in self.table(40): + try: + avg = ms/reqs + millisec = ms/self._ms + except ZeroDivisionError as ex: + avg = 0.0 + millisec = 0.0 + if reqs == 0: + reqs = 1 log.info("% 6.1f%% % 7dms % 9.2f % 8dx -- %s", - 100 * ms/self._ms, ms, ms/reqs, reqs, call) + 100 * millisec, ms, avg, reqs, call) self.clear() @@ -131,6 +139,9 @@ class Stats: """Container for steemd and db timing data.""" PRINT_THRESH_MINS = 1 + COLLECT_DB_STATS = 0 + COLLECT_NODE_STATS = 0 + _db = DbStats() _steemd = SteemStats() _secs = 0.0 @@ -140,14 +151,16 @@ class Stats: @classmethod def log_db(cls, sql, secs): """Log a database query. Incoming SQL is normalized.""" - cls._db.add(_normalize_sql(sql), secs * 1000) - cls.add_secs(secs) + if cls.COLLECT_DB_STATS: + cls._db.add(_normalize_sql(sql), secs * 1000) + cls.add_secs(secs) @classmethod def log_steem(cls, method, secs, batch_size=1): """Log a steemd call.""" - cls._steemd.add(method, secs * 1000, batch_size) - cls.add_secs(secs) + if cls.COLLECT_NODE_STATS: + cls._steemd.add(method, secs * 1000, batch_size) + cls.add_secs(secs) @classmethod def log_idle(cls, secs):