Skip to content
Snippets Groups Projects
Commit 64c77f4a authored by Krzysztof Mochocki's avatar Krzysztof Mochocki
Browse files

updgraded logging and added prometheus daemon

parent 75b31450
No related branches found
No related tags found
4 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!73Added better logging
......@@ -6,6 +6,7 @@ import os
import logging
from hive.conf import Conf
from hive.db.adapter import Db
from hive.utils.stats import PrometheusClient
logging.basicConfig()
......@@ -15,6 +16,7 @@ def run():
conf = Conf.init_argparse()
Db.set_shared_instance(conf.db())
mode = conf.mode()
PrometheusClient( conf.get('prometheus_port') )
pid_file_name = conf.pid_file()
if pid_file_name is not None:
......
......@@ -35,8 +35,10 @@ class Conf():
add('--steemd-url', env_var='STEEMD_URL', required=False, help='steemd/jussi endpoint', default='{"default" : "https://api.hive.blog"}')
add('--muted-accounts-url', env_var='MUTED_ACCOUNTS_URL', required=False, help='url to flat list of muted accounts', default='https://raw.githubusercontent.com/hivevectordefense/irredeemables/master/full.txt')
add('--blacklist-api-url', env_var='BLACKLIST_API_URL', required=False, help='url to acccess blacklist api', default='https://blacklist.usehive.com')
# server
add('--http-server-port', type=int, env_var='HTTP_SERVER_PORT', default=8080)
add('--prometheus-port', type=int, env_var='PROMETHEUS_PORT', required=False, help='if specified, runs prometheus deamon on specified port, which provide statistic and performance data')
# sync
add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4)
......@@ -112,6 +114,7 @@ class Conf():
def get(self, param):
"""Reads a single property, e.g. `database_url`."""
assert self._args, "run init_argparse()"
print(self._args)
return self._args[param]
def mode(self):
......
......@@ -15,6 +15,9 @@ from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM
from hive.utils.stats import FlushStatusManager as FSM
log = logging.getLogger(__name__)
DB = Db.instance()
......@@ -22,7 +25,6 @@ DB = Db.instance()
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
ops_stats = {}
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
......@@ -35,17 +37,6 @@ class Blocks:
cls._head_block_date = head_date
cls._current_block_date = head_date
@staticmethod
def merge_ops_stats(od1, od2):
if od2 is not None:
for k, v in od2.items():
if k in od1:
od1[k] += v
else:
od1[k] = v
return od1
@classmethod
def head_num(cls):
"""Get hive's head block number."""
......@@ -76,7 +67,7 @@ class Blocks:
@classmethod
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = perf_counter()
time_start = OPSM.start()
DB.query("START TRANSACTION")
last_num = 0
......@@ -90,73 +81,78 @@ class Blocks:
# Follows flushing needs to be atomic because recounts are
# expensive. So is tracking follows at all; hence we track
# deltas in memory and update follow/er counts in bulk.
PostDataCache.flush()
Tags.flush()
Votes.flush()
cls._flush_blocks()
Follow.flush(trx=False)
Posts.flush()
flush_time = FSM.start()
def register_time(f_time, name, pushed):
assert pushed is not None
FSM.flush_stat(name, FSM.stop(f_time), pushed)
return FSM.start()
log.info("#############################################################################")
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
DB.query("COMMIT")
time_end = perf_counter()
log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start)
return cls.ops_stats
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
@staticmethod
def prepare_vops(comment_payout_ops, vopsList, date):
vote_ops = {}
ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 }
inefficient_deleted_ops = {}
registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation', 'ineffective_delete_comment_operation']
for vop in vopsList:
start = OPSM.start()
key = None
val = None
op_type = vop['type']
op_value = vop['value']
key = "{}/{}".format(op_value['author'], op_value['permlink'])
if op_type == 'author_reward_operation':
ops_stats[ 'author_reward_operation' ] += 1
if op_type == 'author_reward_operation':
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
elif op_type == 'comment_reward_operation':
ops_stats[ 'comment_reward_operation' ] += 1
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key]['effective_comment_vote_operation'] = None
comment_payout_ops[key][op_type] = op_value
elif op_type == 'effective_comment_vote_operation':
ops_stats[ 'effective_comment_vote_operation' ] += 1
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
vote_ops[ key_vote ] = op_value
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
elif op_type == 'comment_payout_update_operation':
ops_stats[ 'comment_payout_update_operation' ] += 1
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
elif op_type == 'ineffective_delete_comment_operation':
ops_stats[ 'ineffective_delete_comment_operation' ] += 1
inefficient_deleted_ops[key] = {}
return (vote_ops, ops_stats, inefficient_deleted_ops)
if op_type in registered_ops_stats:
OPSM.op_stats(op_type, OPSM.stop(start))
return (vote_ops, inefficient_deleted_ops)
@classmethod
......@@ -180,14 +176,15 @@ class Blocks:
if is_initial_sync:
if num in virtual_operations:
(vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
else:
vops = hived.get_virtual_operations(num)
(vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
for operation in tx['operations']:
start = OPSM.start()
op_type = operation['type']
op = operation['value']
......@@ -238,23 +235,16 @@ class Blocks:
json_ops.append(op)
if op_type != 'custom_json_operation':
if op_type in cls.ops_stats:
cls.ops_stats[op_type] += 1
else:
cls.ops_stats[op_type] = 1
OPSM.op_stats(op_type, OPSM.stop(start))
# follow/reblog/community ops
if json_ops:
custom_ops_stats = CustomOp.process_ops(json_ops, num, cls._head_block_date)
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats)
CustomOp.process_ops(json_ops, num, cls._head_block_date)
if vote_ops is not None:
for k, v in vote_ops.items():
Votes.effective_comment_vote_op(k, v)
if Posts.comment_payout_ops:
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats)
cls._head_block_date = cls._current_block_date
return num
......@@ -328,7 +318,9 @@ class Blocks:
block['prev'], block['txs'],
block['ops'], block['date']))
DB.query(query + ",".join(values))
n = len(cls.blocks_to_flush)
cls.blocks_to_flush = []
return n
@classmethod
def _pop(cls, blocks):
......
......@@ -15,6 +15,8 @@ from hive.indexer.community import process_json_community_op, START_BLOCK
from hive.utils.normalize import load_json_key
from hive.utils.json import valid_op_json, valid_date, valid_command, valid_keys
from hive.utils.stats import OPStatusManager as OPSM
DB = Db.instance()
log = logging.getLogger(__name__)
......@@ -39,22 +41,10 @@ class CustomOp:
@classmethod
def process_ops(cls, ops, block_num, block_date):
ops_stats = {}
"""Given a list of operation in block, filter and process them."""
for op in ops:
if op['id'] not in ['follow', 'community', 'notify']:
opName = str(op['id']) + '-ignored'
if(opName in ops_stats):
ops_stats[opName] += 1
else:
ops_stats[opName] = 1
continue
if(op['id'] in ops_stats):
ops_stats[op['id']] += 1
else:
ops_stats[op['id']] = 1
start = OPSM.start()
opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' )
account = _get_auth(op)
if not account:
......@@ -70,7 +60,8 @@ class CustomOp:
process_json_community_op(account, op_json, block_date)
elif op['id'] == 'notify':
cls._process_notify(account, op_json, block_date)
return ops_stats
OPSM.op_stats(opName, OPSM.stop(start))
@classmethod
def _process_notify(cls, account, op_json, block_date):
......
......@@ -67,4 +67,6 @@ class PostDataCache(object):
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
cls._data.clear()
n = len(cls._data.keys())
cls._data.clear()
return n
......@@ -211,7 +211,9 @@ class Posts:
actual_query = sql.format(values_str)
DB.query(actual_query)
n = len(cls._comment_payout_ops)
cls._comment_payout_ops.clear()
return n
@classmethod
def comment_payout_op(cls):
......@@ -315,7 +317,10 @@ class Posts:
"NULL" if ( cashout_time is None ) else ( "'{}'::timestamp".format( cashout_time ) ),
"NULL" if ( is_paidout is None ) else is_paidout ))
n = len(cls.comment_payout_ops)
cls.comment_payout_ops.clear()
return n
@classmethod
def update_child_count(cls, child_id, op='+'):
......@@ -449,5 +454,4 @@ class Posts:
@classmethod
def flush(cls):
cls.comment_payout_op()
cls.flush_into_db()
return cls.comment_payout_op() + cls.flush_into_db()
......@@ -26,26 +26,23 @@ from hive.indexer.follow import Follow
from hive.indexer.community import Community
from hive.server.common.mutes import Mutes
from hive.utils.stats import OPStatusManager as OPSM
from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.stats import WaitingStatusManager as WSM
from hive.utils.stats import PrometheusClient as PC
from hive.utils.stats import BroadcastObject
log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
def print_ops_stats(prefix, ops_stats):
log.info("############################################################################")
log.info(prefix)
sorted_stats = sorted(ops_stats.items(), key=lambda kv: kv[1], reverse=True)
for (k, v) in sorted_stats:
log.info("`{}': {}".format(k, v))
log.info("############################################################################")
def prepare_vops(vops_by_block):
preparedVops = {}
for blockNum, blockDict in vops_by_block.items():
vopsList = blockDict['ops']
preparedVops[blockNum] = vopsList
return preparedVops
def _block_provider(node, queue, lbound, ubound, chunk_size):
......@@ -93,13 +90,16 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
is_debug = log.isEnabledFor(10)
num = 0
time_start = OPSM.start()
try:
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
total_ops_stats = {}
time_start = perf()
while lbound < ubound:
wait_time = WSM.start()
if blocksQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any block to process...")
......@@ -107,7 +107,9 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get()
blocksQueue.task_done()
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time))
wait_time = WSM.start()
if vopsQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any vops to process...")
......@@ -115,14 +117,14 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get()
vopsQueue.task_done()
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time))
to = min(lbound + chunk_size, ubound)
timer.batch_start()
block_start = perf()
ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync))
Blocks.ops_stats.clear()
Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
block_end = perf()
timer.batch_lap()
......@@ -134,25 +136,38 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
log.info(timer.batch_status(prefix))
log.info("[SYNC] Time elapsed: %fs", time_current - time_start)
total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats)
if block_end - block_start > 1.0 or is_debug:
otm = OPSM.log_current("Operations present in the processed blocks")
ftm = FSM.log_current("Flushing times")
wtm = WSM.log_current("Waiting times")
log.info(f"Calculated time: {otm+ftm+wtm :.4f} s.")
OPSM.next_blocks()
FSM.next_blocks()
WSM.next_blocks()
lbound = to
PC.broadcast(BroadcastObject('sync_current_block', lbound, 'blocks'))
num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
break
print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during processing blocks...")
finally:
stop = OPSM.stop(time_start)
log.info("=== TOTAL STATS ===")
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
otm = OPSM.log_global("All operations present in the processed blocks")
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info("=== TOTAL STATS ===")
return num
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
......
......@@ -68,4 +68,7 @@ class Tags(object):
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
values.clear()
cls._tags.clear()
n = len(cls._tags)
cls._tags.clear()
return n
......@@ -123,6 +123,7 @@ class Votes:
def flush(cls):
""" Flush vote data from cache to database """
cls.inside_flush = True
n = 0
if cls._votes_data:
sql = """
INSERT INTO hive_votes
......@@ -186,5 +187,8 @@ class Votes:
DB.query(actual_query)
values_override.clear()
n = len(cls._votes_data)
cls._votes_data.clear()
cls.inside_flush = False
return n
......@@ -3,11 +3,293 @@
import atexit
import logging
from queue import Queue
from time import perf_counter as perf
from hive.utils.system import colorize, peak_usage_mb
from psutil import pid_exists
from os import getpid
log = logging.getLogger(__name__)
class BroadcastObject:
def __init__(self, category, value, unit):
self.category = category
self.value = value
self.unit = unit
def name(self):
return f"hivemind_{self.category}"
def debug(self):
log.debug(f"{self.name()}_{self.unit}: {self.value :.2f}")
class PrometheusClient:
deamon = None
logs_to_broadcast = Queue()
@staticmethod
def work( port, pid ):
try:
import prometheus_client as prom
prom.start_http_server(port)
gauges = {}
while pid_exists(pid):
value : BroadcastObject = PrometheusClient.logs_to_broadcast.get()
value.debug()
value_name = value.name()
if value_name not in gauges.keys():
gauge = prom.Gauge(value_name, '', unit=value.unit)
gauge.set(value.value)
gauges[value_name] = gauge
else:
gauges[value_name].set(value.value)
except Exception as e:
log.error(f"Prometheus logging failed. Exception\n {e}")
def __init__(self, port):
if port is None:
return
else:
port = int(port)
if PrometheusClient.deamon is None:
try:
import prometheus_client
except ImportError:
log.warn("Failed to import prometheus client. Online stats disabled")
return
from threading import Thread
deamon = Thread(target=PrometheusClient.work, args=[ port, getpid() ], daemon=True)
deamon.start()
@staticmethod
def broadcast(obj):
if type(obj) == type(list()):
for v in obj:
PrometheusClient.broadcast(v)
elif type(obj) == type(BroadcastObject('', '', '')):
PrometheusClient.logs_to_broadcast.put(obj)
else:
raise Exception(f"Not expexcted type. Should be list or BroadcastObject, but: {type(obj)} given")
class Stat:
def __init__(self, time):
self.time = time
def update(self, other):
assert type(self) == type(other)
attributes = self.__dict__
oatte = other.__dict__
for key, val in attributes.items():
setattr(self, key, oatte[key] + val)
return self
def __repr__(self):
return self.__dict__
def __lt__(self, other):
return self.time < other.time
def broadcast(self, name):
return BroadcastObject(name, self.time, 's')
class StatusManager:
# Fully abstract class
def __init__(self):
assert False
@staticmethod
def start():
return perf()
@staticmethod
def stop( start : float ):
return perf() - start
@staticmethod
def merge_dicts(od1, od2, broadcast : bool = False):
if od2 is not None:
for k, v in od2.items():
if k in od1:
od1[k].update(v)
if broadcast:
PrometheusClient.broadcast(v.broadcast(k))
else:
od1[k] = v
return od1
@staticmethod
def log_dict(col : dict) -> float:
sorted_stats = sorted(col.items(), key=lambda kv: kv[1], reverse=True)
measured_time = 0.0
for (k, v) in sorted_stats:
log.info("`{}`: {}".format(k, v))
measured_time += v.time
return measured_time
@staticmethod
def print_row():
log.info("#" * 20)
class OPStat(Stat):
def __init__(self, time, count):
super().__init__(time)
self.count = count
def __str__(self):
return f"Processed {self.count :.0f} times in {self.time :.5f} seconds"
def broadcast(self, name : str):
n = name.lower()
if not n.endswith('operation'):
n = f"{n}_operation"
return list([ super().broadcast(n), BroadcastObject(n, self.count, 'b') ])
class OPStatusManager(StatusManager):
# Summary for whole sync
global_stats = {}
# Currently processed blocks stats, merged to global stats, after `next_block`
cpbs = {}
@staticmethod
def op_stats( name, time, processed = 1 ):
if name in OPStatusManager.cpbs.keys():
OPStatusManager.cpbs[name].time += time
OPStatusManager.cpbs[name].count += processed
else:
OPStatusManager.cpbs[name] = OPStat(time, processed)
@staticmethod
def next_blocks():
OPStatusManager.global_stats = StatusManager.merge_dicts(
OPStatusManager.global_stats,
OPStatusManager.cpbs,
True
)
OPStatusManager.cpbs.clear()
@staticmethod
def log_global(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(OPStatusManager.global_stats)
log.info(f"Total time for processing operations time: {tm :.4f}s.")
return tm
@staticmethod
def log_current(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(OPStatusManager.cpbs)
log.info(f"Current time for processing operations time: {tm :.4f}s.")
return tm
class FlushStat(Stat):
def __init__(self, time, pushed):
super().__init__(time)
self.pushed = pushed
def __str__(self):
return f"Pushed {self.pushed :.0f} records in {self.time :.4f} seconds"
def broadcast(self, name : str):
n = f"flushing_{name.lower()}"
return list([ super().broadcast(n), BroadcastObject(n, self.pushed, 'b') ])
class FlushStatusManager(StatusManager):
# Summary for whole sync
global_stats = {}
# Currently processed blocks stats, merged to global stats, after `next_block`
current_flushes = {}
@staticmethod
def flush_stat(name, time, pushed):
if name in FlushStatusManager.current_flushes.keys():
FlushStatusManager.current_flushes[name].time += time
FlushStatusManager.current_flushes[name].pushed += pushed
else:
FlushStatusManager.current_flushes[name] = FlushStat(time, pushed)
@staticmethod
def next_blocks():
FlushStatusManager.global_stats = StatusManager.merge_dicts(
FlushStatusManager.global_stats,
FlushStatusManager.current_flushes,
True
)
FlushStatusManager.current_flushes.clear()
@staticmethod
def log_global(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(FlushStatusManager.global_stats)
log.info(f"Total flushing time: {tm :.4f}s.")
return tm
@staticmethod
def log_current(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(FlushStatusManager.current_flushes)
log.info(f"Current flushing time: {tm :.4f}s.")
return tm
class WaitStat(Stat):
def __init__(self, time):
super().__init__(time)
def __str__(self):
return f"Waited {self.time :.4f} seconds"
class WaitingStatusManager(StatusManager):
# Summary for whole sync
global_stats = {}
# Currently processed blocks stats, merged to global stats, after `next_block`
current_waits = {}
@staticmethod
def wait_stat(name, time):
if name in WaitingStatusManager.current_waits.keys():
WaitingStatusManager.current_waits[name].time += time
else:
WaitingStatusManager.current_waits[name] = WaitStat(time)
@staticmethod
def next_blocks():
WaitingStatusManager.global_stats = StatusManager.merge_dicts(
WaitingStatusManager.global_stats,
WaitingStatusManager.current_waits,
True
)
WaitingStatusManager.current_waits.clear()
@staticmethod
def log_global(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(WaitingStatusManager.global_stats)
log.info(f"Total waiting time: {tm :.4f}s.")
return tm
@staticmethod
def log_current(label : str):
StatusManager.print_row()
log.info(label)
tm = StatusManager.log_dict(WaitingStatusManager.current_waits)
log.info(f"Current waiting time: {tm :.4f}s.")
return tm
def _normalize_sql(sql, maxlen=180):
"""Collapse whitespace and middle-truncate if needed."""
out = ' '.join(sql.split())
......
......@@ -54,5 +54,5 @@ psql -U $POSTGRES_USER -h localhost -d postgres -c "CREATE DATABASE $DB_NAME;"
echo Attempting to starting hive sync using hived node: $HIVEMIND_SOURCE_HIVED_URL . Max sync block is: $HIVEMIND_MAX_BLOCK
echo Attempting to access database $DB_URL
./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log
./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --prometheus-port 11011 --database-url $DB_URL 2>&1 | tee -i hivemind-sync.log
rm hive_sync.pid
......@@ -61,7 +61,9 @@ setup(
'aiocache',
'configargparse',
'pdoc',
'diff-match-patch'
'diff-match-patch',
'prometheus-client',
'psutil'
],
extras_require={'test': tests_require},
entry_points={
......
Subproject commit d23060b52e4e773308f7bafa666bef231c0e49ed
Subproject commit 39875d086ab82a7377a0799784e858666ae5e62e
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment