diff --git a/beem/account.py b/beem/account.py index c447566044867c749addd3ae829419001e26f8ee..b3b2a299f8f113c8fe1ba127775a31e5f815c778 100644 --- a/beem/account.py +++ b/beem/account.py @@ -802,6 +802,7 @@ class Account(BlockchainObject): def get_owner_history(self, account=None): """ get_owner_history + :param str account: Account name to get interest for (default None) :rtype: list """ @@ -875,6 +876,26 @@ class Account(BlockchainObject): else: return self.steem.rpc.verify_account_authority(account, keys) + def get_expiring_vesting_delegations(self, start=None, limit=1000, account=None): + """ Returns the expirations for vesting delegations. + + :rtype: list + + """ + if account is None: + account = self + else: + account = Account(account, steem_instance=self.steem) + if not self.steem.is_connected(): + raise OfflineHasNoRPCException("No RPC available in offline mode!") + self.steem.rpc.set_next_node_on_empty_reply(False) + if start is None: + start = addTzInfo(datetime.utcnow()) - timedelta(days=8) + if self.steem.rpc.get_use_appbase(): + return self.steem.rpc.find_vesting_delegation_expirations({'account': account["name"]}, api="database")['delegations'] + else: + return self.steem.rpc.get_expiring_vesting_delegations(account["name"], formatTimeString(start), limit) + def get_account_votes(self, account=None): """Returns all votes that the account has done""" if account is None: diff --git a/beem/blockchain.py b/beem/blockchain.py index 06db703a7d6396f22350e764a9f32a5a5536f73d..36eddcabd13491a2bd100c4ad38a6f980a50ebaa 100644 --- a/beem/blockchain.py +++ b/beem/blockchain.py @@ -339,7 +339,7 @@ class Blockchain(object): ).time() return int(time.mktime(block_time.timetuple())) - def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, thread_limit=1000, only_ops=False, only_virtual_ops=False): + def blocks(self, start=None, stop=None, max_batch_size=None, threading=False, thread_num=8, only_ops=False, only_virtual_ops=False): """ Yields blocks starting from ``start``. :param int start: Starting block @@ -348,7 +348,6 @@ class Blockchain(object): Cannot be combined with threading :param bool threading: Enables threading. Cannot be combined with batch calls :param int thread_num: Defines the number of threads, when `threading` is set. - :param int thread_limit: Thread queue size (Default 1000) :param bool only_ops: Only yield operations (default: False). Cannot be combined with ``only_virtual_ops=True``. :param bool only_virtual_ops: Only yield virtual operations (default: False) @@ -376,21 +375,18 @@ class Blockchain(object): head_block = current_block_num if threading and not head_block_reached: # pool = ThreadPoolExecutor(max_workers=thread_num + 1) - pool = Pool(thread_num + 1) + pool = Pool(thread_num + 1, batch_mode=True) # disable autoclean auto_clean = current_block.get_cache_auto_clean() current_block.set_cache_auto_clean(False) latest_block = start - block_num_step = thread_limit - if block_num_step == 0: - block_num_step = 1 result_block_nums = [] - for blocknum in range(start, head_block + 1, block_num_step): + for blocknum in range(start, head_block + 1, thread_num): # futures = [] i = 0 - block_num_list = [] results = [] - while i < thread_limit and blocknum + i <= head_block: + block_num_list = [] + while i < thread_num and blocknum + i <= head_block: block_num_list.append(blocknum + i) pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) i += 1 @@ -399,14 +395,15 @@ class Blockchain(object): while not pool.done() or not pool.idle(): for result in pool.results(): results.append(result) + pool.join() for result in pool.results(): results.append(result) pool.abort() checked_results = [] for b in results: - if isinstance(b, dict) and "transactions" in b: - if len(b.operations) > 0 and int(b.identifier) not in result_block_nums: + if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b: + if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.identifier) not in result_block_nums: checked_results.append(b) result_block_nums.append(int(b.identifier)) diff --git a/beemapi/graphenerpc.py b/beemapi/graphenerpc.py index 61633eaeb470904d7ec7327696c69e93260bd299..d1564c39add126c0948a0cb9df88d47bb1cb5211 100644 --- a/beemapi/graphenerpc.py +++ b/beemapi/graphenerpc.py @@ -249,6 +249,7 @@ class GrapheneRPC(object): """Close Websocket""" if self.ws is None: return + # if self.ws.connected: self.ws.close() def request_send(self, payload): diff --git a/tests/beem/test_blockchain_threading.py b/tests/beem/test_blockchain_threading.py index a4c73aebfd18e45bf52e102b32a1cddb2fa16ca1..9648edcc8a4b0a49be9adf6af14eef9b3f97ffea 100644 --- a/tests/beem/test_blockchain_threading.py +++ b/tests/beem/test_blockchain_threading.py @@ -47,8 +47,7 @@ class Testcases(unittest.TestCase): ops_stream_no_threading = [] opNames = ["transfer", "vote"] block_num_list = [] - thread_limit = 16 - for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=16, thread_limit=thread_limit): + for op in b.stream(opNames=opNames, start=self.start, stop=self.stop, threading=True, thread_num=16): ops_stream.append(op) if op["block_num"] not in block_num_list: block_num_list.append(op["block_num"]) @@ -67,7 +66,7 @@ class Testcases(unittest.TestCase): ops_blocks = [] last_id = self.start - 1 - for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=16, thread_limit=thread_limit): + for op in b.blocks(start=self.start, stop=self.stop, threading=True, thread_num=16): ops_blocks.append(op) self.assertEqual(op.identifier, last_id + 1) last_id += 1