From 93ccb68eb1e855a1827822515376d0d1a98f1709 Mon Sep 17 00:00:00 2001 From: Bartek Wrona <wrona@syncad.com> Date: Fri, 26 Jun 2020 15:57:49 +0200 Subject: [PATCH] enun_virtual_ops call adjusted to new AH-API::enum_virtual_ops call scheme --- hive/indexer/blocks.py | 1 + hive/indexer/sync.py | 5 ++--- hive/steem/client.py | 39 ++++++++++++++------------------------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index a1184f81d..8869d4737 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -1,6 +1,7 @@ """Blocks processor.""" import logging +import json from hive.db.adapter import Db diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 8677fb67b..c040832bf 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -108,6 +108,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun blocks = [] if not blocksQueue.empty() or CONTINUE_PROCESSING: blocks = blocksQueue.get() + blocksQueue.task_done() if vopsQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any vops to process...") @@ -115,6 +116,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun preparedVops = [] if not vopsQueue.empty() or CONTINUE_PROCESSING: preparedVops = vopsQueue.get() + vopsQueue.task_done() to = min(lbound + chunk_size, ubound) @@ -139,9 +141,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun if block_end - block_start > 1.0: print_ops_stats("Operations present in the processed blocks:", ops_stats) - blocksQueue.task_done() - vopsQueue.task_done() - lbound = to num = num + 1 diff --git a/hive/steem/client.py b/hive/steem/client.py index 08a38d614..84a3f5ccf 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -156,10 +156,8 @@ class SteemClient: def enum_virtual_ops(self, begin_block, end_block): """ Get virtual ops for range of blocks """ ret = {} - delta = 1000 from_block = begin_block - to_block = (begin_block + delta) if begin_block + delta < end_block else end_block #According to definition of hive::plugins::acount_history::enum_vops_filter: @@ -173,29 +171,20 @@ class SteemClient: resume_on_operation = 0 - while from_block < to_block: - result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":to_block - , "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter - }) - ops = result['ops'] if 'ops' in result else [] - resume_on_operation = result['next_operation_begin'] if 'next_operation_begin' in result else 0 - - next_block = result['next_block_range_begin'] if 'next_block_range_begin' in result else from_block + delta - - for op in ops: - if(op['op']['type'] not in tracked_ops): - logger.error("{} VOPS Filtering failed: `{}'".format(str(tracked_ops_filter), str(op))) - - if(op['op']['type'] == 'comment_reward_operation' and 'payout' not in op['op']['value']): - logger.error("Broken op: `{}'".format(str(op))) - - block = op['block'] - if block in ret: - ret[block]['ops'].append(op['op']) - if block not in ret: - ret[block] = {'timestamp':op['timestamp'], 'ops':[op['op']]} - from_block = to_block - to_block = next_block if next_block < end_block else end_block + while from_block < end_block: + call_result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":end_block + , "group_by_block": True, "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter + }) + + ret = {opb["block"] : {"timestamp":opb["timestamp"], "ops":[op["op"] for op in opb["ops"]]} for opb in call_result["ops_by_block"]} + + resume_on_operation = call_result['next_operation_begin'] if 'next_operation_begin' in call_result else 0 + + next_block = call_result['next_block_range_begin'] + + # Move to next block only if operations from current one have been processed completely. + from_block = next_block + return ret def get_comment_pending_payouts(self, comments): -- GitLab