Skip to content
Snippets Groups Projects
Commit a7acca69 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'dk-issue-3-filtered-vops' into 'dk-issue-3-concurrent-block-query'

Applied account_history_api::enum_virtual_ops new capabilities

See merge request blocktrades/hivemind!7
parents 4a572716 59e14fbe
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
"""Blocks processor.""" """Blocks processor."""
import logging import logging
import json
from hive.db.adapter import Db from hive.db.adapter import Db
...@@ -105,6 +106,8 @@ class Blocks: ...@@ -105,6 +106,8 @@ class Blocks:
key = "{}/{}".format(op_value['author'], op_value['permlink']) key = "{}/{}".format(op_value['author'], op_value['permlink'])
val = {'hbd_payout':op_value['hbd_payout'], 'hive_payout':op_value['hive_payout'], 'vesting_payout':op_value['vesting_payout']} val = {'hbd_payout':op_value['hbd_payout'], 'hive_payout':op_value['hive_payout'], 'vesting_payout':op_value['vesting_payout']}
elif op_type == 'comment_reward_operation': elif op_type == 'comment_reward_operation':
if('payout' not in op_value or op_value['payout'] is None):
log.error("Broken op: `{}'".format(str(vop)))
key = "{}/{}".format(op_value['author'], op_value['permlink']) key = "{}/{}".format(op_value['author'], op_value['permlink'])
val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']} val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']}
elif op_type == 'effective_comment_vote_operation': elif op_type == 'effective_comment_vote_operation':
......
...@@ -40,10 +40,11 @@ def print_ops_stats(prefix, ops_stats): ...@@ -40,10 +40,11 @@ def print_ops_stats(prefix, ops_stats):
log.info("`{}': {}".format(k, v)) log.info("`{}': {}".format(k, v))
log.info("############################################################################") log.info("############################################################################")
def prepare_vops(vops_by_block): def prepare_vops(vops_by_block):
preparedVops = {} preparedVops = {}
for blockNum, blockDict in vops_by_block.items():
for blockNum, blockDict in vops_by_block.items():
vopsList = blockDict['ops'] vopsList = blockDict['ops']
date = blockDict['timestamp'] date = blockDict['timestamp']
preparedVops[blockNum] = Blocks.prepare_vops(vopsList, date) preparedVops[blockNum] = Blocks.prepare_vops(vopsList, date)
...@@ -108,6 +109,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -108,6 +109,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
blocks = [] blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING: if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get() blocks = blocksQueue.get()
blocksQueue.task_done()
if vopsQueue.empty() and CONTINUE_PROCESSING: if vopsQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any vops to process...") log.info("Awaiting any vops to process...")
...@@ -115,6 +117,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -115,6 +117,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
preparedVops = [] preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING: if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get() preparedVops = vopsQueue.get()
vopsQueue.task_done()
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
...@@ -139,9 +142,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -139,9 +142,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if block_end - block_start > 1.0: if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats) print_ops_stats("Operations present in the processed blocks:", ops_stats)
blocksQueue.task_done()
vopsQueue.task_done()
lbound = to lbound = to
num = num + 1 num = num + 1
......
...@@ -156,24 +156,35 @@ class SteemClient: ...@@ -156,24 +156,35 @@ class SteemClient:
def enum_virtual_ops(self, begin_block, end_block): def enum_virtual_ops(self, begin_block, end_block):
""" Get virtual ops for range of blocks """ """ Get virtual ops for range of blocks """
ret = {} ret = {}
delta = 100
from_block = begin_block from_block = begin_block
to_block = (begin_block + delta) if begin_block + delta < end_block else end_block
#According to definition of hive::plugins::acount_history::enum_vops_filter:
while from_block < to_block:
result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":to_block}) author_reward_operation = 0x000002
ops = result['ops'] if 'ops' in result else [] curation_reward_operation = 0x000004
tracked_ops = ['curation_reward_operation', 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation'] comment_reward_operation = 0x000008
effective_comment_vote_operation = 0x400000
for op in ops:
block = op['block'] tracked_ops_filter = curation_reward_operation | author_reward_operation | comment_reward_operation | effective_comment_vote_operation
if block in ret and op['op']['type'] in tracked_ops: tracked_ops = ['curation_reward_operation', 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation']
ret[block]['ops'].append(op['op'])
if block not in ret and op['op']['type'] in tracked_ops: resume_on_operation = 0
ret[block] = {'timestamp':op['timestamp'], 'ops':[op['op']]}
from_block = to_block while from_block < end_block:
to_block = (from_block + delta) if from_block + delta < end_block else end_block call_result = self.__exec('enum_virtual_ops', {"block_range_begin":from_block, "block_range_end":end_block
, "group_by_block": True, "operation_begin": resume_on_operation, "limit": 1000, "filter": tracked_ops_filter
})
ret = {opb["block"] : {"timestamp":opb["timestamp"], "ops":[op["op"] for op in opb["ops"]]} for opb in call_result["ops_by_block"]}
resume_on_operation = call_result['next_operation_begin'] if 'next_operation_begin' in call_result else 0
next_block = call_result['next_block_range_begin']
# Move to next block only if operations from current one have been processed completely.
from_block = next_block
return ret return ret
def get_comment_pending_payouts(self, comments): def get_comment_pending_payouts(self, comments):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment