Skip to content
Snippets Groups Projects
Commit 94a10207 authored by Dan Notestein's avatar Dan Notestein
Browse files

Merge branch 'master' into 'autoclave'

# Conflicts:
#   Pipfile
#   README.md
#   jussi/upstream.py
#   jussi/urn.py
parents 2e87ef6b b1cf4706
No related branches found
No related tags found
2 merge requests!9finally merge old fixes with master,!7merge changes from hbt4
......@@ -33,7 +33,7 @@ from .utils import merge_cached_responses
logger = structlog.getLogger(__name__)
BATCH_IRREVERSIBLE_TTL_SET = frozenset([TTL.DEFAULT_EXPIRE_IF_IRREVERSIBLE])
BATCH_IRREVERSIBLE_TTL_SET = frozenset([TTL.EXPIRE_IF_REVERSIBLE])
# types
CacheTTLValue = TypeVar('CacheTTL', int, float, type(None))
......@@ -188,13 +188,14 @@ class CacheGroup:
) -> None:
key = jsonrpc_cache_key(request)
ttl = ttl or request.upstream.ttl
if ttl == TTL.DEFAULT_EXPIRE_IF_IRREVERSIBLE:
if ttl == TTL.EXPIRE_IF_REVERSIBLE:
last_irreversible_block_num = last_irreversible_block_num or \
self._memory_cache.gets('last_irreversible_block_num') or \
await self.get('last_irreversible_block_num')
ttl = irreversible_ttl(jsonrpc_response=response,
last_irreversible_block_num=last_irreversible_block_num)
last_irreversible_block_num=last_irreversible_block_num,
request=request)
elif ttl == TTL.NO_CACHE:
return
value = self.prepare_response_for_cache(request, response)
......@@ -218,7 +219,7 @@ class CacheGroup:
else:
new_ttls = []
for i, ttl in enumerate(ttls):
if ttl == TTL.DEFAULT_EXPIRE_IF_IRREVERSIBLE:
if ttl == TTL.EXPIRE_IF_REVERSIBLE:
ttl = irreversible_ttl(responses[i], last_irreversible_block_num)
new_ttls.append(ttl)
triplets = filter(lambda p: p[0] != TTL.NO_CACHE, zip(ttls, requests, responses))
......
......@@ -12,7 +12,7 @@ Method Settings
- For readabilty/writabilty, there are shorthand variables for these 'special' TTL values:
- `NO_EXPIRE` == 0
- `NO_CACHE` == -1
- `DEFAULT_EXPIRE_IF_IRREVERSIBLE` == -2
- `EXPIRE_IF_REVERSIBLE` == -2
"""
......@@ -21,9 +21,10 @@ from enum import Enum
class TTL(Enum):
DEFAULT_TTL = 3
EXTENDED_TTL = 9
NO_EXPIRE = None
NO_CACHE = -1
DEFAULT_EXPIRE_IF_IRREVERSIBLE = -2
EXPIRE_IF_REVERSIBLE = -2
# pylint: disable=no-else-return
def __eq__(self, other: int) -> bool:
......
......@@ -21,7 +21,8 @@ def jsonrpc_cache_key(single_jsonrpc_request: SingleJrpcRequest) -> str:
def irreversible_ttl(jsonrpc_response: dict=None,
last_irreversible_block_num: int=None) -> TTL:
last_irreversible_block_num: int=None,
request: SingleJrpcRequest=None) -> TTL:
if not jsonrpc_response:
return TTL.NO_CACHE
if not isinstance(last_irreversible_block_num, int):
......@@ -30,12 +31,21 @@ def irreversible_ttl(jsonrpc_response: dict=None,
return TTL.NO_CACHE
try:
jrpc_block_num = block_num_from_jsonrpc_response(jsonrpc_response)
return TTL.DEFAULT_TTL
if jrpc_block_num > last_irreversible_block_num:
return TTL.EXTENDED_TTL
else:
return TTL.NO_EXPIRE
except Exception as e:
if request is not None:
request_string = request.json()
else:
request_string = 'None'
logger.warning(
'Unable to cache using last irreversible block',
e=e,
lirb=last_irreversible_block_num)
lirb=last_irreversible_block_num,
request_string=request_string,
jsonrpc_response=jsonrpc_response)
return TTL.NO_CACHE
......
......@@ -74,6 +74,7 @@ def setup_listeners(app: WebApp) -> WebApp:
write_limit=args.websocket_write_limit
)
for url in upstream_urls:
print("********url=",url)
if url.startswith('ws'):
logger.info('creating websocket pool',
pool_min_size=args.websocket_pool_minsize,
......
......@@ -67,6 +67,7 @@ class JSONRPCRequest:
self.timings = timings
def to_dict(self):
"""return a dictionary of self.id, self.jsonrpc, self.method, self.params"""
return {k: getattr(self, k) for k in
('id', 'jsonrpc', 'method', 'params') if getattr(self, k) is not _empty}
......@@ -74,6 +75,7 @@ class JSONRPCRequest:
return dumps(self.to_dict(), ensure_ascii=False)
def to_upstream_request(self, as_json=True) -> Union[str, dict]:
"""helper to update call ids when we create multiple upstream calls from a batch request"""
jrpc_dict = self.to_dict()
jrpc_dict.update({'id': self.upstream_id})
if as_json:
......@@ -89,17 +91,21 @@ class JSONRPCRequest:
@property
def upstream_id(self) -> int:
"""compute upstream id from original id + batch_index"""
return int(self.jussi_request_id) + self.batch_index
@property
def translated(self) -> bool:
"""returns true if the JSONRPCRequest has been translated to appbase format"""
return self.original_request is not None
def __hash__(self) -> int:
"""generate hash from urn"""
return hash(self.urn)
@staticmethod
def translate_to_appbase(request: SingleRawRequest, urn) -> dict:
"""Convert to 'method':'call', 'params]:['condenser_api', method_name, [method params]]"""
params = urn.params
if params is _empty:
params = []
......@@ -114,14 +120,19 @@ class JSONRPCRequest:
# pylint: disable=no-member
def from_http_request(http_request, batch_index: int, request: SingleRawRequest):
"""Convert a SingleRawQuest to JSONRPCRequest, determining upstream server and translating to appbase format('method':'call') if upstream says to"""
from ..urn import from_request as urn_from_request
from ..upstream import Upstream
# Determine upstream server (hived, hivemind, etc) from available upstream mappings and urn from request
upstreams = http_request.app.config.upstreams
#convert a raw request to a urn using regex to map multiple json styles of request
urn = urn_from_request(request) # type:URN
#determine upstream server, cache time, and timeout value for the urn using upstream mapping
upstream = Upstream.from_urn(urn, upstreams=upstreams) # type: Upstream
original_request = None
# If upstream says to translate this urn, do it and keep original request too
if upstreams.translate_to_appbase(urn):
original_request = request
request = JSONRPCRequest.translate_to_appbase(request, urn)
......
......@@ -43,6 +43,7 @@ jsonschema.Draft4Validator.check_schema(UPSTREAM_SCHEMA)
class _Upstreams(object):
"""Maps and converts incoming requests to calls to upstream servers based on name(spaces) in jusii config file"""
__NAMESPACES = None
__URLS = None
__TTLS = None
......@@ -50,6 +51,7 @@ class _Upstreams(object):
__TRANSLATE_TO_APPBASE = None
def __init__(self, config, validate=True):
"""Load config file and create namespaces"""
upstream_config = config['upstreams']
# CONFIG_VALIDATOR.validate(upstream_config)
self.config = upstream_config
......@@ -73,6 +75,7 @@ class _Upstreams(object):
self.validate_urls()
def __build_trie(self, key):
"""builds a searchable trie by parsing a subset of config file specified by key"""
trie = pygtrie.StringTrie(separator='.')
for item in it.chain.from_iterable(c[key] for c in self.config):
if isinstance(item, list):
......@@ -88,6 +91,7 @@ class _Upstreams(object):
@functools.lru_cache(8192)
def url(self, request_urn) -> str:
"""return url for upstream server based on request_urn and URLS trie in config file"""
# certain hived.get_state paths must be routed differently
if (request_urn.api in ['database_api', 'condenser_api']
and request_urn.method == 'get_state'
......@@ -99,6 +103,11 @@ class _Upstreams(object):
return url
_, url = self.__URLS.longest_prefix(str(request_urn))
#if request_urn.method == 'broadcast_transaction_synchronous':
# if not url:
# logger.error('***** no url for %s', str(request_urn))
# else:
# logger.warning('>>>>> %s to %s', str(request_urn), url)
if not url:
raise InvalidUpstreamURL(
url=url, reason='No matching url found', urn=str(request_urn))
......@@ -120,6 +129,7 @@ class _Upstreams(object):
@property
def urls(self) -> frozenset:
"""return a set of all defined upstream urls from config file"""
return frozenset(u for u in self.__URLS.values())
@property
......@@ -127,6 +137,7 @@ class _Upstreams(object):
return self.__NAMESPACES
def translate_to_appbase(self, request_urn) -> bool:
"""return true if urn's namespace is a translating namespace"""
return request_urn.namespace in self.__TRANSLATE_TO_APPBASE
def validate_urls(self):
......@@ -155,6 +166,7 @@ class Upstream(NamedTuple):
@classmethod
@functools.lru_cache(4096)
def from_urn(cls, urn, upstreams: _Upstreams=None):
"""lookup upstream server url, time-to-live in cache, and timeout for a urn request based on config file"""
return Upstream(upstreams.url(urn),
upstreams.ttl(urn),
upstreams.timeout(urn))
......@@ -89,17 +89,17 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
try:
method = single_jsonrpc_request['method']
params = single_jsonrpc_request.get('params', _empty)
#use regex to determine the real namespace, api, method, and params of the request
matched = _parse_jrpc_method(method)
if matched.get('appbase_api'):
if matched.get('appbase_api'): #e.g. condenser_api.method
return {
'namespace': 'appbase',
'api': matched['appbase_api'],
'method': matched['appbase_method'],
'params': params
}
if matched.get('namespace'):
if matched['namespace'] == 'jsonrpc':
if matched.get('namespace'): #e.g. hived.condesner_api.method or jsonrpc.method
if matched['namespace'] == 'jsonrpc': #if jsonrpc namespace, then use appbase namespace and jsonrpc as api
return {
'namespace': 'appbase',
'api': 'jsonrpc',
......@@ -112,9 +112,10 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
'method': matched['method'],
'params': params
}
if matched['bare_method']:
if matched['bare_method']: #e.g. method
method = matched['bare_method']
#if method != call, then it is a hived.database_api.method call
if method != 'call':
return {
'namespace': 'hived',
......@@ -123,16 +124,21 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
'params': params
}
#method = call, so params contains info on actual api, method, and parameters
#probably should be if 2 parameters, then assume no actual parameters
if len(params) != 3:
namespace = 'appbase'
api, method = params
_params = _empty
else:
else: #if three parameters
#remove api and method to get actual parameters
api, method, _params = params
#if (api is condenser_api or jsonrpc) or actual parameters are in a dictionary, then appbase namespace
if api == 'condenser_api' or isinstance(_params, dict) or api == 'jsonrpc':
namespace = 'appbase'
else:
namespace = 'hived'
#if api is an integer, map from api integer to api string
if isinstance(api, int):
try:
api = STEEMD_NUMERIC_API_MAPPING[api]
......@@ -162,6 +168,7 @@ def _parse_jrpc(single_jsonrpc_request) -> dict:
def from_request(single_jsonrpc_request: dict) -> URN:
parsed = _parse_jrpc(single_jsonrpc_request)
#sort parameters for better caching if a dictionary
if isinstance(parsed['params'], dict):
parsed['params'] = dict(sorted(parsed['params'].items()))
return URN(parsed['namespace'],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment