Skip to content
Snippets Groups Projects
Commit 2b8c4f59 authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

Clearing stats object, minor code cleanup

parent c508cd3a
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -22,15 +22,15 @@ class Blocks:
blocks_to_flush = []
ops_stats = {}
operations_in_tx = 0;
opened_tx = False;
operations_in_tx = 0
opened_tx = False
OPERATIONS_IN_TX_TRESHOLD = 500000
@staticmethod
def merge_ops_stats(od1, od2):
for (k, v) in od2.items():
if(k in od1):
if k in od1:
od1[k] += v
else:
od1[k] = v
......@@ -123,7 +123,7 @@ class Blocks:
def _track_tx(cls, opCount = 1):
if(cls.opened_tx == False):
DB.query("START TRANSACTION")
cls.operations_in_tx = 0;
cls.operations_in_tx = 0
cls.opened_tx = True
cls.operations_in_tx += opCount
......
......@@ -30,7 +30,7 @@ from hive.server.common.mutes import Mutes
log = logging.getLogger(__name__)
continue_processing = 1
CONTINUE_PROCESSING = 1
def print_ops_stats(prefix, ops_stats):
log.info("############################################################################")
......@@ -51,131 +51,132 @@ def prepare_vops(vops_by_block):
return preparedVops
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize())
queue.put(blocks)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching blocks")
try:
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
num = 0
while CONTINUE_PROCESSING and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize())
queue.put(blocks)
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching blocks")
def _vops_provider(node, queue, lbound, ubound, chunk_size):
try:
count = ubound - lbound
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
num = 0
while continue_processing and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
timer.batch_start()
vops = node.enum_virtual_ops(lbound, to)
preparedVops = prepare_vops(vops)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize())
queue.put(preparedVops)
num = num + 1
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during fetching vops...")
try:
count = ubound - lbound
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
num = 0
while CONTINUE_PROCESSING and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
timer.batch_start()
vops = node.enum_virtual_ops(lbound, to)
preparedVops = prepare_vops(vops)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize())
queue.put(preparedVops)
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
try:
count = ubound - lbound
num = 0
timer = Timer(count, entity='block', laps=['rps', 'wps'])
try:
count = ubound - lbound
num = 0
timer = Timer(count, entity='block', laps=['rps', 'wps'])
total_ops_stats = {}
total_ops_stats = {}
time_start = perf()
time_start = perf()
while continue_processing and lbound < ubound:
if(blocksQueue.empty()):
log.info("Awaiting any block to process...")
while CONTINUE_PROCESSING and lbound < ubound:
if(blocksQueue.empty()):
log.info("Awaiting any block to process...")
blocks = blocksQueue.get()
blocks = blocksQueue.get()
if(vopsQueue.empty()):
log.info("Awaiting any vops to process...")
if(vopsQueue.empty()):
log.info("Awaiting any vops to process...")
preparedVops = vopsQueue.get()
preparedVops = vopsQueue.get()
to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
timer.batch_start()
block_start = perf()
ops_stats = Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
block_end = perf()
timer.batch_start()
block_start = perf()
ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync))
Blocks.ops_stats.clear()
block_end = perf()
timer.batch_lap()
timer.batch_finish(len(blocks))
time_current = perf()
timer.batch_lap()
timer.batch_finish(len(blocks))
time_current = perf()
prefix = ("[SYNC] Got block %d @ %s" % (
to - 1, blocks[-1]['timestamp']))
log.info(timer.batch_status(prefix))
log.info("Time elapsed: %fs", time_current - time_start)
prefix = ("[SYNC] Got block %d @ %s" % (
to - 1, blocks[-1]['timestamp']))
log.info(timer.batch_status(prefix))
log.info("Time elapsed: %fs", time_current - time_start)
total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats)
if block_end - block_start > 1.0:
print_ops_stats("Operations present in the processed blocks:", ops_stats)
blocksQueue.task_done()
vopsQueue.task_done()
blocksQueue.task_done()
vopsQueue.task_done()
lbound = to
lbound = to
num = num + 1
num = num + 1
print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
return num
except KeyboardInterrupt as ki:
log.info("Caught SIGINT")
except Exception as ex:
log.exception("Exception caught during processing blocks...")
print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during processing blocks...")
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
# pool.shutdown()
except KeyboardInterrupt as ex:
global continue_processing
continue_processing = 0
pool.shutdown(False)
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
# pool.shutdown()
except KeyboardInterrupt:
global CONTINUE_PROCESSING
CONTINUE_PROCESSING = 0
pool.shutdown(False)
blocksQueue.join()
vopsQueue.join()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment