Skip to content
Snippets Groups Projects
Commit 62fe18cd authored by Holger's avatar Holger
Browse files

Block and Blockchain improved and Readme adapted to changes

parent c955f2ad
No related branches found
No related tags found
No related merge requests found
......@@ -160,7 +160,10 @@ Changelog
-------
* get_feed_entries, get_blog_authors, get_savings_withdrawals, get_escrow, verify_account_authority, get_expiring_vesting_delegations, get_vesting_delegations, get_tags_used_by_author added to Account
* get_account_reputations, get_account_count added to Blockchain
* Replies_by_last_update, Trending_tags, Discussions_by_author_before_date
* ImageUploader class added
* Score calculation improved in update_nodes
* apidefinitions added to docs, which includes a complete condenser API call list.
0.19.38
-------
......
......@@ -172,14 +172,14 @@ class Block(BlockchainObject):
if self.only_ops or self.only_virtual_ops:
return list()
trxs = []
i = 0
trx_id = 0
for trx in self["transactions"]:
trx_new = trx.copy()
trx_new['transaction_id'] = self['transaction_ids'][i]
trx_new['block_num'] = self.block_num
trx_new['transaction_num'] = i
trx_new = {"transaction_id": self['transaction_ids'][trx_id]}
trx_new.update(trx.copy())
trx_new.update({"block_num": self.block_num,
"transaction_num": trx_id})
trxs.append(trx_new)
i += 1
trx_id += 1
return trxs
@property
......@@ -193,7 +193,7 @@ class Block(BlockchainObject):
for op in tx["operations"]:
# Replace opid by op name
# op[0] = getOperationNameForId(op[0])
ops.append(op)
ops.append(op.copy())
return ops
@property
......@@ -202,20 +202,19 @@ class Block(BlockchainObject):
if self.only_ops or self.only_virtual_ops:
return list()
trxs = []
i = 0
trx_id = 0
for trx in self["transactions"]:
trx_new = trx.copy()
trx_new['transaction_id'] = self['transaction_ids'][i]
trx_new['block_num'] = self.block_num
trx_new['transaction_num'] = i
trx_new = {"transaction_id": self['transaction_ids'][trx_id]}
trx_new.update(trx.copy())
trx_new.update({"block_num": self.block_num,
"transaction_num": trx_id})
if 'expiration' in trx:
p_date = trx.get('expiration', datetime(1970, 1, 1, 0, 0))
if isinstance(p_date, (datetime, date)):
trx_new['expiration'] = formatTimeString(p_date)
else:
trx_new['expiration'] = p_date
trx_new.update({'expiration': formatTimeString(p_date)})
trxs.append(trx_new)
i += 1
trx_id += 1
return trxs
@property
......@@ -228,13 +227,12 @@ class Block(BlockchainObject):
for op in tx["operations"]:
# Replace opid by op name
# op[0] = getOperationNameForId(op[0])
op_new = op.copy()
if 'timestamp' in op:
p_date = op.get('timestamp', datetime(1970, 1, 1, 0, 0))
if isinstance(p_date, (datetime, date)):
op['timestamp'] = formatTimeString(p_date)
else:
op['timestamp'] = p_date
ops.append(op)
op_new.update({'timestamp': formatTimeString(p_date)})
ops.append(op_new)
return ops
def ops_statistics(self, add_to_ops_stat=None):
......
......@@ -29,6 +29,13 @@ if sys.version_info < (3, 0):
from Queue import Queue
else:
from queue import Queue
FUTURES_MODULE = None
if not FUTURES_MODULE:
try:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
FUTURES_MODULE = "futures"
except ImportError:
FUTURES_MODULE = None
# default exception handler. if you want to take some action on failed tasks
......@@ -382,8 +389,9 @@ class Blockchain(object):
if not start:
start = current_block_num
head_block_reached = False
if threading:
# pool = ThreadPoolExecutor(max_workers=thread_num + 1)
if threading and FUTURES_MODULE is not None:
pool = ThreadPoolExecutor(max_workers=thread_num)
elif threading:
pool = Pool(thread_num, batch_mode=True)
# We are going to loop indefinitely
while True:
......@@ -403,6 +411,8 @@ class Blockchain(object):
# futures = []
i = 0
results = []
if FUTURES_MODULE is not None:
futures = []
block_num_list = []
current_block.set_cache_auto_clean(False)
freeze = self.steem.rpc.nodes.freeze_current_node
......@@ -412,13 +422,19 @@ class Blockchain(object):
error_cnt = self.steem.rpc.nodes.node.error_cnt
while i < thread_num and blocknum + i <= head_block:
block_num_list.append(blocknum + i)
pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
if FUTURES_MODULE is not None:
futures.append(pool.submit(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem))
else:
pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
i += 1
pool.run(True)
pool.join()
for result in pool.results():
results.append(result)
pool.abort()
if FUTURES_MODULE is not None:
results = [r.result() for r in as_completed(futures)]
else:
pool.run(True)
pool.join()
for result in pool.results():
results.append(result)
pool.abort()
current_block.clear_cache_from_expired_items()
current_block.set_cache_auto_clean(auto_clean)
self.steem.rpc.nodes.num_retries = num_retries
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment