From 9ffb6da6a68a4f33a99c1b611ed732cf28bf62dc Mon Sep 17 00:00:00 2001
From: Bartek Wrona <wrona@syncad.com>
Date: Tue, 8 Sep 2020 01:16:31 +0200
Subject: [PATCH] Enabled concurrent execution of Posts.flush

---
 hive/indexer/blocks.py |  9 ++++++---
 hive/indexer/posts.py  | 10 ++++++++--
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py
index 9a5d4cd10..cc5b7912d 100644
--- a/hive/indexer/blocks.py
+++ b/hive/indexer/blocks.py
@@ -43,6 +43,7 @@ class Blocks:
     _current_block_date = None
 
     _concurrent_flush = [
+      ('Posts', Posts.flush, Posts),
       ('PostDataCache', PostDataCache.flush, PostDataCache),
       ('Reputations', Reputations.flush, Reputations),
       ('Votes', Votes.flush, Votes), 
@@ -65,6 +66,7 @@ class Blocks:
         Votes.setup_db_access(sharedDbAdapter)
         Tags.setup_db_access(sharedDbAdapter)
         Follow.setup_db_access(sharedDbAdapter)
+        Posts.setup_db_access(sharedDbAdapter)
 
     @classmethod
     def head_num(cls):
@@ -126,11 +128,12 @@ class Blocks:
 
         log.info("#############################################################################")
         flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
-        flush_time = register_time(flush_time, "Posts", Posts.flush())
 
         folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
         flush_time = register_time(flush_time, "Follow", folllow_items)
 
+        DB.query("COMMIT")
+
         completedThreads = 0;
 
         pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
@@ -155,9 +158,9 @@ class Blocks:
         assert completedThreads == len(cls._concurrent_flush)
 
         if (not is_initial_sync) and (first_block > -1):
+            DB.query("START TRANSACTION")
             cls.on_live_blocks_processed( first_block, last_num )
-
-        DB.query("COMMIT")
+            DB.query("COMMIT")
 
         log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
 
diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py
index ba63521ef..b67e2ce2b 100644
--- a/hive/indexer/posts.py
+++ b/hive/indexer/posts.py
@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE
 from hive.indexer.notify import Notify
 from hive.indexer.post_data_cache import PostDataCache
 from hive.indexer.tags import Tags
+from hive.indexer.db_adapter_holder import DbAdapterHolder
+
 from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive
 
 log = logging.getLogger(__name__)
 DB = Db.instance()
 
-class Posts:
+class Posts(DbAdapterHolder):
     """Handles critical/core post ops and data."""
 
     # LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
@@ -213,9 +215,13 @@ class Posts:
                 yield lst[i:i + n]
 
         for chunk in chunks(cls._comment_payout_ops, 1000):
+            cls.beginTx()
+
             values_str = ','.join(chunk)
             actual_query = sql.format(values_str)
-            DB.query(actual_query)
+            cls.db.query(actual_query)
+
+            cls.commitTx()
 
         n = len(cls._comment_payout_ops)
         cls._comment_payout_ops.clear()
-- 
GitLab