Skip to content
Snippets Groups Projects
Commit c508cd3a authored by Bartek Wrona's avatar Bartek Wrona
Browse files

- Fixed bug in vote operation processing causing wrong MT access

- Fixed handling of KeyboardInterrupt
parent 047a1c50
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
...@@ -91,6 +91,7 @@ class Blocks: ...@@ -91,6 +91,7 @@ class Blocks:
@staticmethod @staticmethod
def prepare_vops(vopsList, date): def prepare_vops(vopsList, date):
vote_ops = []
comment_payout_ops = {} comment_payout_ops = {}
for vop in vopsList: for vop in vopsList:
key = None key = None
...@@ -108,7 +109,7 @@ class Blocks: ...@@ -108,7 +109,7 @@ class Blocks:
key = "{}/{}".format(op_value['author'], op_value['permlink']) key = "{}/{}".format(op_value['author'], op_value['permlink'])
val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']} val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']}
elif op_type == 'effective_comment_vote_operation': elif op_type == 'effective_comment_vote_operation':
Votes.vote_op(vop, date) vote_ops.append(vop)
if key is not None and val is not None: if key is not None and val is not None:
if key in comment_payout_ops: if key in comment_payout_ops:
...@@ -116,7 +117,7 @@ class Blocks: ...@@ -116,7 +117,7 @@ class Blocks:
else: else:
comment_payout_ops[key] = [{op_type:val}] comment_payout_ops[key] = [{op_type:val}]
return comment_payout_ops return (vote_ops, comment_payout_ops)
@classmethod @classmethod
def _track_tx(cls, opCount = 1): def _track_tx(cls, opCount = 1):
...@@ -217,12 +218,25 @@ class Blocks: ...@@ -217,12 +218,25 @@ class Blocks:
# virtual ops # virtual ops
comment_payout_ops = {} comment_payout_ops = {}
vote_ops = []
empty_vops = (vote_ops, comment_payout_ops)
if is_initial_sync: if is_initial_sync:
comment_payout_ops = virtual_operations[num] if num in virtual_operations else {} (vote_ops, comment_payout_ops) = virtual_operations[num] if num in virtual_operations else empty_vops
else: else:
vops = hived.get_virtual_operations(num) vops = hived.get_virtual_operations(num)
comment_payout_ops = prepare_vops(vops, date) (vote_ops, comment_payout_ops) = prepare_vops(vops, date)
for v in vote_ops:
Votes.vote_op(v, date)
op_type = v['type']
if op_type in cls.ops_stats:
cls.ops_stats[op_type] += 1
else:
cls.ops_stats[op_type] = 1
cls._track_tx(len(vote_ops))
if comment_payout_ops: if comment_payout_ops:
comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date) comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date)
...@@ -231,8 +245,6 @@ class Blocks: ...@@ -231,8 +245,6 @@ class Blocks:
return num return num
@classmethod @classmethod
def verify_head(cls, steem): def verify_head(cls, steem):
"""Perform a fork recovery check on startup.""" """Perform a fork recovery check on startup."""
......
...@@ -126,9 +126,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -126,9 +126,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to) # log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
blocksQueue.task_done()
vopsQueue.task_done()
timer.batch_start() timer.batch_start()
block_start = perf() block_start = perf()
...@@ -149,11 +146,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -149,11 +146,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if block_end - block_start > 1.0: if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats) print_ops_stats("Operations present in the processed blocks:", ops_stats)
blocksQueue.task_done()
vopsQueue.task_done()
lbound = to lbound = to
num = num + 1 num = num + 1
print_ops_stats("All operations present in the processed blocks:", ops_stats) print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
return num return num
except KeyboardInterrupt as ki: except KeyboardInterrupt as ki:
log.info("Caught SIGINT") log.info("Caught SIGINT")
...@@ -173,6 +173,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): ...@@ -173,6 +173,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blockConsumerFuture.result() blockConsumerFuture.result()
# pool.shutdown() # pool.shutdown()
except KeyboardInterrupt as ex: except KeyboardInterrupt as ex:
global continue_processing
continue_processing = 0 continue_processing = 0
pool.shutdown(False) pool.shutdown(False)
......
...@@ -52,6 +52,8 @@ class Votes: ...@@ -52,6 +52,8 @@ class Votes:
ret = DB.query_row(sql, author=author, permlink=permlink) ret = DB.query_row(sql, author=author, permlink=permlink)
return 0 if ret is None else int(ret.count) return 0 if ret is None else int(ret.count)
inside_flush = False
@classmethod @classmethod
def vote_op(cls, vop, date): def vote_op(cls, vop, date):
""" Process vote_operation """ """ Process vote_operation """
...@@ -59,6 +61,10 @@ class Votes: ...@@ -59,6 +61,10 @@ class Votes:
author = vop['value']['author'] author = vop['value']['author']
permlink = vop['value']['permlink'] permlink = vop['value']['permlink']
if(cls.inside_flush):
log.info("Adding new vote-info into _votes_data dict")
raise "Fatal error"
key = voter + "/" + author + "/" + permlink key = voter + "/" + author + "/" + permlink
cls._votes_data[key] = dict(voter=voter, cls._votes_data[key] = dict(voter=voter,
...@@ -71,6 +77,8 @@ class Votes: ...@@ -71,6 +77,8 @@ class Votes:
@classmethod @classmethod
def flush(cls): def flush(cls):
log.info("Inside Votes.flush")
cls.inside_flush = True
""" Flush vote data from cache to database """ """ Flush vote data from cache to database """
if cls._votes_data: if cls._votes_data:
sql = """ sql = """
...@@ -122,3 +130,5 @@ class Votes: ...@@ -122,3 +130,5 @@ class Votes:
values.clear() values.clear()
cls._votes_data.clear() cls._votes_data.clear()
cls.inside_flush = False
log.info("Exiting Votes.flush")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment