diff --git a/hive/db/adapter.py b/hive/db/adapter.py index e175c540bdb4e9e754e2b8e8e9c80aa862f12e08..453a97cb17065ddad5e799d0df093b59c08d8b39 100644 --- a/hive/db/adapter.py +++ b/hive/db/adapter.py @@ -50,6 +50,10 @@ class Db: self._exec = self._conn.execute self._exec(sqlalchemy.text("COMMIT")) + def clone(self): + c = Db(self._url) + return c + def engine(self): """Lazy-loaded SQLAlchemy engine.""" if not self._engine: diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 4edf2a5b83633a0b49984ec1844eea1b164d88d1..54126af77d746f35770cc44cec78a0a128da0f87 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -29,10 +29,16 @@ DB = Db.instance() class Blocks: """Processes blocks, dispatches work, manages `hive_blocks` table.""" blocks_to_flush = [] - _head_block_date = None # timestamp of last fully processed block ("previous block") - _current_block_date = None # timestamp of block currently being processes ("current block") def __init__(cls): + cls._reputations = Reputations(DB) + cls._head_block_date = None # timestamp of last fully processed block ("previous block") + cls._current_block_date = None # timestamp of block currently being processes ("current block") + + log.info("Creating a reputations processor") + log.info("Built reputations object: {}".format(cls._reputations)) + log.info("Built blocks object: {}".format(cls)) + head_date = cls.head_date() if(head_date == ''): cls._head_block_date = None @@ -64,6 +70,7 @@ class Blocks: Tags.flush() Votes.flush() Posts.flush() + cls._reputations.flush() block_num = int(block['block_id'][:8], base=16) cls.on_live_blocks_processed( block_num, block_num ) time_end = perf_counter() @@ -74,6 +81,9 @@ class Blocks: def process_multi(cls, blocks, vops, hived, is_initial_sync=False): """Batch-process blocks; wrapped in a transaction.""" time_start = OPSM.start() + + log.info("Blocks object: {}".format(cls)) + DB.query("START TRANSACTION") last_num = 0 @@ -105,7 +115,7 @@ class Blocks: folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Posts", Posts.flush()) - flush_time = register_time(flush_time, "Reputations", Reputations.flush()) + flush_time = register_time(flush_time, "Reputations", cls._flush_reputations()) if (not is_initial_sync) and (first_block > -1): cls.on_live_blocks_processed( first_block, last_num ) @@ -114,8 +124,8 @@ class Blocks: log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") - @staticmethod - def prepare_vops(comment_payout_ops, vopsList, date, block_num): + @classmethod + def prepare_vops(cls, comment_payout_ops, vopsList, date, block_num): vote_ops = {} ineffective_deleted_ops = {} @@ -148,7 +158,7 @@ class Blocks: elif op_type == 'effective_comment_vote_operation': key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink']) - Reputations.process_vote(block_num, op_value) + cls._reputations.process_vote(block_num, op_value) vote_ops[ key_vote ] = op_value @@ -193,10 +203,10 @@ class Blocks: if is_initial_sync: if num in virtual_operations: - (vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num) + (vote_ops, ineffective_deleted_ops ) = cls.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num) else: vops = hived.get_virtual_operations(num) - (vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num) + (vote_ops, ineffective_deleted_ops ) = cls.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num) json_ops = [] for tx_idx, tx in enumerate(block['transactions']): @@ -325,6 +335,10 @@ class Blocks: 'date': block['timestamp']}) return num + @classmethod + def _flush_reputations(cls): + cls._reputations.flush() + @classmethod def _flush_blocks(cls): query = """ diff --git a/hive/indexer/reputations.py b/hive/indexer/reputations.py index d6efef8cb2e2c345e1e59b6d02024db698e5047a..12eebf633eebcf396dc6cedbc1dca8e47516c63d 100644 --- a/hive/indexer/reputations.py +++ b/hive/indexer/reputations.py @@ -7,12 +7,14 @@ from hive.db.db_state import DbState from hive.utils.normalize import escape_characters log = logging.getLogger(__name__) -DB = Db.instance() CACHED_ITEMS_LIMIT = 200 class Reputations: - _queries = [] + def __init__(cls, sharedDb): + cls._queries = [] + cls._db = sharedDb.clone() + log.info("Cloning a database adapter") @classmethod def process_vote(cls, block_num, effective_vote_op): @@ -29,12 +31,12 @@ class Reputations: i = i + 1 items = items + 1 if items >= CACHED_ITEMS_LIMIT: - DB.query_no_return(query) + cls._db.query_no_return(query) query = "" items = 0 if items >= CACHED_ITEMS_LIMIT: - DB.query_no_return(query) + cls._db.query_no_return(query) query = "" items = 0 diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index d60791260352025140e3bcfe2cd3e4c09eff6160..3ac2f8d890a11d85f53b982eaf480e6c6831c9c7 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -89,7 +89,7 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size): except Exception: log.exception("Exception caught during fetching vops...") -def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): +def _block_consumer(node, blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): from hive.utils.stats import minmax is_debug = log.isEnabledFor(10) num = 0 @@ -98,6 +98,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun try: count = ubound - lbound timer = Timer(count, entity='block', laps=['rps', 'wps']) + while lbound < ubound: wait_time_1 = WSM.start() @@ -125,7 +126,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun timer.batch_start() block_start = perf() - Blocks.process_multi(blocks, preparedVops, node, is_initial_sync) + blocksProcessor.process_multi(blocks, preparedVops, node, is_initial_sync) block_end = perf() timer.batch_lap() @@ -181,7 +182,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): try: pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size) - blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) + blockConsumerFuture = pool.submit(_block_consumer, self._steem, self._blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) blockConsumerFuture.result() if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): @@ -208,6 +209,7 @@ class Sync: log.info("Using hived url: `%s'", self._conf.get('steemd_url')) self._steem = conf.steem() + self._blocksProcessor = None def run(self): """Initialize state; setup/recovery checks; sync and runloop.""" @@ -215,6 +217,8 @@ class Sync: # ensure db schema up to date, check app status DbState.initialize() + self._blocksProcessor = Blocks() + # prefetch id->name and id->rank memory maps Accounts.load_ids() Accounts.fetch_ranks() @@ -229,16 +233,16 @@ class Sync: Community.recalc_pending_payouts() if DbState.is_initial_sync(): - last_imported_block = Blocks.head_num() + last_imported_block = self._blocksProcessor.head_num() # resume initial sync self.initial() if not CONTINUE_PROCESSING: return - current_imported_block = Blocks.head_num() + current_imported_block = self._blocksProcessor.head_num() DbState.finish_initial_sync(current_imported_block, last_imported_block) else: # recover from fork - Blocks.verify_head(self._steem) + self._blocksProcessor.verify_head(self._steem) self._update_chain_state() @@ -305,14 +309,15 @@ class Sync: remaining = drop(skip_lines, f) for lines in partition_all(chunk_size, remaining): raise RuntimeError("Sync from checkpoint disabled") - Blocks.process_multi(map(json.loads, lines), True) +# Blocks.process_multi(map(json.loads, lines), True) last_block = num last_read = num def from_steemd(self, is_initial_sync=False, chunk_size=1000): """Fast sync strategy: read/process blocks in batches.""" steemd = self._steem - lbound = Blocks.head_num() + 1 + + lbound = self._blocksProcessor.head_num() + 1 ubound = self._conf.get('test_max_block') or steemd.last_irreversible() count = ubound - lbound @@ -337,7 +342,7 @@ class Sync: timer.batch_lap() # process blocks - Blocks.process_multi(blocks, preparedVops, steemd, is_initial_sync) + self._blocksProcessor.process_multi(blocks, preparedVops, steemd, is_initial_sync) timer.batch_finish(len(blocks)) _prefix = ("[SYNC] Got block %d @ %s" % ( @@ -357,14 +362,15 @@ class Sync: # debug: no max gap if disable_sync in effect max_gap = None if self._conf.get('test_disable_sync') else 100 + assert self._blocksProcessor steemd = self._steem - hive_head = Blocks.head_num() + hive_head = self._blocksProcessor.head_num() for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap): start_time = perf() self._db.query("START TRANSACTION") - num = Blocks.process(block, {}, steemd) + num = blocksProcessor.process(block, {}, steemd) follows = Follow.flush(trx=False) accts = Accounts.flush(steemd, trx=False, spread=8) self._db.query("COMMIT")