From 62fe18cdec72ecc39317dc75eb75c4f7d68e82bf Mon Sep 17 00:00:00 2001 From: Holger <holger@nahrstaedt.de> Date: Wed, 20 Jun 2018 13:07:21 +0200 Subject: [PATCH] Block and Blockchain improved and Readme adapted to changes --- README.rst | 3 +++ beem/block.py | 38 ++++++++++++++++++-------------------- beem/blockchain.py | 32 ++++++++++++++++++++++++-------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/README.rst b/README.rst index 0e18af35..6d3a3e5d 100644 --- a/README.rst +++ b/README.rst @@ -160,7 +160,10 @@ Changelog ------- * get_feed_entries, get_blog_authors, get_savings_withdrawals, get_escrow, verify_account_authority, get_expiring_vesting_delegations, get_vesting_delegations, get_tags_used_by_author added to Account * get_account_reputations, get_account_count added to Blockchain +* Replies_by_last_update, Trending_tags, Discussions_by_author_before_date * ImageUploader class added +* Score calculation improved in update_nodes +* apidefinitions added to docs, which includes a complete condenser API call list. 0.19.38 ------- diff --git a/beem/block.py b/beem/block.py index 7c112545..0532b60a 100644 --- a/beem/block.py +++ b/beem/block.py @@ -172,14 +172,14 @@ class Block(BlockchainObject): if self.only_ops or self.only_virtual_ops: return list() trxs = [] - i = 0 + trx_id = 0 for trx in self["transactions"]: - trx_new = trx.copy() - trx_new['transaction_id'] = self['transaction_ids'][i] - trx_new['block_num'] = self.block_num - trx_new['transaction_num'] = i + trx_new = {"transaction_id": self['transaction_ids'][trx_id]} + trx_new.update(trx.copy()) + trx_new.update({"block_num": self.block_num, + "transaction_num": trx_id}) trxs.append(trx_new) - i += 1 + trx_id += 1 return trxs @property @@ -193,7 +193,7 @@ class Block(BlockchainObject): for op in tx["operations"]: # Replace opid by op name # op[0] = getOperationNameForId(op[0]) - ops.append(op) + ops.append(op.copy()) return ops @property @@ -202,20 +202,19 @@ class Block(BlockchainObject): if self.only_ops or self.only_virtual_ops: return list() trxs = [] - i = 0 + trx_id = 0 for trx in self["transactions"]: - trx_new = trx.copy() - trx_new['transaction_id'] = self['transaction_ids'][i] - trx_new['block_num'] = self.block_num - trx_new['transaction_num'] = i + trx_new = {"transaction_id": self['transaction_ids'][trx_id]} + trx_new.update(trx.copy()) + trx_new.update({"block_num": self.block_num, + "transaction_num": trx_id}) if 'expiration' in trx: p_date = trx.get('expiration', datetime(1970, 1, 1, 0, 0)) if isinstance(p_date, (datetime, date)): - trx_new['expiration'] = formatTimeString(p_date) - else: - trx_new['expiration'] = p_date + trx_new.update({'expiration': formatTimeString(p_date)}) + trxs.append(trx_new) - i += 1 + trx_id += 1 return trxs @property @@ -228,13 +227,12 @@ class Block(BlockchainObject): for op in tx["operations"]: # Replace opid by op name # op[0] = getOperationNameForId(op[0]) + op_new = op.copy() if 'timestamp' in op: p_date = op.get('timestamp', datetime(1970, 1, 1, 0, 0)) if isinstance(p_date, (datetime, date)): - op['timestamp'] = formatTimeString(p_date) - else: - op['timestamp'] = p_date - ops.append(op) + op_new.update({'timestamp': formatTimeString(p_date)}) + ops.append(op_new) return ops def ops_statistics(self, add_to_ops_stat=None): diff --git a/beem/blockchain.py b/beem/blockchain.py index 8808d541..e63c7a05 100644 --- a/beem/blockchain.py +++ b/beem/blockchain.py @@ -29,6 +29,13 @@ if sys.version_info < (3, 0): from Queue import Queue else: from queue import Queue +FUTURES_MODULE = None +if not FUTURES_MODULE: + try: + from concurrent.futures import ThreadPoolExecutor, wait, as_completed + FUTURES_MODULE = "futures" + except ImportError: + FUTURES_MODULE = None # default exception handler. if you want to take some action on failed tasks @@ -382,8 +389,9 @@ class Blockchain(object): if not start: start = current_block_num head_block_reached = False - if threading: - # pool = ThreadPoolExecutor(max_workers=thread_num + 1) + if threading and FUTURES_MODULE is not None: + pool = ThreadPoolExecutor(max_workers=thread_num) + elif threading: pool = Pool(thread_num, batch_mode=True) # We are going to loop indefinitely while True: @@ -403,6 +411,8 @@ class Blockchain(object): # futures = [] i = 0 results = [] + if FUTURES_MODULE is not None: + futures = [] block_num_list = [] current_block.set_cache_auto_clean(False) freeze = self.steem.rpc.nodes.freeze_current_node @@ -412,13 +422,19 @@ class Blockchain(object): error_cnt = self.steem.rpc.nodes.node.error_cnt while i < thread_num and blocknum + i <= head_block: block_num_list.append(blocknum + i) - pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) + if FUTURES_MODULE is not None: + futures.append(pool.submit(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)) + else: + pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) i += 1 - pool.run(True) - pool.join() - for result in pool.results(): - results.append(result) - pool.abort() + if FUTURES_MODULE is not None: + results = [r.result() for r in as_completed(futures)] + else: + pool.run(True) + pool.join() + for result in pool.results(): + results.append(result) + pool.abort() current_block.clear_cache_from_expired_items() current_block.set_cache_auto_clean(auto_clean) self.steem.rpc.nodes.num_retries = num_retries -- GitLab