Skip to content
Snippets Groups Projects

exception handling during sync

Merged Marcin requested to merge mi_exception_handling into develop
2 files
+ 63
35
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 62
35
@@ -35,22 +35,31 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
@@ -35,22 +35,31 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime
from datetime import datetime
from signal import signal, SIGINT
from signal import signal, SIGINT
 
from atomic import AtomicLong
log = logging.getLogger(__name__)
log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
CONTINUE_PROCESSING = True
SIG_INT_DURING_SYNC = False
EXCEPTION_THROWN = AtomicLong(0)
 
SIG_INT_DURING_SYNC = AtomicLong(0)
def set_sig_int(signal, frame):
def set_sig_int(signal, frame):
global SIG_INT_DURING_SYNC
global SIG_INT_DURING_SYNC
SIG_INT_DURING_SYNC = True
SIG_INT_DURING_SYNC += 1
log.info("""
log.info("""
**********************************************************
**********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
**********************************************************
""")
""")
 
def set_exception_thrown():
 
global EXCEPTION_THROWN
 
EXCEPTION_THROWN += 1
 
 
def can_continue_thread():
 
return EXCEPTION_THROWN.value == 0 and SIG_INT_DURING_SYNC.value == 0
 
def prepare_vops(vops_by_block):
def prepare_vops(vops_by_block):
preparedVops = {}
preparedVops = {}
@@ -62,7 +71,7 @@ def prepare_vops(vops_by_block):
@@ -62,7 +71,7 @@ def prepare_vops(vops_by_block):
return preparedVops
return preparedVops
def put_to_queue( data_queue, value ):
def put_to_queue( data_queue, value ):
while not SIG_INT_DURING_SYNC:
while can_continue_thread():
try:
try:
data_queue.put( value, True, 1)
data_queue.put( value, True, 1)
return
return
@@ -75,7 +84,7 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
@@ -75,7 +84,7 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
count = ubound - lbound
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while not SIG_INT_DURING_SYNC and lbound < ubound:
while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound)
to = min(lbound + chunk_size, ubound)
timer.batch_start()
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
blocks = node.get_blocks_range(lbound, to)
@@ -86,7 +95,8 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
@@ -86,7 +95,8 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
return num
return num
except Exception:
except Exception:
log.exception("Exception caught during fetching blocks")
log.exception("Exception caught during fetching blocks")
return num
set_exception_thrown()
 
raise
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try:
try:
@@ -95,7 +105,7 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
@@ -95,7 +105,7 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while not SIG_INT_DURING_SYNC and lbound < ubound:
while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound)
to = min(lbound + chunk_size, ubound)
timer.batch_start()
timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to)
vops = node.enum_virtual_ops(conf, lbound, to)
@@ -108,9 +118,11 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
@@ -108,9 +118,11 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
return num
return num
except Exception:
except Exception:
log.exception("Exception caught during fetching vops...")
log.exception("Exception caught during fetching vops...")
 
set_exception_thrown()
 
raise
def get_from_queue( data_queue ):
def get_from_queue( data_queue ):
while not SIG_INT_DURING_SYNC:
while can_continue_thread():
try:
try:
ret = data_queue.get(True, 1)
ret = data_queue.get(True, 1)
data_queue.task_done()
data_queue.task_done()
@@ -125,26 +137,39 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
@@ -125,26 +137,39 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = 0
num = 0
time_start = OPSM.start()
time_start = OPSM.start()
rate = {}
rate = {}
 
 
def print_summary():
 
stop = OPSM.stop(time_start)
 
log.info("=== TOTAL STATS ===")
 
wtm = WSM.log_global("Total waiting times")
 
ftm = FSM.log_global("Total flush times")
 
otm = OPSM.log_global("All operations present in the processed blocks")
 
ttm = ftm + otm + wtm
 
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
 
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
 
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
 
log.info("=== TOTAL STATS ===")
 
try:
try:
count = ubound - lbound
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
while lbound < ubound:
wait_time_1 = WSM.start()
wait_time_1 = WSM.start()
if blocksQueue.empty() and not SIG_INT_DURING_SYNC:
if blocksQueue.empty() and can_continue_thread():
log.info("Awaiting any block to process...")
log.info("Awaiting any block to process...")
blocks = []
blocks = []
if not blocksQueue.empty() or not SIG_INT_DURING_SYNC:
if not blocksQueue.empty() or can_continue_thread():
blocks = get_from_queue( blocksQueue )
blocks = get_from_queue( blocksQueue )
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start()
wait_time_2 = WSM.start()
if vopsQueue.empty() and not SIG_INT_DURING_SYNC:
if vopsQueue.empty() and can_continue_thread():
log.info("Awaiting any vops to process...")
log.info("Awaiting any vops to process...")
preparedVops = []
preparedVops = []
if not vopsQueue.empty() or not SIG_INT_DURING_SYNC:
if not vopsQueue.empty() or can_continue_thread():
preparedVops = get_from_queue(vopsQueue)
preparedVops = get_from_queue(vopsQueue)
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
@@ -152,7 +177,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
@@ -152,7 +177,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
timer.batch_start()
timer.batch_start()
if SIG_INT_DURING_SYNC:
if not can_continue_thread():
break;
break;
block_start = perf()
block_start = perf()
@@ -185,22 +210,16 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
@@ -185,22 +210,16 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1
num = num + 1
if SIG_INT_DURING_SYNC and blocksQueue.empty() and vopsQueue.empty():
if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty():
break
break
except Exception:
except Exception:
log.exception("Exception caught during processing blocks...")
log.exception("Exception caught during processing blocks...")
finally:
set_exception_thrown()
stop = OPSM.stop(time_start)
print_summary()
log.info("=== TOTAL STATS ===")
raise
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
print_summary()
otm = OPSM.log_global("All operations present in the processed blocks")
return num
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
return num
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
blocksQueue = queue.Queue(maxsize=10)
@@ -208,21 +227,27 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
@@ -208,21 +227,27 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
old_sig_int_handler = signal(SIGINT, set_sig_int)
old_sig_int_handler = signal(SIGINT, set_sig_int)
with ThreadPoolExecutor(max_workers = 4) as pool:
with ThreadPoolExecutor(max_workers = 4) as pool:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
consumer_exception = blockConsumerFuture.exception()
if SIG_INT_DURING_SYNC:
block_exception = block_provider_future.exception()
pool.shutdown(False)
vops_exception = vops_provider_future.exception()
 
 
if consumer_exception:
 
raise consumer_exception
 
 
if block_exception:
 
raise block_exception
 
 
if vops_exception:
 
raise vops_exception
signal(SIGINT, old_sig_int_handler)
signal(SIGINT, old_sig_int_handler)
blocksQueue.queue.clear()
blocksQueue.queue.clear()
vopsQueue.queue.clear()
vopsQueue.queue.clear()
if SIG_INT_DURING_SYNC:
exit(0)
class Sync:
class Sync:
"""Manages the sync/index process.
"""Manages the sync/index process.
@@ -288,7 +313,7 @@ class Sync:
@@ -288,7 +313,7 @@ class Sync:
DbState.before_initial_sync(last_imported_block, hived_head_block)
DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync
# resume initial sync
self.initial()
self.initial()
if not CONTINUE_PROCESSING:
if not can_continue_thread():
return
return
current_imported_block = Blocks.head_num()
current_imported_block = Blocks.head_num()
DbState.finish_initial_sync(current_imported_block)
DbState.finish_initial_sync(current_imported_block)
@@ -311,6 +336,8 @@ class Sync:
@@ -311,6 +336,8 @@ class Sync:
while True:
while True:
# sync up to irreversible block
# sync up to irreversible block
self.from_steemd()
self.from_steemd()
 
if not can_continue_thread():
 
return
try:
try:
# listen for new blocks
# listen for new blocks
@@ -325,7 +352,7 @@ class Sync:
@@ -325,7 +352,7 @@ class Sync:
log.info("[INIT] *** Initial fast sync ***")
log.info("[INIT] *** Initial fast sync ***")
self.from_steemd(is_initial_sync=True)
self.from_steemd(is_initial_sync=True)
if not CONTINUE_PROCESSING:
if not can_continue_thread():
return
return
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
Loading