From c6cc1c012dea57ae2c1435bc3bc10153b9cb0a4c Mon Sep 17 00:00:00 2001 From: Bartek Wrona <wrona@syncad.com> Date: Tue, 8 Sep 2020 00:21:07 +0200 Subject: [PATCH] Enabled timing collection for MT flushes. Temporary disabled reputation data processing. --- hive/indexer/blocks.py | 22 ++++++++++++++-------- hive/indexer/reputations.py | 2 ++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 3a1949096..9a5d4cd10 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -29,6 +29,13 @@ log = logging.getLogger(__name__) DB = Db.instance() +def time_collector(f): + startTime = FSM.start() + result = f() + elapsedTime = FSM.stop(startTime) + + return (result, elapsedTime) + class Blocks: """Processes blocks, dispatches work, manages `hive_blocks` table.""" blocks_to_flush = [] @@ -121,26 +128,25 @@ class Blocks: flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) flush_time = register_time(flush_time, "Posts", Posts.flush()) -# flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush()) -# flush_time = register_time(flush_time, "Tags", Tags.flush()) -# flush_time = register_time(flush_time, "Votes", Votes.flush()) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) flush_time = register_time(flush_time, "Follow", folllow_items) -# flush_time = register_time(flush_time, "Reputations", cls._flush_reputations()) completedThreads = 0; pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush)) - flush_futures = {pool.submit(f): (description, c) for (description, f, c) in cls._concurrent_flush} + flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush} for future in concurrent.futures.as_completed(flush_futures): (description, c) = flush_futures[future] completedThreads = completedThreads + 1 try: - n = future.result() + (n, elapsedTime) = future.result() + assert n is not None assert not c.tx_active() - if n > 0: - log.info('%r flush generated %d records' % (description, n)) + FSM.flush_stat(description, elapsedTime, n) + +# if n > 0: +# log.info('%r flush generated %d records' % (description, n)) except Exception as exc: log.error('%r generated an exception: %s' % (description, exc)) raise exc diff --git a/hive/indexer/reputations.py b/hive/indexer/reputations.py index 127c9fce1..003662fec 100644 --- a/hive/indexer/reputations.py +++ b/hive/indexer/reputations.py @@ -17,6 +17,8 @@ class Reputations(DbAdapterHolder): @classmethod def flush(self): + self._queries.clear() + if not self._queries: return 0 -- GitLab