Skip to content
Snippets Groups Projects
Commit 98b4c764 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'dk-issue-3-concurrent-block-query' of...

Merge branch 'dk-issue-3-concurrent-block-query' of gitlab.syncad.com:blocktrades/hivemind into dk-issue-3-concurrent-block-query
parents bbce31d5 0409e44e
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
...@@ -5,6 +5,11 @@ import glob ...@@ -5,6 +5,11 @@ import glob
from time import perf_counter as perf from time import perf_counter as perf
import os import os
import ujson as json import ujson as json
import time
import concurrent, threading, queue
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
from funcy.seqs import drop from funcy.seqs import drop
from toolz import partition_all from toolz import partition_all
...@@ -25,6 +30,111 @@ from hive.server.common.mutes import Mutes ...@@ -25,6 +30,111 @@ from hive.server.common.mutes import Mutes
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
continue_processing = 1
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
# log.info("Enqueuing retrieved blocks from range: [%d, %d]. Blocks queue size: %d", lbound, to, queue.qsize())
queue.put(blocks)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching blocks")
def _vops_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
timer.batch_start()
vops = node.enum_virtual_ops(lbound, to)
lbound = to
timer.batch_lap()
# log.info("Enqueuing retrieved vops for block range: [%d, %d]. Vops queue size: %d", lbound, to, queue.qsize())
queue.put(vops)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
try:
count = ubound - lbound
num = 0
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while continue_processing and lbound < ubound:
# log.info("Awaiting any block to process...")
blocks = blocksQueue.get()
blocksQueue.task_done()
# log.info("Awaiting any vops to process...")
vops = vopsQueue.get()
to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
timer.batch_start()
Blocks.process_multi(blocks, vops, node, is_initial_sync)
timer.batch_lap()
timer.batch_finish(len(blocks))
prefix = ("[SYNC] Got block %d @ %s" % (
to - 1, blocks[-1]['timestamp']))
log.info(timer.batch_status(prefix))
lbound = to
vopsQueue.task_done()
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during processing blocks...")
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
# pool.shutdown()
except KeyboardInterrupt as ex:
continue_processing = 0
pool.shutdown(false)
blocksQueue.join()
vopsQueue.join()
class Sync: class Sync:
"""Manages the sync/index process. """Manages the sync/index process.
...@@ -149,6 +259,10 @@ class Sync: ...@@ -149,6 +259,10 @@ class Sync:
if count < 1: if count < 1:
return return
if is_initial_sync:
_node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size)
return
log.info("[SYNC] start block %d, +%d to sync", lbound, count) log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound: while lbound < ubound:
......
...@@ -65,8 +65,16 @@ class StatsAbstract: ...@@ -65,8 +65,16 @@ class StatsAbstract:
log.info('%7s %9s %9s %9s', '-pct-', '-ttl-', '-avg-', '-cnt-') log.info('%7s %9s %9s %9s', '-pct-', '-ttl-', '-avg-', '-cnt-')
for call, ms, reqs in self.table(40): for call, ms, reqs in self.table(40):
try:
avg = ms/reqs
millisec = ms/self._ms
except ZeroDivisionError as ex:
avg = 0.0
millisec = 0.0
if reqs == 0:
reqs = 1
log.info("% 6.1f%% % 7dms % 9.2f % 8dx -- %s", log.info("% 6.1f%% % 7dms % 9.2f % 8dx -- %s",
100 * ms/self._ms, ms, ms/reqs, reqs, call) 100 * millisec, ms, avg, reqs, call)
self.clear() self.clear()
...@@ -131,6 +139,9 @@ class Stats: ...@@ -131,6 +139,9 @@ class Stats:
"""Container for steemd and db timing data.""" """Container for steemd and db timing data."""
PRINT_THRESH_MINS = 1 PRINT_THRESH_MINS = 1
COLLECT_DB_STATS = 0
COLLECT_NODE_STATS = 0
_db = DbStats() _db = DbStats()
_steemd = SteemStats() _steemd = SteemStats()
_secs = 0.0 _secs = 0.0
...@@ -140,14 +151,16 @@ class Stats: ...@@ -140,14 +151,16 @@ class Stats:
@classmethod @classmethod
def log_db(cls, sql, secs): def log_db(cls, sql, secs):
"""Log a database query. Incoming SQL is normalized.""" """Log a database query. Incoming SQL is normalized."""
cls._db.add(_normalize_sql(sql), secs * 1000) if cls.COLLECT_DB_STATS:
cls.add_secs(secs) cls._db.add(_normalize_sql(sql), secs * 1000)
cls.add_secs(secs)
@classmethod @classmethod
def log_steem(cls, method, secs, batch_size=1): def log_steem(cls, method, secs, batch_size=1):
"""Log a steemd call.""" """Log a steemd call."""
cls._steemd.add(method, secs * 1000, batch_size) if cls.COLLECT_NODE_STATS:
cls.add_secs(secs) cls._steemd.add(method, secs * 1000, batch_size)
cls.add_secs(secs)
@classmethod @classmethod
def log_idle(cls, secs): def log_idle(cls, secs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment