Skip to content
Snippets Groups Projects
Commit 8730d584 authored by Marcin's avatar Marcin
Browse files

ctrl+c breaks sync correctly

parent beda6b94
No related branches found
No related tags found
2 merge requests!456Release candidate v1 24,!399exception handling during sync
...@@ -34,10 +34,24 @@ from hive.indexer.mock_vops_provider import MockVopsProvider ...@@ -34,10 +34,24 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime from datetime import datetime
from signal import signal, SIGINT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True CONTINUE_PROCESSING = True
SIG_INT_DURING_SYNC = False
def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC = True
log.info("""
**********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
def prepare_vops(vops_by_block): def prepare_vops(vops_by_block):
preparedVops = {} preparedVops = {}
...@@ -47,26 +61,32 @@ def prepare_vops(vops_by_block): ...@@ -47,26 +61,32 @@ def prepare_vops(vops_by_block):
return preparedVops return preparedVops
def put_to_queue( data_queue, value ):
while not SIG_INT_DURING_SYNC:
try:
data_queue.put( value, True, 1)
return
except queue.Full:
continue
def _block_provider(node, queue, lbound, ubound, chunk_size): def _block_provider(node, queue, lbound, ubound, chunk_size):
try: try:
num = 0 num = 0
count = ubound - lbound count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count) log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound: while not SIG_INT_DURING_SYNC and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
blocks = node.get_blocks_range(lbound, to) blocks = node.get_blocks_range(lbound, to)
lbound = to lbound = to
timer.batch_lap() timer.batch_lap()
queue.put(blocks) put_to_queue( queue, blocks )
num = num + 1 num = num + 1
return num return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during fetching blocks") log.exception("Exception caught during fetching blocks")
return num
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try: try:
...@@ -75,22 +95,30 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): ...@@ -75,22 +95,30 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count) log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound: while not SIG_INT_DURING_SYNC and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to) vops = node.enum_virtual_ops(conf, lbound, to)
preparedVops = prepare_vops(vops) preparedVops = prepare_vops(vops)
lbound = to lbound = to
timer.batch_lap() timer.batch_lap()
queue.put(preparedVops) put_to_queue( queue, preparedVops )
num = num + 1 num = num + 1
return num return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during fetching vops...") log.exception("Exception caught during fetching vops...")
def get_from_queue( data_queue ):
while not SIG_INT_DURING_SYNC:
try:
ret = data_queue.get(True, 1)
data_queue.task_done()
except queue.Empty:
continue
return ret
return []
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax from hive.utils.stats import minmax
is_debug = log.isEnabledFor(10) is_debug = log.isEnabledFor(10)
...@@ -102,31 +130,31 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -102,31 +130,31 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound: while lbound < ubound:
wait_time_1 = WSM.start() wait_time_1 = WSM.start()
if blocksQueue.empty() and CONTINUE_PROCESSING: if blocksQueue.empty() and not SIG_INT_DURING_SYNC:
log.info("Awaiting any block to process...") log.info("Awaiting any block to process...")
blocks = [] blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING: if not blocksQueue.empty() or not SIG_INT_DURING_SYNC:
blocks = blocksQueue.get() blocks = get_from_queue( blocksQueue )
blocksQueue.task_done()
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1)) WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start() wait_time_2 = WSM.start()
if vopsQueue.empty() and CONTINUE_PROCESSING: if vopsQueue.empty() and not SIG_INT_DURING_SYNC:
log.info("Awaiting any vops to process...") log.info("Awaiting any vops to process...")
preparedVops = [] preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING: if not vopsQueue.empty() or not SIG_INT_DURING_SYNC:
preparedVops = vopsQueue.get() preparedVops = get_from_queue(vopsQueue)
vopsQueue.task_done()
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2)) WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
if SIG_INT_DURING_SYNC:
break;
block_start = perf() block_start = perf()
Blocks.process_multi(blocks, preparedVops, is_initial_sync) Blocks.process_multi(blocks, preparedVops, is_initial_sync)
block_end = perf() block_end = perf()
...@@ -157,10 +185,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -157,10 +185,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1 num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): if SIG_INT_DURING_SYNC and blocksQueue.empty() and vopsQueue.empty():
break break
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during processing blocks...") log.exception("Exception caught during processing blocks...")
finally: finally:
...@@ -179,25 +205,23 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -179,25 +205,23 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10) blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10) vopsQueue = queue.Queue(maxsize=10)
global CONTINUE_PROCESSING old_sig_int_handler = signal(SIGINT, set_sig_int)
with ThreadPoolExecutor(max_workers = 4) as pool: with ThreadPoolExecutor(max_workers = 4) as pool:
try: pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
blockConsumerFuture.result() if SIG_INT_DURING_SYNC:
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): pool.shutdown(False)
pool.shutdown(False)
except KeyboardInterrupt: signal(SIGINT, old_sig_int_handler)
log.info(""" ********************************************************** blocksQueue.queue.clear()
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES... vopsQueue.queue.clear()
**********************************************************
""") if SIG_INT_DURING_SYNC:
CONTINUE_PROCESSING = False exit(0)
blocksQueue.join()
vopsQueue.join()
class Sync: class Sync:
"""Manages the sync/index process. """Manages the sync/index process.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment