Skip to content
Snippets Groups Projects
Commit 9ffb6da6 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Enabled concurrent execution of Posts.flush

parent c6cc1c01
No related branches found
No related tags found
1 merge request!121Prerequisuites to Reputation api support
This commit is part of merge request !121. Comments created here will be created in the context of that merge request.
......@@ -43,6 +43,7 @@ class Blocks:
_current_block_date = None
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
......@@ -65,6 +66,7 @@ class Blocks:
Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
Posts.setup_db_access(sharedDbAdapter)
@classmethod
def head_num(cls):
......@@ -126,11 +128,12 @@ class Blocks:
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "Posts", Posts.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
DB.query("COMMIT")
completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
......@@ -155,9 +158,9 @@ class Blocks:
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1):
DB.query("START TRANSACTION")
cls.on_live_blocks_processed( first_block, last_num )
DB.query("COMMIT")
DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
......
......@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive
log = logging.getLogger(__name__)
DB = Db.instance()
class Posts:
class Posts(DbAdapterHolder):
"""Handles critical/core post ops and data."""
# LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
......@@ -213,9 +215,13 @@ class Posts:
yield lst[i:i + n]
for chunk in chunks(cls._comment_payout_ops, 1000):
cls.beginTx()
values_str = ','.join(chunk)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
cls.commitTx()
n = len(cls._comment_payout_ops)
cls._comment_payout_ops.clear()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment