Skip to content
Snippets Groups Projects
Commit b1cf4706 authored by Dan Notestein's avatar Dan Notestein Committed by root
Browse files

[DLN] add some comments and some commented-out code that logs...

[DLN] add some comments and some commented-out code that logs broadcast_transaction_synchronous calls for traffic analysis
parent 908247a6
No related branches found
No related tags found
2 merge requests!9finally merge old fixes with master,!7merge changes from hbt4
...@@ -24,16 +24,6 @@ pre-commit = "*" ...@@ -24,16 +24,6 @@ pre-commit = "*"
progress = "*" progress = "*"
pydevd = "*" pydevd = "*"
pylint = "<2.3.0" pylint = "<2.3.0"
pytest = "==3.10.0"
pytest-asyncio = "==0.9.0"
pytest-console-scripts = "==0.1.7"
pytest-cov = "==2.6.0"
pytest-docker = "==0.6.1"
pytest-mock = "==1.10.0"
pytest-profiling = "*"
pytest-pylint = "*"
pytest-sanic = "==0.1.13"
pytest-timeout = "*"
python-rapidjson = "*" python-rapidjson = "*"
requests = "*" requests = "*"
yapf = "*" yapf = "*"
......
...@@ -74,6 +74,7 @@ def setup_listeners(app: WebApp) -> WebApp: ...@@ -74,6 +74,7 @@ def setup_listeners(app: WebApp) -> WebApp:
write_limit=args.websocket_write_limit write_limit=args.websocket_write_limit
) )
for url in upstream_urls: for url in upstream_urls:
print("********url=",url)
if url.startswith('ws'): if url.startswith('ws'):
logger.info('creating websocket pool', logger.info('creating websocket pool',
pool_min_size=args.websocket_pool_minsize, pool_min_size=args.websocket_pool_minsize,
......
...@@ -67,6 +67,7 @@ class JSONRPCRequest: ...@@ -67,6 +67,7 @@ class JSONRPCRequest:
self.timings = timings self.timings = timings
def to_dict(self): def to_dict(self):
"""return a dictionary of self.id, self.jsonrpc, self.method, self.params"""
return {k: getattr(self, k) for k in return {k: getattr(self, k) for k in
('id', 'jsonrpc', 'method', 'params') if getattr(self, k) is not _empty} ('id', 'jsonrpc', 'method', 'params') if getattr(self, k) is not _empty}
...@@ -74,6 +75,7 @@ class JSONRPCRequest: ...@@ -74,6 +75,7 @@ class JSONRPCRequest:
return dumps(self.to_dict(), ensure_ascii=False) return dumps(self.to_dict(), ensure_ascii=False)
def to_upstream_request(self, as_json=True) -> Union[str, dict]: def to_upstream_request(self, as_json=True) -> Union[str, dict]:
"""helper to update call ids when we create multiple upstream calls from a batch request"""
jrpc_dict = self.to_dict() jrpc_dict = self.to_dict()
jrpc_dict.update({'id': self.upstream_id}) jrpc_dict.update({'id': self.upstream_id})
if as_json: if as_json:
...@@ -89,17 +91,21 @@ class JSONRPCRequest: ...@@ -89,17 +91,21 @@ class JSONRPCRequest:
@property @property
def upstream_id(self) -> int: def upstream_id(self) -> int:
"""compute upstream id from original id + batch_index"""
return int(self.jussi_request_id) + self.batch_index return int(self.jussi_request_id) + self.batch_index
@property @property
def translated(self) -> bool: def translated(self) -> bool:
"""returns true if the JSONRPCRequest has been translated to appbase format"""
return self.original_request is not None return self.original_request is not None
def __hash__(self) -> int: def __hash__(self) -> int:
"""generate hash from urn"""
return hash(self.urn) return hash(self.urn)
@staticmethod @staticmethod
def translate_to_appbase(request: SingleRawRequest, urn) -> dict: def translate_to_appbase(request: SingleRawRequest, urn) -> dict:
"""Convert to 'method':'call', 'params]:['condenser_api', method_name, [method params]]"""
params = urn.params params = urn.params
if params is _empty: if params is _empty:
params = [] params = []
...@@ -114,14 +120,19 @@ class JSONRPCRequest: ...@@ -114,14 +120,19 @@ class JSONRPCRequest:
# pylint: disable=no-member # pylint: disable=no-member
def from_http_request(http_request, batch_index: int, request: SingleRawRequest): def from_http_request(http_request, batch_index: int, request: SingleRawRequest):
"""Convert a SingleRawQuest to JSONRPCRequest, determining upstream server and translating to appbase format('method':'call') if upstream says to"""
from ..urn import from_request as urn_from_request from ..urn import from_request as urn_from_request
from ..upstream import Upstream from ..upstream import Upstream
# Determine upstream server (hived, hivemind, etc) from available upstream mappings and urn from request
upstreams = http_request.app.config.upstreams upstreams = http_request.app.config.upstreams
#convert a raw request to a urn using regex to map multiple json styles of request
urn = urn_from_request(request) # type:URN urn = urn_from_request(request) # type:URN
#determine upstream server, cache time, and timeout value for the urn using upstream mapping
upstream = Upstream.from_urn(urn, upstreams=upstreams) # type: Upstream upstream = Upstream.from_urn(urn, upstreams=upstreams) # type: Upstream
original_request = None original_request = None
# If upstream says to translate this urn, do it and keep original request too
if upstreams.translate_to_appbase(urn): if upstreams.translate_to_appbase(urn):
original_request = request original_request = request
request = JSONRPCRequest.translate_to_appbase(request, urn) request = JSONRPCRequest.translate_to_appbase(request, urn)
......
...@@ -43,6 +43,7 @@ jsonschema.Draft4Validator.check_schema(UPSTREAM_SCHEMA) ...@@ -43,6 +43,7 @@ jsonschema.Draft4Validator.check_schema(UPSTREAM_SCHEMA)
class _Upstreams(object): class _Upstreams(object):
"""Maps and converts incoming requests to calls to upstream servers based on name(spaces) in jusii config file"""
__NAMESPACES = None __NAMESPACES = None
__URLS = None __URLS = None
__TTLS = None __TTLS = None
...@@ -50,6 +51,7 @@ class _Upstreams(object): ...@@ -50,6 +51,7 @@ class _Upstreams(object):
__TRANSLATE_TO_APPBASE = None __TRANSLATE_TO_APPBASE = None
def __init__(self, config, validate=True): def __init__(self, config, validate=True):
"""Load config file and create namespaces"""
upstream_config = config['upstreams'] upstream_config = config['upstreams']
# CONFIG_VALIDATOR.validate(upstream_config) # CONFIG_VALIDATOR.validate(upstream_config)
self.config = upstream_config self.config = upstream_config
...@@ -73,6 +75,7 @@ class _Upstreams(object): ...@@ -73,6 +75,7 @@ class _Upstreams(object):
self.validate_urls() self.validate_urls()
def __build_trie(self, key): def __build_trie(self, key):
"""builds a searchable trie by parsing a subset of config file specified by key"""
trie = pygtrie.StringTrie(separator='.') trie = pygtrie.StringTrie(separator='.')
for item in it.chain.from_iterable(c[key] for c in self.config): for item in it.chain.from_iterable(c[key] for c in self.config):
if isinstance(item, list): if isinstance(item, list):
...@@ -88,6 +91,7 @@ class _Upstreams(object): ...@@ -88,6 +91,7 @@ class _Upstreams(object):
@functools.lru_cache(8192) @functools.lru_cache(8192)
def url(self, request_urn) -> str: def url(self, request_urn) -> str:
"""return url for upstream server based on request_urn and URLS trie in config file"""
# certain steemd.get_state paths must be routed differently # certain steemd.get_state paths must be routed differently
if (request_urn.api in ['database_api', 'condenser_api'] if (request_urn.api in ['database_api', 'condenser_api']
and request_urn.method == 'get_state' and request_urn.method == 'get_state'
...@@ -99,6 +103,11 @@ class _Upstreams(object): ...@@ -99,6 +103,11 @@ class _Upstreams(object):
return url return url
_, url = self.__URLS.longest_prefix(str(request_urn)) _, url = self.__URLS.longest_prefix(str(request_urn))
#if request_urn.method == 'broadcast_transaction_synchronous':
# if not url:
# logger.error('***** no url for %s', str(request_urn))
# else:
# logger.warning('>>>>> %s to %s', str(request_urn), url)
if not url: if not url:
raise InvalidUpstreamURL( raise InvalidUpstreamURL(
url=url, reason='No matching url found', urn=str(request_urn)) url=url, reason='No matching url found', urn=str(request_urn))
...@@ -120,6 +129,7 @@ class _Upstreams(object): ...@@ -120,6 +129,7 @@ class _Upstreams(object):
@property @property
def urls(self) -> frozenset: def urls(self) -> frozenset:
"""return a set of all defined upstream urls from config file"""
return frozenset(u for u in self.__URLS.values()) return frozenset(u for u in self.__URLS.values())
@property @property
...@@ -127,6 +137,7 @@ class _Upstreams(object): ...@@ -127,6 +137,7 @@ class _Upstreams(object):
return self.__NAMESPACES return self.__NAMESPACES
def translate_to_appbase(self, request_urn) -> bool: def translate_to_appbase(self, request_urn) -> bool:
"""return true if urn's namespace is a translating namespace"""
return request_urn.namespace in self.__TRANSLATE_TO_APPBASE return request_urn.namespace in self.__TRANSLATE_TO_APPBASE
def validate_urls(self): def validate_urls(self):
...@@ -155,6 +166,7 @@ class Upstream(NamedTuple): ...@@ -155,6 +166,7 @@ class Upstream(NamedTuple):
@classmethod @classmethod
@functools.lru_cache(4096) @functools.lru_cache(4096)
def from_urn(cls, urn, upstreams: _Upstreams=None): def from_urn(cls, urn, upstreams: _Upstreams=None):
"""lookup upstream server url, time-to-live in cache, and timeout for a urn request based on config file"""
return Upstream(upstreams.url(urn), return Upstream(upstreams.url(urn),
upstreams.ttl(urn), upstreams.ttl(urn),
upstreams.timeout(urn)) upstreams.timeout(urn))
...@@ -89,17 +89,17 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: ...@@ -89,17 +89,17 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
try: try:
method = single_jsonrpc_request['method'] method = single_jsonrpc_request['method']
params = single_jsonrpc_request.get('params', _empty) params = single_jsonrpc_request.get('params', _empty)
#use regex to determine the real namespace, api, method, and params of the request
matched = _parse_jrpc_method(method) matched = _parse_jrpc_method(method)
if matched.get('appbase_api'): #e.g. condenser_api.method
if matched.get('appbase_api'):
return { return {
'namespace': 'appbase', 'namespace': 'appbase',
'api': matched['appbase_api'], 'api': matched['appbase_api'],
'method': matched['appbase_method'], 'method': matched['appbase_method'],
'params': params 'params': params
} }
if matched.get('namespace'): if matched.get('namespace'): #e.g. steemd.condesner_api.method or jsonrpc.method
if matched['namespace'] == 'jsonrpc': if matched['namespace'] == 'jsonrpc': #if jsonrpc namespace, then use appbase namespace and jsonrpc as api
return { return {
'namespace': 'appbase', 'namespace': 'appbase',
'api': 'jsonrpc', 'api': 'jsonrpc',
...@@ -112,9 +112,10 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: ...@@ -112,9 +112,10 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
'method': matched['method'], 'method': matched['method'],
'params': params 'params': params
} }
if matched['bare_method']: if matched['bare_method']: #e.g. method
method = matched['bare_method'] method = matched['bare_method']
#if method != call, then it is a steemd.database_api.method call
if method != 'call': if method != 'call':
return { return {
'namespace': 'steemd', 'namespace': 'steemd',
...@@ -123,16 +124,21 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: ...@@ -123,16 +124,21 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
'params': params 'params': params
} }
#method = call, so params contains info on actual api, method, and parameters
#probably should be if 2 parameters, then assume no actual parameters
if len(params) != 3: if len(params) != 3:
namespace = 'appbase' namespace = 'appbase'
api, method = params api, method = params
_params = _empty _params = _empty
else: else: #if three parameters
#remove api and method to get actual parameters
api, method, _params = params api, method, _params = params
#if (api is condenser_api or jsonrpc) or actual parameters are in a dictionary, then appbase namespace
if api == 'condenser_api' or isinstance(_params, dict) or api == 'jsonrpc': if api == 'condenser_api' or isinstance(_params, dict) or api == 'jsonrpc':
namespace = 'appbase' namespace = 'appbase'
else: else:
namespace = 'steemd' namespace = 'steemd'
#if api is an integer, map from api integer to api string
if isinstance(api, int): if isinstance(api, int):
try: try:
api = STEEMD_NUMERIC_API_MAPPING[api] api = STEEMD_NUMERIC_API_MAPPING[api]
...@@ -162,6 +168,7 @@ def _parse_jrpc(single_jsonrpc_request) -> dict: ...@@ -162,6 +168,7 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
def from_request(single_jsonrpc_request: dict) -> URN: def from_request(single_jsonrpc_request: dict) -> URN:
parsed = _parse_jrpc(single_jsonrpc_request) parsed = _parse_jrpc(single_jsonrpc_request)
#sort parameters for better caching if a dictionary
if isinstance(parsed['params'], dict): if isinstance(parsed['params'], dict):
parsed['params'] = dict(sorted(parsed['params'].items())) parsed['params'] = dict(sorted(parsed['params'].items()))
return URN(parsed['namespace'], return URN(parsed['namespace'],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment