Skip to content
Snippets Groups Projects

exception handling during sync

Merged Marcin requested to merge mi_exception_handling into develop
1 file
+ 63
39
Compare changes
  • Side-by-side
  • Inline
+ 63
39
@@ -34,10 +34,24 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime
from signal import signal, SIGINT
log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
SIG_INT_DURING_SYNC = False
def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC = True
log.info("""
**********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
def prepare_vops(vops_by_block):
preparedVops = {}
@@ -47,26 +61,32 @@ def prepare_vops(vops_by_block):
return preparedVops
def put_to_queue( data_queue, value ):
while not SIG_INT_DURING_SYNC:
try:
data_queue.put( value, True, 1)
return
except queue.Full:
continue
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
num = 0
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound:
while not SIG_INT_DURING_SYNC and lbound < ubound:
to = min(lbound + chunk_size, ubound)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
queue.put(blocks)
put_to_queue( queue, blocks )
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching blocks")
return num
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try:
@@ -75,22 +95,30 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound:
while not SIG_INT_DURING_SYNC and lbound < ubound:
to = min(lbound + chunk_size, ubound)
timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to)
preparedVops = prepare_vops(vops)
lbound = to
timer.batch_lap()
queue.put(preparedVops)
put_to_queue( queue, preparedVops )
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching vops...")
def get_from_queue( data_queue ):
while not SIG_INT_DURING_SYNC:
try:
ret = data_queue.get(True, 1)
data_queue.task_done()
except queue.Empty:
continue
return ret
return []
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax
is_debug = log.isEnabledFor(10)
@@ -102,31 +130,31 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
wait_time_1 = WSM.start()
if blocksQueue.empty() and CONTINUE_PROCESSING:
if blocksQueue.empty() and not SIG_INT_DURING_SYNC:
log.info("Awaiting any block to process...")
blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get()
blocksQueue.task_done()
if not blocksQueue.empty() or not SIG_INT_DURING_SYNC:
blocks = get_from_queue( blocksQueue )
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start()
if vopsQueue.empty() and CONTINUE_PROCESSING:
if vopsQueue.empty() and not SIG_INT_DURING_SYNC:
log.info("Awaiting any vops to process...")
preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get()
vopsQueue.task_done()
if not vopsQueue.empty() or not SIG_INT_DURING_SYNC:
preparedVops = get_from_queue(vopsQueue)
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
to = min(lbound + chunk_size, ubound)
timer.batch_start()
if SIG_INT_DURING_SYNC:
break;
block_start = perf()
Blocks.process_multi(blocks, preparedVops, is_initial_sync)
block_end = perf()
@@ -157,10 +185,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
if SIG_INT_DURING_SYNC and blocksQueue.empty() and vopsQueue.empty():
break
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during processing blocks...")
finally:
@@ -179,25 +205,23 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
global CONTINUE_PROCESSING
old_sig_int_handler = signal(SIGINT, set_sig_int)
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
pool.shutdown(False)
except KeyboardInterrupt:
log.info(""" **********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
CONTINUE_PROCESSING = False
blocksQueue.join()
vopsQueue.join()
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
if SIG_INT_DURING_SYNC:
pool.shutdown(False)
signal(SIGINT, old_sig_int_handler)
blocksQueue.queue.clear()
vopsQueue.queue.clear()
if SIG_INT_DURING_SYNC:
exit(0)
class Sync:
"""Manages the sync/index process.
Loading