Skip to content
Snippets Groups Projects
Commit 8137a6a9 authored by Holger's avatar Holger
Browse files

Load missing blocks when using threads and fix issue #27

* Unit test for issue #27 added
* Exception handling added to blocks using threads
* get_all_accounts fixed for appbase calls
parent 58f4585b
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ import time
import hashlib
import json
import math
import logging
from datetime import datetime, timedelta
from .utils import formatTimeString, addTzInfo
from .block import Block
......@@ -18,6 +19,7 @@ from .exceptions import BatchedCallsNotSupported, BlockDoesNotExistsException, B
from beemgraphenebase.py23 import py23_bytes
from beem.instance import shared_steem_instance
from .amount import Amount
log = logging.getLogger(__name__)
FUTURES_MODULE = None
if not FUTURES_MODULE:
try:
......@@ -246,40 +248,31 @@ class Blockchain(object):
auto_clean = current_block.get_cache_auto_clean()
current_block.set_cache_auto_clean(False)
latest_block = 0
starting_block = 0
for blocknum in range(start, head_block + 1, thread_num):
futures = []
i = blocknum
block_num_list = []
while i < blocknum + thread_num and i <= head_block:
block_num_list.append(i)
futures.append(pool.submit(Block, i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem))
i += 1
results = [r.result() for r in as_completed(futures)]
block_nums = []
missing_nums = []
starting_block = int(results[0].identifier)
try:
results = [r.result() for r in as_completed(futures)]
except Exception as e:
log.warning(str(e))
result_block_nums = []
for b in results:
block_nums.append(int(b.identifier))
result_block_nums.append(int(b.identifier))
if latest_block < int(b.identifier):
latest_block = int(b.identifier)
if starting_block > int(b.identifier):
starting_block = int(b.identifier)
if starting_block > blocknum:
for i in range(blocknum, starting_block):
missing_nums.append(i)
missing_block_num = list(set(block_num_list).difference(set(result_block_nums)))
if len(missing_block_num) > 0:
for blocknum in missing_block_num:
block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
results.append(block)
from operator import itemgetter
blocks = sorted(results, key=itemgetter('id'))
last_i = blocks[0].identifier - 1
for b in blocks:
if b.identifier - last_i > 1:
for j in range(last_i + 1, b.identifier):
missing_nums.append(j)
last_i = b.identifier
if len(missing_nums) > 0:
for block_num in missing_nums:
block = Block(block_num, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
results.append(block)
blocks = sorted(results, key=itemgetter('id'))
for b in blocks:
yield b
current_block.clear_cache_from_expired_items()
......@@ -580,10 +573,13 @@ class Blockchain(object):
:param str stop: Stop at this account name
:param int steps: Obtain ``steps`` ret with a single call from RPC
"""
lastname = start
cnt = 1
if not self.steem.is_connected():
raise OfflineHasNoRPCException("No RPC available in offline mode!")
if self.steem.rpc.get_use_appbase() and start == "":
lastname = None
else:
lastname = start
self.steem.rpc.set_next_node_on_empty_reply(False)
while True:
if self.steem.rpc.get_use_appbase():
......@@ -591,12 +587,17 @@ class Blockchain(object):
else:
ret = self.steem.rpc.lookup_accounts(lastname, steps)
for account in ret:
yield account
cnt += 1
if account == stop or (limit > 0 and cnt > limit):
raise StopIteration
if lastname == ret[-1]:
if isinstance(account, dict):
account_name = account["name"]
else:
account_name = account
if account_name != lastname:
yield account_name
cnt += 1
if account_name == stop or (limit > 0 and cnt > limit):
raise StopIteration
if lastname == account_name:
raise StopIteration
lastname = ret[-1]
lastname = account_name
if len(ret) < steps:
raise StopIteration
......@@ -104,6 +104,9 @@ class Testcases(unittest.TestCase):
for acc in b.get_all_accounts(steps=100, limit=100):
accounts.append(acc)
self.assertEqual(len(accounts), 100)
limit = 5000
self.assertEqual(len(list(b.get_all_accounts(limit=limit))), limit)
self.assertEqual(len(set(b.get_all_accounts(limit=limit))), limit)
@parameterized.expand([
("non_appbase"),
......
......@@ -39,7 +39,7 @@ class Testcases(unittest.TestCase):
b = Blockchain(steem_instance=cls.bts)
num = b.get_current_block_num()
cls.start = num - 25
cls.start = num - 100
cls.stop = num
cls.max_batch_size = 1 # appbase does not support batch rpc calls at the momement (internal error)
......
......@@ -36,9 +36,8 @@ class Testcases(unittest.TestCase):
cls.bts.set_default_account("test")
b = Blockchain(steem_instance=cls.bts)
# num = b.get_current_block_num()
num = 2000000
cls.start = num - 25
num = b.get_current_block_num()
cls.start = num - 100
cls.stop = num
def test_stream_threading(self):
......@@ -46,7 +45,7 @@ class Testcases(unittest.TestCase):
b = Blockchain(steem_instance=bts)
ops_stream = []
opNames = ["transfer", "vote"]
for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=8):
ops_stream.append(op)
......@@ -58,7 +57,7 @@ class Testcases(unittest.TestCase):
for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=8):
ops_blocks.append(op)
self.assertEqual(op.identifier, last_id + 1)
last_id += 1
last_id += 1
op_stat4 = {"transfer": 0, "vote": 0}
self.assertTrue(len(ops_blocks) > 0)
for block in ops_blocks:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment