From 4e198a112541d1f95153e164db6bac5fd10ff61c Mon Sep 17 00:00:00 2001
From: Bartek Wrona <wrona@syncad.com>
Date: Mon, 22 Jun 2020 11:01:21 +0200
Subject: [PATCH] WIP: Implemented concurrent fetching of blocks and vops to
 avoid blocking operation processing during API calls.

---
 hive/indexer/sync.py | 114 +++++++++++++++++++++++++++++++++++++++++++
 hive/utils/stats.py  |  23 +++++++--
 2 files changed, 132 insertions(+), 5 deletions(-)

diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py
index a064b72bc..7f40425cf 100644
--- a/hive/indexer/sync.py
+++ b/hive/indexer/sync.py
@@ -5,6 +5,11 @@ import glob
 from time import perf_counter as perf
 import os
 import ujson as json
+import time
+
+import concurrent, threading, queue
+from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import Future
 
 from funcy.seqs import drop
 from toolz import partition_all
@@ -26,6 +31,111 @@ from hive.server.common.mutes import Mutes
 
 log = logging.getLogger(__name__)
 
+continue_processing = 1
+
+def _block_provider(node, queue, lbound, ubound, chunk_size):
+  try:
+    count = ubound - lbound
+    log.info("[SYNC] start block %d, +%d to sync", lbound, count)
+    timer = Timer(count, entity='block', laps=['rps', 'wps'])
+    num = 0
+    while continue_processing and lbound < ubound:
+        to = min(lbound + chunk_size, ubound)
+#        log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
+        timer.batch_start()
+        blocks = node.get_blocks_range(lbound, to)
+        lbound = to
+        timer.batch_lap()
+#        log.info("Enqueuing retrieved blocks from range: [%d, %d]. Blocks queue size: %d", lbound, to, queue.qsize())
+        queue.put(blocks)
+        num = num + 1
+    return num
+  except KeyboardInterrupt as ki:
+    log.info("Caught SIGINT")
+
+  except Exception as ex:
+    log.exception("Exception caught during fetching blocks")
+
+def _vops_provider(node, queue, lbound, ubound, chunk_size):
+  try:
+    count = ubound - lbound
+    log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
+    timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
+    num = 0
+
+    while continue_processing and  lbound < ubound:
+        to = min(lbound + chunk_size, ubound)
+#        log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
+        timer.batch_start()
+        vops = node.enum_virtual_ops(lbound, to)
+        lbound = to
+        timer.batch_lap()
+#        log.info("Enqueuing retrieved vops for block range: [%d, %d]. Vops queue size: %d", lbound, to, queue.qsize())
+        queue.put(vops)
+        num = num + 1
+
+    return num
+  except KeyboardInterrupt as ki:
+    log.info("Caught SIGINT")
+
+  except Exception as ex:
+    log.exception("Exception caught during fetching vops...")
+
+def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
+  try:
+    count = ubound - lbound
+    num = 0
+    timer = Timer(count, entity='block', laps=['rps', 'wps'])
+
+    while continue_processing and lbound < ubound:
+#        log.info("Awaiting any block to process...")
+        blocks = blocksQueue.get()
+        blocksQueue.task_done()
+
+#        log.info("Awaiting any vops to process...")
+        vops = vopsQueue.get()
+
+        to = min(lbound + chunk_size, ubound)
+#        log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
+
+        timer.batch_start()
+        Blocks.process_multi(blocks, vops, node, is_initial_sync)
+        timer.batch_lap()
+        timer.batch_finish(len(blocks))
+        prefix = ("[SYNC] Got block %d @ %s" % (
+            to - 1, blocks[-1]['timestamp']))
+        log.info(timer.batch_status(prefix))
+
+        lbound = to
+
+        vopsQueue.task_done()
+        num = num + 1
+
+    return num
+  except KeyboardInterrupt as ki:
+    log.info("Caught SIGINT")
+  except Exception as ex:
+    log.exception("Exception caught during processing blocks...")
+
+def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
+    blocksQueue = queue.Queue(maxsize=10)
+    vopsQueue = queue.Queue(maxsize=10)
+
+    with ThreadPoolExecutor(max_workers = 4) as pool:
+      try:
+        pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
+        pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
+        blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
+
+        blockConsumerFuture.result()
+  #      pool.shutdown()
+      except KeyboardInterrupt as ex:
+        continue_processing = 0
+        pool.shutdown(false)
+
+    blocksQueue.join()
+    vopsQueue.join()
+
 class Sync:
     """Manages the sync/index process.
 
@@ -150,6 +260,10 @@ class Sync:
         if count < 1:
             return
 
+        if is_initial_sync:
+          _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size)
+          return
+
         log.info("[SYNC] start block %d, +%d to sync", lbound, count)
         timer = Timer(count, entity='block', laps=['rps', 'wps'])
         while lbound < ubound:
diff --git a/hive/utils/stats.py b/hive/utils/stats.py
index f330408f7..d179eb666 100644
--- a/hive/utils/stats.py
+++ b/hive/utils/stats.py
@@ -65,8 +65,16 @@ class StatsAbstract:
 
         log.info('%7s %9s %9s %9s', '-pct-', '-ttl-', '-avg-', '-cnt-')
         for call, ms, reqs in self.table(40):
+            try:
+              avg = ms/reqs
+              millisec = ms/self._ms
+            except ZeroDivisionError as ex:
+              avg = 0.0
+              millisec = 0.0
+            if reqs == 0:
+                reqs = 1
             log.info("% 6.1f%% % 7dms % 9.2f % 8dx -- %s",
-                     100 * ms/self._ms, ms, ms/reqs, reqs, call)
+                     100 * millisec, ms, avg, reqs, call)
         self.clear()
 
 
@@ -131,6 +139,9 @@ class Stats:
     """Container for steemd and db timing data."""
     PRINT_THRESH_MINS = 1
 
+    COLLECT_DB_STATS = 0
+    COLLECT_NODE_STATS = 0
+
     _db = DbStats()
     _steemd = SteemStats()
     _secs = 0.0
@@ -140,14 +151,16 @@ class Stats:
     @classmethod
     def log_db(cls, sql, secs):
         """Log a database query. Incoming SQL is normalized."""
-        cls._db.add(_normalize_sql(sql), secs * 1000)
-        cls.add_secs(secs)
+        if cls.COLLECT_DB_STATS:
+            cls._db.add(_normalize_sql(sql), secs * 1000)
+            cls.add_secs(secs)
 
     @classmethod
     def log_steem(cls, method, secs, batch_size=1):
         """Log a steemd call."""
-        cls._steemd.add(method, secs * 1000, batch_size)
-        cls.add_secs(secs)
+        if cls.COLLECT_NODE_STATS:
+            cls._steemd.add(method, secs * 1000, batch_size)
+            cls.add_secs(secs)
 
     @classmethod
     def log_idle(cls, secs):
-- 
GitLab