graphenerpc.py 21.34 KiB
# -*- coding: utf-8 -*-
from itertools import cycle
import threading
import sys
import json
import signal
import logging
import ssl
import re
import time
import warnings
import six
from .exceptions import (
UnauthorizedError, RPCConnection, RPCError, RPCErrorDoRetry, NumRetriesReached, CallRetriesReached, WorkingNodeMissing, TimeoutException
)
from .rpcutils import (
is_network_appbase_ready,
get_api_name, get_query
)
from .node import Nodes
from beemgraphenebase.version import version as beem_version
from beemgraphenebase.chains import known_chains
from _thread import interrupt_main
WEBSOCKET_MODULE = None
if not WEBSOCKET_MODULE:
try:
import websocket
from websocket._exceptions import WebSocketConnectionClosedException, WebSocketTimeoutException
WEBSOCKET_MODULE = "websocket"
except ImportError:
WEBSOCKET_MODULE = None
REQUEST_MODULE = None
if not REQUEST_MODULE:
try:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.exceptions import ConnectionError
REQUEST_MODULE = "requests"
except ImportError:
REQUEST_MODULE = None
log = logging.getLogger(__name__)
class SessionInstance(object):
"""Singelton for the Session Instance"""
instance = None
def set_session_instance(instance):
"""Set session instance"""
SessionInstance.instance = instance
def shared_session_instance():
"""Get session instance"""
if REQUEST_MODULE is None:
raise Exception()
if not SessionInstance.instance:
SessionInstance.instance = requests.Session()
return SessionInstance.instance
def create_ws_instance(use_ssl=True, enable_multithread=True):
"""Get websocket instance"""
if WEBSOCKET_MODULE is None:
raise Exception()
if use_ssl:
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
return websocket.WebSocket(sslopt=sslopt_ca_certs, enable_multithread=enable_multithread)
else:
return websocket.WebSocket(enable_multithread=enable_multithread)
class GrapheneRPC(object):
"""
This class allows to call API methods synchronously, without callbacks.
It logs warnings and errors.
:param str urls: Either a single Websocket/Http URL, or a list of URLs
:param str user: Username for Authentication
:param str password: Password for Authentication
:param int num_retries: Try x times to num_retries to a node on disconnect, -1 for indefinitely (default is 100)
:param int num_retries_call: Repeat num_retries_call times a rpc call on node error (default is 5)
:param int timeout: Timeout setting for https nodes (default is 60)
:param bool autoconnect: When set to false, connection is performed on the first rpc call (default is True)
:param bool use_condenser: Use the old condenser_api rpc protocol on nodes with version
0.19.4 or higher. The settings has no effect on nodes with version of 0.19.3 or lower.
:param bool use_tor: When set to true, 'socks5h://localhost:9050' is set as proxy
:param dict custom_chains: custom chain which should be added to the known chains
Available APIs:
* database
* network_node
* network_broadcast
Usage:
.. code-block:: python
from beemapi.graphenerpc import GrapheneRPC
ws = GrapheneRPC("wss://steemd.pevo.science","","")
print(ws.get_account_count())
ws = GrapheneRPC("https://api.steemit.com","","")
print(ws.get_account_count())
.. note:: This class allows to call methods available via
websocket. If you want to use the notification
subsystem, please use ``GrapheneWebsocket`` instead.
"""
def __init__(self, urls, user=None, password=None, **kwargs):
"""Init."""
self.rpc_methods = {'offline': -1, 'ws': 0, 'jsonrpc': 1, 'wsappbase': 2, 'appbase': 3}
self.current_rpc = self.rpc_methods["ws"]
self._request_id = 0
self.timeout = kwargs.get('timeout', 60)
num_retries = kwargs.get("num_retries", 100)
num_retries_call = kwargs.get("num_retries_call", 5)
self.use_condenser = kwargs.get("use_condenser", False)
self.use_tor = kwargs.get("use_tor", False)
self.disable_chain_detection = kwargs.get("disable_chain_detection", False)
self.known_chains = known_chains
custom_chain = kwargs.get("custom_chains", {})
if len(custom_chain) > 0:
for c in custom_chain:
if c not in self.known_chains:
self.known_chains[c] = custom_chain[c]
self.nodes = Nodes(urls, num_retries, num_retries_call)
if self.nodes.working_nodes_count == 0:
self.current_rpc = self.rpc_methods["offline"]
self.user = user
self.password = password
self.ws = None
self.url = None
self.session = None
self.rpc_queue = []
if kwargs.get("autoconnect", True):
self.rpcconnect()
@property
def num_retries(self):
return self.nodes.num_retries
@property
def num_retries_call(self):
return self.nodes.num_retries_call
@property
def error_cnt_call(self):
return self.nodes.error_cnt_call
@property
def error_cnt(self):
return self.nodes.error_cnt
def get_request_id(self):
"""Get request id."""
self._request_id += 1
return self._request_id
def next(self):
"""Switches to the next node url"""
if self.ws:
try:
self.rpcclose()
except Exception as e:
log.warning(str(e))
self.rpcconnect()
def is_appbase_ready(self):
"""Check if node is appbase ready"""
return self.current_rpc in [self.rpc_methods['wsappbase'], self.rpc_methods['appbase']]
def get_use_appbase(self):
"""Returns True if appbase ready and appbase calls are set"""
return not self.use_condenser and self.is_appbase_ready()
def rpcconnect(self, next_url=True):
"""Connect to next url in a loop."""
if self.nodes.working_nodes_count == 0:
return
while True:
if next_url:
self.url = next(self.nodes)
self.nodes.reset_error_cnt_call()
log.debug("Trying to connect to node %s" % self.url)
if self.url[:3] == "wss":
self.ws = create_ws_instance(use_ssl=True)
self.ws.settimeout(self.timeout)
self.current_rpc = self.rpc_methods["wsappbase"]
elif self.url[:2] == "ws":
self.ws = create_ws_instance(use_ssl=False)
self.ws.settimeout(self.timeout)
self.current_rpc = self.rpc_methods["wsappbase"]
else:
self.ws = None
self.session = shared_session_instance()
if self.use_tor:
self.session.proxies = {}
self.session.proxies['http'] = 'socks5h://localhost:9050'
self.session.proxies['https'] = 'socks5h://localhost:9050'
self.current_rpc = self.rpc_methods["appbase"]
self.headers = {'User-Agent': 'beem v%s' % (beem_version),
'content-type': 'application/json; charset=utf-8'}
try:
if self.ws:
self.ws.connect(self.url)
self.rpclogin(self.user, self.password)
if self.disable_chain_detection:
# Set to appbase rpc format
if self.current_rpc == self.rpc_methods['ws']:
self.current_rpc = self.rpc_methods['wsappbase']
else:
self.current_rpc = self.rpc_methods['appbase']
break
try:
props = None
if not self.use_condenser:
props = self.get_config(api="database")
else:
props = self.get_config()
except Exception as e:
if re.search("Bad Cast:Invalid cast from type", str(e)):
# retry with not appbase
if self.current_rpc == self.rpc_methods['wsappbase']:
self.current_rpc = self.rpc_methods['ws']
else:
self.current_rpc = self.rpc_methods['appbase']
props = self.get_config(api="database")
if props is None:
raise RPCError("Could not receive answer for get_config")
if is_network_appbase_ready(props):
if self.ws:
self.current_rpc = self.rpc_methods["wsappbase"]
else:
self.current_rpc = self.rpc_methods["appbase"]
break
except KeyboardInterrupt:
raise
except Exception as e:
self.nodes.increase_error_cnt()
do_sleep = not next_url or (next_url and self.nodes.working_nodes_count == 1)
self.nodes.sleep_and_check_retries(str(e), sleep=do_sleep)
next_url = True
def rpclogin(self, user, password):
"""Login into Websocket"""
if self.ws and self.current_rpc == self.rpc_methods['ws'] and user and password:
self.login(user, password, api="login_api")
def rpcclose(self):
"""Close Websocket"""
if self.ws is None:
return
# if self.ws.connected:
self.ws.close()
def request_send(self, payload):
if self.user is not None and self.password is not None:
response = self.session.post(self.url,
data=payload,
headers=self.headers,
timeout=self.timeout,
auth=(self.user, self.password))
else:
response = self.session.post(self.url,
data=payload,
headers=self.headers,
timeout=self.timeout)
if response.status_code == 401:
raise UnauthorizedError
return response
def ws_send(self, payload):
if self.ws is None:
raise RPCConnection("No websocket available!")
self.ws.send(payload)
reply = self.ws.recv()
return reply
def version_string_to_int(self, network_version):
version_list = network_version.split('.')
return int(int(version_list[0]) * 1e8 + int(version_list[1]) * 1e4 + int(version_list[2]))
def get_network(self, props=None):
""" Identify the connected network. This call returns a
dictionary with keys chain_id, core_symbol and prefix
"""
if props is None:
props = self.get_config(api="database")
chain_id = None
network_version = None
blockchain_name = None
chain_config = None
prefix = None
symbols = []
chain_assets = []
for key in props:
if key[-8:] == "CHAIN_ID":
chain_id = props[key]
blockchain_name = key.split("_")[0]
elif key[-13:] == "CHAIN_VERSION":
network_version = props[key]
elif key[-14:] == "ADDRESS_PREFIX":
prefix = props[key]
elif key[-6:] == "SYMBOL":
value = {}
value["asset"] = props[key]["nai"]
value["precision"] = props[key]["decimals"]
if "IS_TEST_NET" in props and props["IS_TEST_NET"] and "nai" in props[key] and props[key]["nai"] == "@@000000013":
value["symbol"] = "TBD"
elif "IS_TEST_NET" in props and props["IS_TEST_NET"] and "nai" in props[key] and props[key]["nai"] == "@@000000021":
value["symbol"] = "TESTS"
else:
value["symbol"] = key[:-7]
value["id"] = -1
symbols.append(value)
symbol_id = 0
if len(symbols) == 2:
symbol_id = 1
for s in sorted(symbols, key=lambda self: self['asset'], reverse=False):
s["id"] = symbol_id
symbol_id += 1
chain_assets.append(s)
if chain_id is not None and network_version is not None and len(chain_assets) > 0 and prefix is not None:
chain_config = {"prefix": prefix, "chain_id": chain_id, "min_version": network_version, "chain_assets": chain_assets}
if chain_id is None:
raise RPCError("Connecting to unknown network!")
highest_version_chain = None
for k, v in list(self.known_chains.items()):
if blockchain_name is not None and blockchain_name not in k and blockchain_name != "STEEMIT" and blockchain_name != "CHAIN":
continue
if v["chain_id"] == chain_id and self.version_string_to_int(v["min_version"]) <= self.version_string_to_int(network_version):
if highest_version_chain is None:
highest_version_chain = v
elif self.version_string_to_int(v["min_version"]) > self.version_string_to_int(highest_version_chain["min_version"]):
highest_version_chain = v
if highest_version_chain is None and chain_config is not None:
return chain_config
elif highest_version_chain is None:
raise RPCError("Connecting to unknown network!")
else:
return highest_version_chain
def _check_for_server_error(self, reply):
"""Checks for server error message in reply"""
if re.search("Internal Server Error", reply) or re.search("500", reply):
raise RPCErrorDoRetry("Internal Server Error")
elif re.search("Not Implemented", reply) or re.search("501", reply):
raise RPCError("Not Implemented")
elif re.search("Bad Gateway", reply) or re.search("502", reply):
raise RPCErrorDoRetry("Bad Gateway")
elif re.search("Too Many Requests", reply) or re.search("429", reply):
raise RPCErrorDoRetry("Too Many Requests")
elif re.search("Service Temporarily Unavailable", reply) or re.search("Service Unavailable", reply) or re.search("503", reply):
raise RPCErrorDoRetry("Service Temporarily Unavailable")
elif re.search("Gateway Time-out", reply) or re.search("Gateway Timeout", reply) or re.search("504", reply):
raise RPCErrorDoRetry("Gateway Time-out")
elif re.search("HTTP Version not supported", reply) or re.search("505", reply):
raise RPCError("HTTP Version not supported")
elif re.search("Variant Also Negotiates", reply) or re.search("506", reply):
raise RPCError("Variant Also Negotiates")
elif re.search("Insufficient Storage", reply) or re.search("507", reply):
raise RPCError("Insufficient Storage")
elif re.search("Loop Detected", reply) or re.search("508", reply):
raise RPCError("Loop Detected")
elif re.search("Bandwidth Limit Exceeded", reply) or re.search("509", reply):
raise RPCError("Bandwidth Limit Exceeded")
elif re.search("Not Extended", reply) or re.search("510", reply):
raise RPCError("Not Extended")
elif re.search("Network Authentication Required", reply) or re.search("511", reply):
raise RPCError("Network Authentication Required")
else:
raise RPCError("Client returned invalid format. Expected JSON!")
def rpcexec(self, payload):
"""
Execute a call by sending the payload.
:param json payload: Payload data
:raises ValueError: if the server does not respond in proper JSON format
:raises RPCError: if the server returns an error
"""
log.debug(json.dumps(payload))
if self.nodes.working_nodes_count == 0:
raise WorkingNodeMissing
if self.url is None:
raise RPCConnection("RPC is not connected!")
reply = {}
response = None
while True:
self.nodes.increase_error_cnt_call()
try:
if self.current_rpc == self.rpc_methods['ws'] or \
self.current_rpc == self.rpc_methods['wsappbase']:
reply = self.ws_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
else:
response = self.request_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
reply = response.text
if not bool(reply):
try:
self.nodes.sleep_and_check_retries("Empty Reply", call_retry=True)
except CallRetriesReached:
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries("Empty Reply", sleep=False, call_retry=False)
self.rpcconnect()
else:
break
except KeyboardInterrupt:
raise
except WebSocketConnectionClosedException as e:
if self.nodes.num_retries_call_reached:
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()
else:
# self.nodes.sleep_and_check_retries(str(e), sleep=True, call_retry=True)
self.rpcconnect(next_url=False)
except ConnectionError as e:
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()
except WebSocketTimeoutException as e:
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()
except Exception as e:
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()
ret = {}
try:
if response is None:
ret = json.loads(reply, strict=False, encoding="utf-8")
else:
ret = response.json()
except ValueError:
self._check_for_server_error(reply)
log.debug(json.dumps(reply))
if isinstance(ret, dict) and 'error' in ret:
if 'detail' in ret['error']:
raise RPCError(ret['error']['detail'])
else:
raise RPCError(ret['error']['message'])
else:
if isinstance(ret, list):
ret_list = []
for r in ret:
if isinstance(r, dict) and 'error' in r:
if 'detail' in r['error']:
raise RPCError(r['error']['detail'])
else:
raise RPCError(r['error']['message'])
elif isinstance(r, dict) and "result" in r:
ret_list.append(r["result"])
else:
ret_list.append(r)
self.nodes.reset_error_cnt_call()
return ret_list
elif isinstance(ret, dict) and "result" in ret:
self.nodes.reset_error_cnt_call()
return ret["result"]
elif isinstance(ret, int):
raise RPCError("Client returned invalid format. Expected JSON! Output: %s" % (str(ret)))
else:
self.nodes.reset_error_cnt_call()
return ret
return ret
# End of Deprecated methods
####################################################################
def __getattr__(self, name):
"""Map all methods to RPC calls and pass through the arguments."""
def method(*args, **kwargs):
api_name = get_api_name(self.is_appbase_ready(), *args, **kwargs)
if self.is_appbase_ready() and self.use_condenser and api_name != "bridge":
api_name = "condenser_api"
if (api_name is None):
api_name = 'database_api'
# let's be able to define the num_retries per query
stored_num_retries_call = self.nodes.num_retries_call
self.nodes.num_retries_call = kwargs.get("num_retries_call", stored_num_retries_call)
add_to_queue = kwargs.get("add_to_queue", False)
query = get_query(self.is_appbase_ready() and not self.use_condenser or api_name == "bridge", self.get_request_id(), api_name, name, args)
if add_to_queue:
self.rpc_queue.append(query)
self.nodes.num_retries_call = stored_num_retries_call
return None
elif len(self.rpc_queue) > 0:
self.rpc_queue.append(query)
query = self.rpc_queue
self.rpc_queue = []
r = self.rpcexec(query)
self.nodes.num_retries_call = stored_num_retries_call
return r
return method