diff --git a/hive/cli.py b/hive/cli.py index 0269fda4edb4ad318e8ab66c59e3ad8175bbd166..e6d7f3a8975955047bbe3ddcac84c1bb559bfc52 100644 --- a/hive/cli.py +++ b/hive/cli.py @@ -6,6 +6,7 @@ import os import logging from hive.conf import Conf from hive.db.adapter import Db +from hive.utils.stats import PrometheusClient logging.basicConfig() @@ -15,6 +16,7 @@ def run(): conf = Conf.init_argparse() Db.set_shared_instance(conf.db()) mode = conf.mode() + PrometheusClient( conf.get('prometheus_port') ) pid_file_name = conf.pid_file() if pid_file_name is not None: diff --git a/hive/conf.py b/hive/conf.py index b2dc0fc827dc11472bd81df99a22cf8c3bbaadd1..4099a6a47b70bdce3eef746facd4544267883196 100644 --- a/hive/conf.py +++ b/hive/conf.py @@ -35,8 +35,10 @@ class Conf(): add('--steemd-url', env_var='STEEMD_URL', required=False, help='steemd/jussi endpoint', default='{"default" : "https://api.hive.blog"}') add('--muted-accounts-url', env_var='MUTED_ACCOUNTS_URL', required=False, help='url to flat list of muted accounts', default='https://raw.githubusercontent.com/hivevectordefense/irredeemables/master/full.txt') add('--blacklist-api-url', env_var='BLACKLIST_API_URL', required=False, help='url to acccess blacklist api', default='https://blacklist.usehive.com') + # server add('--http-server-port', type=int, env_var='HTTP_SERVER_PORT', default=8080) + add('--prometheus-port', type=int, env_var='PROMETHEUS_PORT', required=False, help='if specified, runs prometheus deamon on specified port, which provide statistic and performance data') # sync add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4) @@ -112,6 +114,7 @@ class Conf(): def get(self, param): """Reads a single property, e.g. `database_url`.""" assert self._args, "run init_argparse()" + print(self._args) return self._args[param] def mode(self): diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 589d3c9ee16f689b7fee2d76d689df582b46ec22..788ad73c7ce764edecf578ddd9e9da01614e3e41 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -15,6 +15,9 @@ from hive.indexer.post_data_cache import PostDataCache from hive.indexer.tags import Tags from time import perf_counter +from hive.utils.stats import OPStatusManager as OPSM +from hive.utils.stats import FlushStatusManager as FSM + log = logging.getLogger(__name__) DB = Db.instance() @@ -22,7 +25,6 @@ DB = Db.instance() class Blocks: """Processes blocks, dispatches work, manages `hive_blocks` table.""" blocks_to_flush = [] - ops_stats = {} _head_block_date = None # timestamp of last fully processed block ("previous block") _current_block_date = None # timestamp of block currently being processes ("current block") @@ -35,17 +37,6 @@ class Blocks: cls._head_block_date = head_date cls._current_block_date = head_date - @staticmethod - def merge_ops_stats(od1, od2): - if od2 is not None: - for k, v in od2.items(): - if k in od1: - od1[k] += v - else: - od1[k] = v - - return od1 - @classmethod def head_num(cls): """Get hive's head block number.""" @@ -76,7 +67,7 @@ class Blocks: @classmethod def process_multi(cls, blocks, vops, hived, is_initial_sync=False): """Batch-process blocks; wrapped in a transaction.""" - time_start = perf_counter() + time_start = OPSM.start() DB.query("START TRANSACTION") last_num = 0 @@ -90,73 +81,78 @@ class Blocks: # Follows flushing needs to be atomic because recounts are # expensive. So is tracking follows at all; hence we track # deltas in memory and update follow/er counts in bulk. - PostDataCache.flush() - Tags.flush() - Votes.flush() - cls._flush_blocks() - Follow.flush(trx=False) - Posts.flush() + + flush_time = FSM.start() + def register_time(f_time, name, pushed): + assert pushed is not None + FSM.flush_stat(name, FSM.stop(f_time), pushed) + return FSM.start() + + log.info("#############################################################################") + flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush()) + flush_time = register_time(flush_time, "Tags", Tags.flush()) + flush_time = register_time(flush_time, "Votes", Votes.flush()) + flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) + folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) + flush_time = register_time(flush_time, "Follow", folllow_items) + flush_time = register_time(flush_time, "Posts", Posts.flush()) DB.query("COMMIT") - time_end = perf_counter() - log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start) - return cls.ops_stats + log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") @staticmethod def prepare_vops(comment_payout_ops, vopsList, date): vote_ops = {} - ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 } + inefficient_deleted_ops = {} + registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation', 'ineffective_delete_comment_operation'] for vop in vopsList: + start = OPSM.start() key = None val = None op_type = vop['type'] op_value = vop['value'] key = "{}/{}".format(op_value['author'], op_value['permlink']) - - if op_type == 'author_reward_operation': - ops_stats[ 'author_reward_operation' ] += 1 + if op_type == 'author_reward_operation': if key not in comment_payout_ops: - comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } + comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } comment_payout_ops[key][op_type] = op_value elif op_type == 'comment_reward_operation': - ops_stats[ 'comment_reward_operation' ] += 1 - if key not in comment_payout_ops: - comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } + comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } comment_payout_ops[key]['effective_comment_vote_operation'] = None comment_payout_ops[key][op_type] = op_value elif op_type == 'effective_comment_vote_operation': - ops_stats[ 'effective_comment_vote_operation' ] += 1 key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink']) vote_ops[ key_vote ] = op_value if key not in comment_payout_ops: - comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } + comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } comment_payout_ops[key][op_type] = op_value elif op_type == 'comment_payout_update_operation': - ops_stats[ 'comment_payout_update_operation' ] += 1 - if key not in comment_payout_ops: - comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } + comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date } comment_payout_ops[key][op_type] = op_value elif op_type == 'ineffective_delete_comment_operation': ops_stats[ 'ineffective_delete_comment_operation' ] += 1 inefficient_deleted_ops[key] = {} - return (vote_ops, ops_stats, inefficient_deleted_ops) + if op_type in registered_ops_stats: + OPSM.op_stats(op_type, OPSM.stop(start)) + + return (vote_ops, inefficient_deleted_ops) @classmethod @@ -180,14 +176,15 @@ class Blocks: if is_initial_sync: if num in virtual_operations: - (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date) + (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date) else: vops = hived.get_virtual_operations(num) - (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date) + (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date) json_ops = [] for tx_idx, tx in enumerate(block['transactions']): for operation in tx['operations']: + start = OPSM.start() op_type = operation['type'] op = operation['value'] @@ -238,23 +235,16 @@ class Blocks: json_ops.append(op) if op_type != 'custom_json_operation': - if op_type in cls.ops_stats: - cls.ops_stats[op_type] += 1 - else: - cls.ops_stats[op_type] = 1 + OPSM.op_stats(op_type, OPSM.stop(start)) # follow/reblog/community ops if json_ops: - custom_ops_stats = CustomOp.process_ops(json_ops, num, cls._head_block_date) - cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats) + CustomOp.process_ops(json_ops, num, cls._head_block_date) if vote_ops is not None: for k, v in vote_ops.items(): Votes.effective_comment_vote_op(k, v) - if Posts.comment_payout_ops: - cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats) - cls._head_block_date = cls._current_block_date return num @@ -328,7 +318,9 @@ class Blocks: block['prev'], block['txs'], block['ops'], block['date'])) DB.query(query + ",".join(values)) + n = len(cls.blocks_to_flush) cls.blocks_to_flush = [] + return n @classmethod def _pop(cls, blocks): diff --git a/hive/indexer/custom_op.py b/hive/indexer/custom_op.py index d41305679a6e6fc8cb48dbb144497fea97cd68e4..27289e65ec099d0d5377b6aa5a12f9905e8de9dd 100644 --- a/hive/indexer/custom_op.py +++ b/hive/indexer/custom_op.py @@ -15,6 +15,8 @@ from hive.indexer.community import process_json_community_op, START_BLOCK from hive.utils.normalize import load_json_key from hive.utils.json import valid_op_json, valid_date, valid_command, valid_keys +from hive.utils.stats import OPStatusManager as OPSM + DB = Db.instance() log = logging.getLogger(__name__) @@ -39,22 +41,10 @@ class CustomOp: @classmethod def process_ops(cls, ops, block_num, block_date): - ops_stats = {} - """Given a list of operation in block, filter and process them.""" for op in ops: - if op['id'] not in ['follow', 'community', 'notify']: - opName = str(op['id']) + '-ignored' - if(opName in ops_stats): - ops_stats[opName] += 1 - else: - ops_stats[opName] = 1 - continue - - if(op['id'] in ops_stats): - ops_stats[op['id']] += 1 - else: - ops_stats[op['id']] = 1 + start = OPSM.start() + opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' ) account = _get_auth(op) if not account: @@ -70,7 +60,8 @@ class CustomOp: process_json_community_op(account, op_json, block_date) elif op['id'] == 'notify': cls._process_notify(account, op_json, block_date) - return ops_stats + + OPSM.op_stats(opName, OPSM.stop(start)) @classmethod def _process_notify(cls, account, op_json, block_date): diff --git a/hive/indexer/post_data_cache.py b/hive/indexer/post_data_cache.py index 8c578b60e5daf93bd706876fa56e2c4bee382a37..5cdcc3a4b00f40de89f7087486b19a06958caa13 100644 --- a/hive/indexer/post_data_cache.py +++ b/hive/indexer/post_data_cache.py @@ -67,4 +67,6 @@ class PostDataCache(object): log.info("Executing query:\n{}".format(sql)) DB.query(sql) - cls._data.clear() + n = len(cls._data.keys()) + cls._data.clear() + return n diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index aaa65a18bd3f6d331d988ea5ca741ffd15eb3839..4f2b5350f08146194db8117c43c899c2e78d0396 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -211,7 +211,9 @@ class Posts: actual_query = sql.format(values_str) DB.query(actual_query) + n = len(cls._comment_payout_ops) cls._comment_payout_ops.clear() + return n @classmethod def comment_payout_op(cls): @@ -315,7 +317,10 @@ class Posts: "NULL" if ( cashout_time is None ) else ( "'{}'::timestamp".format( cashout_time ) ), "NULL" if ( is_paidout is None ) else is_paidout )) + + n = len(cls.comment_payout_ops) cls.comment_payout_ops.clear() + return n @classmethod def update_child_count(cls, child_id, op='+'): @@ -449,5 +454,4 @@ class Posts: @classmethod def flush(cls): - cls.comment_payout_op() - cls.flush_into_db() + return cls.comment_payout_op() + cls.flush_into_db() diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 9c6488a822264761695dccf80e085b2b5ba4aafc..bde43da27334c03ae7037d6bc63b736531a39b3d 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -26,26 +26,23 @@ from hive.indexer.follow import Follow from hive.indexer.community import Community from hive.server.common.mutes import Mutes +from hive.utils.stats import OPStatusManager as OPSM +from hive.utils.stats import FlushStatusManager as FSM +from hive.utils.stats import WaitingStatusManager as WSM +from hive.utils.stats import PrometheusClient as PC +from hive.utils.stats import BroadcastObject + log = logging.getLogger(__name__) CONTINUE_PROCESSING = True -def print_ops_stats(prefix, ops_stats): - log.info("############################################################################") - log.info(prefix) - sorted_stats = sorted(ops_stats.items(), key=lambda kv: kv[1], reverse=True) - for (k, v) in sorted_stats: - log.info("`{}': {}".format(k, v)) - - log.info("############################################################################") - def prepare_vops(vops_by_block): preparedVops = {} for blockNum, blockDict in vops_by_block.items(): vopsList = blockDict['ops'] preparedVops[blockNum] = vopsList - + return preparedVops def _block_provider(node, queue, lbound, ubound, chunk_size): @@ -93,13 +90,16 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size): log.exception("Exception caught during fetching vops...") def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): + is_debug = log.isEnabledFor(10) + num = 0 + time_start = OPSM.start() try: count = ubound - lbound timer = Timer(count, entity='block', laps=['rps', 'wps']) - total_ops_stats = {} - time_start = perf() while lbound < ubound: + + wait_time = WSM.start() if blocksQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any block to process...") @@ -107,7 +107,9 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun if not blocksQueue.empty() or CONTINUE_PROCESSING: blocks = blocksQueue.get() blocksQueue.task_done() + WSM.wait_stat('block_consumer_block', WSM.stop(wait_time)) + wait_time = WSM.start() if vopsQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any vops to process...") @@ -115,14 +117,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun if not vopsQueue.empty() or CONTINUE_PROCESSING: preparedVops = vopsQueue.get() vopsQueue.task_done() + WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time)) to = min(lbound + chunk_size, ubound) timer.batch_start() block_start = perf() - ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)) - Blocks.ops_stats.clear() + Blocks.process_multi(blocks, preparedVops, node, is_initial_sync) block_end = perf() timer.batch_lap() @@ -134,25 +136,38 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun log.info(timer.batch_status(prefix)) log.info("[SYNC] Time elapsed: %fs", time_current - time_start) - total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats) - if block_end - block_start > 1.0: - print_ops_stats("Operations present in the processed blocks:", ops_stats) + if block_end - block_start > 1.0 or is_debug: + otm = OPSM.log_current("Operations present in the processed blocks") + ftm = FSM.log_current("Flushing times") + wtm = WSM.log_current("Waiting times") + log.info(f"Calculated time: {otm+ftm+wtm :.4f} s.") + + OPSM.next_blocks() + FSM.next_blocks() + WSM.next_blocks() lbound = to + PC.broadcast(BroadcastObject('sync_current_block', lbound, 'blocks')) num = num + 1 if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): break - - print_ops_stats("All operations present in the processed blocks:", total_ops_stats) - return num except KeyboardInterrupt: log.info("Caught SIGINT") - except Exception: log.exception("Exception caught during processing blocks...") + finally: + stop = OPSM.stop(time_start) + log.info("=== TOTAL STATS ===") + wtm = WSM.log_global("Total waiting times") + ftm = FSM.log_global("Total flush times") + otm = OPSM.log_global("All operations present in the processed blocks") + ttm = ftm + otm + wtm + log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s") + log.info("=== TOTAL STATS ===") + return num def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blocksQueue = queue.Queue(maxsize=10) diff --git a/hive/indexer/tags.py b/hive/indexer/tags.py index 58809052633102265419bf3246cdc5cc6a458cc4..44d942395456ee39d48f8236b095f8f379ce0c05 100644 --- a/hive/indexer/tags.py +++ b/hive/indexer/tags.py @@ -68,4 +68,7 @@ class Tags(object): tag_query = str(sql) DB.query(tag_query.format(','.join(values))) values.clear() - cls._tags.clear() + + n = len(cls._tags) + cls._tags.clear() + return n diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 2e62d0187bcc3fe0b3c7b3c649a2de42d876b84a..e5387d3854f5b9a479203e2aa76dc3d7b2d44422 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -123,6 +123,7 @@ class Votes: def flush(cls): """ Flush vote data from cache to database """ cls.inside_flush = True + n = 0 if cls._votes_data: sql = """ INSERT INTO hive_votes @@ -186,5 +187,8 @@ class Votes: DB.query(actual_query) values_override.clear() + n = len(cls._votes_data) cls._votes_data.clear() cls.inside_flush = False + return n + diff --git a/hive/utils/stats.py b/hive/utils/stats.py index 499c34c5a9aabd14f5ed8674fb1b1bf873db9b92..a7c3ae7fb6d9cc5bc6daf917c20f3269da590195 100644 --- a/hive/utils/stats.py +++ b/hive/utils/stats.py @@ -3,11 +3,293 @@ import atexit import logging +from queue import Queue from time import perf_counter as perf from hive.utils.system import colorize, peak_usage_mb +from psutil import pid_exists +from os import getpid log = logging.getLogger(__name__) +class BroadcastObject: + def __init__(self, category, value, unit): + self.category = category + self.value = value + self.unit = unit + + def name(self): + return f"hivemind_{self.category}" + + def debug(self): + log.debug(f"{self.name()}_{self.unit}: {self.value :.2f}") + +class PrometheusClient: + + deamon = None + logs_to_broadcast = Queue() + + @staticmethod + def work( port, pid ): + try: + import prometheus_client as prom + prom.start_http_server(port) + + gauges = {} + + while pid_exists(pid): + value : BroadcastObject = PrometheusClient.logs_to_broadcast.get() + value.debug() + value_name = value.name() + + if value_name not in gauges.keys(): + gauge = prom.Gauge(value_name, '', unit=value.unit) + gauge.set(value.value) + gauges[value_name] = gauge + else: + gauges[value_name].set(value.value) + + except Exception as e: + log.error(f"Prometheus logging failed. Exception\n {e}") + + def __init__(self, port): + if port is None: + return + else: + port = int(port) + if PrometheusClient.deamon is None: + try: + import prometheus_client + except ImportError: + log.warn("Failed to import prometheus client. Online stats disabled") + return + from threading import Thread + deamon = Thread(target=PrometheusClient.work, args=[ port, getpid() ], daemon=True) + deamon.start() + + @staticmethod + def broadcast(obj): + if type(obj) == type(list()): + for v in obj: + PrometheusClient.broadcast(v) + elif type(obj) == type(BroadcastObject('', '', '')): + PrometheusClient.logs_to_broadcast.put(obj) + else: + raise Exception(f"Not expexcted type. Should be list or BroadcastObject, but: {type(obj)} given") + +class Stat: + def __init__(self, time): + self.time = time + + def update(self, other): + assert type(self) == type(other) + attributes = self.__dict__ + oatte = other.__dict__ + for key, val in attributes.items(): + setattr(self, key, oatte[key] + val) + return self + + def __repr__(self): + return self.__dict__ + + def __lt__(self, other): + return self.time < other.time + + def broadcast(self, name): + return BroadcastObject(name, self.time, 's') + +class StatusManager: + + # Fully abstract class + def __init__(self): + assert False + + @staticmethod + def start(): + return perf() + + @staticmethod + def stop( start : float ): + return perf() - start + + @staticmethod + def merge_dicts(od1, od2, broadcast : bool = False): + if od2 is not None: + for k, v in od2.items(): + if k in od1: + od1[k].update(v) + if broadcast: + PrometheusClient.broadcast(v.broadcast(k)) + else: + od1[k] = v + + return od1 + + @staticmethod + def log_dict(col : dict) -> float: + sorted_stats = sorted(col.items(), key=lambda kv: kv[1], reverse=True) + measured_time = 0.0 + for (k, v) in sorted_stats: + log.info("`{}`: {}".format(k, v)) + measured_time += v.time + return measured_time + + @staticmethod + def print_row(): + log.info("#" * 20) + +class OPStat(Stat): + def __init__(self, time, count): + super().__init__(time) + self.count = count + + def __str__(self): + return f"Processed {self.count :.0f} times in {self.time :.5f} seconds" + + def broadcast(self, name : str): + n = name.lower() + if not n.endswith('operation'): + n = f"{n}_operation" + return list([ super().broadcast(n), BroadcastObject(n, self.count, 'b') ]) + +class OPStatusManager(StatusManager): + # Summary for whole sync + global_stats = {} + + # Currently processed blocks stats, merged to global stats, after `next_block` + cpbs = {} + + @staticmethod + def op_stats( name, time, processed = 1 ): + if name in OPStatusManager.cpbs.keys(): + OPStatusManager.cpbs[name].time += time + OPStatusManager.cpbs[name].count += processed + else: + OPStatusManager.cpbs[name] = OPStat(time, processed) + + @staticmethod + def next_blocks(): + OPStatusManager.global_stats = StatusManager.merge_dicts( + OPStatusManager.global_stats, + OPStatusManager.cpbs, + True + ) + OPStatusManager.cpbs.clear() + + @staticmethod + def log_global(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(OPStatusManager.global_stats) + log.info(f"Total time for processing operations time: {tm :.4f}s.") + return tm + + + @staticmethod + def log_current(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(OPStatusManager.cpbs) + log.info(f"Current time for processing operations time: {tm :.4f}s.") + return tm + +class FlushStat(Stat): + def __init__(self, time, pushed): + super().__init__(time) + self.pushed = pushed + + def __str__(self): + return f"Pushed {self.pushed :.0f} records in {self.time :.4f} seconds" + + def broadcast(self, name : str): + n = f"flushing_{name.lower()}" + return list([ super().broadcast(n), BroadcastObject(n, self.pushed, 'b') ]) + +class FlushStatusManager(StatusManager): + # Summary for whole sync + global_stats = {} + + # Currently processed blocks stats, merged to global stats, after `next_block` + current_flushes = {} + + @staticmethod + def flush_stat(name, time, pushed): + if name in FlushStatusManager.current_flushes.keys(): + FlushStatusManager.current_flushes[name].time += time + FlushStatusManager.current_flushes[name].pushed += pushed + else: + FlushStatusManager.current_flushes[name] = FlushStat(time, pushed) + + @staticmethod + def next_blocks(): + FlushStatusManager.global_stats = StatusManager.merge_dicts( + FlushStatusManager.global_stats, + FlushStatusManager.current_flushes, + True + ) + FlushStatusManager.current_flushes.clear() + + @staticmethod + def log_global(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(FlushStatusManager.global_stats) + log.info(f"Total flushing time: {tm :.4f}s.") + return tm + + @staticmethod + def log_current(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(FlushStatusManager.current_flushes) + log.info(f"Current flushing time: {tm :.4f}s.") + return tm + +class WaitStat(Stat): + def __init__(self, time): + super().__init__(time) + + def __str__(self): + return f"Waited {self.time :.4f} seconds" + +class WaitingStatusManager(StatusManager): + # Summary for whole sync + global_stats = {} + + # Currently processed blocks stats, merged to global stats, after `next_block` + current_waits = {} + + @staticmethod + def wait_stat(name, time): + if name in WaitingStatusManager.current_waits.keys(): + WaitingStatusManager.current_waits[name].time += time + else: + WaitingStatusManager.current_waits[name] = WaitStat(time) + + @staticmethod + def next_blocks(): + WaitingStatusManager.global_stats = StatusManager.merge_dicts( + WaitingStatusManager.global_stats, + WaitingStatusManager.current_waits, + True + ) + WaitingStatusManager.current_waits.clear() + + @staticmethod + def log_global(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(WaitingStatusManager.global_stats) + log.info(f"Total waiting time: {tm :.4f}s.") + return tm + + @staticmethod + def log_current(label : str): + StatusManager.print_row() + log.info(label) + tm = StatusManager.log_dict(WaitingStatusManager.current_waits) + log.info(f"Current waiting time: {tm :.4f}s.") + return tm + def _normalize_sql(sql, maxlen=180): """Collapse whitespace and middle-truncate if needed.""" out = ' '.join(sql.split()) diff --git a/scripts/ci_sync.sh b/scripts/ci_sync.sh index 9907cda99446e28486807ff16a4124531a988c0c..53e64068362392cd0d0c9bcf4dc6dc9bd02f5319 100755 --- a/scripts/ci_sync.sh +++ b/scripts/ci_sync.sh @@ -54,5 +54,5 @@ psql -U $POSTGRES_USER -h localhost -d postgres -c "CREATE DATABASE $DB_NAME;" echo Attempting to starting hive sync using hived node: $HIVEMIND_SOURCE_HIVED_URL . Max sync block is: $HIVEMIND_MAX_BLOCK echo Attempting to access database $DB_URL -./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log +./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --prometheus-port 11011 --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log rm hive_sync.pid diff --git a/setup.py b/setup.py index ee620402ef511d844a44d780d3735700493a957e..9cc251e10682a19bb45942abfbcfdaae27e47f79 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,9 @@ setup( 'aiocache', 'configargparse', 'pdoc', - 'diff-match-patch' + 'diff-match-patch', + 'prometheus-client', + 'psutil' ], extras_require={'test': tests_require}, entry_points={ diff --git a/tests/tests_api b/tests/tests_api index d23060b52e4e773308f7bafa666bef231c0e49ed..39875d086ab82a7377a0799784e858666ae5e62e 160000 --- a/tests/tests_api +++ b/tests/tests_api @@ -1 +1 @@ -Subproject commit d23060b52e4e773308f7bafa666bef231c0e49ed +Subproject commit 39875d086ab82a7377a0799784e858666ae5e62e