Skip to content
Snippets Groups Projects
Commit 3e30ab63 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Prerequisites for MT version of Reputation flush.

parent a570ff74
No related branches found
No related tags found
1 merge request!121Prerequisuites to Reputation api support
This commit is part of merge request !121. Comments created here will be created in the context of that merge request.
......@@ -50,6 +50,10 @@ class Db:
self._exec = self._conn.execute
self._exec(sqlalchemy.text("COMMIT"))
def clone(self):
c = Db(self._url)
return c
def engine(self):
"""Lazy-loaded SQLAlchemy engine."""
if not self._engine:
......
......@@ -29,10 +29,16 @@ DB = Db.instance()
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
def __init__(cls):
cls._reputations = Reputations(DB)
cls._head_block_date = None # timestamp of last fully processed block ("previous block")
cls._current_block_date = None # timestamp of block currently being processes ("current block")
log.info("Creating a reputations processor")
log.info("Built reputations object: {}".format(cls._reputations))
log.info("Built blocks object: {}".format(cls))
head_date = cls.head_date()
if(head_date == ''):
cls._head_block_date = None
......@@ -64,6 +70,7 @@ class Blocks:
Tags.flush()
Votes.flush()
Posts.flush()
cls._reputations.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
......@@ -74,6 +81,9 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()
log.info("Blocks object: {}".format(cls))
DB.query("START TRANSACTION")
last_num = 0
......@@ -105,7 +115,7 @@ class Blocks:
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reputations", Reputations.flush())
flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
if (not is_initial_sync) and (first_block > -1):
cls.on_live_blocks_processed( first_block, last_num )
......@@ -114,8 +124,8 @@ class Blocks:
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
@staticmethod
def prepare_vops(comment_payout_ops, vopsList, date, block_num):
@classmethod
def prepare_vops(cls, comment_payout_ops, vopsList, date, block_num):
vote_ops = {}
ineffective_deleted_ops = {}
......@@ -148,7 +158,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation':
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
Reputations.process_vote(block_num, op_value)
cls._reputations.process_vote(block_num, op_value)
vote_ops[ key_vote ] = op_value
......@@ -193,10 +203,10 @@ class Blocks:
if is_initial_sync:
if num in virtual_operations:
(vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num)
(vote_ops, ineffective_deleted_ops ) = cls.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num)
else:
vops = hived.get_virtual_operations(num)
(vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num)
(vote_ops, ineffective_deleted_ops ) = cls.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num)
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
......@@ -325,6 +335,10 @@ class Blocks:
'date': block['timestamp']})
return num
@classmethod
def _flush_reputations(cls):
cls._reputations.flush()
@classmethod
def _flush_blocks(cls):
query = """
......
......@@ -7,12 +7,14 @@ from hive.db.db_state import DbState
from hive.utils.normalize import escape_characters
log = logging.getLogger(__name__)
DB = Db.instance()
CACHED_ITEMS_LIMIT = 200
class Reputations:
_queries = []
def __init__(cls, sharedDb):
cls._queries = []
cls._db = sharedDb.clone()
log.info("Cloning a database adapter")
@classmethod
def process_vote(cls, block_num, effective_vote_op):
......@@ -29,12 +31,12 @@ class Reputations:
i = i + 1
items = items + 1
if items >= CACHED_ITEMS_LIMIT:
DB.query_no_return(query)
cls._db.query_no_return(query)
query = ""
items = 0
if items >= CACHED_ITEMS_LIMIT:
DB.query_no_return(query)
cls._db.query_no_return(query)
query = ""
items = 0
......
......@@ -89,7 +89,7 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
except Exception:
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
def _block_consumer(node, blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax
is_debug = log.isEnabledFor(10)
num = 0
......@@ -98,6 +98,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
try:
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
wait_time_1 = WSM.start()
......@@ -125,7 +126,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
timer.batch_start()
block_start = perf()
Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
blocksProcessor.process_multi(blocks, preparedVops, node, is_initial_sync)
block_end = perf()
timer.batch_lap()
......@@ -181,7 +182,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, self._blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
......@@ -208,6 +209,7 @@ class Sync:
log.info("Using hived url: `%s'", self._conf.get('steemd_url'))
self._steem = conf.steem()
self._blocksProcessor = None
def run(self):
"""Initialize state; setup/recovery checks; sync and runloop."""
......@@ -215,6 +217,8 @@ class Sync:
# ensure db schema up to date, check app status
DbState.initialize()
self._blocksProcessor = Blocks()
# prefetch id->name and id->rank memory maps
Accounts.load_ids()
Accounts.fetch_ranks()
......@@ -229,16 +233,16 @@ class Sync:
Community.recalc_pending_payouts()
if DbState.is_initial_sync():
last_imported_block = Blocks.head_num()
last_imported_block = self._blocksProcessor.head_num()
# resume initial sync
self.initial()
if not CONTINUE_PROCESSING:
return
current_imported_block = Blocks.head_num()
current_imported_block = self._blocksProcessor.head_num()
DbState.finish_initial_sync(current_imported_block, last_imported_block)
else:
# recover from fork
Blocks.verify_head(self._steem)
self._blocksProcessor.verify_head(self._steem)
self._update_chain_state()
......@@ -305,14 +309,15 @@ class Sync:
remaining = drop(skip_lines, f)
for lines in partition_all(chunk_size, remaining):
raise RuntimeError("Sync from checkpoint disabled")
Blocks.process_multi(map(json.loads, lines), True)
# Blocks.process_multi(map(json.loads, lines), True)
last_block = num
last_read = num
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
"""Fast sync strategy: read/process blocks in batches."""
steemd = self._steem
lbound = Blocks.head_num() + 1
lbound = self._blocksProcessor.head_num() + 1
ubound = self._conf.get('test_max_block') or steemd.last_irreversible()
count = ubound - lbound
......@@ -337,7 +342,7 @@ class Sync:
timer.batch_lap()
# process blocks
Blocks.process_multi(blocks, preparedVops, steemd, is_initial_sync)
self._blocksProcessor.process_multi(blocks, preparedVops, steemd, is_initial_sync)
timer.batch_finish(len(blocks))
_prefix = ("[SYNC] Got block %d @ %s" % (
......@@ -357,14 +362,15 @@ class Sync:
# debug: no max gap if disable_sync in effect
max_gap = None if self._conf.get('test_disable_sync') else 100
assert self._blocksProcessor
steemd = self._steem
hive_head = Blocks.head_num()
hive_head = self._blocksProcessor.head_num()
for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap):
start_time = perf()
self._db.query("START TRANSACTION")
num = Blocks.process(block, {}, steemd)
num = blocksProcessor.process(block, {}, steemd)
follows = Follow.flush(trx=False)
accts = Accounts.flush(steemd, trx=False, spread=8)
self._db.query("COMMIT")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment