diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 9a5d4cd109fa35924492a55457c03ffd3c30a4d7..cc5b7912d22181ad5263c36bc53cffe577fe7d82 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -43,6 +43,7 @@ class Blocks: _current_block_date = None _concurrent_flush = [ + ('Posts', Posts.flush, Posts), ('PostDataCache', PostDataCache.flush, PostDataCache), ('Reputations', Reputations.flush, Reputations), ('Votes', Votes.flush, Votes), @@ -65,6 +66,7 @@ class Blocks: Votes.setup_db_access(sharedDbAdapter) Tags.setup_db_access(sharedDbAdapter) Follow.setup_db_access(sharedDbAdapter) + Posts.setup_db_access(sharedDbAdapter) @classmethod def head_num(cls): @@ -126,11 +128,12 @@ class Blocks: log.info("#############################################################################") flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) - flush_time = register_time(flush_time, "Posts", Posts.flush()) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) flush_time = register_time(flush_time, "Follow", folllow_items) + DB.query("COMMIT") + completedThreads = 0; pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush)) @@ -155,9 +158,9 @@ class Blocks: assert completedThreads == len(cls._concurrent_flush) if (not is_initial_sync) and (first_block > -1): + DB.query("START TRANSACTION") cls.on_live_blocks_processed( first_block, last_num ) - - DB.query("COMMIT") + DB.query("COMMIT") log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index ba63521efa0f968534f39981e33047dfdf550220..b67e2ce2bac9170d0be84cca89cf5b3bb0340c3b 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE from hive.indexer.notify import Notify from hive.indexer.post_data_cache import PostDataCache from hive.indexer.tags import Tags +from hive.indexer.db_adapter_holder import DbAdapterHolder + from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive log = logging.getLogger(__name__) DB = Db.instance() -class Posts: +class Posts(DbAdapterHolder): """Handles critical/core post ops and data.""" # LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries) @@ -213,9 +215,13 @@ class Posts: yield lst[i:i + n] for chunk in chunks(cls._comment_payout_ops, 1000): + cls.beginTx() + values_str = ','.join(chunk) actual_query = sql.format(values_str) - DB.query(actual_query) + cls.db.query(actual_query) + + cls.commitTx() n = len(cls._comment_payout_ops) cls._comment_payout_ops.clear()