Skip to content
Snippets Groups Projects
Commit 0acde451 authored by John Gerlock's avatar John Gerlock
Browse files

Add statsd timers, fixed error response caching

parent 85f54c05
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,8 @@ ENV JUSSI_SBDS_HTTP_URL https://sbds.steemitdev.com ...@@ -19,6 +19,8 @@ ENV JUSSI_SBDS_HTTP_URL https://sbds.steemitdev.com
ENV JUSSI_REDIS_PORT 6379 ENV JUSSI_REDIS_PORT 6379
ENV ENVIRONMENT PROD ENV ENVIRONMENT PROD
RUN \ RUN \
apt-get update && \ apt-get update && \
apt-get install -y \ apt-get install -y \
...@@ -35,7 +37,20 @@ RUN \ ...@@ -35,7 +37,20 @@ RUN \
libxml2-dev \ libxml2-dev \
libxslt-dev \ libxslt-dev \
runit \ runit \
nginx nginx \
wget
RUN wget -q https://www.scalyr.com/scalyr-repo/stable/latest/scalyr-repo-bootstrap_1.2.1_all.deb
RUN dpkg -r scalyr-repo scalyr-repo-bootstrap # Remove any previous repository definitions, if any.
RUN dpkg -i ./scalyr-repo-bootstrap_1.2.1_all.deb
#RUN rm scalyr-repo-bootstrap_1.2.1_all.deb
RUN \
apt-get update && \
apt-get install -y \
scalyr-repo \
scalyr-agent-2
RUN \ RUN \
...@@ -77,7 +92,8 @@ RUN \ ...@@ -77,7 +92,8 @@ RUN \
/var/tmp/* \ /var/tmp/* \
/var/cache/* \ /var/cache/* \
/usr/include \ /usr/include \
/usr/local/include /usr/local/include \
EXPOSE ${NGINX_SERVER_PORT} EXPOSE ${NGINX_SERVER_PORT}
......
SHELL := /bin/bash SHELL := /bin/bash
ROOT_DIR := $(shell pwd) ROOT_DIR := $(shell pwd)
PROJECT_NAME := jussi PROJECT_NAME := $(notdir $(ROOT_DIR))
PROJECT_DOCKER_TAG := steemit/$(PROJECT_NAME) PROJECT_DOCKER_TAG := steemit/$(PROJECT_NAME)
PROJECT_DOCKER_RUN_ARGS := -p8080:8080 PROJECT_DOCKER_RUN_ARGS := -p8080:8080
PIPENV_VENV_IN_PROJECT := 1
export PIPENV_VENV_IN_PROJECT
default: build default: build
.PHONY: init build run run-local test lint fmt .PHONY: init build run run-local test lint fmt
init: init:
pip3 install pipenv pip3 install pipenv
pipenv lock
pipenv install --three --dev pipenv install --three --dev
pipenv run python3 setup.py develop
build: build:
docker build -t $(PROJECT_DOCKER_TAG) . docker build -t $(PROJECT_DOCKER_TAG) .
run: build run:
docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG) docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG)
run-local: run-local:
...@@ -26,10 +29,9 @@ run-local: ...@@ -26,10 +29,9 @@ run-local:
test: test:
pipenv run py.test tests pipenv run py.test tests
lint: lint:
pipenv run py.test --pylint -m pylint jussi pipenv run py.test --pylint -m pylint $(PROJECT_NAME)
fmt: fmt:
pipenv run yapf --recursive --in-place --style pep8 jussi pipenv run yapf --recursive --in-place --style pep8 $(PROJECT_NAME)
pipenv run autopep8 --recursive --in-place jussi pipenv run autopep8 --recursive --in-placedo $(PROJECT_NAME)
\ No newline at end of file \ No newline at end of file
...@@ -12,4 +12,3 @@ __summary__ = "" ...@@ -12,4 +12,3 @@ __summary__ = ""
__uri__ = "https://github.com/steemit/jussi" __uri__ = "https://github.com/steemit/jussi"
__author__ = "John Gerlock" __author__ = "John Gerlock"
__email__ = "jg@steemit.com" __email__ = "jg@steemit.com"
...@@ -3,6 +3,7 @@ import asyncio ...@@ -3,6 +3,7 @@ import asyncio
import hashlib import hashlib
import logging import logging
import ujson
logger = logging.getLogger('sanic') logger = logging.getLogger('sanic')
...@@ -40,3 +41,34 @@ async def cache_set(app, value, jussi_attrs): ...@@ -40,3 +41,34 @@ async def cache_set(app, value, jussi_attrs):
cache = app.config.cache cache = app.config.cache
logger.debug('%s.set(%s, %s, ttl=%s)', cache, jussi_attrs.key, value, ttl) logger.debug('%s.set(%s, %s, ttl=%s)', cache, jussi_attrs.key, value, ttl)
asyncio.ensure_future(cache.set(jussi_attrs.key, value, ttl=ttl)) asyncio.ensure_future(cache.set(jussi_attrs.key, value, ttl=ttl))
async def cache_json_response(app, value, jussi_attrs):
"""Don't cache error responses
Args:
app: object
value: str || bytes
jussi_attrs: namedtuple
Returns:
"""
try:
if isinstance(value, bytes):
parsed = ujson.loads(value.decode())
else:
parsed = ujson.loads(value)
except Exception as e:
logger.error(
'json parse error %s in response from upstream %s, skipping cache',
e, jussi_attrs.upstream_url)
return
if 'error' in parsed:
logger.error(
'jsonrpc error %s in response from upstream %s, skipping cache',
parsed['error'], jussi_attrs.upstream_url)
return
else:
asyncio.ensure_future(cache_set(app, value, jussi_attrs))
# coding=utf-8 # coding=utf-8
import time
import logging import logging
from sanic import response from sanic import response
from sanic.exceptions import InvalidUsage
from .cache import cache_get from .cache import cache_get
from .utils import async_exclude_methods
from .utils import jussi_attrs from .utils import jussi_attrs
from .utils import replace_jsonrpc_id from .utils import replace_jsonrpc_id
from .utils import sort_request from .utils import sort_request
from .timers import init_timers
from .timers import log_timers
logger = logging.getLogger('sanic') logger = logging.getLogger('sanic')
async def start_stats(request):
request['timers'] = init_timers(start_time=time.time())
request['statsd'] = request.app.config.statsd_client.pipeline()
@async_exclude_methods(exclude_http_methods=('GET', ))
async def add_jussi_attrs(request): async def add_jussi_attrs(request):
#request.parsed_json = replace_jsonrpc_id(request.json) # request.json handles json parse errors, this handles empty json
if not request.json:
raise InvalidUsage('Bad jsonrpc request')
request.parsed_json = sort_request(request.json) request.parsed_json = sort_request(request.json)
request = await jussi_attrs(request) request = await jussi_attrs(request)
logger.debug('request.jussi: %s', request['jussi']) logger.debug('request.jussi: %s', request['jussi'])
@async_exclude_methods(exclude_http_methods=('GET', ))
async def caching_middleware(request): async def caching_middleware(request):
if request['jussi_is_batch']: if request['jussi_is_batch']:
logger.debug('skipping cache for jsonrpc batch request') logger.debug('skipping cache for jsonrpc batch request')
return return
jussi_attrs = request['jussi'] jussi_attrs = request['jussi']
cached_response = await cache_get(request.app, jussi_attrs) with request['timers']['caching_middleware']:
cached_response = await cache_get(request.app, jussi_attrs)
if cached_response: if cached_response:
return response.raw( return response.raw(
cached_response, cached_response,
content_type='application/json', content_type='application/json',
headers={'x-jussi-cache-hit': jussi_attrs.key}) headers={'x-jussi-cache-hit': jussi_attrs.key})
async def finalize_timers(request, response):
end = time.time()
if not request.get('timers'):
logger.info('skipped finalizing timers, no timers to finalize')
return
request['timers']['total_jussi_elapsed']
for name, timer in request['timers'].items():
timer.end(end)
log_timers(request.get('timers'), logger.debug)
async def log_stats(request, response):
if not request.get('timers'):
logger.info('skipped logging timers, no timers to log')
return
log_timers(request.get('timers'), logger.info)
try:
pipe = request['statsd']
logger.info(pipe._stats)
for name, timer in request['timers'].items():
pipe.timing(name, timer.elapsed)
pipe.send()
except Exception as e:
logger.warning('Failed to send stats to statsd: %s', e)
...@@ -6,6 +6,7 @@ import functools ...@@ -6,6 +6,7 @@ import functools
import logging import logging
import os import os
import ujson import ujson
import datetime
import aiocache import aiocache
import aiocache.plugins import aiocache.plugins
...@@ -13,20 +14,23 @@ import aiohttp ...@@ -13,20 +14,23 @@ import aiohttp
import pygtrie import pygtrie
import funcy import funcy
import websockets import websockets
from statsd import StatsClient
from sanic import Sanic from sanic import Sanic
from sanic import response from sanic import response
from sanic.exceptions import SanicException from sanic.exceptions import SanicException
from jussi.cache import cache_get from jussi.cache import cache_get
from jussi.cache import cache_set from jussi.cache import cache_json_response
from jussi.serializers import CompressionSerializer from jussi.serializers import CompressionSerializer
from jussi.logging_config import LOGGING from jussi.logging_config import LOGGING
from jussi.middlewares import add_jussi_attrs from jussi.middlewares import add_jussi_attrs
from jussi.middlewares import caching_middleware from jussi.middlewares import caching_middleware
from jussi.middlewares import start_stats
from jussi.middlewares import finalize_timers
from jussi.middlewares import log_stats
from jussi.utils import websocket_conn from jussi.utils import websocket_conn
from jussi.utils import return_bytes from jussi.utils import return_bytes
# init logging # init logging
LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO'))
LOGGING['loggers']['sanic']['level'] = LOG_LEVEL LOGGING['loggers']['sanic']['level'] = LOG_LEVEL
...@@ -41,10 +45,12 @@ NO_CACHE_TTL = -1 ...@@ -41,10 +45,12 @@ NO_CACHE_TTL = -1
NO_CACHE_EXPIRE_TTL = 0 NO_CACHE_EXPIRE_TTL = 0
# add individual method cache settings here # add individual method cache settings here
METHOD_CACHE_SETTINGS = ( METHOD_CACHE_SETTINGS = (('get_block', 'steemd_websocket_url',
('get_block', 'steemd_websocket_url', NO_CACHE_EXPIRE_TTL), NO_CACHE_EXPIRE_TTL),
('get_block_header', 'steemd_websocket_url', NO_CACHE_EXPIRE_TTL), ('get_block_header', 'steemd_websocket_url',
('get_global_dynamic_properties', 'steemd_websocket_url', 1)) NO_CACHE_EXPIRE_TTL),
('get_global_dynamic_properties',
'steemd_websocket_url', 1))
@funcy.log_calls(logger.debug) @funcy.log_calls(logger.debug)
...@@ -56,8 +62,9 @@ async def fetch_ws(app, jussi, jsonrpc_request): ...@@ -56,8 +62,9 @@ async def fetch_ws(app, jussi, jsonrpc_request):
response = await ws.recv() response = await ws.recv()
return response return response
@funcy.log_calls(logger.debug) @funcy.log_calls(logger.debug)
async def http_post(app, jussi, jsonrpc_request): async def fetch_http(app, jussi, jsonrpc_request):
session = app.config.aiohttp['session'] session = app.config.aiohttp['session']
async with session.post(jussi.upstream_url, json=jsonrpc_request) as resp: async with session.post(jussi.upstream_url, json=jsonrpc_request) as resp:
bytes_response = await resp.read() bytes_response = await resp.read()
...@@ -72,26 +79,39 @@ async def dispatch_single(sanic_http_request, jsonrpc_request, jrpc_req_index): ...@@ -72,26 +79,39 @@ async def dispatch_single(sanic_http_request, jsonrpc_request, jrpc_req_index):
if sanic_http_request['jussi_is_batch']: if sanic_http_request['jussi_is_batch']:
jussi_attrs = jussi_attrs[jrpc_req_index] jussi_attrs = jussi_attrs[jrpc_req_index]
# stats/logging
def prefix(name):
return '%s.%s' % (jussi_attrs.log_prefix, name)
sanic_http_request['statsd'].incr('%s.%s' % (jussi_attrs.namespace,
jussi_attrs.method_name))
# return cached response if possible # return cached response if possible
response = await cache_get(app, jussi_attrs) with sanic_http_request['timers'][prefix('dispatch_single_caching_check')]:
response = await cache_get(app, jussi_attrs)
if response: if response:
return response return response
if jussi_attrs.is_ws: if jussi_attrs.is_ws:
bytes_response = await fetch_ws(app, jussi_attrs, jsonrpc_request) with sanic_http_request['timers'][prefix('fetch_ws')]:
bytes_response = await fetch_ws(app, jussi_attrs, jsonrpc_request)
else: else:
bytes_response = await http_post(app, jussi_attrs, jsonrpc_request) with sanic_http_request['timers'][prefix('fetch_http')]:
bytes_response = await fetch_http(app, jussi_attrs,
jsonrpc_request)
asyncio.ensure_future( asyncio.ensure_future(
cache_set(app, bytes_response, jussi_attrs=jussi_attrs)) cache_json_response(app, bytes_response, jussi_attrs=jussi_attrs))
return bytes_response return bytes_response
async def dispatch_batch(sanic_http_request, jsonrpc_requests): async def dispatch_batch(sanic_http_request, jsonrpc_requests):
responses = await asyncio.gather(*[ with sanic_http_request['timers']['batch_requests']:
dispatch_single(sanic_http_request, jsonrpc_request, jrpc_req_index) responses = await asyncio.gather(* [
for jrpc_req_index, jsonrpc_request in enumerate(jsonrpc_requests) dispatch_single(sanic_http_request, jsonrpc_request,
]) jrpc_req_index)
for jrpc_req_index, jsonrpc_request in enumerate(jsonrpc_requests)
])
json_responses = [] json_responses = []
for r in responses: for r in responses:
if isinstance(r, bytes): if isinstance(r, bytes):
...@@ -101,7 +121,7 @@ async def dispatch_batch(sanic_http_request, jsonrpc_requests): ...@@ -101,7 +121,7 @@ async def dispatch_batch(sanic_http_request, jsonrpc_requests):
return ujson.dumps(json_responses).encode() return ujson.dumps(json_responses).encode()
@app.route('/', methods=['POST']) @app.post('/')
async def handle(sanic_http_request): async def handle(sanic_http_request):
app = sanic_http_request.app app = sanic_http_request.app
...@@ -109,6 +129,7 @@ async def handle(sanic_http_request): ...@@ -109,6 +129,7 @@ async def handle(sanic_http_request):
jsonrpc_requests = sanic_http_request.json jsonrpc_requests = sanic_http_request.json
# make upstream requests # make upstream requests
if sanic_http_request['jussi_is_batch']: if sanic_http_request['jussi_is_batch']:
jsonrpc_response = await dispatch_batch(sanic_http_request, jsonrpc_response = await dispatch_batch(sanic_http_request,
jsonrpc_requests) jsonrpc_requests)
...@@ -124,6 +145,23 @@ async def handle(sanic_http_request): ...@@ -124,6 +145,23 @@ async def handle(sanic_http_request):
return response.text(jsonrpc_response, content_type='application/json') return response.text(jsonrpc_response, content_type='application/json')
# health check routes
@app.get('/')
async def handle(sanic_http_request):
return response.json({
'status': 'OK',
'datetime': datetime.datetime.utcnow().isoformat()
})
@app.get('/health')
async def handle(sanic_http_request):
return response.json({
'status': 'OK',
'datetime': datetime.datetime.utcnow().isoformat()
})
@app.exception(SanicException) @app.exception(SanicException)
def handle_errors(request, exception): def handle_errors(request, exception):
"""all errors return HTTP 502 """all errors return HTTP 502
...@@ -135,8 +173,14 @@ def handle_errors(request, exception): ...@@ -135,8 +173,14 @@ def handle_errors(request, exception):
Returns: Returns:
""" """
status_code = getattr(exception, 'status_code', 502)
message = str(exception) or 'Gateway Error'
logger.error('%s-%s', request, exception) logger.error('%s-%s', request, exception)
return response.text(body='Gateway Error', status=502) return response.json(
{
'error': message,
'status_code': status_code
}, status=status_code)
# register listeners # register listeners
...@@ -150,28 +194,35 @@ def handle_errors(request, exception): ...@@ -150,28 +194,35 @@ def handle_errors(request, exception):
def setup_statsd(app, loop): def setup_statsd(app, loop):
logger.info('before_server_start -> setup_statsd') logger.info('before_server_start -> setup_statsd')
args = app.config.args args = app.config.args
if args.statsd_host:
app.config.statsd = { config = {
'host': args.statsd_host, 'host': args.statsd_host,
'port': args.statsd_port, 'port': args.statsd_port,
'prefix': args.stats_prefix 'prefix': args.statsd_prefix
} }
app.config.statsd_client = StatsClient(**config)
@app.listener('before_server_start') @app.listener('before_server_start')
def setup_middlewares(app, loop): def setup_middlewares(app, loop):
logger.info('before_server_start -> setup_middlewares') logger.info('before_server_start -> setup_middlewares')
app.request_middleware.append(start_stats)
app.request_middleware.append(add_jussi_attrs) app.request_middleware.append(add_jussi_attrs)
app.request_middleware.append(caching_middleware) app.request_middleware.append(caching_middleware)
app.response_middleware.append(finalize_timers)
app.response_middleware.append(log_stats)
@app.listener('before_server_start') @app.listener('before_server_start')
async def setup_cache(app, loop): async def setup_cache(app, loop):
logger.info('before_server_start -> setup_cache') logger.info('before_server_start -> setup_cache')
args = app.config.args args = app.config.args
# only use redis if we can really talk to it # only use redis if we can really talk to it
logger.info('before_server_start -> setup_cache redis_host:%s', args.redis_host) logger.info('before_server_start -> setup_cache redis_host:%s',
logger.info('before_server_start -> setup_cache redis_port:%s', args.redis_port) args.redis_host)
logger.info('before_server_start -> setup_cache redis_port:%s',
args.redis_port)
try: try:
if not args.redis_host: if not args.redis_host:
raise ValueError('no redis host specified') raise ValueError('no redis host specified')
...@@ -189,7 +240,9 @@ async def setup_cache(app, loop): ...@@ -189,7 +240,9 @@ async def setup_cache(app, loop):
assert val == b'testval' assert val == b'testval'
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
logger.error('Unable to use redis (was a setting not defined?), using in-memory cache instead...') logger.error(
'Unable to use redis (was a setting not defined?), using in-memory cache instead...'
)
default_cache = aiocache.SimpleMemoryCache( default_cache = aiocache.SimpleMemoryCache(
serializer=CompressionSerializer(), serializer=CompressionSerializer(),
plugins=[ plugins=[
...@@ -239,10 +292,10 @@ async def setup_websocket_connection(app, loop): ...@@ -239,10 +292,10 @@ async def setup_websocket_connection(app, loop):
""" """
logger.info('before_server_start -> setup_ws_client') logger.info('before_server_start -> setup_ws_client')
args = app.config.args args = app.config.args
app.config.websocket_kwargs = dict(uri=args.steemd_websocket_url, app.config.websocket_kwargs = dict(
max_size=int(2e6), max_queue=200) uri=args.steemd_websocket_url, max_size=int(2e6), max_queue=200)
app.config.websocket_client = await websockets.connect(**app.config.websocket_kwargs) app.config.websocket_client = await websockets.connect(
**app.config.websocket_kwargs)
@app.listener('before_server_start') @app.listener('before_server_start')
...@@ -287,27 +340,28 @@ def main(): ...@@ -287,27 +340,28 @@ def main():
parser.add_argument('--server_port', type=int, default=9000) parser.add_argument('--server_port', type=int, default=9000)
parser.add_argument('--server_workers', type=int, default=os.cpu_count()) parser.add_argument('--server_workers', type=int, default=os.cpu_count())
parser.add_argument( parser.add_argument(
'--steemd_websocket_url', type=str, '--steemd_websocket_url',
default='wss://steemd.steemitdev.com') type=str,
parser.add_argument('--sbds_url', type=str, default='wss://steemd.steemitdev.com')
default='https://sbds.steemit.com') parser.add_argument(
'--sbds_url', type=str, default='https://sbds.steemit.com')
parser.add_argument('--redis_host', type=str, default=None) parser.add_argument('--redis_host', type=str, default=None)
parser.add_argument('--redis_port', type=int, default=6379) parser.add_argument('--redis_port', type=int, default=6379)
parser.add_argument('--redis_namespace', type=str, default='jussi') parser.add_argument('--redis_namespace', type=str, default='jussi')
parser.add_argument('--statsd_host', type=str) parser.add_argument('--statsd_host', type=str, default='localhost')
parser.add_argument('--statsd_port', type=int, default=8125) parser.add_argument('--statsd_port', type=int, default=8125)
parser.add_argument('--statsd_prefix', type=str, default='jussi') parser.add_argument('--statsd_prefix', type=str, default='jussi')
args = parser.parse_args() args = parser.parse_args()
app.config.args = args app.config.args = args
# run app # run app
logger.info('app.run') logger.info('app.run')
app.run( app.run(
host=args.server_host, host=args.server_host,
port=args.server_port, port=args.server_port,
workers=args.server_workers, workers=args.server_workers,
log_config=LOGGING) log_config=LOGGING)
if __name__ == '__main__': if __name__ == '__main__':
main() main()
\ No newline at end of file
# coding=utf-8
import time
from collections import defaultdict
class Timer(object):
__slots__ = ('_start_time', '_end_time')
def __init__(self, start=None, end=None):
self._start_time = start or time.time()
self._end_time = end
def start(self, start=None):
if start:
self._start_time = start
def restart(self):
self._start_time = time.time()
def end(self, end=None):
self._end_time = self._end_time or end or time.time()
@property
def elapsed(self):
end = self._end_time or time.time()
return int((end - self._start_time) * 1000)
@property
def final(self):
self.end()
return self.elapsed
def __str__(self):
return str(self.elapsed)
def __repr__(self):
return self.elapsed
def __enter__(self, start=None):
if start:
self.start(start=start)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end()
def init_timers(start_time=None):
return defaultdict(lambda: Timer(start=start_time))
def end_timers(timers, end=None):
for name, timer in timers.items():
if end:
timer.end(end)
return timers
def display_timers(timers, end=None):
for name, timer in timers.items():
if end:
timer.end(end)
print('%s %s' % (name, timer))
def log_timers(timers, log_func, end=None):
for name, timer in timers.items():
log_func('%s %s' % (name, timer))
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import functools import functools
import time import time
import logging
import websockets import websockets
from collections import OrderedDict from collections import OrderedDict
from collections import namedtuple from collections import namedtuple
from sanic.exceptions import InvalidUsage
from jussi.cache import jsonrpc_cache_key
logger = logging.getLogger('sanic')
from jussi.cache import jsonrpc_cache_key
# decorators # decorators
def apply_single_or_batch(func): def apply_single_or_batch(func):
"""Decorate func to apply func to single or batch jsonrpc_requests """Decorate func to apply func to single or batch jsonrpc_requests
...@@ -32,7 +35,6 @@ def apply_single_or_batch(func): ...@@ -32,7 +35,6 @@ def apply_single_or_batch(func):
return wrapper return wrapper
def websocket_conn(func): def websocket_conn(func):
"""Decorate func to make sure func has an open websocket client """Decorate func to make sure func has an open websocket client
...@@ -42,6 +44,7 @@ def websocket_conn(func): ...@@ -42,6 +44,7 @@ def websocket_conn(func):
Returns: Returns:
""" """
@functools.wraps(func) @functools.wraps(func)
async def wrapper(app, jussi, jsonrpc_request): async def wrapper(app, jussi, jsonrpc_request):
ws = app.config.websocket_client ws = app.config.websocket_client
...@@ -52,9 +55,45 @@ def websocket_conn(func): ...@@ -52,9 +55,45 @@ def websocket_conn(func):
ws = await websockets.connect(**app.config.websocket_kwargs) ws = await websockets.connect(**app.config.websocket_kwargs)
app.config.websocket_client = ws app.config.websocket_client = ws
return await func(app, jussi, jsonrpc_request) return await func(app, jussi, jsonrpc_request)
return wrapper return wrapper
def async_ignore_exceptions(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.info('Ignored exception: %s', e)
return wrapper
def async_exclude_methods(middleware_func=None, exclude_http_methods=None):
"""Exclude specified HTTP methods from middleware
Args:
middleware_func:
exclude_http_methods:
Returns:
"""
if middleware_func is None:
return functools.partial(
async_exclude_methods, exclude_http_methods=exclude_http_methods)
@functools.wraps(middleware_func)
async def f(request):
if request.method in exclude_http_methods:
return request
else:
return await middleware_func(request)
return f
def return_bytes(func): def return_bytes(func):
"""Decorate func to make sure func has an open websocket client """Decorate func to make sure func has an open websocket client
...@@ -64,14 +103,15 @@ def return_bytes(func): ...@@ -64,14 +103,15 @@ def return_bytes(func):
Returns: Returns:
""" """
@functools.wraps(func) @functools.wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
result = await func(*args, **kwargs) result = await func(*args, **kwargs)
if isinstance(result, str): if isinstance(result, str):
result = result.encode() result = result.encode()
return result return result
return wrapper
return wrapper
def generate_int_id(): def generate_int_id():
...@@ -137,28 +177,38 @@ async def get_upstream(sanic_http_request, jsonrpc_request): ...@@ -137,28 +177,38 @@ async def get_upstream(sanic_http_request, jsonrpc_request):
return upstream['url'], upstream['ttl'] return upstream['url'], upstream['ttl']
JussiAttributes = namedtuple( JussiAttributes = namedtuple('JussiAttributes', [
'JussiAttributes', ['key', 'upstream_url', 'ttl', 'cacheable', 'is_ws']) 'key', 'upstream_url', 'ttl', 'cacheable', 'is_ws', 'namespace',
'method_name', 'log_prefix'
])
async def jussi_attrs(sanic_http_request): async def jussi_attrs(sanic_http_request):
jsonrpc_requests = sanic_http_request.json jsonrpc_requests = sanic_http_request.json
app = sanic_http_request.app app = sanic_http_request.app
if isinstance(jsonrpc_requests, list): if isinstance(jsonrpc_requests, list):
results = [] results = []
for r in jsonrpc_requests: for i, r in enumerate(jsonrpc_requests):
if not r:
raise InvalidUsage('Bad jsonrpc request')
key = jsonrpc_cache_key(r) key = jsonrpc_cache_key(r)
url, ttl = await get_upstream(sanic_http_request, r) url, ttl = await get_upstream(sanic_http_request, r)
cacheable = ttl > app.config.cache_config['no_cache_ttl'] cacheable = ttl > app.config.cache_config['no_cache_ttl']
is_ws = url.startswith('ws') is_ws = url.startswith('ws')
namespace, method_name = parse_namespaced_method(r['method'])
prefix = '.'.join([str(i), namespace, method_name])
results.append( results.append(
JussiAttributes( JussiAttributes(
key=key, key=key,
upstream_url=url, upstream_url=url,
ttl=ttl, ttl=ttl,
cacheable=cacheable, cacheable=cacheable,
is_ws=is_ws)) is_ws=is_ws,
namespace=namespace,
method_name=method_name,
log_prefix=prefix))
sanic_http_request['jussi'] = results sanic_http_request['jussi'] = results
sanic_http_request['jussi_is_batch'] = True sanic_http_request['jussi_is_batch'] = True
else: else:
...@@ -166,12 +216,18 @@ async def jussi_attrs(sanic_http_request): ...@@ -166,12 +216,18 @@ async def jussi_attrs(sanic_http_request):
url, ttl = await get_upstream(sanic_http_request, jsonrpc_requests) url, ttl = await get_upstream(sanic_http_request, jsonrpc_requests)
cacheable = ttl > app.config.cache_config['no_cache_ttl'] cacheable = ttl > app.config.cache_config['no_cache_ttl']
is_ws = url.startswith('ws') is_ws = url.startswith('ws')
namespace, method_name = parse_namespaced_method(
jsonrpc_requests['method'])
prefix = '.'.join(['0', namespace, method_name])
sanic_http_request['jussi'] = JussiAttributes( sanic_http_request['jussi'] = JussiAttributes(
key=key, key=key,
upstream_url=url, upstream_url=url,
ttl=ttl, ttl=ttl,
cacheable=cacheable, cacheable=cacheable,
is_ws=is_ws) is_ws=is_ws,
namespace=namespace,
method_name=method_name,
log_prefix=prefix)
sanic_http_request['jussi_is_batch'] = False sanic_http_request['jussi_is_batch'] = False
return sanic_http_request return sanic_http_request
\ No newline at end of file
...@@ -5,7 +5,8 @@ try: ...@@ -5,7 +5,8 @@ try:
from pipenv.project import Project from pipenv.project import Project
from pipenv.utils import convert_deps_to_pip from pipenv.utils import convert_deps_to_pip
except ImportError: except ImportError:
raise Exception('setup requires pipenv. Run "pip3 install pipenv" then try again') raise Exception(
'setup requires pipenv. Run "pip3 install pipenv" then try again')
pfile = Project().parsed_pipfile pfile = Project().parsed_pipfile
requirements = convert_deps_to_pip(pfile['packages'], r=False) requirements = convert_deps_to_pip(pfile['packages'], r=False)
...@@ -21,24 +22,16 @@ with open(os.path.join(base_dir, "jussi", "__about__.py")) as f: ...@@ -21,24 +22,16 @@ with open(os.path.join(base_dir, "jussi", "__about__.py")) as f:
with open(os.path.join(base_dir, "README.md")) as f: with open(os.path.join(base_dir, "README.md")) as f:
long_description = f.read() long_description = f.read()
setup( setup(
name=about["__title__"], name=about["__title__"],
version=about["__version__"], version=about["__version__"],
description=about["__summary__"], description=about["__summary__"],
long_description=long_description, long_description=long_description,
url=about["__uri__"], url=about["__uri__"],
author=about["__author__"], author=about["__author__"],
author_email=about["__email__"], author_email=about["__email__"],
setup_requires=['pipenv', 'pytest-runner'],
setup_requires=['pipenv','pytest-runner'],
tests_require=test_requirements, tests_require=test_requirements,
install_requires=requirements, install_requires=requirements,
packages=["jussi"], packages=["jussi"],
entry_points={ entry_points={'console_scripts': ['jussi=jussi.serve:main']})
'console_scripts': ['jussi=jussi.serve:main']
}
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment