From b1cf4706dc2735a512b774fd0c557bece1201025 Mon Sep 17 00:00:00 2001 From: Dan Notestein <dan@syncad.com> Date: Wed, 4 Aug 2021 14:19:17 +0000 Subject: [PATCH] [DLN] add some comments and some commented-out code that logs broadcast_transaction_synchronous calls for traffic analysis --- Pipfile | 10 ---------- jussi/listeners.py | 1 + jussi/request/jsonrpc.py | 11 +++++++++++ jussi/upstream.py | 12 ++++++++++++ jussi/urn.py | 19 +++++++++++++------ 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/Pipfile b/Pipfile index ad58017..8855423 100644 --- a/Pipfile +++ b/Pipfile @@ -24,16 +24,6 @@ pre-commit = "*" progress = "*" pydevd = "*" pylint = "<2.3.0" -pytest = "==3.10.0" -pytest-asyncio = "==0.9.0" -pytest-console-scripts = "==0.1.7" -pytest-cov = "==2.6.0" -pytest-docker = "==0.6.1" -pytest-mock = "==1.10.0" -pytest-profiling = "*" -pytest-pylint = "*" -pytest-sanic = "==0.1.13" -pytest-timeout = "*" python-rapidjson = "*" requests = "*" yapf = "*" diff --git a/jussi/listeners.py b/jussi/listeners.py index 856fdaa..cd94204 100644 --- a/jussi/listeners.py +++ b/jussi/listeners.py @@ -74,6 +74,7 @@ def setup_listeners(app: WebApp) -> WebApp: write_limit=args.websocket_write_limit ) for url in upstream_urls: + print("********url=",url) if url.startswith('ws'): logger.info('creating websocket pool', pool_min_size=args.websocket_pool_minsize, diff --git a/jussi/request/jsonrpc.py b/jussi/request/jsonrpc.py index f54fbeb..e105dc4 100644 --- a/jussi/request/jsonrpc.py +++ b/jussi/request/jsonrpc.py @@ -67,6 +67,7 @@ class JSONRPCRequest: self.timings = timings def to_dict(self): + """return a dictionary of self.id, self.jsonrpc, self.method, self.params""" return {k: getattr(self, k) for k in ('id', 'jsonrpc', 'method', 'params') if getattr(self, k) is not _empty} @@ -74,6 +75,7 @@ class JSONRPCRequest: return dumps(self.to_dict(), ensure_ascii=False) def to_upstream_request(self, as_json=True) -> Union[str, dict]: + """helper to update call ids when we create multiple upstream calls from a batch request""" jrpc_dict = self.to_dict() jrpc_dict.update({'id': self.upstream_id}) if as_json: @@ -89,17 +91,21 @@ class JSONRPCRequest: @property def upstream_id(self) -> int: + """compute upstream id from original id + batch_index""" return int(self.jussi_request_id) + self.batch_index @property def translated(self) -> bool: + """returns true if the JSONRPCRequest has been translated to appbase format""" return self.original_request is not None def __hash__(self) -> int: + """generate hash from urn""" return hash(self.urn) @staticmethod def translate_to_appbase(request: SingleRawRequest, urn) -> dict: + """Convert to 'method':'call', 'params]:['condenser_api', method_name, [method params]]""" params = urn.params if params is _empty: params = [] @@ -114,14 +120,19 @@ class JSONRPCRequest: # pylint: disable=no-member def from_http_request(http_request, batch_index: int, request: SingleRawRequest): + """Convert a SingleRawQuest to JSONRPCRequest, determining upstream server and translating to appbase format('method':'call') if upstream says to""" from ..urn import from_request as urn_from_request from ..upstream import Upstream + # Determine upstream server (hived, hivemind, etc) from available upstream mappings and urn from request upstreams = http_request.app.config.upstreams + #convert a raw request to a urn using regex to map multiple json styles of request urn = urn_from_request(request) # type:URN + #determine upstream server, cache time, and timeout value for the urn using upstream mapping upstream = Upstream.from_urn(urn, upstreams=upstreams) # type: Upstream original_request = None + # If upstream says to translate this urn, do it and keep original request too if upstreams.translate_to_appbase(urn): original_request = request request = JSONRPCRequest.translate_to_appbase(request, urn) diff --git a/jussi/upstream.py b/jussi/upstream.py index e25ff58..dda6222 100644 --- a/jussi/upstream.py +++ b/jussi/upstream.py @@ -43,6 +43,7 @@ jsonschema.Draft4Validator.check_schema(UPSTREAM_SCHEMA) class _Upstreams(object): + """Maps and converts incoming requests to calls to upstream servers based on name(spaces) in jusii config file""" __NAMESPACES = None __URLS = None __TTLS = None @@ -50,6 +51,7 @@ class _Upstreams(object): __TRANSLATE_TO_APPBASE = None def __init__(self, config, validate=True): + """Load config file and create namespaces""" upstream_config = config['upstreams'] # CONFIG_VALIDATOR.validate(upstream_config) self.config = upstream_config @@ -73,6 +75,7 @@ class _Upstreams(object): self.validate_urls() def __build_trie(self, key): + """builds a searchable trie by parsing a subset of config file specified by key""" trie = pygtrie.StringTrie(separator='.') for item in it.chain.from_iterable(c[key] for c in self.config): if isinstance(item, list): @@ -88,6 +91,7 @@ class _Upstreams(object): @functools.lru_cache(8192) def url(self, request_urn) -> str: + """return url for upstream server based on request_urn and URLS trie in config file""" # certain steemd.get_state paths must be routed differently if (request_urn.api in ['database_api', 'condenser_api'] and request_urn.method == 'get_state' @@ -99,6 +103,11 @@ class _Upstreams(object): return url _, url = self.__URLS.longest_prefix(str(request_urn)) + #if request_urn.method == 'broadcast_transaction_synchronous': + # if not url: + # logger.error('***** no url for %s', str(request_urn)) + # else: + # logger.warning('>>>>> %s to %s', str(request_urn), url) if not url: raise InvalidUpstreamURL( url=url, reason='No matching url found', urn=str(request_urn)) @@ -120,6 +129,7 @@ class _Upstreams(object): @property def urls(self) -> frozenset: + """return a set of all defined upstream urls from config file""" return frozenset(u for u in self.__URLS.values()) @property @@ -127,6 +137,7 @@ class _Upstreams(object): return self.__NAMESPACES def translate_to_appbase(self, request_urn) -> bool: + """return true if urn's namespace is a translating namespace""" return request_urn.namespace in self.__TRANSLATE_TO_APPBASE def validate_urls(self): @@ -155,6 +166,7 @@ class Upstream(NamedTuple): @classmethod @functools.lru_cache(4096) def from_urn(cls, urn, upstreams: _Upstreams=None): + """lookup upstream server url, time-to-live in cache, and timeout for a urn request based on config file""" return Upstream(upstreams.url(urn), upstreams.ttl(urn), upstreams.timeout(urn)) diff --git a/jussi/urn.py b/jussi/urn.py index 3a30341..03925c0 100644 --- a/jussi/urn.py +++ b/jussi/urn.py @@ -89,17 +89,17 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: try: method = single_jsonrpc_request['method'] params = single_jsonrpc_request.get('params', _empty) + #use regex to determine the real namespace, api, method, and params of the request matched = _parse_jrpc_method(method) - - if matched.get('appbase_api'): + if matched.get('appbase_api'): #e.g. condenser_api.method return { 'namespace': 'appbase', 'api': matched['appbase_api'], 'method': matched['appbase_method'], 'params': params } - if matched.get('namespace'): - if matched['namespace'] == 'jsonrpc': + if matched.get('namespace'): #e.g. steemd.condesner_api.method or jsonrpc.method + if matched['namespace'] == 'jsonrpc': #if jsonrpc namespace, then use appbase namespace and jsonrpc as api return { 'namespace': 'appbase', 'api': 'jsonrpc', @@ -112,9 +112,10 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: 'method': matched['method'], 'params': params } - if matched['bare_method']: + if matched['bare_method']: #e.g. method method = matched['bare_method'] + #if method != call, then it is a steemd.database_api.method call if method != 'call': return { 'namespace': 'steemd', @@ -123,16 +124,21 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: 'params': params } + #method = call, so params contains info on actual api, method, and parameters + #probably should be if 2 parameters, then assume no actual parameters if len(params) != 3: namespace = 'appbase' api, method = params _params = _empty - else: + else: #if three parameters + #remove api and method to get actual parameters api, method, _params = params + #if (api is condenser_api or jsonrpc) or actual parameters are in a dictionary, then appbase namespace if api == 'condenser_api' or isinstance(_params, dict) or api == 'jsonrpc': namespace = 'appbase' else: namespace = 'steemd' + #if api is an integer, map from api integer to api string if isinstance(api, int): try: api = STEEMD_NUMERIC_API_MAPPING[api] @@ -162,6 +168,7 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: def from_request(single_jsonrpc_request: dict) -> URN: parsed = _parse_jrpc(single_jsonrpc_request) + #sort parameters for better caching if a dictionary if isinstance(parsed['params'], dict): parsed['params'] = dict(sorted(parsed['params'].items())) return URN(parsed['namespace'], -- GitLab