Skip to content
Snippets Groups Projects

Added better logging

Merged Krzysztof Mochocki requested to merge km_better_logging into develop
Files
13
+ 38
48
@@ -15,6 +15,9 @@ from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM
from hive.utils.stats import FlushStatusManager as FSM
log = logging.getLogger(__name__)
DB = Db.instance()
@@ -22,7 +25,6 @@ DB = Db.instance()
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
ops_stats = {}
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
@@ -35,17 +37,6 @@ class Blocks:
cls._head_block_date = head_date
cls._current_block_date = head_date
@staticmethod
def merge_ops_stats(od1, od2):
if od2 is not None:
for k, v in od2.items():
if k in od1:
od1[k] += v
else:
od1[k] = v
return od1
@classmethod
def head_num(cls):
"""Get hive's head block number."""
@@ -76,7 +67,7 @@ class Blocks:
@classmethod
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = perf_counter()
time_start = OPSM.start()
DB.query("START TRANSACTION")
last_num = 0
@@ -90,69 +81,73 @@ class Blocks:
# Follows flushing needs to be atomic because recounts are
# expensive. So is tracking follows at all; hence we track
# deltas in memory and update follow/er counts in bulk.
PostDataCache.flush()
Tags.flush()
Votes.flush()
cls._flush_blocks()
Follow.flush(trx=False)
Posts.flush()
flush_time = FSM.start()
def register_time(f_time, name, pushed):
assert pushed is not None
FSM.flush_stat(name, FSM.stop(f_time), pushed)
return FSM.start()
log.info("#############################################################################")
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
DB.query("COMMIT")
time_end = perf_counter()
log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start)
return cls.ops_stats
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
@staticmethod
def prepare_vops(comment_payout_ops, vopsList, date):
vote_ops = {}
ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0 }
registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation']
for vop in vopsList:
start = OPSM.start()
key = None
val = None
op_type = vop['type']
op_value = vop['value']
key = "{}/{}".format(op_value['author'], op_value['permlink'])
if op_type == 'author_reward_operation':
ops_stats[ 'author_reward_operation' ] += 1
if op_type == 'author_reward_operation':
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
elif op_type == 'comment_reward_operation':
ops_stats[ 'comment_reward_operation' ] += 1
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key]['effective_comment_vote_operation'] = None
comment_payout_ops[key][op_type] = op_value
elif op_type == 'effective_comment_vote_operation':
ops_stats[ 'effective_comment_vote_operation' ] += 1
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
vote_ops[ key_vote ] = op_value
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
elif op_type == 'comment_payout_update_operation':
ops_stats[ 'comment_payout_update_operation' ] += 1
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None, 'date' : date }
comment_payout_ops[key][op_type] = op_value
return (vote_ops, ops_stats)
if op_type in registered_ops_stats:
OPSM.op_stats(op_type, OPSM.stop(start))
return vote_ops
@classmethod
@@ -173,6 +168,7 @@ class Blocks:
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
for operation in tx['operations']:
start = OPSM.start()
op_type = operation['type']
op = operation['value']
@@ -221,33 +217,25 @@ class Blocks:
json_ops.append(op)
if op_type != 'custom_json_operation':
if op_type in cls.ops_stats:
cls.ops_stats[op_type] += 1
else:
cls.ops_stats[op_type] = 1
OPSM.op_stats(op_type, OPSM.stop(start))
# follow/reblog/community ops
if json_ops:
custom_ops_stats = CustomOp.process_ops(json_ops, num, cls._head_block_date)
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats)
CustomOp.process_ops(json_ops, num, cls._head_block_date)
vote_ops = None
comment_payout_stats = None
if is_initial_sync:
if num in virtual_operations:
(vote_ops, comment_payout_stats) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
vote_ops = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
else:
vops = hived.get_virtual_operations(num)
(vote_ops, comment_payout_stats) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
vote_ops = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
if vote_ops is not None:
for k, v in vote_ops.items():
Votes.effective_comment_vote_op(k, v)
if Posts.comment_payout_ops:
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats)
cls._head_block_date = cls._current_block_date
return num
@@ -321,7 +309,9 @@ class Blocks:
block['prev'], block['txs'],
block['ops'], block['date']))
DB.query(query + ",".join(values))
n = len(cls.blocks_to_flush)
cls.blocks_to_flush = []
return n
@classmethod
def _pop(cls, blocks):
Loading