Skip to content
Snippets Groups Projects
graphenerpc.py 16.87 KiB
"""graphennewsrpc."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import next
from builtins import str
from builtins import object
from itertools import cycle
import json
import logging
import ssl
import sys
import threading
import re
import time
import warnings
from .exceptions import (
    UnauthorizedError, RPCConnection, RPCError, RPCErrorDoRetry, NumRetriesReached, CallRetriesReached, WorkingNodeMissing
)
from .rpcutils import (
    is_network_appbase_ready,
    get_api_name, get_query
)
from .node import Nodes
from beemgraphenebase.version import version as beem_version
from beemgraphenebase.chains import known_chains

WEBSOCKET_MODULE = None
if not WEBSOCKET_MODULE:
    try:
        import websocket
        from websocket._exceptions import WebSocketConnectionClosedException
        WEBSOCKET_MODULE = "websocket"
    except ImportError:
        WEBSOCKET_MODULE = None
REQUEST_MODULE = None
if not REQUEST_MODULE:
    try:
        import requests
        from requests.adapters import HTTPAdapter
        from requests.packages.urllib3.util.retry import Retry
        from requests.exceptions import ConnectionError
        REQUEST_MODULE = "requests"
    except ImportError:
        REQUEST_MODULE = None


log = logging.getLogger(__name__)


class SessionInstance(object):
    """Singelton for the Session Instance"""
    instance = None


def set_session_instance(instance):
    """Set session instance"""
    SessionInstance.instance = instance


def shared_session_instance():
    """Get session instance"""
    if REQUEST_MODULE is None:
        raise Exception()
    if not SessionInstance.instance:
        SessionInstance.instance = requests.Session()
    return SessionInstance.instance


def create_ws_instance(use_ssl=True, enable_multithread=True):
    """Get websocket instance"""
    if WEBSOCKET_MODULE is None:
        raise Exception()
    if use_ssl:
        ssl_defaults = ssl.get_default_verify_paths()
        sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
        return websocket.WebSocket(sslopt=sslopt_ca_certs, enable_multithread=enable_multithread)
    else:
        return websocket.WebSocket(enable_multithread=enable_multithread)


class GrapheneRPC(object):
    """
    This class allows to call API methods synchronously, without callbacks.

    It logs warnings and errors.

    :param str urls: Either a single Websocket/Http URL, or a list of URLs
    :param str user: Username for Authentication
    :param str password: Password for Authentication
    :param int num_retries: Try x times to num_retries to a node on disconnect, -1 for indefinitely
    :param int num_retries_call: Repeat num_retries_call times a rpc call on node error (default is 5)
    :param int timeout: Timeout setting for https nodes (default is 60)
    :param bool autoconnect: When set to false, connection is performed on the first rpc call (default is True)
    :param bool use_condenser: Use the old condenser_api rpc protocol on nodes with version
        0.19.4 or higher. The settings has no effect on nodes with version of 0.19.3 or lower.

    Available APIs:

          * database
          * network_node
          * network_broadcast

    Usage:

        .. code-block:: python

            from beemapi.graphenerpc import GrapheneRPC
            ws = GrapheneRPC("wss://steemd.pevo.science","","")
            print(ws.get_account_count())

            ws = GrapheneRPC("https://api.steemit.com","","")
            print(ws.get_account_count())

    .. note:: This class allows to call methods available via
              websocket. If you want to use the notification
              subsystem, please use ``GrapheneWebsocket`` instead.

    """

    def __init__(self, urls, user=None, password=None, **kwargs):
        """Init."""
        self.rpc_methods = {'offline': -1, 'ws': 0, 'jsonrpc': 1, 'wsappbase': 2, 'appbase': 3}
        self.current_rpc = self.rpc_methods["ws"]
        self._request_id = 0
        self.timeout = kwargs.get('timeout', 60)
        num_retries = kwargs.get("num_retries", -1)
        num_retries_call = kwargs.get("num_retries_call", 5)
        self.use_condenser = kwargs.get("use_condenser", False)
        self.nodes = Nodes(urls, num_retries, num_retries_call)
        if self.nodes.working_nodes_count == 0:
            self.current_rpc = self.rpc_methods["offline"]

        self.user = user
        self.password = password
        self.ws = None
        self.url = None
        self.session = None
        self.rpc_queue = []
        if kwargs.get("autoconnect", True):
            self.rpcconnect()

    @property
    def num_retries(self):
        return self.nodes.num_retries

    @property
    def num_retries_call(self):
        return self.nodes.num_retries_call

    @property
    def error_cnt_call(self):
        return self.nodes.error_cnt_call

    @property
    def error_cnt(self):
        return self.nodes.error_cnt

    def get_request_id(self):
        """Get request id."""
        self._request_id += 1
        return self._request_id

    def next(self):
        """Switches to the next node url"""
        if self.ws:
            try:
                self.ws.close()
            except Exception as e:
                log.warning(str(e))
        self.rpcconnect()

    def is_appbase_ready(self):
        """Check if node is appbase ready"""
        return self.current_rpc in [self.rpc_methods['wsappbase'], self.rpc_methods['appbase']]

    def get_use_appbase(self):
        """Returns True if appbase ready and appbase calls are set"""
        return not self.use_condenser and self.is_appbase_ready()

    def rpcconnect(self, next_url=True):
        """Connect to next url in a loop."""
        if self.nodes.working_nodes_count == 0:
            return
        while True:
            if next_url:
                self.url = next(self.nodes)
                self.nodes.reset_error_cnt_call()
                log.debug("Trying to connect to node %s" % self.url)
                if self.url[:3] == "wss":
                    self.ws = create_ws_instance(use_ssl=True)
                    self.current_rpc = self.rpc_methods["ws"]
                elif self.url[:2] == "ws":
                    self.ws = create_ws_instance(use_ssl=False)
                    self.current_rpc = self.rpc_methods["ws"]
                else:
                    self.ws = None
                    self.session = shared_session_instance()
                    self.current_rpc = self.rpc_methods["jsonrpc"]
                    self.headers = {'User-Agent': 'beem v%s' % (beem_version),
                                    'content-type': 'application/json'}
            try:
                if self.ws:
                    self.ws.connect(self.url)
                    self.rpclogin(self.user, self.password)
                try:
                    props = None
                    if not self.use_condenser:
                        props = self.get_config(api="database")
                    else:
                        props = self.get_config()
                except Exception as e:
                    if re.search("Bad Cast:Invalid cast from type", str(e)):
                        # retry with appbase
                        if self.current_rpc == self.rpc_methods['ws']:
                            self.current_rpc = self.rpc_methods['wsappbase']
                        else:
                            self.current_rpc = self.rpc_methods['appbase']
                        props = self.get_config(api="database")
                if props is None:
                    raise RPCError("Could not recieve answer for get_config")
                if is_network_appbase_ready(props):
                    if self.ws:
                        self.current_rpc = self.rpc_methods["wsappbase"]
                    else:
                        self.current_rpc = self.rpc_methods["appbase"]
                break
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self.nodes.increase_error_cnt()
                do_sleep = not next_url or (next_url and self.nodes.working_nodes_count == 1)
                self.nodes.sleep_and_check_retries(str(e), sleep=do_sleep)
                next_url = True

    def rpclogin(self, user, password):
        """Login into Websocket"""
        if self.ws and self.current_rpc == self.rpc_methods['ws'] and user and password:
            self.login(user, password, api="login_api")

    def rpcclose(self):
        """Close Websocket"""
        if self.ws:
            self.ws.close()

    def request_send(self, payload):
        response = self.session.post(self.url,
                                     data=payload,
                                     headers=self.headers,
                                     timeout=self.timeout,
                                     auth=(self.user, self.password))
        if response.status_code == 401:
            raise UnauthorizedError
        return response.text

    def ws_send(self, payload):
        self.ws.send(payload)
        reply = self.ws.recv()
        return reply

    def get_network(self, props=None):
        """ Identify the connected network. This call returns a
            dictionary with keys chain_id, core_symbol and prefix
        """
        if props is None:
            props = self.get_config(api="database")
        if "STEEMIT_CHAIN_ID" in props:
            chain_id = props["STEEMIT_CHAIN_ID"]
            network_version = props['STEEMIT_BLOCKCHAIN_VERSION']
        elif "STEEM_CHAIN_ID" in props:
            chain_id = props["STEEM_CHAIN_ID"]
            network_version = props['STEEM_BLOCKCHAIN_VERSION']
        else:
            raise("Connecting to unknown network!")
        highest_version_chain = None
        for k, v in list(known_chains.items()):
            if v["chain_id"] == chain_id and v["min_version"] <= network_version:
                if highest_version_chain is None:
                    highest_version_chain = v
                elif v["min_version"] == '0.0.0' and self.use_condenser:
                    highest_version_chain = v
                elif v["min_version"] > highest_version_chain["min_version"] and not self.use_condenser:
                    highest_version_chain = v
        if highest_version_chain is None:
            raise("Connecting to unknown network!")
        else:
            return highest_version_chain

    def _check_for_server_error(self, reply):
        """Checks for server error message in reply"""
        if re.search("Internal Server Error", reply) or re.search("500", reply):
            raise RPCErrorDoRetry("Internal Server Error")
        elif re.search("Not Implemented", reply) or re.search("501", reply):
            raise RPCError("Not Implemented")
        elif re.search("Bad Gateway", reply) or re.search("502", reply):
            raise RPCErrorDoRetry("Bad Gateway")
        elif re.search("Service Temporarily Unavailable", reply) or re.search("Service Unavailable", reply) or re.search("503", reply):
            raise RPCErrorDoRetry("Service Temporarily Unavailable")
        elif re.search("Gateway Time-out", reply) or re.search("Gateway Timeout", reply) or re.search("504", reply):
            raise RPCErrorDoRetry("Gateway Time-out")
        elif re.search("HTTP Version not supported", reply) or re.search("505", reply):
            raise RPCError("HTTP Version not supported")
        elif re.search("Variant Also Negotiates", reply) or re.search("506", reply):
            raise RPCError("Variant Also Negotiates")
        elif re.search("Insufficient Storage", reply) or re.search("507", reply):
            raise RPCError("Insufficient Storage")
        elif re.search("Loop Detected", reply) or re.search("508", reply):
            raise RPCError("Loop Detected")
        elif re.search("Bandwidth Limit Exceeded", reply) or re.search("509", reply):
            raise RPCError("Bandwidth Limit Exceeded")
        elif re.search("Not Extended", reply) or re.search("510", reply):
            raise RPCError("Not Extended")
        elif re.search("Network Authentication Required", reply) or re.search("511", reply):
            raise RPCError("Network Authentication Required")
        else:
            raise RPCError("Client returned invalid format. Expected JSON!")

    def rpcexec(self, payload):
        """
        Execute a call by sending the payload.

        :param json payload: Payload data
        :raises ValueError: if the server does not respond in proper JSON format
        :raises RPCError: if the server returns an error
        """
        log.debug(json.dumps(payload))
        if self.nodes.working_nodes_count == 0:
            raise WorkingNodeMissing
        if self.url is None:
            self.rpcconnect()
        reply = {}
        while True:
            self.nodes.increase_error_cnt_call()
            try:
                if self.current_rpc == self.rpc_methods['ws'] or \
                   self.current_rpc == self.rpc_methods['wsappbase']:
                    reply = self.ws_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
                else:
                    reply = self.request_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
                if not bool(reply):
                    try:
                        self.nodes.sleep_and_check_retries("Empty Reply", call_retry=True)
                    except CallRetriesReached:
                        self.nodes.increase_error_cnt()
                        self.nodes.sleep_and_check_retries("Empty Reply", sleep=False, call_retry=False)
                        self.rpcconnect()
                else:
                    break
            except KeyboardInterrupt:
                raise
            except WebSocketConnectionClosedException:
                # self.error_cnt[self.url] += 1
                self.rpcconnect(next_url=False)
            except ConnectionError as e:
                self.nodes.increase_error_cnt()
                self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
                self.rpcconnect()
            except Exception as e:
                self.nodes.increase_error_cnt()
                self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
                self.rpcconnect()

        ret = {}
        try:
            ret = json.loads(reply, strict=False)
        except ValueError:
            self._check_for_server_error(reply)

        log.debug(json.dumps(reply))

        if isinstance(ret, dict) and 'error' in ret:
            if 'detail' in ret['error']:
                raise RPCError(ret['error']['detail'])
            else:
                raise RPCError(ret['error']['message'])
        else:
            if isinstance(ret, list):
                ret_list = []
                for r in ret:
                    if isinstance(r, dict) and 'error' in r:
                        if 'detail' in r['error']:
                            raise RPCError(r['error']['detail'])
                        else:
                            raise RPCError(r['error']['message'])
                    elif isinstance(r, dict) and "result" in r:
                        ret_list.append(r["result"])
                    else:
                        ret_list.append(r)
                self.nodes.reset_error_cnt_call()
                return ret_list
            elif isinstance(ret, dict) and "result" in ret:
                self.nodes.reset_error_cnt_call()
                return ret["result"]
            elif isinstance(ret, int):
                raise RPCError("Client returned invalid format. Expected JSON! Output: %s" % (str(ret)))
            else:
                self.nodes.reset_error_cnt_call()
                return ret
        return ret

    # End of Deprecated methods
    ####################################################################
    def __getattr__(self, name):
        """Map all methods to RPC calls and pass through the arguments."""
        def method(*args, **kwargs):

            api_name = get_api_name(self.is_appbase_ready(), *args, **kwargs)
            if self.is_appbase_ready() and self.use_condenser:
                api_name = "condenser_api"

            # let's be able to define the num_retries per query
            stored_num_retries_call = self.nodes.num_retries_call
            self.nodes.num_retries_call = kwargs.get("num_retries_call", stored_num_retries_call)
            add_to_queue = kwargs.get("add_to_queue", False)
            query = get_query(self.is_appbase_ready() and not self.use_condenser, self.get_request_id(), api_name, name, args)
            if add_to_queue:
                self.rpc_queue.append(query)
                self.nodes.num_retries_call = stored_num_retries_call
                return None
            elif len(self.rpc_queue) > 0:
                self.rpc_queue.append(query)
                query = self.rpc_queue
                self.rpc_queue = []
            r = self.rpcexec(query)
            self.nodes.num_retries_call = stored_num_retries_call
            return r
        return method