diff --git a/beem/blockchain.py b/beem/blockchain.py index 166057e4728dbd36fc16e2871779558b23c937bb..9f738d5c6910106e10b3b969cfa5046994569bb9 100644 --- a/beem/blockchain.py +++ b/beem/blockchain.py @@ -18,6 +18,7 @@ import logging from datetime import datetime, timedelta from .utils import formatTimeString, addTzInfo from .block import Block +from beemapi.node import Nodes from .exceptions import BatchedCallsNotSupported, BlockDoesNotExistsException, BlockWaitTimeExceeded, OfflineHasNoRPCException from beemapi.exceptions import NumRetriesReached from beemgraphenebase.py23 import py23_bytes @@ -63,6 +64,7 @@ class Worker(Thread): # if not self.idle.is_set(): # print >> stdout, '%s is idle' % self.name self.idle.set() + # time.sleep(1) continue try: @@ -389,6 +391,11 @@ class Blockchain(object): results = [] block_num_list = [] current_block.set_cache_auto_clean(False) + freeze = self.steem.rpc.nodes.freeze_current_node + num_retries = self.steem.rpc.nodes.num_retries + self.steem.rpc.nodes.freeze_current_node = True + self.steem.rpc.nodes.num_retries = 1 + error_cnt = self.steem.rpc.nodes.node.error_cnt while i < thread_num and blocknum + i <= head_block: block_num_list.append(blocknum + i) pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) @@ -400,8 +407,29 @@ class Blockchain(object): pool.abort() current_block.clear_cache_from_expired_items() current_block.set_cache_auto_clean(auto_clean) + self.steem.rpc.nodes.num_retries = num_retries + self.steem.rpc.nodes.freeze_current_node = freeze + new_error_cnt = self.steem.rpc.nodes.node.error_cnt + self.steem.rpc.nodes.node.error_cnt = error_cnt + if new_error_cnt > error_cnt: + self.steem.rpc.nodes.node.error_cnt += 1 + self.steem.rpc.next() + + checked_results = [] + for b in results: + if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b: + if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.identifier) not in result_block_nums: + checked_results.append(b) + result_block_nums.append(int(b.identifier)) + + missing_block_num = list(set(block_num_list).difference(set(result_block_nums))) + if len(missing_block_num) > 0: + for blocknum in missing_block_num: + block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem) + checked_results.append(block) + result_block_nums.append(int(block.identifier)) from operator import itemgetter - blocks = sorted(results, key=itemgetter('id')) + blocks = sorted(checked_results, key=itemgetter('id')) for b in blocks: if latest_block < int(b.identifier): latest_block = int(b.identifier) diff --git a/beem/discussions.py b/beem/discussions.py index ec4e0a2a2bfb195b35fd6eec372b246b697537ee..82409d88b1fedcf1c554f80af13c3de4de67c5d6 100644 --- a/beem/discussions.py +++ b/beem/discussions.py @@ -69,6 +69,30 @@ class Discussions_by_trending(list): ) +class Discussions_by_author_before_date(list): + """ Get Discussions by author before date + + :param beem.discussions.Query: discussion_query + :param beem.steem.Steem steem_instance: Steem instance + + """ + def __init__(self, author="", start_permlink="", before_date="1970-01-01T00:00:00", limit=100, steem_instance=None): + self.steem = steem_instance or shared_steem_instance() + limit_ok = limit > 0 + self.steem.rpc.set_next_node_on_empty_reply(limit_ok) + if self.steem.rpc.get_use_appbase(): + discussion_query = {"author": author, "start_permlink": start_permlink, "before_date": before_date, "limit": limit} + posts = self.steem.rpc.get_discussions_by_author_before_date(discussion_query, api="tags")['discussions'] + else: + posts = self.steem.rpc.get_discussions_by_author_before_date(author, start_permlink, before_date, limit) + super(Discussions_by_author_before_date, self).__init__( + [ + Comment(x, steem_instance=self.steem) + for x in posts + ] + ) + + class Comment_discussions_by_payout(list): """ Get comment_discussions_by_payout @@ -255,8 +279,6 @@ class Discussions_by_feed(list): """ def __init__(self, discussion_query, steem_instance=None): self.steem = steem_instance or shared_steem_instance() - limit_ok = "limit" in discussion_query and discussion_query["limit"] > 0 - self.steem.rpc.set_next_node_on_empty_reply(limit_ok) if self.steem.rpc.get_use_appbase(): posts = self.steem.rpc.get_discussions_by_feed(discussion_query, api="tags")['discussions'] else: @@ -302,13 +324,12 @@ class Discussions_by_blog(list): class Discussions_by_comments(list): """ Get discussions by comments - :param beem.discussions.Query: discussion_query + :param beem.discussions.Query: discussion_query, start_author and start_permlink must be set. :param beem.steem.Steem steem_instance: Steem instance + """ def __init__(self, discussion_query, steem_instance=None): self.steem = steem_instance or shared_steem_instance() - limit_ok = "limit" in discussion_query and discussion_query["limit"] > 0 - self.steem.rpc.set_next_node_on_empty_reply(limit_ok) if self.steem.rpc.get_use_appbase(): posts = self.steem.rpc.get_discussions_by_comments(discussion_query, api="tags")['discussions'] else: diff --git a/beemapi/node.py b/beemapi/node.py index 0075114e8cd9ab8ed7c054505f44cdb74888f525..4995eaabed88121dee7f0da989b23b3c32c335cc 100644 --- a/beemapi/node.py +++ b/beemapi/node.py @@ -46,12 +46,15 @@ class Nodes(list): self.num_retries = num_retries self.num_retries_call = num_retries_call self.current_node_index = -1 + self.freeze_current_node = False def __iter__(self): return self def __next__(self): next_node_count = 0 + if self.freeze_current_node: + return self.url while next_node_count == 0 and (self.num_retries < 0 or self.node.error_cnt < self.num_retries): self.current_node_index += 1 if self.current_node_index >= self.working_nodes_count: @@ -73,6 +76,13 @@ class Nodes(list): @property def working_nodes_count(self): n = 0 + if self.freeze_current_node: + i = self.current_node_index + if self.current_node_index < 0: + i = 0 + if self.num_retries < 0 or self[i].error_cnt <= self.num_retries: + n += 1 + return n for i in range(len(self)): if self.num_retries < 0 or self[i].error_cnt <= self.num_retries: n += 1