From 64c77f4acb9ff5a546eeba6a6d7039b971e630aa Mon Sep 17 00:00:00 2001
From: kmochocki <kmochocki@syncad.com>
Date: Mon, 10 Aug 2020 15:59:13 +0200
Subject: [PATCH] updgraded logging and added prometheus daemon

---
 hive/cli.py                     |   2 +
 hive/conf.py                    |   3 +
 hive/indexer/blocks.py          |  86 +++++-----
 hive/indexer/custom_op.py       |  21 +--
 hive/indexer/post_data_cache.py |   4 +-
 hive/indexer/posts.py           |   8 +-
 hive/indexer/sync.py            |  57 ++++---
 hive/indexer/tags.py            |   5 +-
 hive/indexer/votes.py           |   4 +
 hive/utils/stats.py             | 282 ++++++++++++++++++++++++++++++++
 scripts/ci_sync.sh              |   2 +-
 setup.py                        |   4 +-
 tests/tests_api                 |   2 +-
 13 files changed, 390 insertions(+), 90 deletions(-)

diff --git a/hive/cli.py b/hive/cli.py
index 0269fda4e..e6d7f3a89 100644
--- a/hive/cli.py
+++ b/hive/cli.py
@@ -6,6 +6,7 @@ import os
 import logging
 from hive.conf import Conf
 from hive.db.adapter import Db
+from hive.utils.stats import PrometheusClient
 
 logging.basicConfig()
 
@@ -15,6 +16,7 @@ def run():
     conf = Conf.init_argparse()
     Db.set_shared_instance(conf.db())
     mode = conf.mode()
+    PrometheusClient( conf.get('prometheus_port') )
 
     pid_file_name = conf.pid_file()
     if pid_file_name is not None:
diff --git a/hive/conf.py b/hive/conf.py
index b2dc0fc82..4099a6a47 100644
--- a/hive/conf.py
+++ b/hive/conf.py
@@ -35,8 +35,10 @@ class Conf():
         add('--steemd-url', env_var='STEEMD_URL', required=False, help='steemd/jussi endpoint', default='{"default" : "https://api.hive.blog"}')
         add('--muted-accounts-url', env_var='MUTED_ACCOUNTS_URL', required=False, help='url to flat list of muted accounts', default='https://raw.githubusercontent.com/hivevectordefense/irredeemables/master/full.txt')
         add('--blacklist-api-url', env_var='BLACKLIST_API_URL', required=False, help='url to acccess blacklist api', default='https://blacklist.usehive.com')
+
         # server
         add('--http-server-port', type=int, env_var='HTTP_SERVER_PORT', default=8080)
+        add('--prometheus-port', type=int, env_var='PROMETHEUS_PORT', required=False, help='if specified, runs prometheus deamon on specified port, which provide statistic and performance data')
 
         # sync
         add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4)
@@ -112,6 +114,7 @@ class Conf():
     def get(self, param):
         """Reads a single property, e.g. `database_url`."""
         assert self._args, "run init_argparse()"
+        print(self._args)
         return self._args[param]
 
     def mode(self):
diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py
index 589d3c9ee..788ad73c7 100644
--- a/hive/indexer/blocks.py
+++ b/hive/indexer/blocks.py
@@ -15,6 +15,9 @@ from hive.indexer.post_data_cache import PostDataCache
 from hive.indexer.tags import Tags
 from time import perf_counter
 
+from hive.utils.stats import OPStatusManager as OPSM
+from hive.utils.stats import FlushStatusManager as FSM
+
 log = logging.getLogger(__name__)
 
 DB = Db.instance()
@@ -22,7 +25,6 @@ DB = Db.instance()
 class Blocks:
     """Processes blocks, dispatches work, manages `hive_blocks` table."""
     blocks_to_flush = []
-    ops_stats = {}
     _head_block_date = None # timestamp of last fully processed block ("previous block")
     _current_block_date = None # timestamp of block currently being processes ("current block")
 
@@ -35,17 +37,6 @@ class Blocks:
             cls._head_block_date = head_date
             cls._current_block_date = head_date
 
-    @staticmethod
-    def merge_ops_stats(od1, od2):
-        if od2 is not None:
-            for k, v in od2.items():
-                if k in od1:
-                    od1[k] += v
-                else:
-                    od1[k] = v
-
-        return od1
-
     @classmethod
     def head_num(cls):
         """Get hive's head block number."""
@@ -76,7 +67,7 @@ class Blocks:
     @classmethod
     def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
         """Batch-process blocks; wrapped in a transaction."""
-        time_start = perf_counter()
+        time_start = OPSM.start()
         DB.query("START TRANSACTION")
 
         last_num = 0
@@ -90,73 +81,78 @@ class Blocks:
         # Follows flushing needs to be atomic because recounts are
         # expensive. So is tracking follows at all; hence we track
         # deltas in memory and update follow/er counts in bulk.
-        PostDataCache.flush()
-        Tags.flush()
-        Votes.flush()
-        cls._flush_blocks()
-        Follow.flush(trx=False)
-        Posts.flush()
+
+        flush_time = FSM.start()
+        def register_time(f_time, name, pushed):
+            assert pushed is not None
+            FSM.flush_stat(name, FSM.stop(f_time), pushed)
+            return FSM.start()
+
+        log.info("#############################################################################")
+        flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
+        flush_time = register_time(flush_time, "Tags", Tags.flush())
+        flush_time = register_time(flush_time, "Votes", Votes.flush())
+        flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
+        folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
+        flush_time = register_time(flush_time, "Follow", folllow_items)
+        flush_time = register_time(flush_time, "Posts", Posts.flush())
 
         DB.query("COMMIT")
-        time_end = perf_counter()
-        log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start)
 
-        return cls.ops_stats
+        log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
 
     @staticmethod
     def prepare_vops(comment_payout_ops, vopsList, date):
         vote_ops = {}
-        ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 }
+
         inefficient_deleted_ops = {}
+        registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation', 'ineffective_delete_comment_operation']
 
         for vop in vopsList:
+            start = OPSM.start()
             key = None
             val = None
 
             op_type = vop['type']
             op_value = vop['value']
             key = "{}/{}".format(op_value['author'], op_value['permlink'])
-  
-            if op_type == 'author_reward_operation':
-                ops_stats[ 'author_reward_operation' ] += 1
 
+            if op_type == 'author_reward_operation':
                 if key not in comment_payout_ops:
-                  comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
+                    comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
 
                 comment_payout_ops[key][op_type] = op_value
 
             elif op_type == 'comment_reward_operation':
-                ops_stats[ 'comment_reward_operation' ] += 1
-
                 if key not in comment_payout_ops:
-                  comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
+                    comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
 
                 comment_payout_ops[key]['effective_comment_vote_operation'] = None
 
                 comment_payout_ops[key][op_type] = op_value
 
             elif op_type == 'effective_comment_vote_operation':
-                ops_stats[ 'effective_comment_vote_operation' ] += 1
                 key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
                 vote_ops[ key_vote ] = op_value
 
                 if key not in comment_payout_ops:
-                  comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
+                    comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
 
                 comment_payout_ops[key][op_type] = op_value
 
             elif op_type == 'comment_payout_update_operation':
-                ops_stats[ 'comment_payout_update_operation' ] += 1
-
                 if key not in comment_payout_ops:
-                  comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
+                    comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
 
                 comment_payout_ops[key][op_type] = op_value
             elif op_type == 'ineffective_delete_comment_operation':
               ops_stats[ 'ineffective_delete_comment_operation' ] += 1
               inefficient_deleted_ops[key] = {}
 
-        return (vote_ops, ops_stats, inefficient_deleted_ops)
+            if op_type in registered_ops_stats:
+                OPSM.op_stats(op_type, OPSM.stop(start))
+
+        return (vote_ops, inefficient_deleted_ops)
 
 
     @classmethod
@@ -180,14 +176,15 @@ class Blocks:
 
         if is_initial_sync:
             if num in virtual_operations:
-              (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
+              (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
         else:
             vops = hived.get_virtual_operations(num)
-            (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
+            (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
 
         json_ops = []
         for tx_idx, tx in enumerate(block['transactions']):
             for operation in tx['operations']:
+                start = OPSM.start()
                 op_type = operation['type']
                 op = operation['value']
 
@@ -238,23 +235,16 @@ class Blocks:
                     json_ops.append(op)
 
                 if op_type != 'custom_json_operation':
-                    if op_type in cls.ops_stats:
-                        cls.ops_stats[op_type] += 1
-                    else:
-                        cls.ops_stats[op_type] = 1
+                    OPSM.op_stats(op_type, OPSM.stop(start))
 
         # follow/reblog/community ops
         if json_ops:
-            custom_ops_stats = CustomOp.process_ops(json_ops, num, cls._head_block_date)
-            cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats)
+            CustomOp.process_ops(json_ops, num, cls._head_block_date)
 
         if vote_ops is not None:
             for k, v in vote_ops.items():
                 Votes.effective_comment_vote_op(k, v)
 
-        if Posts.comment_payout_ops:
-            cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats)
-
         cls._head_block_date = cls._current_block_date
 
         return num
@@ -328,7 +318,9 @@ class Blocks:
                                                                   block['prev'], block['txs'],
                                                                   block['ops'], block['date']))
         DB.query(query + ",".join(values))
+        n = len(cls.blocks_to_flush)
         cls.blocks_to_flush = []
+        return n
 
     @classmethod
     def _pop(cls, blocks):
diff --git a/hive/indexer/custom_op.py b/hive/indexer/custom_op.py
index d41305679..27289e65e 100644
--- a/hive/indexer/custom_op.py
+++ b/hive/indexer/custom_op.py
@@ -15,6 +15,8 @@ from hive.indexer.community import process_json_community_op, START_BLOCK
 from hive.utils.normalize import load_json_key
 from hive.utils.json import valid_op_json, valid_date, valid_command, valid_keys
 
+from hive.utils.stats import OPStatusManager as OPSM
+
 DB = Db.instance()
 
 log = logging.getLogger(__name__)
@@ -39,22 +41,10 @@ class CustomOp:
 
     @classmethod
     def process_ops(cls, ops, block_num, block_date):
-        ops_stats = {}
-
         """Given a list of operation in block, filter and process them."""
         for op in ops:
-            if op['id'] not in ['follow', 'community', 'notify']:
-                opName = str(op['id']) + '-ignored'
-                if(opName  in ops_stats):
-                    ops_stats[opName] += 1
-                else:
-                    ops_stats[opName] = 1
-                continue
-
-            if(op['id'] in ops_stats):
-                ops_stats[op['id']] += 1
-            else:
-                ops_stats[op['id']] = 1
+            start = OPSM.start()
+            opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' )
 
             account = _get_auth(op)
             if not account:
@@ -70,7 +60,8 @@ class CustomOp:
                     process_json_community_op(account, op_json, block_date)
             elif op['id'] == 'notify':
                 cls._process_notify(account, op_json, block_date)
-        return ops_stats
+            
+            OPSM.op_stats(opName, OPSM.stop(start))
 
     @classmethod
     def _process_notify(cls, account, op_json, block_date):
diff --git a/hive/indexer/post_data_cache.py b/hive/indexer/post_data_cache.py
index 8c578b60e..5cdcc3a4b 100644
--- a/hive/indexer/post_data_cache.py
+++ b/hive/indexer/post_data_cache.py
@@ -67,4 +67,6 @@ class PostDataCache(object):
                 log.info("Executing query:\n{}".format(sql))
 
             DB.query(sql)
-            cls._data.clear()
+        n = len(cls._data.keys())
+        cls._data.clear()
+        return n
diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py
index aaa65a18b..4f2b5350f 100644
--- a/hive/indexer/posts.py
+++ b/hive/indexer/posts.py
@@ -211,7 +211,9 @@ class Posts:
             actual_query = sql.format(values_str)
             DB.query(actual_query)
 
+        n = len(cls._comment_payout_ops)
         cls._comment_payout_ops.clear()
+        return n
 
     @classmethod
     def comment_payout_op(cls):
@@ -315,7 +317,10 @@ class Posts:
               "NULL" if ( cashout_time is None ) else ( "'{}'::timestamp".format( cashout_time ) ),
 
               "NULL" if ( is_paidout is None ) else is_paidout ))
+        
+        n = len(cls.comment_payout_ops)
         cls.comment_payout_ops.clear()
+        return n
 
     @classmethod
     def update_child_count(cls, child_id, op='+'):
@@ -449,5 +454,4 @@ class Posts:
 
     @classmethod
     def flush(cls):
-      cls.comment_payout_op()
-      cls.flush_into_db()
+      return cls.comment_payout_op() + cls.flush_into_db()
diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py
index 9c6488a82..bde43da27 100644
--- a/hive/indexer/sync.py
+++ b/hive/indexer/sync.py
@@ -26,26 +26,23 @@ from hive.indexer.follow import Follow
 from hive.indexer.community import Community
 from hive.server.common.mutes import Mutes
 
+from hive.utils.stats import OPStatusManager as OPSM
+from hive.utils.stats import FlushStatusManager as FSM
+from hive.utils.stats import WaitingStatusManager as WSM
+from hive.utils.stats import PrometheusClient as PC
+from hive.utils.stats import BroadcastObject
+
 log = logging.getLogger(__name__)
 
 CONTINUE_PROCESSING = True
 
-def print_ops_stats(prefix, ops_stats):
-    log.info("############################################################################")
-    log.info(prefix)
-    sorted_stats = sorted(ops_stats.items(), key=lambda kv: kv[1], reverse=True)
-    for (k, v) in sorted_stats:
-        log.info("`{}': {}".format(k, v))
-
-    log.info("############################################################################")
-
 def prepare_vops(vops_by_block):
     preparedVops = {}
 
     for blockNum, blockDict in vops_by_block.items():
         vopsList = blockDict['ops']
         preparedVops[blockNum] = vopsList
-  
+
     return preparedVops
 
 def _block_provider(node, queue, lbound, ubound, chunk_size):
@@ -93,13 +90,16 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
         log.exception("Exception caught during fetching vops...")
 
 def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
+    is_debug = log.isEnabledFor(10)
+
     num = 0
+    time_start = OPSM.start()
     try:
         count = ubound - lbound
         timer = Timer(count, entity='block', laps=['rps', 'wps'])
-        total_ops_stats = {}
-        time_start = perf()
         while lbound < ubound:
+
+            wait_time = WSM.start()
             if blocksQueue.empty() and CONTINUE_PROCESSING:
                 log.info("Awaiting any block to process...")
             
@@ -107,7 +107,9 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
             if not blocksQueue.empty() or CONTINUE_PROCESSING:
                 blocks = blocksQueue.get()
                 blocksQueue.task_done()
+            WSM.wait_stat('block_consumer_block', WSM.stop(wait_time))
 
+            wait_time = WSM.start()
             if vopsQueue.empty() and CONTINUE_PROCESSING:
                 log.info("Awaiting any vops to process...")
             
@@ -115,14 +117,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
             if not vopsQueue.empty() or CONTINUE_PROCESSING:
                 preparedVops = vopsQueue.get()
                 vopsQueue.task_done()
+            WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time))
 
             to = min(lbound + chunk_size, ubound)
 
             timer.batch_start()
             
             block_start = perf()
-            ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync))
-            Blocks.ops_stats.clear()
+            Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
             block_end = perf()
 
             timer.batch_lap()
@@ -134,25 +136,38 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
             log.info(timer.batch_status(prefix))
             log.info("[SYNC] Time elapsed: %fs", time_current - time_start)
 
-            total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
 
-            if block_end - block_start > 1.0:
-                print_ops_stats("Operations present in the processed blocks:", ops_stats)
+            if block_end - block_start > 1.0 or is_debug:
+                otm = OPSM.log_current("Operations present in the processed blocks")
+                ftm = FSM.log_current("Flushing times")
+                wtm = WSM.log_current("Waiting times")
+                log.info(f"Calculated time: {otm+ftm+wtm :.4f} s.")
+
+            OPSM.next_blocks()
+            FSM.next_blocks()
+            WSM.next_blocks()
 
             lbound = to
+            PC.broadcast(BroadcastObject('sync_current_block', lbound, 'blocks'))
 
             num = num + 1
 
             if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
                 break
-
-        print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
-        return num
     except KeyboardInterrupt:
         log.info("Caught SIGINT")
-
     except Exception:
         log.exception("Exception caught during processing blocks...")
+    finally:
+        stop = OPSM.stop(time_start)
+        log.info("=== TOTAL STATS ===")
+        wtm = WSM.log_global("Total waiting times")
+        ftm = FSM.log_global("Total flush times")
+        otm = OPSM.log_global("All operations present in the processed blocks")
+        ttm = ftm + otm + wtm
+        log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
+        log.info("=== TOTAL STATS ===")
+        return num
 
 def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
     blocksQueue = queue.Queue(maxsize=10)
diff --git a/hive/indexer/tags.py b/hive/indexer/tags.py
index 588090526..44d942395 100644
--- a/hive/indexer/tags.py
+++ b/hive/indexer/tags.py
@@ -68,4 +68,7 @@ class Tags(object):
                 tag_query = str(sql)
                 DB.query(tag_query.format(','.join(values)))
                 values.clear()
-            cls._tags.clear()
+            
+        n = len(cls._tags)
+        cls._tags.clear()
+        return n
diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py
index 2e62d0187..e5387d385 100644
--- a/hive/indexer/votes.py
+++ b/hive/indexer/votes.py
@@ -123,6 +123,7 @@ class Votes:
     def flush(cls):
         """ Flush vote data from cache to database """
         cls.inside_flush = True
+        n = 0
         if cls._votes_data:
             sql = """
                 INSERT INTO hive_votes
@@ -186,5 +187,8 @@ class Votes:
                 DB.query(actual_query)
                 values_override.clear()
 
+            n = len(cls._votes_data)
             cls._votes_data.clear()
         cls.inside_flush = False
+        return n
+
diff --git a/hive/utils/stats.py b/hive/utils/stats.py
index 499c34c5a..a7c3ae7fb 100644
--- a/hive/utils/stats.py
+++ b/hive/utils/stats.py
@@ -3,11 +3,293 @@
 import atexit
 import logging
 
+from queue import Queue
 from time import perf_counter as perf
 from hive.utils.system import colorize, peak_usage_mb
+from psutil import pid_exists
+from os import getpid
 
 log = logging.getLogger(__name__)
 
+class BroadcastObject:
+    def __init__(self, category, value, unit):
+        self.category = category
+        self.value = value
+        self.unit = unit
+
+    def name(self):
+        return f"hivemind_{self.category}"
+
+    def debug(self):
+        log.debug(f"{self.name()}_{self.unit}: {self.value :.2f}")
+
+class PrometheusClient:
+
+    deamon = None
+    logs_to_broadcast = Queue()
+
+    @staticmethod
+    def work( port, pid ):
+        try:
+            import prometheus_client as prom
+            prom.start_http_server(port)
+
+            gauges = {}
+
+            while pid_exists(pid):
+                value : BroadcastObject = PrometheusClient.logs_to_broadcast.get()
+                value.debug()
+                value_name = value.name()
+
+                if value_name not in gauges.keys():
+                    gauge = prom.Gauge(value_name, '', unit=value.unit)
+                    gauge.set(value.value)
+                    gauges[value_name] = gauge
+                else:
+                    gauges[value_name].set(value.value)
+
+        except Exception as e:
+            log.error(f"Prometheus logging failed. Exception\n {e}")
+
+    def __init__(self, port):
+        if port is None:
+            return
+        else:
+            port = int(port)
+        if PrometheusClient.deamon is None:
+            try:
+                import prometheus_client
+            except ImportError:
+                log.warn("Failed to import prometheus client. Online stats disabled")
+                return
+            from threading import Thread
+            deamon = Thread(target=PrometheusClient.work, args=[ port, getpid() ], daemon=True)
+            deamon.start()
+
+    @staticmethod
+    def broadcast(obj):
+        if type(obj) == type(list()):
+            for v in obj:
+                PrometheusClient.broadcast(v)
+        elif type(obj) == type(BroadcastObject('', '', '')):
+            PrometheusClient.logs_to_broadcast.put(obj)
+        else:
+            raise Exception(f"Not expexcted type. Should be list or BroadcastObject, but: {type(obj)} given")
+
+class Stat:
+    def __init__(self, time):
+        self.time = time
+
+    def update(self, other):
+        assert type(self) == type(other)
+        attributes = self.__dict__
+        oatte = other.__dict__
+        for key, val in attributes.items():
+            setattr(self, key, oatte[key] + val)
+        return self
+
+    def __repr__(self):
+        return self.__dict__
+
+    def __lt__(self, other):
+        return self.time < other.time
+
+    def broadcast(self, name):
+        return BroadcastObject(name, self.time, 's')
+
+class StatusManager:
+
+    # Fully abstract class
+    def __init__(self):
+        assert False
+
+    @staticmethod
+    def start():
+        return perf()
+
+    @staticmethod
+    def stop( start : float ):
+        return perf() - start
+
+    @staticmethod
+    def merge_dicts(od1, od2, broadcast : bool = False):
+        if od2 is not None:
+            for k, v in od2.items():
+                if k in od1:
+                    od1[k].update(v)
+                    if broadcast:
+                        PrometheusClient.broadcast(v.broadcast(k))
+                else:
+                    od1[k] = v
+
+        return od1
+
+    @staticmethod
+    def log_dict(col : dict) -> float:
+        sorted_stats = sorted(col.items(), key=lambda kv: kv[1], reverse=True)
+        measured_time = 0.0
+        for (k, v) in sorted_stats:
+            log.info("`{}`: {}".format(k, v))
+            measured_time += v.time
+        return measured_time
+
+    @staticmethod
+    def print_row():
+        log.info("#" * 20)
+
+class OPStat(Stat):
+    def __init__(self, time, count):
+        super().__init__(time)
+        self.count = count
+
+    def __str__(self):
+        return f"Processed {self.count :.0f} times in {self.time :.5f} seconds"
+
+    def broadcast(self, name : str):
+        n = name.lower()
+        if not n.endswith('operation'):
+            n = f"{n}_operation"
+        return list([ super().broadcast(n), BroadcastObject(n, self.count, 'b') ])
+
+class OPStatusManager(StatusManager):
+    # Summary for whole sync
+    global_stats = {}
+
+    # Currently processed blocks stats, merged to global stats, after `next_block`
+    cpbs = {}
+
+    @staticmethod
+    def op_stats( name, time, processed = 1 ):
+        if name in OPStatusManager.cpbs.keys():
+            OPStatusManager.cpbs[name].time += time
+            OPStatusManager.cpbs[name].count += processed
+        else:
+            OPStatusManager.cpbs[name] = OPStat(time, processed)
+
+    @staticmethod
+    def next_blocks():
+        OPStatusManager.global_stats = StatusManager.merge_dicts(
+            OPStatusManager.global_stats, 
+            OPStatusManager.cpbs,
+            True
+        )
+        OPStatusManager.cpbs.clear()
+
+    @staticmethod
+    def log_global(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(OPStatusManager.global_stats)
+        log.info(f"Total time for processing operations time: {tm :.4f}s.")
+        return tm
+
+
+    @staticmethod
+    def log_current(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(OPStatusManager.cpbs)
+        log.info(f"Current time for processing operations time: {tm :.4f}s.")
+        return tm
+
+class FlushStat(Stat):
+    def __init__(self, time, pushed):
+        super().__init__(time)
+        self.pushed = pushed
+
+    def __str__(self):
+        return f"Pushed {self.pushed :.0f} records in {self.time :.4f} seconds"
+
+    def broadcast(self, name : str):
+        n = f"flushing_{name.lower()}"
+        return list([ super().broadcast(n), BroadcastObject(n, self.pushed, 'b') ])
+
+class FlushStatusManager(StatusManager):
+    # Summary for whole sync
+    global_stats = {}
+
+    # Currently processed blocks stats, merged to global stats, after `next_block`
+    current_flushes = {}
+
+    @staticmethod
+    def flush_stat(name, time, pushed):
+        if name in FlushStatusManager.current_flushes.keys():
+            FlushStatusManager.current_flushes[name].time += time
+            FlushStatusManager.current_flushes[name].pushed += pushed
+        else:
+            FlushStatusManager.current_flushes[name] = FlushStat(time, pushed)
+
+    @staticmethod
+    def next_blocks():
+        FlushStatusManager.global_stats = StatusManager.merge_dicts(
+            FlushStatusManager.global_stats, 
+            FlushStatusManager.current_flushes,
+            True
+        )
+        FlushStatusManager.current_flushes.clear()
+
+    @staticmethod
+    def log_global(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(FlushStatusManager.global_stats)
+        log.info(f"Total flushing time: {tm :.4f}s.")
+        return tm
+
+    @staticmethod
+    def log_current(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(FlushStatusManager.current_flushes)
+        log.info(f"Current flushing time: {tm :.4f}s.")
+        return tm
+
+class WaitStat(Stat):
+    def __init__(self, time):
+        super().__init__(time)
+
+    def __str__(self):
+        return f"Waited {self.time :.4f} seconds"
+
+class WaitingStatusManager(StatusManager):
+    # Summary for whole sync
+    global_stats = {}
+
+    # Currently processed blocks stats, merged to global stats, after `next_block`
+    current_waits = {}
+
+    @staticmethod
+    def wait_stat(name, time):
+        if name in WaitingStatusManager.current_waits.keys():
+            WaitingStatusManager.current_waits[name].time += time
+        else:
+            WaitingStatusManager.current_waits[name] = WaitStat(time)
+
+    @staticmethod
+    def next_blocks():
+        WaitingStatusManager.global_stats = StatusManager.merge_dicts(
+            WaitingStatusManager.global_stats, 
+            WaitingStatusManager.current_waits,
+            True
+        )
+        WaitingStatusManager.current_waits.clear()
+
+    @staticmethod
+    def log_global(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(WaitingStatusManager.global_stats)
+        log.info(f"Total waiting time: {tm :.4f}s.")
+        return tm
+
+    @staticmethod
+    def log_current(label : str):
+        StatusManager.print_row()
+        log.info(label)
+        tm = StatusManager.log_dict(WaitingStatusManager.current_waits)
+        log.info(f"Current waiting time: {tm :.4f}s.")
+        return tm
+
 def _normalize_sql(sql, maxlen=180):
     """Collapse whitespace and middle-truncate if needed."""
     out = ' '.join(sql.split())
diff --git a/scripts/ci_sync.sh b/scripts/ci_sync.sh
index 9907cda99..53e640683 100755
--- a/scripts/ci_sync.sh
+++ b/scripts/ci_sync.sh
@@ -54,5 +54,5 @@ psql -U $POSTGRES_USER -h localhost -d postgres -c "CREATE DATABASE $DB_NAME;"
 
 echo Attempting to starting hive sync using hived node: $HIVEMIND_SOURCE_HIVED_URL . Max sync block is: $HIVEMIND_MAX_BLOCK
 echo Attempting to access database $DB_URL
-./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log
+./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --prometheus-port 11011 --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log
 rm hive_sync.pid
diff --git a/setup.py b/setup.py
index ee620402e..9cc251e10 100644
--- a/setup.py
+++ b/setup.py
@@ -61,7 +61,9 @@ setup(
         'aiocache',
         'configargparse',
         'pdoc',
-        'diff-match-patch'
+        'diff-match-patch',
+        'prometheus-client',
+        'psutil'
     ],
     extras_require={'test': tests_require},
     entry_points={
diff --git a/tests/tests_api b/tests/tests_api
index d23060b52..39875d086 160000
--- a/tests/tests_api
+++ b/tests/tests_api
@@ -1 +1 @@
-Subproject commit d23060b52e4e773308f7bafa666bef231c0e49ed
+Subproject commit 39875d086ab82a7377a0799784e858666ae5e62e
-- 
GitLab