From 047a1c5053307d1c498f81f7d157a4eea9d24af6 Mon Sep 17 00:00:00 2001
From: Bartek Wrona <wrona@syncad.com>
Date: Tue, 23 Jun 2020 09:43:31 +0200
Subject: [PATCH] - Fixed generation of votes insertion query (missing limit so
 query exceeded SQL limit). Also simplified SQL code - comment_payout_op is
 done via grouped UPDATE query to improve sync performance - follow insert is
 done via grouped UPDATE query to improve sync performance - Implemented
 statistics gathering for each processed block to print number of processed
 operations - virtual ops are prepared in its dedicated thread to improve sync
 performance - Added printing of total execution time of sync, at each
 block-range report.

---
 hive/db/adapter.py        |  11 ++--
 hive/indexer/blocks.py    | 123 +++++++++++++++++++++++++++-----------
 hive/indexer/custom_op.py |  13 ++++
 hive/indexer/follow.py    |  42 ++++++++-----
 hive/indexer/posts.py     | 111 ++++++++++++++++++++++++----------
 hive/indexer/sync.py      |  67 +++++++++++++++++----
 hive/indexer/votes.py     |  68 +++++++++++++--------
 7 files changed, 312 insertions(+), 123 deletions(-)

diff --git a/hive/db/adapter.py b/hive/db/adapter.py
index 9867ab815..e175c540b 100644
--- a/hive/db/adapter.py
+++ b/hive/db/adapter.py
@@ -157,11 +157,12 @@ class Db:
         return (sql, values)
 
     def _sql_text(self, sql):
-        if sql in self._prep_sql:
-            query = self._prep_sql[sql]
-        else:
-            query = sqlalchemy.text(sql).execution_options(autocommit=False)
-            self._prep_sql[sql] = query
+#        if sql in self._prep_sql:
+#            query = self._prep_sql[sql]
+#        else:
+#            query = sqlalchemy.text(sql).execution_options(autocommit=False)
+#            self._prep_sql[sql] = query
+        query = sqlalchemy.text(sql).execution_options(autocommit=False)
         return query
 
     def _query(self, sql, **kwargs):
diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py
index 4f5f34b9b..cc37dfc1b 100644
--- a/hive/indexer/blocks.py
+++ b/hive/indexer/blocks.py
@@ -20,6 +20,22 @@ DB = Db.instance()
 class Blocks:
     """Processes blocks, dispatches work, manages `hive_blocks` table."""
     blocks_to_flush = []
+    ops_stats = {}
+
+    operations_in_tx = 0;
+    opened_tx = False;
+
+    OPERATIONS_IN_TX_TRESHOLD = 500000
+
+    @staticmethod
+    def merge_ops_stats(od1, od2):
+        for (k, v) in od2.items():
+            if(k in od1):
+               od1[k] += v
+            else:
+               od1[k] = v
+
+        return od1
 
     @classmethod
     def head_num(cls):
@@ -49,7 +65,7 @@ class Blocks:
     def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
         """Batch-process blocks; wrapped in a transaction."""
         time_start = perf_counter()
-        DB.query("START TRANSACTION")
+#        DB.query("START TRANSACTION")
 
         last_num = 0
         try:
@@ -67,10 +83,55 @@ class Blocks:
         cls._flush_blocks()
         Follow.flush(trx=False)
 
-        DB.query("COMMIT")
+#        DB.query("COMMIT")
         time_end = perf_counter()
         log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start)
 
+        return cls.ops_stats
+
+    @staticmethod
+    def prepare_vops(vopsList, date):
+        comment_payout_ops = {}
+        for vop in vopsList:
+            key = None
+            val = None
+
+            op_type = vop['type']
+            op_value = vop['value']
+            if op_type == 'curation_reward_operation':
+                key = "{}/{}".format(op_value['comment_author'], op_value['comment_permlink'])
+                val = {'reward' : op_value['reward']}
+            elif op_type == 'author_reward_operation':
+                key = "{}/{}".format(op_value['author'], op_value['permlink'])
+                val = {'hbd_payout':op_value['hbd_payout'], 'hive_payout':op_value['hive_payout'], 'vesting_payout':op_value['vesting_payout']}
+            elif op_type == 'comment_reward_operation':
+                key = "{}/{}".format(op_value['author'], op_value['permlink'])
+                val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']}
+            elif op_type == 'effective_comment_vote_operation':
+                Votes.vote_op(vop, date)
+
+            if key is not None and val is not None:
+                if key in comment_payout_ops:
+                    comment_payout_ops[key].append({op_type:val})
+                else:
+                    comment_payout_ops[key] = [{op_type:val}]
+
+        return comment_payout_ops
+
+    @classmethod
+    def _track_tx(cls, opCount = 1):
+        if(cls.opened_tx == False):
+            DB.query("START TRANSACTION")
+            cls.operations_in_tx = 0;
+            cls.opened_tx = True
+
+        cls.operations_in_tx += opCount
+
+        if(cls.operations_in_tx >= cls.OPERATIONS_IN_TX_TRESHOLD):
+            DB.query("COMMIT")
+            DB.query("START TRANSACTION")
+            cls.operations_in_tx = 0
+
     @classmethod
     def _process(cls, block, virtual_operations, hived, is_initial_sync=False):
         """Process a single block. Assumes a trx is open."""
@@ -96,6 +157,8 @@ class Blocks:
                     account_names.add(op['new_account_name'])
                 elif op_type == 'create_claimed_account_operation':
                     account_names.add(op['new_account_name'])
+
+        cls._track_tx()
         Accounts.register(account_names, date)     # register any new names
 
         # second scan will process all other ops
@@ -106,6 +169,14 @@ class Blocks:
                 op_type = operation['type']
                 op = operation['value']
 
+                if(op_type != 'custom_json_operation'):
+                    if op_type in cls.ops_stats:
+                        cls.ops_stats[op_type] += 1
+                    else:
+                        cls.ops_stats[op_type] = 1
+
+                cls._track_tx()
+
                 # account metadata updates
                 if op_type == 'account_update_operation':
                     if not is_initial_sync:
@@ -135,49 +206,33 @@ class Blocks:
 
         # follow/reblog/community ops
         if json_ops:
-            CustomOp.process_ops(json_ops, num, date)
+            custom_ops_stats = CustomOp.process_ops(json_ops, num, date)
+            cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats)
+            cls._track_tx(len(json_ops))
 
         if update_comment_pending_payouts:
-            Posts.update_comment_pending_payouts(hived, update_comment_pending_payouts)
+            payout_ops_stat = Posts.update_comment_pending_payouts(hived, update_comment_pending_payouts)
+            cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, payout_ops_stat)
+            cls._track_tx(len(update_comment_pending_payouts))
 
         # virtual ops
         comment_payout_ops = {}
-        vops = []
-        if not is_initial_sync:
-            vops = hived.get_virtual_operations(num)
-        else:
-            vops = virtual_operations[num]['ops'] if num in virtual_operations else []
-        for vop in vops:
-            key = None
-            val = None
-
-            op_type = vop['type']
-            op_value = vop['value']
-            if op_type == 'curation_reward_operation':
-                key = "{}/{}".format(op_value['comment_author'], op_value['comment_permlink'])
-                val = {'reward' : op_value['reward']}
 
-            if op_type == 'author_reward_operation':
-                key = "{}/{}".format(op_value['author'], op_value['permlink'])
-                val = {'hbd_payout':op_value['hbd_payout'], 'hive_payout':op_value['hive_payout'], 'vesting_payout':op_value['vesting_payout']}
-
-            if op_type == 'comment_reward_operation':
-                key = "{}/{}".format(op_value['author'], op_value['permlink'])
-                val = {'payout':op_value['payout'], 'author_rewards':op_value['author_rewards']}
-
-            if op_type == 'effective_comment_vote_operation':
-                Votes.vote_op(vop, date)
+        if is_initial_sync:
+            comment_payout_ops = virtual_operations[num] if num in virtual_operations else {}
+        else:
+            vops = hived.get_virtual_operations(num)
+            comment_payout_ops = prepare_vops(vops, date)
 
-            if key is not None and val is not None:
-                if key in comment_payout_ops:
-                    comment_payout_ops[key].append({op_type:val})
-                else:
-                    comment_payout_ops[key] = [{op_type:val}]
         if comment_payout_ops:
-            Posts.comment_payout_op(comment_payout_ops, date)
+            comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date)
+            cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats)
+            cls._track_tx(len(comment_payout_ops))
 
         return num
 
+
+
     @classmethod
     def verify_head(cls, steem):
         """Perform a fork recovery check on startup."""
diff --git a/hive/indexer/custom_op.py b/hive/indexer/custom_op.py
index 47897b567..5a31ae7c5 100644
--- a/hive/indexer/custom_op.py
+++ b/hive/indexer/custom_op.py
@@ -39,11 +39,23 @@ class CustomOp:
 
     @classmethod
     def process_ops(cls, ops, block_num, block_date):
+        ops_stats = {}
+
         """Given a list of operation in block, filter and process them."""
         for op in ops:
             if op['id'] not in ['follow', 'community', 'notify']:
+                opName = str(op['id']) + '-ignored'
+                if(opName  in ops_stats):
+                    ops_stats[opName] += 1
+                else:
+                    ops_stats[opName] = 1
                 continue
 
+            if(op['id'] in ops_stats):
+                ops_stats[op['id']] += 1
+            else:
+                ops_stats[op['id']] = 1
+
             account = _get_auth(op)
             if not account:
                 continue
@@ -58,6 +70,7 @@ class CustomOp:
                     process_json_community_op(account, op_json, block_date)
             elif op['id'] == 'notify':
                 cls._process_notify(account, op_json, block_date)
+        return ops_stats
 
     @classmethod
     def _process_notify(cls, account, op_json, block_date):
diff --git a/hive/indexer/follow.py b/hive/indexer/follow.py
index 21ddfc646..84fb7cadc 100644
--- a/hive/indexer/follow.py
+++ b/hive/indexer/follow.py
@@ -38,7 +38,7 @@ def _flip_dict(dict_to_flip):
 class Follow:
     """Handles processing of incoming follow ups and flushing to db."""
 
-    follow_items_to_flush = []
+    follow_items_to_flush = dict()
 
     @classmethod
     def follow_op(cls, account, op_json, date):
@@ -52,13 +52,20 @@ class Follow:
         old_state = None
         if DbState.is_initial_sync():
             # insert or update state
-            DB.query(FOLLOW_ITEM_INSERT_QUERY, **op)
 
-#            cls.follow_items_to_flush.append({
-#            'flr': op['flr'],
-#            'flg': op['flg'],
-#            'state': op['state'],
-#            'at': op['at']})
+            k = '{}/{}'.format(op['flr'], op['flg'])
+
+            if k in cls.follow_items_to_flush:
+                old_value = cls.follow_items_to_flush.get(k)
+                old_value['state'] = op['state'] 
+                cls.follow_items_to_flush[k] = old_value
+            else:
+                cls.follow_items_to_flush[k] = dict(
+                                                      flr = op['flr'],
+                                                      flg = op['flg'],
+                                                      state = op['state'],
+                                                      at = op['at'])
+
         else:
             old_state = cls._get_follow_db_state(op['flr'], op['flg'])
             # insert or update state
@@ -139,15 +146,17 @@ class Follow:
               VALUES """
 
         sql_postfix = """
-              ON CONFLICT (follower, following) DO UPDATE SET state = (CASE hf.state
-                                                                        WHEN 0 THEN 0 -- 0 blocks possibility to update state 
-                                                                        ELSE 1
-                                                                        END)
+              ON CONFLICT ON CONSTRAINT hive_follows_pk DO UPDATE SET 
+                state = (CASE hf.state
+                        WHEN 0 THEN 0 -- 0 blocks possibility to update state 
+                        ELSE EXCLUDED.state
+                    END)
+              WHERE hf.following = EXCLUDED.following AND hf.follower = EXCLUDED.follower
               """
         values = []
         limit = 1000
-        count = 0;
-        for follow_item in cls.follow_items_to_flush:
+        count = 0
+        for (k, follow_item) in cls.follow_items_to_flush.items():
           if count < limit:
             values.append("({}, {}, '{}', {})".format(follow_item['flr'], follow_item['flg'], follow_item['at'], follow_item['state']))
             count = count + 1
@@ -155,15 +164,16 @@ class Follow:
             query = sql_prefix + ",".join(values)
             query += sql_postfix
             DB.query(query)
-            values = []
-            count = 0
+            values.clear()
+            values.append("({}, {}, '{}', {})".format(follow_item['flr'], follow_item['flg'], follow_item['at'], follow_item['state']))
+            count = 1
 
         if len(values):
           query = sql_prefix + ",".join(values)
           query += sql_postfix
           DB.query(query)
 
-        cls.follow_items_to_flush = []
+        cls.follow_items_to_flush.clear()
 
     @classmethod
     def flush(cls, trx=True):
diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py
index 8cc29b822..403b3ddaa 100644
--- a/hive/indexer/posts.py
+++ b/hive/indexer/posts.py
@@ -122,6 +122,53 @@ class Posts:
 
     @classmethod
     def comment_payout_op(cls, ops, date):
+        ops_stats = {}
+        sql = """
+              UPDATE hive_posts AS ihp SET
+                  total_payout_value = data_source.total_payout_value,
+                  curator_payout_value = data_source.curator_payout_value,
+                  author_rewards = data_source.author_rewards,
+                  author_rewards_hive = data_source.author_rewards_hive,
+                  author_rewards_hbd = data_source.author_rewards_hbd,
+                  author_rewards_vests = data_source.author_rewards_vests,
+                  last_payout = data_source.last_payout,
+                  cashout_time = data_source.cashout_time,
+                  is_paidout = true
+
+              FROM 
+              (
+              SELECT  ha_a.id as author_id, hpd_p.id as permlink_id, 
+                      t.total_payout_value,
+                      t.curator_payout_value,
+                      t.author_rewards,
+                      t.author_rewards_hive,
+                      t.author_rewards_hbd,
+                      t.author_rewards_vests,
+                      t.last_payout,
+                      t.cashout_time
+              from
+              (
+              VALUES
+                --- put all constant values here
+                {}
+              ) AS T(author, permlink,
+                      total_payout_value,
+                      curator_payout_value,
+                      author_rewards,
+                      author_rewards_hive,
+                      author_rewards_hbd,
+                      author_rewards_vests,
+                      last_payout,
+                      cashout_time)
+              INNER JOIN hive_accounts ha_a ON ha_a.name = t.author
+              INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink
+              ) as data_source(author_id, permlink_id, total_payout_value)
+              WHERE ihp.permlink_id = data_source.permlink_id and ihp.author_id = data_source.author_id
+              """
+
+        values = []
+        values_limit = 1000
+
         """ Process comment payment operations """
         for k, v in ops.items():
             author, permlink = k.split("/")
@@ -136,48 +183,45 @@ class Posts:
             comment_author_reward = None
             for operation in v:
                 for op, value in operation.items():
+                    if op in ops_stats:
+                        ops_stats[op] += 1
+                    else:
+                        ops_stats[op] = 1
+
                     if op == 'curation_reward_operation':
                         curator_rewards_sum = curator_rewards_sum + int(value['reward']['amount'])
-
-                    if op == 'author_reward_operation':
+                    elif op == 'author_reward_operation':
                         author_rewards_hive = value['hive_payout']['amount']
                         author_rewards_hbd = value['hbd_payout']['amount']
                         author_rewards_vests = value['vesting_payout']['amount']
-
-                    if op == 'comment_reward_operation':
+                    elif op == 'comment_reward_operation':
                         comment_author_reward = value['payout']
                         author_rewards = value['author_rewards']
             curator_rewards = {'amount' : str(curator_rewards_sum), 'precision': 6, 'nai': '@@000000037'}
 
-            sql = """UPDATE
-                        hive_posts
-                    SET
-                        total_payout_value = :total_payout_value,
-                        curator_payout_value = :curator_payout_value,
-                        author_rewards = :author_rewards,
-                        author_rewards_hive = :author_rewards_hive,
-                        author_rewards_hbd = :author_rewards_hbd,
-                        author_rewards_vests = :author_rewards_vests,
-                        last_payout = :last_payout,
-                        cashout_time = :cashout_time,
-                        is_paidout = true
-                    WHERE id = (
-                        SELECT hp.id 
-                        FROM hive_posts hp 
-                        INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id 
-                        INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id 
-                        WHERE ha_a.name = :author AND hpd_p.permlink = :permlink
-                    )
-            """
-            DB.query(sql, total_payout_value=legacy_amount(comment_author_reward),
-                     curator_payout_value=legacy_amount(curator_rewards),
-                     author_rewards=author_rewards,
-                     author_rewards_hive=author_rewards_hive,
-                     author_rewards_hbd=author_rewards_hbd,
-                     author_rewards_vests=author_rewards_vests,
-                     last_payout=date,
-                     cashout_time=date,
-                     author=author, permlink=permlink)
+            values.append("('{}', '{}', '{}', '{}', {}, {}, {}, {}, '{}'::timestamp, '{}'::timestamp)".format(author, permlink,
+               legacy_amount(comment_author_reward), # total_payout_value
+               legacy_amount(curator_rewards), #curator_payout_value
+               author_rewards,
+               author_rewards_hive,
+               author_rewards_hbd,
+               author_rewards_vests,
+               date, #last_payout
+               date #cashout_time
+               ))
+
+            if len(values) >= values_limit:
+                values_str = ','.join(values)
+                actual_query = sql.format(values_str)
+                DB.query(actual_query)
+                values.clear()
+
+        if len(values) >= 0:
+            values_str = ','.join(values)
+            actual_query = sql.format(values_str)
+            DB.query(actual_query)
+            values.clear()
+        return ops_stats
 
     @classmethod
     def update_child_count(cls, child_id, op='+'):
@@ -216,6 +260,7 @@ class Posts:
               FROM delete_hive_post((:author)::varchar, (:permlink)::varchar);
               """
         row = DB.query_row(sql, author=op['author'], permlink = op['permlink'])
+
         result = dict(row)
         pid = result['id']
 
diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py
index eacc41f0b..d68cf8d14 100644
--- a/hive/indexer/sync.py
+++ b/hive/indexer/sync.py
@@ -32,6 +32,24 @@ log = logging.getLogger(__name__)
 
 continue_processing = 1
 
+def print_ops_stats(prefix, ops_stats):
+    log.info("############################################################################")
+    log.info(prefix)
+    sorted_stats = sorted(ops_stats.items(), key=lambda kv: kv[1], reverse=True)
+    for (k, v) in sorted_stats:
+        log.info("`{}': {}".format(k, v))
+
+    log.info("############################################################################")
+def prepare_vops(vops_by_block):
+    preparedVops = {}
+    for blockNum, blockDict in vops_by_block.items():
+
+        vopsList = blockDict['ops']
+        date = blockDict['timestamp']
+        preparedVops[blockNum] = Blocks.prepare_vops(vopsList, date)
+  
+    return preparedVops
+
 def _block_provider(node, queue, lbound, ubound, chunk_size):
   try:
     count = ubound - lbound
@@ -45,7 +63,9 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
         blocks = node.get_blocks_range(lbound, to)
         lbound = to
         timer.batch_lap()
-#        log.info("Enqueuing retrieved blocks from range: [%d, %d]. Blocks queue size: %d", lbound, to, queue.qsize())
+#        if(queue.full()):
+#            log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize())
+
         queue.put(blocks)
         num = num + 1
     return num
@@ -67,10 +87,12 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
 #        log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
         timer.batch_start()
         vops = node.enum_virtual_ops(lbound, to)
+        preparedVops = prepare_vops(vops)
         lbound = to
         timer.batch_lap()
-#        log.info("Enqueuing retrieved vops for block range: [%d, %d]. Vops queue size: %d", lbound, to, queue.qsize())
-        queue.put(vops)
+#        if(queue.full()):
+#            log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize())
+        queue.put(preparedVops)
         num = num + 1
 
     return num
@@ -86,30 +108,52 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
     num = 0
     timer = Timer(count, entity='block', laps=['rps', 'wps'])
 
+    total_ops_stats = {}
+
+    time_start = perf()
+
     while continue_processing and lbound < ubound:
-#        log.info("Awaiting any block to process...")
+        if(blocksQueue.empty()):
+            log.info("Awaiting any block to process...")
+
         blocks = blocksQueue.get()
-        blocksQueue.task_done()
 
-#        log.info("Awaiting any vops to process...")
-        vops = vopsQueue.get()
+        if(vopsQueue.empty()):
+            log.info("Awaiting any vops to process...")
+
+        preparedVops = vopsQueue.get()
 
         to = min(lbound + chunk_size, ubound)
 #        log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
 
+        blocksQueue.task_done()
+        vopsQueue.task_done()
+
         timer.batch_start()
-        Blocks.process_multi(blocks, vops, node, is_initial_sync)
+        
+        block_start = perf()
+        ops_stats = Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
+        block_end = perf()
+
         timer.batch_lap()
         timer.batch_finish(len(blocks))
+        time_current = perf()
+
         prefix = ("[SYNC] Got block %d @ %s" % (
             to - 1, blocks[-1]['timestamp']))
         log.info(timer.batch_status(prefix))
+        log.info("Time elapsed: %fs", time_current - time_start)
+
+        total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
+
+        if block_end - block_start > 1.0:
+            print_ops_stats("Operations present in the processed blocks:", ops_stats)
 
         lbound = to
 
-        vopsQueue.task_done()
         num = num + 1
 
+    print_ops_stats("All operations present in the processed blocks:", ops_stats)
     return num
   except KeyboardInterrupt as ki:
     log.info("Caught SIGINT")
@@ -130,7 +174,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
   #      pool.shutdown()
       except KeyboardInterrupt as ex:
         continue_processing = 0
-        pool.shutdown(false)
+        pool.shutdown(False)
 
     blocksQueue.join()
     vopsQueue.join()
@@ -272,11 +316,12 @@ class Sync:
             to = min(lbound + chunk_size, ubound)
             blocks = steemd.get_blocks_range(lbound, to)
             vops = steemd.enum_virtual_ops(lbound, to)
+            preparedVops = prepare_vops(vops)
             lbound = to
             timer.batch_lap()
 
             # process blocks
-            Blocks.process_multi(blocks, vops, steemd, is_initial_sync)
+            Blocks.process_multi(blocks, preparedVops, steemd, is_initial_sync)
             timer.batch_finish(len(blocks))
 
             _prefix = ("[SYNC] Got block %d @ %s" % (
diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py
index 9c55b7645..b19e5fd66 100644
--- a/hive/indexer/votes.py
+++ b/hive/indexer/votes.py
@@ -74,31 +74,51 @@ class Votes:
         """ Flush vote data from cache to database """
         if cls._votes_data:
             sql = """
-                INSERT INTO hive_votes
+                    INSERT INTO hive_votes
                     (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update) 
-            """
+                    select data_source.post_id, data_source.voter_id, data_source.author_id, data_source.permlink_id, data_source.weight, data_source.rshares, data_source.vote_percent, data_source.last_update
+                    from 
+                    (
+                    SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update
+                    from
+                    (
+                    VALUES
+                    --   voter, author, permlink, weight, rshares, vote_percent, last_update
+                      {}
+                    ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update)
+                    INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter
+                    INNER JOIN hive_accounts ha_a ON ha_a.name = t.author
+                    INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink
+                    INNER JOIN hive_posts hp ON hp.author_id = ha_a.id AND hp.permlink_id = hpd_p.id  
+                    ) as data_source(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update)
+                    ON CONFLICT ON CONSTRAINT hive_votes_ux1 DO
+                      UPDATE
+                        SET
+                          weight = EXCLUDED.weight,
+                          rshares = EXCLUDED.rshares,
+                          vote_percent = EXCLUDED.vote_percent,
+                          last_update = EXCLUDED.last_update,
+                          num_changes = hive_votes.num_changes + 1
+                      WHERE hive_votes.id = EXCLUDED.id
+                      """
+
             values = []
+            values_limit = 1000
+
             for _, vd in cls._votes_data.items():
-                values.append("""
-                    SELECT hp.id, ha_v.id, ha_a.id, hpd_p.id, {}, {}, {}, '{}'::timestamp
-                    FROM hive_accounts ha_v,
-                        hive_posts hp
-                    INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
-                    INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
-                    WHERE ha_a.name = '{}' AND hpd_p.permlink = '{}' AND ha_v.name = '{}'
-                """.format(vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'], vd['author'], vd['permlink'], vd['voter']))
-            sql += ' UNION ALL '.join(values)
+                values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp)".format(
+                    vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update']))
 
-            sql += """
-                ON CONFLICT ON CONSTRAINT hive_votes_ux1 DO
-                    UPDATE
-                        SET
-                            weight = EXCLUDED.weight,
-                            rshares = EXCLUDED.rshares,
-                            vote_percent = EXCLUDED.vote_percent,
-                            last_update = EXCLUDED.last_update,
-                            num_changes = hive_votes.num_changes + 1
-                    WHERE hive_votes.id = EXCLUDED.id
-            """
-            DB.query(sql)
-            cls._votes_data.clear()
+                if len(values) >= values_limit:
+                    values_str = ','.join(values)
+                    actual_query = sql.format(values_str)
+                    DB.query(actual_query)
+                    values.clear()
+
+            if len(values) >= 0:
+                values_str = ','.join(values)
+                actual_query = sql.format(values_str)
+                DB.query(actual_query)
+                values.clear()
+                
+        cls._votes_data.clear()
-- 
GitLab