From 347c7ba791276bfc614775b8a69982fa1c256a46 Mon Sep 17 00:00:00 2001 From: Holger <holger@nahrstaedt.de> Date: Wed, 20 Jun 2018 16:42:24 +0200 Subject: [PATCH] Block identifier handling improved and test_blockchain_treading improved --- beem/block.py | 11 ++---- beem/blockchain.py | 31 ++++++++++------ beem/transactionbuilder.py | 2 +- tests/beem/test_blockchain_threading.py | 49 ++++++++++++++----------- 4 files changed, 52 insertions(+), 41 deletions(-) diff --git a/beem/block.py b/beem/block.py index 0532b60a..7d406894 100644 --- a/beem/block.py +++ b/beem/block.py @@ -123,12 +123,10 @@ class Block(BlockchainObject): """ Even though blocks never change, you freshly obtain its contents from an API with this method """ - if self.identifier is None and "id" in self: - self.identifier = self["id"] - if not isinstance(self.identifier, int): - self.identifier = int(self.identifier) + if self.identifier is None: + return if not self.steem.is_connected(): - return None + return self.steem.rpc.set_next_node_on_empty_reply(False) if self.only_ops or self.only_virtual_ops: if self.steem.rpc.get_use_appbase(): @@ -154,13 +152,12 @@ class Block(BlockchainObject): if not block: raise BlockDoesNotExistsException(str(self.identifier)) block = self._parse_json_data(block) - block["id"] = self.identifier super(Block, self).__init__(block, lazy=self.lazy, full=self.full, steem_instance=self.steem) @property def block_num(self): """Returns the block number""" - return self.identifier + return int(self['block_id'][:8], base=16) def time(self): """Return a datatime instance for the timestamp of this block""" diff --git a/beem/blockchain.py b/beem/blockchain.py index e63c7a05..cdb89250 100644 --- a/beem/blockchain.py +++ b/beem/blockchain.py @@ -410,25 +410,28 @@ class Blockchain(object): for blocknum in range(start, head_block + 1, thread_num): # futures = [] i = 0 - results = [] if FUTURES_MODULE is not None: futures = [] block_num_list = [] current_block.set_cache_auto_clean(False) freeze = self.steem.rpc.nodes.freeze_current_node - num_retries = self.steem.rpc.nodes.num_retries + # num_retries = self.steem.rpc.nodes.num_retries self.steem.rpc.nodes.freeze_current_node = True - self.steem.rpc.nodes.num_retries = 1 + # self.steem.rpc.nodes.num_retries = 1 error_cnt = self.steem.rpc.nodes.node.error_cnt while i < thread_num and blocknum + i <= head_block: block_num_list.append(blocknum + i) + results = [] if FUTURES_MODULE is not None: futures.append(pool.submit(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)) else: pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) i += 1 if FUTURES_MODULE is not None: - results = [r.result() for r in as_completed(futures)] + try: + results = [r.result() for r in as_completed(futures)] + except Exception as e: + log.error(str(e)) else: pool.run(True) pool.join() @@ -437,7 +440,7 @@ class Blockchain(object): pool.abort() current_block.clear_cache_from_expired_items() current_block.set_cache_auto_clean(auto_clean) - self.steem.rpc.nodes.num_retries = num_retries + # self.steem.rpc.nodes.num_retries = num_retries self.steem.rpc.nodes.freeze_current_node = freeze new_error_cnt = self.steem.rpc.nodes.node.error_cnt self.steem.rpc.nodes.node.error_cnt = error_cnt @@ -447,22 +450,24 @@ class Blockchain(object): checked_results = [] for b in results: + b["id"] = b.block_num + b.identifier = b.block_num if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b: - if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.identifier) not in result_block_nums: + if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.block_num) not in result_block_nums: checked_results.append(b) - result_block_nums.append(int(b.identifier)) + result_block_nums.append(int(b.block_num)) missing_block_num = list(set(block_num_list).difference(set(result_block_nums))) if len(missing_block_num) > 0: for blocknum in missing_block_num: block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) checked_results.append(block) - result_block_nums.append(int(block.identifier)) + result_block_nums.append(int(block.block_num)) from operator import itemgetter blocks = sorted(checked_results, key=itemgetter('id')) for b in blocks: - if latest_block < int(b.identifier): - latest_block = int(b.identifier) + if latest_block < int(b.block_num): + latest_block = int(b.block_num) yield b if latest_block <= head_block: @@ -679,6 +684,10 @@ class Blockchain(object): trx = block["transactions"] else: trx = [block] + block_num = 0 + trx_id = "" + _id = "" + timestamp = "" for trx_nr in range(len(trx)): for event in trx[trx_nr]["operations"]: if isinstance(event, list): @@ -702,7 +711,7 @@ class Blockchain(object): block_num = event.get("block") _id = self.hash_op(event["op"]) timestamp = event.get("timestamp") - if not opNames or op_type in opNames: + if not bool(opNames) or op_type in opNames and block_num > 0: if raw_ops: yield {"block_num": block_num, "op": [op_type, op], diff --git a/beem/transactionbuilder.py b/beem/transactionbuilder.py index 69ea7504..a7d0c986 100644 --- a/beem/transactionbuilder.py +++ b/beem/transactionbuilder.py @@ -5,6 +5,7 @@ from __future__ import print_function from __future__ import unicode_literals from builtins import str from future.utils import python_2_unicode_compatible +import logging from beemgraphenebase.py23 import bytes_types, integer_types, string_types, text_type from .account import Account from .utils import formatTimeFromNow @@ -21,7 +22,6 @@ from .exceptions import ( OfflineHasNoRPCException ) from beem.instance import shared_steem_instance -import logging log = logging.getLogger(__name__) diff --git a/tests/beem/test_blockchain_threading.py b/tests/beem/test_blockchain_threading.py index 4b3b7cdc..a1c16b14 100644 --- a/tests/beem/test_blockchain_threading.py +++ b/tests/beem/test_blockchain_threading.py @@ -40,10 +40,26 @@ class Testcases(unittest.TestCase): # cls.N_transfer = 121 # cls.N_vote = 2825 + def test_block_threading(self): + bts = self.bts + b = Blockchain(steem_instance=bts) + blocks_no_threading = [] + for block in b.blocks(start=self.start, stop=self.stop, threading=False, thread_num=8): + blocks_no_threading.append(block) + + for n in range(5): + blocks = [] + for block in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=8): + blocks.append(block) + + for i in range(min(len(blocks), len(blocks_no_threading))): + self.assertEqual(blocks[i]["block_id"], blocks_no_threading[i]["block_id"]) + self.assertEqual(len(blocks_no_threading), len(blocks)) + def test_stream_threading(self): bts = self.bts b = Blockchain(steem_instance=bts) - ops_stream = [] + ops_stream_no_threading = [] opNames = ["transfer", "vote"] @@ -52,29 +68,18 @@ class Testcases(unittest.TestCase): ops_stream_no_threading.append(op) if op["block_num"] not in block_num_list2: block_num_list2.append(op["block_num"]) + for n in range(2): + ops_stream = [] + block_num_list = [] + for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=8): + ops_stream.append(op) + if op["block_num"] not in block_num_list: + block_num_list.append(op["block_num"]) - block_num_list = [] - for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=8): - ops_stream.append(op) - if op["block_num"] not in block_num_list: - block_num_list.append(op["block_num"]) - - self.assertEqual(ops_stream[0]["block_num"], ops_stream_no_threading[0]["block_num"]) - self.assertEqual(ops_stream[-1]["block_num"], ops_stream_no_threading[-1]["block_num"]) - self.assertEqual(len(ops_stream_no_threading), len(ops_stream)) - for i in range(len(ops_stream)): - self.assertEqual(ops_stream[i], ops_stream_no_threading[i]) + self.assertEqual(ops_stream[0]["block_num"], ops_stream_no_threading[0]["block_num"]) + self.assertEqual(ops_stream[-1]["block_num"], ops_stream_no_threading[-1]["block_num"]) + self.assertEqual(len(ops_stream_no_threading), len(ops_stream)) self.assertEqual(len(block_num_list), len(block_num_list2)) for i in range(len(block_num_list)): self.assertEqual(block_num_list[i], block_num_list2[i]) - - blocks = [] - last_id = self.start - 1 - for block in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=8): - blocks.append(block) - self.assertEqual(block.identifier, last_id + 1) - last_id += 1 - - for i in range(len(blocks)): - self.assertEqual(blocks[i]["id"], block_num_list2[i]) -- GitLab