diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 9013190a9bb868460c200dc552354b9eed42e6d5..fdb55721ddd4ed2d0152e602d9c254dbb0d02e6f 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -1,6 +1,7 @@ """Blocks processor.""" import logging +import json from hive.db.adapter import Db @@ -105,6 +106,8 @@ class Blocks: key = "{}/{}".format(op_value['author'], op_value['permlink']) val = {'hbd_payout':op_value['hbd_payout'], 'hive_payout':op_value['hive_payout'], 'vesting_payout':op_value['vesting_payout']} elif op_type == 'comment_reward_operation': + if('payout' not in op_value or op_value['payout'] is None): + log.error("Broken op: `{}'".format(str(vop))) key = "{}/{}".format(op_value['author'], op_value['permlink']) val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']} elif op_type == 'effective_comment_vote_operation': diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 8677fb67b145cac573fad50f1dc28deb0308d743..fffc403decac7d369d52317794295a0e522fdd89 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -40,10 +40,11 @@ def print_ops_stats(prefix, ops_stats): log.info("`{}': {}".format(k, v)) log.info("############################################################################") + def prepare_vops(vops_by_block): preparedVops = {} - for blockNum, blockDict in vops_by_block.items(): + for blockNum, blockDict in vops_by_block.items(): vopsList = blockDict['ops'] date = blockDict['timestamp'] preparedVops[blockNum] = Blocks.prepare_vops(vopsList, date) @@ -108,6 +109,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun blocks = [] if not blocksQueue.empty() or CONTINUE_PROCESSING: blocks = blocksQueue.get() + blocksQueue.task_done() if vopsQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any vops to process...") @@ -115,6 +117,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun preparedVops = [] if not vopsQueue.empty() or CONTINUE_PROCESSING: preparedVops = vopsQueue.get() + vopsQueue.task_done() to = min(lbound + chunk_size, ubound) @@ -139,9 +142,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun if block_end - block_start > 1.0: print_ops_stats("Operations present in the processed blocks:", ops_stats) - blocksQueue.task_done() - vopsQueue.task_done() - lbound = to num = num + 1 diff --git a/hive/steem/client.py b/hive/steem/client.py index 9e34db3985880b881c9268d58a1d8582b995ed02..84a3f5ccf91bae03fe1d7dbb704b75657896c05d 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -156,24 +156,35 @@ class SteemClient: def enum_virtual_ops(self, begin_block, end_block): """ Get virtual ops for range of blocks """ ret = {} - delta = 100 from_block = begin_block - to_block = (begin_block + delta) if begin_block + delta < end_block else end_block - - while from_block < to_block: - result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":to_block}) - ops = result['ops'] if 'ops' in result else [] - tracked_ops = ['curation_reward_operation', 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation'] - - for op in ops: - block = op['block'] - if block in ret and op['op']['type'] in tracked_ops: - ret[block]['ops'].append(op['op']) - if block not in ret and op['op']['type'] in tracked_ops: - ret[block] = {'timestamp':op['timestamp'], 'ops':[op['op']]} - from_block = to_block - to_block = (from_block + delta) if from_block + delta < end_block else end_block + + #According to definition of hive::plugins::acount_history::enum_vops_filter: + + author_reward_operation = 0x000002 + curation_reward_operation = 0x000004 + comment_reward_operation = 0x000008 + effective_comment_vote_operation = 0x400000 + + tracked_ops_filter = curation_reward_operation | author_reward_operation | comment_reward_operation | effective_comment_vote_operation + tracked_ops = ['curation_reward_operation', 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation'] + + resume_on_operation = 0 + + while from_block < end_block: + call_result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":end_block + , "group_by_block": True, "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter + }) + + ret = {opb["block"] : {"timestamp":opb["timestamp"], "ops":[op["op"] for op in opb["ops"]]} for opb in call_result["ops_by_block"]} + + resume_on_operation = call_result['next_operation_begin'] if 'next_operation_begin' in call_result else 0 + + next_block = call_result['next_block_range_begin'] + + # Move to next block only if operations from current one have been processed completely. + from_block = next_block + return ret def get_comment_pending_payouts(self, comments):