Skip to content
Snippets Groups Projects
Commit 93ccb68e authored by Bartek Wrona's avatar Bartek Wrona
Browse files

enun_virtual_ops call adjusted to new AH-API::enum_virtual_ops call scheme

parent df43edf9
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
"""Blocks processor."""
import logging
import json
from hive.db.adapter import Db
......
......@@ -108,6 +108,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get()
blocksQueue.task_done()
if vopsQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any vops to process...")
......@@ -115,6 +116,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get()
vopsQueue.task_done()
to = min(lbound + chunk_size, ubound)
......@@ -139,9 +141,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats)
blocksQueue.task_done()
vopsQueue.task_done()
lbound = to
num = num + 1
......
......@@ -156,10 +156,8 @@ class SteemClient:
def enum_virtual_ops(self, begin_block, end_block):
""" Get virtual ops for range of blocks """
ret = {}
delta = 1000
from_block = begin_block
to_block = (begin_block + delta) if begin_block + delta < end_block else end_block
#According to definition of hive::plugins::acount_history::enum_vops_filter:
......@@ -173,29 +171,20 @@ class SteemClient:
resume_on_operation = 0
while from_block < to_block:
result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":to_block
, "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter
})
ops = result['ops'] if 'ops' in result else []
resume_on_operation = result['next_operation_begin'] if 'next_operation_begin' in result else 0
next_block = result['next_block_range_begin'] if 'next_block_range_begin' in result else from_block + delta
for op in ops:
if(op['op']['type'] not in tracked_ops):
logger.error("{} VOPS Filtering failed: `{}'".format(str(tracked_ops_filter), str(op)))
if(op['op']['type'] == 'comment_reward_operation' and 'payout' not in op['op']['value']):
logger.error("Broken op: `{}'".format(str(op)))
block = op['block']
if block in ret:
ret[block]['ops'].append(op['op'])
if block not in ret:
ret[block] = {'timestamp':op['timestamp'], 'ops':[op['op']]}
from_block = to_block
to_block = next_block if next_block < end_block else end_block
while from_block < end_block:
call_result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":end_block
, "group_by_block": True, "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter
})
ret = {opb["block"] : {"timestamp":opb["timestamp"], "ops":[op["op"] for op in opb["ops"]]} for opb in call_result["ops_by_block"]}
resume_on_operation = call_result['next_operation_begin'] if 'next_operation_begin' in call_result else 0
next_block = call_result['next_block_range_begin']
# Move to next block only if operations from current one have been processed completely.
from_block = next_block
return ret
def get_comment_pending_payouts(self, comments):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment