From c508cd3a91fe158566e54b633e0c461b50351d15 Mon Sep 17 00:00:00 2001
From: Bartek Wrona <wrona@syncad.com>
Date: Tue, 23 Jun 2020 10:18:01 +0200
Subject: [PATCH] - Fixed bug in vote operation processing causing wrong MT
 access - Fixed handling of KeyboardInterrupt

---
 hive/indexer/blocks.py | 24 ++++++++++++++++++------
 hive/indexer/sync.py   |  9 +++++----
 hive/indexer/votes.py  | 10 ++++++++++
 3 files changed, 33 insertions(+), 10 deletions(-)

diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py
index cc37dfc1b..4e589e07c 100644
--- a/hive/indexer/blocks.py
+++ b/hive/indexer/blocks.py
@@ -91,6 +91,7 @@ class Blocks:
 
     @staticmethod
     def prepare_vops(vopsList, date):
+        vote_ops = []
         comment_payout_ops = {}
         for vop in vopsList:
             key = None
@@ -108,7 +109,7 @@ class Blocks:
                 key = "{}/{}".format(op_value['author'], op_value['permlink'])
                 val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']}
             elif op_type == 'effective_comment_vote_operation':
-                Votes.vote_op(vop, date)
+                vote_ops.append(vop)
 
             if key is not None and val is not None:
                 if key in comment_payout_ops:
@@ -116,7 +117,7 @@ class Blocks:
                 else:
                     comment_payout_ops[key] = [{op_type:val}]
 
-        return comment_payout_ops
+        return (vote_ops, comment_payout_ops)
 
     @classmethod
     def _track_tx(cls, opCount = 1):
@@ -217,12 +218,25 @@ class Blocks:
 
         # virtual ops
         comment_payout_ops = {}
+        vote_ops = []
+
+        empty_vops = (vote_ops, comment_payout_ops)
 
         if is_initial_sync:
-            comment_payout_ops = virtual_operations[num] if num in virtual_operations else {}
+            (vote_ops, comment_payout_ops) = virtual_operations[num] if num in virtual_operations else empty_vops
         else:
             vops = hived.get_virtual_operations(num)
-            comment_payout_ops = prepare_vops(vops, date)
+            (vote_ops, comment_payout_ops) = prepare_vops(vops, date)
+
+        for v in vote_ops:
+            Votes.vote_op(v, date)
+            op_type = v['type']
+            if op_type in cls.ops_stats:
+                cls.ops_stats[op_type] += 1
+            else:
+                cls.ops_stats[op_type] = 1
+
+        cls._track_tx(len(vote_ops))
 
         if comment_payout_ops:
             comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date)
@@ -231,8 +245,6 @@ class Blocks:
 
         return num
 
-
-
     @classmethod
     def verify_head(cls, steem):
         """Perform a fork recovery check on startup."""
diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py
index d68cf8d14..591061a44 100644
--- a/hive/indexer/sync.py
+++ b/hive/indexer/sync.py
@@ -126,9 +126,6 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
         to = min(lbound + chunk_size, ubound)
 #        log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
 
-        blocksQueue.task_done()
-        vopsQueue.task_done()
-
         timer.batch_start()
         
         block_start = perf()
@@ -149,11 +146,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
         if block_end - block_start > 1.0:
             print_ops_stats("Operations present in the processed blocks:", ops_stats)
 
+        blocksQueue.task_done()
+        vopsQueue.task_done()
+
         lbound = to
 
         num = num + 1
 
-    print_ops_stats("All operations present in the processed blocks:", ops_stats)
+    print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
     return num
   except KeyboardInterrupt as ki:
     log.info("Caught SIGINT")
@@ -173,6 +173,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
         blockConsumerFuture.result()
   #      pool.shutdown()
       except KeyboardInterrupt as ex:
+        global continue_processing
         continue_processing = 0
         pool.shutdown(False)
 
diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py
index b19e5fd66..2248e8e75 100644
--- a/hive/indexer/votes.py
+++ b/hive/indexer/votes.py
@@ -52,6 +52,8 @@ class Votes:
         ret = DB.query_row(sql, author=author, permlink=permlink)
         return 0 if ret is None else int(ret.count)
 
+    inside_flush = False
+
     @classmethod
     def vote_op(cls, vop, date):
         """ Process vote_operation """
@@ -59,6 +61,10 @@ class Votes:
         author = vop['value']['author']
         permlink = vop['value']['permlink']
 
+        if(cls.inside_flush):
+            log.info("Adding new vote-info into _votes_data dict")
+            raise "Fatal error"
+
         key = voter + "/" + author + "/" + permlink
 
         cls._votes_data[key] = dict(voter=voter,
@@ -71,6 +77,8 @@ class Votes:
 
     @classmethod
     def flush(cls):
+        log.info("Inside Votes.flush")
+        cls.inside_flush = True
         """ Flush vote data from cache to database """
         if cls._votes_data:
             sql = """
@@ -122,3 +130,5 @@ class Votes:
                 values.clear()
                 
         cls._votes_data.clear()
+        cls.inside_flush = False
+        log.info("Exiting Votes.flush")
-- 
GitLab