Skip to content
Snippets Groups Projects
Commit b0be8b7e authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'dk-issue-3-concurrent-block-query' of...

Merge branch 'dk-issue-3-concurrent-block-query' of gitlab.syncad.com:blocktrades/hivemind into dk-issue-3-concurrent-block-query
parents 785e2301 4e198a11
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -5,6 +5,11 @@ import glob
from time import perf_counter as perf
import os
import ujson as json
import time
import concurrent, threading, queue
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
from funcy.seqs import drop
from toolz import partition_all
......@@ -25,6 +30,111 @@ from hive.server.common.mutes import Mutes
log = logging.getLogger(__name__)
continue_processing = 1
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
# log.info("Enqueuing retrieved blocks from range: [%d, %d]. Blocks queue size: %d", lbound, to, queue.qsize())
queue.put(blocks)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching blocks")
def _vops_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
timer.batch_start()
vops = node.enum_virtual_ops(lbound, to)
lbound = to
timer.batch_lap()
# log.info("Enqueuing retrieved vops for block range: [%d, %d]. Vops queue size: %d", lbound, to, queue.qsize())
queue.put(vops)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
try:
count = ubound - lbound
num = 0
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while continue_processing and lbound < ubound:
# log.info("Awaiting any block to process...")
blocks = blocksQueue.get()
blocksQueue.task_done()
# log.info("Awaiting any vops to process...")
vops = vopsQueue.get()
to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
timer.batch_start()
Blocks.process_multi(blocks, vops, node, is_initial_sync)
timer.batch_lap()
timer.batch_finish(len(blocks))
prefix = ("[SYNC] Got block %d @ %s" % (
to - 1, blocks[-1]['timestamp']))
log.info(timer.batch_status(prefix))
lbound = to
vopsQueue.task_done()
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during processing blocks...")
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
# pool.shutdown()
except KeyboardInterrupt as ex:
continue_processing = 0
pool.shutdown(false)
blocksQueue.join()
vopsQueue.join()
class Sync:
"""Manages the sync/index process.
......@@ -149,6 +259,10 @@ class Sync:
if count < 1:
return
if is_initial_sync:
_node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size)
return
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
......
......@@ -65,8 +65,16 @@ class StatsAbstract:
log.info('%7s %9s %9s %9s', '-pct-', '-ttl-', '-avg-', '-cnt-')
for call, ms, reqs in self.table(40):
try:
avg = ms/reqs
millisec = ms/self._ms
except ZeroDivisionError as ex:
avg = 0.0
millisec = 0.0
if reqs == 0:
reqs = 1
log.info("% 6.1f%% % 7dms % 9.2f % 8dx -- %s",
100 * ms/self._ms, ms, ms/reqs, reqs, call)
100 * millisec, ms, avg, reqs, call)
self.clear()
......@@ -131,6 +139,9 @@ class Stats:
"""Container for steemd and db timing data."""
PRINT_THRESH_MINS = 1
COLLECT_DB_STATS = 0
COLLECT_NODE_STATS = 0
_db = DbStats()
_steemd = SteemStats()
_secs = 0.0
......@@ -140,14 +151,16 @@ class Stats:
@classmethod
def log_db(cls, sql, secs):
"""Log a database query. Incoming SQL is normalized."""
cls._db.add(_normalize_sql(sql), secs * 1000)
cls.add_secs(secs)
if cls.COLLECT_DB_STATS:
cls._db.add(_normalize_sql(sql), secs * 1000)
cls.add_secs(secs)
@classmethod
def log_steem(cls, method, secs, batch_size=1):
"""Log a steemd call."""
cls._steemd.add(method, secs * 1000, batch_size)
cls.add_secs(secs)
if cls.COLLECT_NODE_STATS:
cls._steemd.add(method, secs * 1000, batch_size)
cls.add_secs(secs)
@classmethod
def log_idle(cls, secs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment