From fe039357cab0b26d8fe3eb2a1d9568724cd66cf2 Mon Sep 17 00:00:00 2001
From: Holger <holger@nahrstaedt.de>
Date: Thu, 21 Jun 2018 23:09:31 +0200
Subject: [PATCH] Try to improve blocks and streams with threading

* stream_threading_performance example added to evaluate threading
* _switch_to_next_node added to Steemnoderpc
* 5 tags restriction removed from post()
---
 beem/block.py                            | 14 +++++-
 beem/blockchain.py                       | 31 +++++++-----
 beem/steem.py                            |  3 --
 beemapi/steemnoderpc.py                  | 30 +++++++-----
 examples/stream_threading_performance.py | 62 ++++++++++++++++++++++++
 5 files changed, 109 insertions(+), 31 deletions(-)
 create mode 100644 examples/stream_threading_performance.py

diff --git a/beem/block.py b/beem/block.py
index a17bc9f3..a2c4719a 100644
--- a/beem/block.py
+++ b/beem/block.py
@@ -150,7 +150,7 @@ class Block(BlockchainObject):
             else:
                 block = self.steem.rpc.get_block(self.identifier)
         if not block:
-            raise BlockDoesNotExistsException(str(self.identifier))
+            raise BlockDoesNotExistsException("output: %s of identifier %s" % (str(block), str(self.identifier)))
         block = self._parse_json_data(block)
         super(Block, self).__init__(block, lazy=self.lazy, full=self.full, steem_instance=self.steem)
 
@@ -174,6 +174,8 @@ class Block(BlockchainObject):
         if self.only_ops or self.only_virtual_ops:
             return list()
         trxs = []
+        if "transactions" not in self:
+            return []
         trx_id = 0
         for trx in self["transactions"]:
             trx_new = {"transaction_id": self['transaction_ids'][trx_id]}
@@ -190,8 +192,12 @@ class Block(BlockchainObject):
         if self.only_ops or self.only_virtual_ops:
             return self["operations"]
         ops = []
-        trxs = self["transactions"]
+        trxs = []
+        if "transactions" in self:
+            trxs = self["transactions"]
         for tx in trxs:
+            if "operations" not in tx:
+                continue
             for op in tx["operations"]:
                 # Replace opid by op name
                 # op[0] = getOperationNameForId(op[0])
@@ -207,6 +213,8 @@ class Block(BlockchainObject):
         if self.only_ops or self.only_virtual_ops:
             return list()
         trxs = []
+        if "transactions" not in self:
+            return []
         trx_id = 0
         for trx in self["transactions"]:
             trx_new = {"transaction_id": self['transaction_ids'][trx_id]}
@@ -230,6 +238,8 @@ class Block(BlockchainObject):
         ops = []
         for tx in self["transactions"]:
             for op in tx["operations"]:
+                if "operations" not in tx:
+                    continue
                 # Replace opid by op name
                 # op[0] = getOperationNameForId(op[0])
                 if isinstance(op, list):
diff --git a/beem/blockchain.py b/beem/blockchain.py
index c349fdca..a54a3a09 100644
--- a/beem/blockchain.py
+++ b/beem/blockchain.py
@@ -34,6 +34,7 @@ if not FUTURES_MODULE:
     try:
         from concurrent.futures import ThreadPoolExecutor, wait, as_completed
         FUTURES_MODULE = "futures"
+        # FUTURES_MODULE = None
     except ImportError:
         FUTURES_MODULE = None
 
@@ -41,7 +42,7 @@ if not FUTURES_MODULE:
 # default exception handler. if you want to take some action on failed tasks
 # maybe add the task back into the queue, then make your own handler and pass it in
 def default_handler(name, exception, *args, **kwargs):
-    print('%s raised %s with args %s and kwargs %s' % (name, str(exception), repr(args), repr(kwargs)))
+    log.warn('%s raised %s with args %s and kwargs %s' % (name, str(exception), repr(args), repr(kwargs)))
     pass
 
 
@@ -281,7 +282,7 @@ class Blockchain(object):
         if props is None:
             raise ValueError("Could not receive dynamic_global_properties!")
         if self.mode not in props:
-            raise ValueError(self.mode + " is not in " + props)
+            raise ValueError(self.mode + " is not in " + str(props))
         return int(props.get(self.mode))
 
     def get_current_block(self, only_ops=False, only_virtual_ops=False):
@@ -415,9 +416,9 @@ class Blockchain(object):
                     block_num_list = []
                     current_block.set_cache_auto_clean(False)
                     freeze = self.steem.rpc.nodes.freeze_current_node
-                    # num_retries = self.steem.rpc.nodes.num_retries
-                    self.steem.rpc.nodes.freeze_current_node = True
-                    # self.steem.rpc.nodes.num_retries = 1
+                    num_retries = self.steem.rpc.nodes.num_retries
+                    # self.steem.rpc.nodes.freeze_current_node = True
+                    self.steem.rpc.nodes.num_retries = thread_num
                     error_cnt = self.steem.rpc.nodes.node.error_cnt
                     while i < thread_num and blocknum + i <= head_block:
                         block_num_list.append(blocknum + i)
@@ -440,29 +441,33 @@ class Blockchain(object):
                         pool.abort()
                     current_block.clear_cache_from_expired_items()
                     current_block.set_cache_auto_clean(auto_clean)
-                    # self.steem.rpc.nodes.num_retries = num_retries
+                    self.steem.rpc.nodes.num_retries = num_retries
                     self.steem.rpc.nodes.freeze_current_node = freeze
                     new_error_cnt = self.steem.rpc.nodes.node.error_cnt
                     self.steem.rpc.nodes.node.error_cnt = error_cnt
                     if new_error_cnt > error_cnt:
                         self.steem.rpc.nodes.node.error_cnt += 1
-                        self.steem.rpc.next()
+                    #    self.steem.rpc.next()
 
                     checked_results = []
                     for b in results:
                         b["id"] = b.block_num
                         b.identifier = b.block_num
-                        if isinstance(b, dict) and "transactions" in b and "transaction_ids" in b:
-                            if len(b["transactions"]) == len(b["transaction_ids"]) and int(b.block_num) not in result_block_nums:
+                        if len(b.operations) > 0:
+                            if int(b.block_num) not in result_block_nums:
                                 checked_results.append(b)
                                 result_block_nums.append(int(b.block_num))
 
                     missing_block_num = list(set(block_num_list).difference(set(result_block_nums)))
-                    if len(missing_block_num) > 0:
+                    while len(missing_block_num) > 0:
                         for blocknum in missing_block_num:
-                            block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
-                            checked_results.append(block)
-                            result_block_nums.append(int(block.block_num))
+                            try:
+                                block = Block(blocknum, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
+                                checked_results.append(block)
+                                result_block_nums.append(int(block.block_num))
+                            except Exception as e:
+                                log.error(str(e))
+                        missing_block_num = list(set(block_num_list).difference(set(result_block_nums)))
                     from operator import itemgetter
                     blocks = sorted(checked_results, key=itemgetter('id'))
                     for b in blocks:
diff --git a/beem/steem.py b/beem/steem.py
index 7620fa9a..8f571ba4 100644
--- a/beem/steem.py
+++ b/beem/steem.py
@@ -1303,9 +1303,6 @@ class Steem(object):
         category = None
         tags = tags or json_metadata.get('tags', [])
         if tags:
-            if len(tags) > 5:
-                raise ValueError('Can only specify up to 5 tags per post.')
-
             # first tag should be a category
             category = tags[0]
             json_metadata.update({"tags": tags})
diff --git a/beemapi/steemnoderpc.py b/beemapi/steemnoderpc.py
index d021750b..52115df2 100644
--- a/beemapi/steemnoderpc.py
+++ b/beemapi/steemnoderpc.py
@@ -118,7 +118,7 @@ class SteemNodeRPC(GrapheneRPC):
             raise exceptions.NoMethodWithName(msg)
         elif re.search("Could not find API", msg):
             if self._check_api_name(msg):
-                raise exceptions.ApiNotSupported(msg)
+                self._switch_to_next_node(msg, "ApiNotSupported")
             else:
                 raise exceptions.NoApiWithName(msg)
         elif re.search("irrelevant signature included", msg):
@@ -132,22 +132,14 @@ class SteemNodeRPC(GrapheneRPC):
             self.nodes.sleep_and_check_retries(str(msg), call_retry=True)
             doRetry = True
         elif re.search("!check_max_block_age", str(e)):
-            if self.nodes.working_nodes_count == 1:
-                raise exceptions.UnhandledRPCError(msg)
-            self.nodes.increase_error_cnt()
-            self.nodes.sleep_and_check_retries(str(msg), sleep=False)
-            self.next()
+            self._switch_to_next_node(str(e))
             doRetry = True
         elif re.search("out_of_rangeEEEE: unknown key", msg) or re.search("unknown key:unknown key", msg):
             raise exceptions.UnkownKey(msg)
         elif re.search("Assert Exception:v.is_object(): Input data have to treated as object", msg):
             raise exceptions.UnhandledRPCError("Use Operation(op, appbase=True) to prevent error: " + msg)
         elif re.search("Client returned invalid format. Expected JSON!", msg):
-            if self.nodes.working_nodes_count == 1:
-                raise exceptions.UnhandledRPCError(msg)
-            self.nodes.increase_error_cnt()
-            self.nodes.sleep_and_check_retries(str(msg), sleep=False)
-            self.next()
+            self._switch_to_next_node(msg)
             doRetry = True
         elif msg:
             raise exceptions.UnhandledRPCError(msg)
@@ -155,8 +147,18 @@ class SteemNodeRPC(GrapheneRPC):
             raise e
         return doRetry
 
+    def _switch_to_next_node(self, msg, error_type="UnhandledRPCError"):
+        if self.nodes.working_nodes_count == 1:
+            if error_type == "UnhandledRPCError":
+                raise exceptions.UnhandledRPCError(msg)
+            elif error_type == "ApiNotSupported":
+                raise exceptions.ApiNotSupported(msg)
+        self.nodes.increase_error_cnt()
+        self.nodes.sleep_and_check_retries(str(msg), sleep=False)
+        self.next()
+
     def _check_api_name(self, msg):
-        error_start = "Could not find API "
+        error_start = "Could not find API"
         known_apis = ['account_history_api', 'tags_api',
                       'database_api', 'market_history_api',
                       'block_api', 'account_by_key_api', 'chain_api',
@@ -164,8 +166,10 @@ class SteemNodeRPC(GrapheneRPC):
                       'witness_api', 'test_api',
                       'network_broadcast_api']
         for api in known_apis:
-            if re.search(error_start + api, msg):
+            if re.search(error_start + " " + api, msg):
                 return True
+        if msg[-18:] == error_start:
+            return True
         return False
 
     def get_account(self, name, **kwargs):
diff --git a/examples/stream_threading_performance.py b/examples/stream_threading_performance.py
new file mode 100644
index 00000000..6ede2a88
--- /dev/null
+++ b/examples/stream_threading_performance.py
@@ -0,0 +1,62 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+import sys
+from datetime import datetime, timedelta
+import time
+import io
+import logging
+
+from beem.blockchain import Blockchain
+from beem.block import Block
+from beem.steem import Steem
+from beem.utils import parse_time, formatTimedelta
+from beem.nodelist import NodeList
+log = logging.getLogger(__name__)
+logging.basicConfig(level=logging.INFO)
+
+
+def stream_votes(stm, threading, thread_num):
+    b = Blockchain(steem_instance=stm)
+    opcount = 0
+    start_time = time.time()
+    for op in b.stream(start=23483000, stop=23483200, threading=threading, thread_num=thread_num,
+                       opNames=['vote']):
+        sys.stdout.write("\r%s" % op['block_num'])
+        opcount += 1
+    now = time.time()
+    total_duration = now - start_time
+    print(" votes: %d, time %.2f" % (opcount, total_duration))
+    return opcount, total_duration
+
+
+if __name__ == "__main__":
+    node_setup = 1
+    threading = True
+    thread_num = 8
+    timeout = 10
+    nodes = NodeList()
+    nodes.update_nodes()
+    node_list = nodes.get_nodes()
+
+    vote_result = []
+    duration = []
+    stm = Steem(node=node_list, timeout=timeout)
+    print("Without threading")
+    stream_votes(stm, False, 8)
+    if threading:
+        print("\n Threading with %d threads is activated now." % thread_num)
+
+    for n in range(len(node_list)):
+        print("\n Round %d / %d" % (n, len(node_list)))
+        stm = Steem(node=node_list, timeout=timeout)
+        print(stm)
+        opcount, total_duration = stream_votes(stm, threading, thread_num)
+        vote_result.append(opcount)
+        duration.append(total_duration)
+        node_list = node_list[1:] + [node_list[0]]
+    print("Finished!")
+
+    for n in range(len(node_list)):
+        print(" votes: %d, time %.2f" % (vote_result[n], duration[n]))
-- 
GitLab