Skip to content
Snippets Groups Projects
Commit 6b5526b2 authored by Holger's avatar Holger
Browse files

Add Discussions_by_author_before_date and improve some other discussion classes

beemapi/Nodes
* add option to freeze a node, in order to prevent switching to the next node
parent 344e461e
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ import logging
from datetime import datetime, timedelta
from .utils import formatTimeString, addTzInfo
from .block import Block
from beemapi.node import Nodes
from .exceptions import BatchedCallsNotSupported, BlockDoesNotExistsException, BlockWaitTimeExceeded, OfflineHasNoRPCException
from beemapi.exceptions import NumRetriesReached
from beemgraphenebase.py23 import py23_bytes
......@@ -63,6 +64,7 @@ class Worker(Thread):
# if not self.idle.is_set():
# print >> stdout, '%s is idle' % self.name
self.idle.set()
# time.sleep(1)
continue
try:
......@@ -389,6 +391,11 @@ class Blockchain(object):
results = []
block_num_list = []
current_block.set_cache_auto_clean(False)
freeze = self.steem.rpc.nodes.freeze_current_node
num_retries = self.steem.rpc.nodes.num_retries
self.steem.rpc.nodes.freeze_current_node = True
self.steem.rpc.nodes.num_retries = 1
error_cnt = self.steem.rpc.nodes.node.error_cnt
while i < thread_num and blocknum + i <= head_block:
block_num_list.append(blocknum + i)
pool.enqueue(Block, blocknum + i, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
......@@ -400,8 +407,29 @@ class Blockchain(object):
pool.abort()
current_block.clear_cache_from_expired_items()
current_block.set_cache_auto_clean(auto_clean)
self.steem.rpc.nodes.num_retries = num_retries
self.steem.rpc.nodes.freeze_current_node = freeze
new_error_cnt = self.steem.rpc.nodes.node.error_cnt
self.steem.rpc.nodes.node.error_cnt = error_cnt
if new_error_cnt > error_cnt:
self.steem.rpc.nodes.node.error_cnt += 1
self.steem.rpc.next()
checked_results = []
for b in results:
if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b:
if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.identifier) not in result_block_nums:
checked_results.append(b)
result_block_nums.append(int(b.identifier))
missing_block_num = list(set(block_num_list).difference(set(result_block_nums)))
if len(missing_block_num) > 0:
for blocknum in missing_block_num:
block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
checked_results.append(block)
result_block_nums.append(int(block.identifier))
from operator import itemgetter
blocks = sorted(results, key=itemgetter('id'))
blocks = sorted(checked_results, key=itemgetter('id'))
for b in blocks:
if latest_block < int(b.identifier):
latest_block = int(b.identifier)
......
......@@ -69,6 +69,30 @@ class Discussions_by_trending(list):
)
class Discussions_by_author_before_date(list):
""" Get Discussions by author before date
:param beem.discussions.Query: discussion_query
:param beem.steem.Steem steem_instance: Steem instance
"""
def __init__(self, author="", start_permlink="", before_date="1970-01-01T00:00:00", limit=100, steem_instance=None):
self.steem = steem_instance or shared_steem_instance()
limit_ok = limit > 0
self.steem.rpc.set_next_node_on_empty_reply(limit_ok)
if self.steem.rpc.get_use_appbase():
discussion_query = {"author": author, "start_permlink": start_permlink, "before_date": before_date, "limit": limit}
posts = self.steem.rpc.get_discussions_by_author_before_date(discussion_query, api="tags")['discussions']
else:
posts = self.steem.rpc.get_discussions_by_author_before_date(author, start_permlink, before_date, limit)
super(Discussions_by_author_before_date, self).__init__(
[
Comment(x, steem_instance=self.steem)
for x in posts
]
)
class Comment_discussions_by_payout(list):
""" Get comment_discussions_by_payout
......@@ -255,8 +279,6 @@ class Discussions_by_feed(list):
"""
def __init__(self, discussion_query, steem_instance=None):
self.steem = steem_instance or shared_steem_instance()
limit_ok = "limit" in discussion_query and discussion_query["limit"] > 0
self.steem.rpc.set_next_node_on_empty_reply(limit_ok)
if self.steem.rpc.get_use_appbase():
posts = self.steem.rpc.get_discussions_by_feed(discussion_query, api="tags")['discussions']
else:
......@@ -302,13 +324,12 @@ class Discussions_by_blog(list):
class Discussions_by_comments(list):
""" Get discussions by comments
:param beem.discussions.Query: discussion_query
:param beem.discussions.Query: discussion_query, start_author and start_permlink must be set.
:param beem.steem.Steem steem_instance: Steem instance
"""
def __init__(self, discussion_query, steem_instance=None):
self.steem = steem_instance or shared_steem_instance()
limit_ok = "limit" in discussion_query and discussion_query["limit"] > 0
self.steem.rpc.set_next_node_on_empty_reply(limit_ok)
if self.steem.rpc.get_use_appbase():
posts = self.steem.rpc.get_discussions_by_comments(discussion_query, api="tags")['discussions']
else:
......
......@@ -46,12 +46,15 @@ class Nodes(list):
self.num_retries = num_retries
self.num_retries_call = num_retries_call
self.current_node_index = -1
self.freeze_current_node = False
def __iter__(self):
return self
def __next__(self):
next_node_count = 0
if self.freeze_current_node:
return self.url
while next_node_count == 0 and (self.num_retries < 0 or self.node.error_cnt < self.num_retries):
self.current_node_index += 1
if self.current_node_index >= self.working_nodes_count:
......@@ -73,6 +76,13 @@ class Nodes(list):
@property
def working_nodes_count(self):
n = 0
if self.freeze_current_node:
i = self.current_node_index
if self.current_node_index < 0:
i = 0
if self.num_retries < 0 or self[i].error_cnt <= self.num_retries:
n += 1
return n
for i in range(len(self)):
if self.num_retries < 0 or self[i].error_cnt <= self.num_retries:
n += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment