Skip to content
Snippets Groups Projects
Commit d90efed9 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'mi_exception_handling' into 'develop'

exception handling during sync

See merge request !399
parents beda6b94 7940ea1b
No related branches found
No related tags found
2 merge requests!456Release candidate v1 24,!399exception handling during sync
...@@ -34,10 +34,33 @@ from hive.indexer.mock_vops_provider import MockVopsProvider ...@@ -34,10 +34,33 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime from datetime import datetime
from signal import signal, SIGINT, SIGTERM
from atomic import AtomicLong
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True CONTINUE_PROCESSING = True
EXCEPTION_THROWN = AtomicLong(0)
FINISH_SIGNAL_DURING_SYNC = AtomicLong(0)
def finish_signals_handler(signal, frame):
global FINISH_SIGNAL_DURING_SYNC
FINISH_SIGNAL_DURING_SYNC += 1
log.info("""
**********************************************************
CAUGHT {}. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""".format( "SIGINT" if signal == SIGINT else "SIGTERM" ) )
def set_exception_thrown():
global EXCEPTION_THROWN
EXCEPTION_THROWN += 1
def can_continue_thread():
return EXCEPTION_THROWN.value == 0 and FINISH_SIGNAL_DURING_SYNC.value == 0
def prepare_vops(vops_by_block): def prepare_vops(vops_by_block):
preparedVops = {} preparedVops = {}
...@@ -47,26 +70,33 @@ def prepare_vops(vops_by_block): ...@@ -47,26 +70,33 @@ def prepare_vops(vops_by_block):
return preparedVops return preparedVops
def put_to_queue( data_queue, value ):
while can_continue_thread():
try:
data_queue.put( value, True, 1)
return
except queue.Full:
continue
def _block_provider(node, queue, lbound, ubound, chunk_size): def _block_provider(node, queue, lbound, ubound, chunk_size):
try: try:
num = 0 num = 0
count = ubound - lbound count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count) log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound: while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
blocks = node.get_blocks_range(lbound, to) blocks = node.get_blocks_range(lbound, to)
lbound = to lbound = to
timer.batch_lap() timer.batch_lap()
queue.put(blocks) put_to_queue( queue, blocks )
num = num + 1 num = num + 1
return num return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during fetching blocks") log.exception("Exception caught during fetching blocks")
set_exception_thrown()
raise
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try: try:
...@@ -75,21 +105,31 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size): ...@@ -75,21 +105,31 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count) log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound: while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to) vops = node.enum_virtual_ops(conf, lbound, to)
preparedVops = prepare_vops(vops) preparedVops = prepare_vops(vops)
lbound = to lbound = to
timer.batch_lap() timer.batch_lap()
queue.put(preparedVops) put_to_queue( queue, preparedVops )
num = num + 1 num = num + 1
return num return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during fetching vops...") log.exception("Exception caught during fetching vops...")
set_exception_thrown()
raise
def get_from_queue( data_queue ):
while can_continue_thread():
try:
ret = data_queue.get(True, 1)
data_queue.task_done()
except queue.Empty:
continue
return ret
return []
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax from hive.utils.stats import minmax
...@@ -97,36 +137,49 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -97,36 +137,49 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = 0 num = 0
time_start = OPSM.start() time_start = OPSM.start()
rate = {} rate = {}
def print_summary():
stop = OPSM.stop(time_start)
log.info("=== TOTAL STATS ===")
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
otm = OPSM.log_global("All operations present in the processed blocks")
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
try: try:
count = ubound - lbound count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound: while lbound < ubound:
wait_time_1 = WSM.start() wait_time_1 = WSM.start()
if blocksQueue.empty() and CONTINUE_PROCESSING: if blocksQueue.empty() and can_continue_thread():
log.info("Awaiting any block to process...") log.info("Awaiting any block to process...")
blocks = [] blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING: if not blocksQueue.empty() or can_continue_thread():
blocks = blocksQueue.get() blocks = get_from_queue( blocksQueue )
blocksQueue.task_done()
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1)) WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start() wait_time_2 = WSM.start()
if vopsQueue.empty() and CONTINUE_PROCESSING: if vopsQueue.empty() and can_continue_thread():
log.info("Awaiting any vops to process...") log.info("Awaiting any vops to process...")
preparedVops = [] preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING: if not vopsQueue.empty() or can_continue_thread():
preparedVops = vopsQueue.get() preparedVops = get_from_queue(vopsQueue)
vopsQueue.task_done()
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2)) WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
if not can_continue_thread():
break;
block_start = perf() block_start = perf()
Blocks.process_multi(blocks, preparedVops, is_initial_sync) Blocks.process_multi(blocks, preparedVops, is_initial_sync)
block_end = perf() block_end = perf()
...@@ -157,47 +210,45 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -157,47 +210,45 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1 num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty():
break break
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception: except Exception:
log.exception("Exception caught during processing blocks...") log.exception("Exception caught during processing blocks...")
finally: set_exception_thrown()
stop = OPSM.stop(time_start) print_summary()
log.info("=== TOTAL STATS ===") raise
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times") print_summary()
otm = OPSM.log_global("All operations present in the processed blocks") return num
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
return num
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10) blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10) vopsQueue = queue.Queue(maxsize=10)
global CONTINUE_PROCESSING old_sig_int_handler = signal(SIGINT, finish_signals_handler)
old_sig_term_handler = signal(SIGTERM, finish_signals_handler)
with ThreadPoolExecutor(max_workers = 4) as pool: with ThreadPoolExecutor(max_workers = 4) as pool:
try: block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size) blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
consumer_exception = blockConsumerFuture.exception()
blockConsumerFuture.result() block_exception = block_provider_future.exception()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): vops_exception = vops_provider_future.exception()
pool.shutdown(False)
except KeyboardInterrupt: if consumer_exception:
log.info(""" ********************************************************** raise consumer_exception
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
********************************************************** if block_exception:
""") raise block_exception
CONTINUE_PROCESSING = False
blocksQueue.join() if vops_exception:
vopsQueue.join() raise vops_exception
signal(SIGINT, old_sig_int_handler)
signal(SIGTERM, old_sig_term_handler)
blocksQueue.queue.clear()
vopsQueue.queue.clear()
class Sync: class Sync:
"""Manages the sync/index process. """Manages the sync/index process.
...@@ -264,7 +315,7 @@ class Sync: ...@@ -264,7 +315,7 @@ class Sync:
DbState.before_initial_sync(last_imported_block, hived_head_block) DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync # resume initial sync
self.initial() self.initial()
if not CONTINUE_PROCESSING: if not can_continue_thread():
return return
current_imported_block = Blocks.head_num() current_imported_block = Blocks.head_num()
DbState.finish_initial_sync(current_imported_block) DbState.finish_initial_sync(current_imported_block)
...@@ -287,6 +338,8 @@ class Sync: ...@@ -287,6 +338,8 @@ class Sync:
while True: while True:
# sync up to irreversible block # sync up to irreversible block
self.from_steemd() self.from_steemd()
if not can_continue_thread():
return
try: try:
# listen for new blocks # listen for new blocks
...@@ -301,7 +354,7 @@ class Sync: ...@@ -301,7 +354,7 @@ class Sync:
log.info("[INIT] *** Initial fast sync ***") log.info("[INIT] *** Initial fast sync ***")
self.from_steemd(is_initial_sync=True) self.from_steemd(is_initial_sync=True)
if not CONTINUE_PROCESSING: if not can_continue_thread():
return return
def from_steemd(self, is_initial_sync=False, chunk_size=1000): def from_steemd(self, is_initial_sync=False, chunk_size=1000):
......
...@@ -92,6 +92,7 @@ if __name__ == "__main__": ...@@ -92,6 +92,7 @@ if __name__ == "__main__":
'diff-match-patch', 'diff-match-patch',
'prometheus-client', 'prometheus-client',
'psutil', 'psutil',
'atomic',
], ],
extras_require={ extras_require={
'dev': [ 'dev': [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment