Skip to content
Snippets Groups Projects
Commit dc184607 authored by Holger's avatar Holger
Browse files

get_expiring_vesting_delegations added to account

Blockchain
* thread_limit parameter removed
* batch_mode set to True for threaded blocks()
* block sanity check added for threaded blocks()
parent be264291
No related branches found
No related tags found
No related merge requests found
...@@ -802,6 +802,7 @@ class Account(BlockchainObject): ...@@ -802,6 +802,7 @@ class Account(BlockchainObject):
def get_owner_history(self, account=None): def get_owner_history(self, account=None):
""" get_owner_history """ get_owner_history
:param str account: Account name to get interest for (default None)
:rtype: list :rtype: list
""" """
...@@ -875,6 +876,26 @@ class Account(BlockchainObject): ...@@ -875,6 +876,26 @@ class Account(BlockchainObject):
else: else:
return self.steem.rpc.verify_account_authority(account, keys) return self.steem.rpc.verify_account_authority(account, keys)
def get_expiring_vesting_delegations(self, start=None, limit=1000, account=None):
""" Returns the expirations for vesting delegations.
:rtype: list
"""
if account is None:
account = self
else:
account = Account(account, steem_instance=self.steem)
if not self.steem.is_connected():
raise OfflineHasNoRPCException("No RPC available in offline mode!")
self.steem.rpc.set_next_node_on_empty_reply(False)
if start is None:
start = addTzInfo(datetime.utcnow()) - timedelta(days=8)
if self.steem.rpc.get_use_appbase():
return self.steem.rpc.find_vesting_delegation_expirations({'account': account["name"]}, api="database")['delegations']
else:
return self.steem.rpc.get_expiring_vesting_delegations(account["name"], formatTimeString(start), limit)
def get_account_votes(self, account=None): def get_account_votes(self, account=None):
"""Returns all votes that the account has done""" """Returns all votes that the account has done"""
if account is None: if account is None:
......
...@@ -339,7 +339,7 @@ class Blockchain(object): ...@@ -339,7 +339,7 @@ class Blockchain(object):
).time() ).time()
return int(time.mktime(block_time.timetuple())) return int(time.mktime(block_time.timetuple()))
def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, thread_limit=1000, only_ops=False, only_virtual_ops=False): def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, only_ops=False, only_virtual_ops=False):
""" Yields blocks starting from ``start``. """ Yields blocks starting from ``start``.
:param int start: Starting block :param int start: Starting block
...@@ -348,7 +348,6 @@ class Blockchain(object): ...@@ -348,7 +348,6 @@ class Blockchain(object):
Cannot be combined with threading Cannot be combined with threading
:param bool threading: Enables threading. Cannot be combined with batch calls :param bool threading: Enables threading. Cannot be combined with batch calls
:param int thread_num: Defines the number of threads, when `threading` is set. :param int thread_num: Defines the number of threads, when `threading` is set.
:param int thread_limit: Thread queue size (Default 1000)
:param bool only_ops: Only yield operations (default: False). :param bool only_ops: Only yield operations (default: False).
Cannot be combined with ``only_virtual_ops=True``. Cannot be combined with ``only_virtual_ops=True``.
:param bool only_virtual_ops: Only yield virtual operations (default: False) :param bool only_virtual_ops: Only yield virtual operations (default: False)
...@@ -376,21 +375,18 @@ class Blockchain(object): ...@@ -376,21 +375,18 @@ class Blockchain(object):
head_block = current_block_num head_block = current_block_num
if threading and not head_block_reached: if threading and not head_block_reached:
# pool = ThreadPoolExecutor(max_workers=thread_num + 1) # pool = ThreadPoolExecutor(max_workers=thread_num + 1)
pool = Pool(thread_num + 1) pool = Pool(thread_num + 1, batch_mode=True)
# disable autoclean # disable autoclean
auto_clean = current_block.get_cache_auto_clean() auto_clean = current_block.get_cache_auto_clean()
current_block.set_cache_auto_clean(False) current_block.set_cache_auto_clean(False)
latest_block = start latest_block = start
block_num_step = thread_limit
if block_num_step == 0:
block_num_step = 1
result_block_nums = [] result_block_nums = []
for blocknum in range(start, head_block + 1, block_num_step): for blocknum in range(start, head_block + 1, thread_num):
# futures = [] # futures = []
i = 0 i = 0
block_num_list = []
results = [] results = []
while i < thread_limit and blocknum + i <= head_block: block_num_list = []
while i < thread_num and blocknum + i <= head_block:
block_num_list.append(blocknum + i) block_num_list.append(blocknum + i)
pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
i += 1 i += 1
...@@ -399,14 +395,15 @@ class Blockchain(object): ...@@ -399,14 +395,15 @@ class Blockchain(object):
while not pool.done() or not pool.idle(): while not pool.done() or not pool.idle():
for result in pool.results(): for result in pool.results():
results.append(result) results.append(result)
pool.join()
for result in pool.results(): for result in pool.results():
results.append(result) results.append(result)
pool.abort() pool.abort()
checked_results = [] checked_results = []
for b in results: for b in results:
if isinstance(b, dict) and "transactions" in b: if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b:
if len(b.operations) > 0 and int(b.identifier) not in result_block_nums: if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.identifier) not in result_block_nums:
checked_results.append(b) checked_results.append(b)
result_block_nums.append(int(b.identifier)) result_block_nums.append(int(b.identifier))
......
...@@ -249,6 +249,7 @@ class GrapheneRPC(object): ...@@ -249,6 +249,7 @@ class GrapheneRPC(object):
"""Close Websocket""" """Close Websocket"""
if self.ws is None: if self.ws is None:
return return
# if self.ws.connected:
self.ws.close() self.ws.close()
def request_send(self, payload): def request_send(self, payload):
......
...@@ -47,8 +47,7 @@ class Testcases(unittest.TestCase): ...@@ -47,8 +47,7 @@ class Testcases(unittest.TestCase):
ops_stream_no_threading = [] ops_stream_no_threading = []
opNames = ["transfer", "vote"] opNames = ["transfer", "vote"]
block_num_list = [] block_num_list = []
thread_limit = 16 for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=16):
for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=16, thread_limit=thread_limit):
ops_stream.append(op) ops_stream.append(op)
if op["block_num"] not in block_num_list: if op["block_num"] not in block_num_list:
block_num_list.append(op["block_num"]) block_num_list.append(op["block_num"])
...@@ -67,7 +66,7 @@ class Testcases(unittest.TestCase): ...@@ -67,7 +66,7 @@ class Testcases(unittest.TestCase):
ops_blocks = [] ops_blocks = []
last_id = self.start - 1 last_id = self.start - 1
for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=16, thread_limit=thread_limit): for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=16):
ops_blocks.append(op) ops_blocks.append(op)
self.assertEqual(op.identifier, last_id + 1) self.assertEqual(op.identifier, last_id + 1)
last_id += 1 last_id += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment