Skip to content
Snippets Groups Projects
Commit c6cc1c01 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Enabled timing collection for MT flushes. Temporary disabled reputation data processing.

parent cd57e251
No related branches found
No related tags found
1 merge request!121Prerequisuites to Reputation api support
...@@ -29,6 +29,13 @@ log = logging.getLogger(__name__) ...@@ -29,6 +29,13 @@ log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
def time_collector(f):
startTime = FSM.start()
result = f()
elapsedTime = FSM.stop(startTime)
return (result, elapsedTime)
class Blocks: class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table.""" """Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = [] blocks_to_flush = []
...@@ -121,26 +128,25 @@ class Blocks: ...@@ -121,26 +128,25 @@ class Blocks:
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "Posts", Posts.flush()) flush_time = register_time(flush_time, "Posts", Posts.flush())
# flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
# flush_time = register_time(flush_time, "Tags", Tags.flush())
# flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Follow", folllow_items)
# flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
completedThreads = 0; completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush)) pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_futures = {pool.submit(f): (description, c) for (description, f, c) in cls._concurrent_flush} flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures): for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future] (description, c) = flush_futures[future]
completedThreads = completedThreads + 1 completedThreads = completedThreads + 1
try: try:
n = future.result() (n, elapsedTime) = future.result()
assert n is not None
assert not c.tx_active() assert not c.tx_active()
if n > 0: FSM.flush_stat(description, elapsedTime, n)
log.info('%r flush generated %d records' % (description, n))
# if n > 0:
# log.info('%r flush generated %d records' % (description, n))
except Exception as exc: except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc)) log.error('%r generated an exception: %s' % (description, exc))
raise exc raise exc
......
...@@ -17,6 +17,8 @@ class Reputations(DbAdapterHolder): ...@@ -17,6 +17,8 @@ class Reputations(DbAdapterHolder):
@classmethod @classmethod
def flush(self): def flush(self):
self._queries.clear()
if not self._queries: if not self._queries:
return 0 return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment