Skip to content
Snippets Groups Projects

Prerequisuites to Reputation api support

Merged Bartek Wrona requested to merge reputation_api_support into develop
Files
4
+ 69
11
@@ -2,6 +2,7 @@
from hive.indexer.reblog import Reblog
import logging
import concurrent
from hive.db.adapter import Db
@@ -13,6 +14,8 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from hive.indexer.reblog import Reblog
from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM
@@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block
from concurrent.futures import ThreadPoolExecutor
log = logging.getLogger(__name__)
DB = Db.instance()
def time_collector(f):
startTime = FSM.start()
result = f()
elapsedTime = FSM.stop(startTime)
return (result, elapsedTime)
def follows_flush_helper():
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
return folllow_items
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
_head_block_date = None
_current_block_date = None
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
('Follow', follows_flush_helper, Follow),
('Reblog', Reblog.flush, Reblog)
]
def __init__(cls):
head_date = cls.head_date()
@@ -39,6 +65,16 @@ class Blocks:
cls._head_block_date = head_date
cls._current_block_date = head_date
@classmethod
def setup_db_access(self, sharedDbAdapter):
PostDataCache.setup_db_access(sharedDbAdapter)
Reputations.setup_db_access(sharedDbAdapter)
Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
Posts.setup_db_access(sharedDbAdapter)
Reblog.setup_db_access(sharedDbAdapter)
@classmethod
def head_num(cls):
"""Get hive's head block number."""
@@ -64,6 +100,8 @@ class Blocks:
Posts.flush()
Reblog.flush()
Follow.flush(trx=False)
Reputations.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
@@ -74,6 +112,7 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()
DB.query("START TRANSACTION")
last_num = 0
@@ -99,18 +138,36 @@ class Blocks:
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reblog", Reblog.flush())
DB.query("COMMIT")
completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future]
completedThreads = completedThreads + 1
try:
(n, elapsedTime) = future.result()
assert n is not None
assert not c.tx_active()
FSM.flush_stat(description, elapsedTime, n)
# if n > 0:
# log.info('%r flush generated %d records' % (description, n))
except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc))
raise exc
pool.shutdown()
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1):
DB.query("START TRANSACTION")
cls.on_live_blocks_processed( first_block, last_num )
DB.query("COMMIT")
DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
@@ -145,6 +202,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation':
Votes.effective_comment_vote_op( op_value )
Reputations.process_vote(block_num, op_value)
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None }
Loading