From 0acde45178b6ff13a123611eafa3c9ef4a099b6a Mon Sep 17 00:00:00 2001 From: John Gerlock <john.gerlock@gmail.com> Date: Sat, 8 Jul 2017 01:05:23 -0400 Subject: [PATCH] Add statsd timers, fixed error response caching --- Dockerfile | 20 ++++++- Makefile | 16 ++--- jussi/__about__.py | 1 - jussi/cache.py | 32 ++++++++++ jussi/middlewares.py | 47 ++++++++++++++- jussi/serve.py | 136 ++++++++++++++++++++++++++++++------------- jussi/timers.py | 68 ++++++++++++++++++++++ jussi/utils.py | 76 ++++++++++++++++++++---- setup.py | 15 ++--- 9 files changed, 337 insertions(+), 74 deletions(-) create mode 100644 jussi/timers.py diff --git a/Dockerfile b/Dockerfile index f2ccf6d..9b98731 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,8 @@ ENV JUSSI_SBDS_HTTP_URL https://sbds.steemitdev.com ENV JUSSI_REDIS_PORT 6379 ENV ENVIRONMENT PROD + + RUN \ apt-get update && \ apt-get install -y \ @@ -35,7 +37,20 @@ RUN \ libxml2-dev \ libxslt-dev \ runit \ - nginx + nginx \ + wget + +RUN wget -q https://www.scalyr.com/scalyr-repo/stable/latest/scalyr-repo-bootstrap_1.2.1_all.deb +RUN dpkg -r scalyr-repo scalyr-repo-bootstrap # Remove any previous repository definitions, if any. +RUN dpkg -i ./scalyr-repo-bootstrap_1.2.1_all.deb + +#RUN rm scalyr-repo-bootstrap_1.2.1_all.deb + +RUN \ + apt-get update && \ + apt-get install -y \ + scalyr-repo \ + scalyr-agent-2 RUN \ @@ -77,7 +92,8 @@ RUN \ /var/tmp/* \ /var/cache/* \ /usr/include \ - /usr/local/include + /usr/local/include \ + EXPOSE ${NGINX_SERVER_PORT} diff --git a/Makefile b/Makefile index bc17748..fd1939e 100644 --- a/Makefile +++ b/Makefile @@ -1,23 +1,26 @@ SHELL := /bin/bash ROOT_DIR := $(shell pwd) -PROJECT_NAME := jussi +PROJECT_NAME := $(notdir $(ROOT_DIR)) PROJECT_DOCKER_TAG := steemit/$(PROJECT_NAME) PROJECT_DOCKER_RUN_ARGS := -p8080:8080 +PIPENV_VENV_IN_PROJECT := 1 +export PIPENV_VENV_IN_PROJECT + default: build .PHONY: init build run run-local test lint fmt init: pip3 install pipenv - pipenv lock pipenv install --three --dev + pipenv run python3 setup.py develop build: docker build -t $(PROJECT_DOCKER_TAG) . -run: build +run: docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG) run-local: @@ -26,10 +29,9 @@ run-local: test: pipenv run py.test tests - lint: - pipenv run py.test --pylint -m pylint jussi + pipenv run py.test --pylint -m pylint $(PROJECT_NAME) fmt: - pipenv run yapf --recursive --in-place --style pep8 jussi - pipenv run autopep8 --recursive --in-place jussi \ No newline at end of file + pipenv run yapf --recursive --in-place --style pep8 $(PROJECT_NAME) + pipenv run autopep8 --recursive --in-placedo $(PROJECT_NAME) \ No newline at end of file diff --git a/jussi/__about__.py b/jussi/__about__.py index 644ed49..eb396a2 100644 --- a/jussi/__about__.py +++ b/jussi/__about__.py @@ -12,4 +12,3 @@ __summary__ = "" __uri__ = "https://github.com/steemit/jussi" __author__ = "John Gerlock" __email__ = "jg@steemit.com" - diff --git a/jussi/cache.py b/jussi/cache.py index f4de50f..e733d3e 100644 --- a/jussi/cache.py +++ b/jussi/cache.py @@ -3,6 +3,7 @@ import asyncio import hashlib import logging +import ujson logger = logging.getLogger('sanic') @@ -40,3 +41,34 @@ async def cache_set(app, value, jussi_attrs): cache = app.config.cache logger.debug('%s.set(%s, %s, ttl=%s)', cache, jussi_attrs.key, value, ttl) asyncio.ensure_future(cache.set(jussi_attrs.key, value, ttl=ttl)) + + +async def cache_json_response(app, value, jussi_attrs): + """Don't cache error responses + + Args: + app: object + value: str || bytes + jussi_attrs: namedtuple + + Returns: + + """ + try: + if isinstance(value, bytes): + parsed = ujson.loads(value.decode()) + else: + parsed = ujson.loads(value) + + except Exception as e: + logger.error( + 'json parse error %s in response from upstream %s, skipping cache', + e, jussi_attrs.upstream_url) + return + if 'error' in parsed: + logger.error( + 'jsonrpc error %s in response from upstream %s, skipping cache', + parsed['error'], jussi_attrs.upstream_url) + return + else: + asyncio.ensure_future(cache_set(app, value, jussi_attrs)) diff --git a/jussi/middlewares.py b/jussi/middlewares.py index 9b97e6f..249e0d2 100644 --- a/jussi/middlewares.py +++ b/jussi/middlewares.py @@ -1,31 +1,74 @@ # coding=utf-8 +import time import logging from sanic import response +from sanic.exceptions import InvalidUsage from .cache import cache_get +from .utils import async_exclude_methods from .utils import jussi_attrs from .utils import replace_jsonrpc_id from .utils import sort_request +from .timers import init_timers +from .timers import log_timers logger = logging.getLogger('sanic') +async def start_stats(request): + request['timers'] = init_timers(start_time=time.time()) + request['statsd'] = request.app.config.statsd_client.pipeline() + + +@async_exclude_methods(exclude_http_methods=('GET', )) async def add_jussi_attrs(request): - #request.parsed_json = replace_jsonrpc_id(request.json) + # request.json handles json parse errors, this handles empty json + if not request.json: + raise InvalidUsage('Bad jsonrpc request') request.parsed_json = sort_request(request.json) request = await jussi_attrs(request) logger.debug('request.jussi: %s', request['jussi']) +@async_exclude_methods(exclude_http_methods=('GET', )) async def caching_middleware(request): if request['jussi_is_batch']: logger.debug('skipping cache for jsonrpc batch request') return jussi_attrs = request['jussi'] - cached_response = await cache_get(request.app, jussi_attrs) + with request['timers']['caching_middleware']: + cached_response = await cache_get(request.app, jussi_attrs) + if cached_response: return response.raw( cached_response, content_type='application/json', headers={'x-jussi-cache-hit': jussi_attrs.key}) + + +async def finalize_timers(request, response): + end = time.time() + if not request.get('timers'): + logger.info('skipped finalizing timers, no timers to finalize') + return + request['timers']['total_jussi_elapsed'] + for name, timer in request['timers'].items(): + timer.end(end) + log_timers(request.get('timers'), logger.debug) + + +async def log_stats(request, response): + if not request.get('timers'): + logger.info('skipped logging timers, no timers to log') + return + log_timers(request.get('timers'), logger.info) + + try: + pipe = request['statsd'] + logger.info(pipe._stats) + for name, timer in request['timers'].items(): + pipe.timing(name, timer.elapsed) + pipe.send() + except Exception as e: + logger.warning('Failed to send stats to statsd: %s', e) diff --git a/jussi/serve.py b/jussi/serve.py index 5df4cc5..aa12f4f 100755 --- a/jussi/serve.py +++ b/jussi/serve.py @@ -6,6 +6,7 @@ import functools import logging import os import ujson +import datetime import aiocache import aiocache.plugins @@ -13,20 +14,23 @@ import aiohttp import pygtrie import funcy import websockets +from statsd import StatsClient from sanic import Sanic from sanic import response from sanic.exceptions import SanicException from jussi.cache import cache_get -from jussi.cache import cache_set +from jussi.cache import cache_json_response from jussi.serializers import CompressionSerializer from jussi.logging_config import LOGGING from jussi.middlewares import add_jussi_attrs from jussi.middlewares import caching_middleware +from jussi.middlewares import start_stats +from jussi.middlewares import finalize_timers +from jussi.middlewares import log_stats from jussi.utils import websocket_conn from jussi.utils import return_bytes - # init logging LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) LOGGING['loggers']['sanic']['level'] = LOG_LEVEL @@ -41,10 +45,12 @@ NO_CACHE_TTL = -1 NO_CACHE_EXPIRE_TTL = 0 # add individual method cache settings here -METHOD_CACHE_SETTINGS = ( - ('get_block', 'steemd_websocket_url', NO_CACHE_EXPIRE_TTL), - ('get_block_header', 'steemd_websocket_url', NO_CACHE_EXPIRE_TTL), - ('get_global_dynamic_properties', 'steemd_websocket_url', 1)) +METHOD_CACHE_SETTINGS = (('get_block', 'steemd_websocket_url', + NO_CACHE_EXPIRE_TTL), + ('get_block_header', 'steemd_websocket_url', + NO_CACHE_EXPIRE_TTL), + ('get_global_dynamic_properties', + 'steemd_websocket_url', 1)) @funcy.log_calls(logger.debug) @@ -56,8 +62,9 @@ async def fetch_ws(app, jussi, jsonrpc_request): response = await ws.recv() return response + @funcy.log_calls(logger.debug) -async def http_post(app, jussi, jsonrpc_request): +async def fetch_http(app, jussi, jsonrpc_request): session = app.config.aiohttp['session'] async with session.post(jussi.upstream_url, json=jsonrpc_request) as resp: bytes_response = await resp.read() @@ -72,26 +79,39 @@ async def dispatch_single(sanic_http_request, jsonrpc_request, jrpc_req_index): if sanic_http_request['jussi_is_batch']: jussi_attrs = jussi_attrs[jrpc_req_index] + # stats/logging + def prefix(name): + return '%s.%s' % (jussi_attrs.log_prefix, name) + + sanic_http_request['statsd'].incr('%s.%s' % (jussi_attrs.namespace, + jussi_attrs.method_name)) + # return cached response if possible - response = await cache_get(app, jussi_attrs) + with sanic_http_request['timers'][prefix('dispatch_single_caching_check')]: + response = await cache_get(app, jussi_attrs) if response: return response if jussi_attrs.is_ws: - bytes_response = await fetch_ws(app, jussi_attrs, jsonrpc_request) + with sanic_http_request['timers'][prefix('fetch_ws')]: + bytes_response = await fetch_ws(app, jussi_attrs, jsonrpc_request) else: - bytes_response = await http_post(app, jussi_attrs, jsonrpc_request) + with sanic_http_request['timers'][prefix('fetch_http')]: + bytes_response = await fetch_http(app, jussi_attrs, + jsonrpc_request) asyncio.ensure_future( - cache_set(app, bytes_response, jussi_attrs=jussi_attrs)) + cache_json_response(app, bytes_response, jussi_attrs=jussi_attrs)) return bytes_response async def dispatch_batch(sanic_http_request, jsonrpc_requests): - responses = await asyncio.gather(*[ - dispatch_single(sanic_http_request, jsonrpc_request, jrpc_req_index) - for jrpc_req_index, jsonrpc_request in enumerate(jsonrpc_requests) - ]) + with sanic_http_request['timers']['batch_requests']: + responses = await asyncio.gather(* [ + dispatch_single(sanic_http_request, jsonrpc_request, + jrpc_req_index) + for jrpc_req_index, jsonrpc_request in enumerate(jsonrpc_requests) + ]) json_responses = [] for r in responses: if isinstance(r, bytes): @@ -101,7 +121,7 @@ async def dispatch_batch(sanic_http_request, jsonrpc_requests): return ujson.dumps(json_responses).encode() -@app.route('/', methods=['POST']) +@app.post('/') async def handle(sanic_http_request): app = sanic_http_request.app @@ -109,6 +129,7 @@ async def handle(sanic_http_request): jsonrpc_requests = sanic_http_request.json # make upstream requests + if sanic_http_request['jussi_is_batch']: jsonrpc_response = await dispatch_batch(sanic_http_request, jsonrpc_requests) @@ -124,6 +145,23 @@ async def handle(sanic_http_request): return response.text(jsonrpc_response, content_type='application/json') +# health check routes +@app.get('/') +async def handle(sanic_http_request): + return response.json({ + 'status': 'OK', + 'datetime': datetime.datetime.utcnow().isoformat() + }) + + +@app.get('/health') +async def handle(sanic_http_request): + return response.json({ + 'status': 'OK', + 'datetime': datetime.datetime.utcnow().isoformat() + }) + + @app.exception(SanicException) def handle_errors(request, exception): """all errors return HTTP 502 @@ -135,8 +173,14 @@ def handle_errors(request, exception): Returns: """ + status_code = getattr(exception, 'status_code', 502) + message = str(exception) or 'Gateway Error' logger.error('%s-%s', request, exception) - return response.text(body='Gateway Error', status=502) + return response.json( + { + 'error': message, + 'status_code': status_code + }, status=status_code) # register listeners @@ -150,28 +194,35 @@ def handle_errors(request, exception): def setup_statsd(app, loop): logger.info('before_server_start -> setup_statsd') args = app.config.args - if args.statsd_host: - app.config.statsd = { - 'host': args.statsd_host, - 'port': args.statsd_port, - 'prefix': args.stats_prefix - } + + config = { + 'host': args.statsd_host, + 'port': args.statsd_port, + 'prefix': args.statsd_prefix + } + app.config.statsd_client = StatsClient(**config) @app.listener('before_server_start') def setup_middlewares(app, loop): logger.info('before_server_start -> setup_middlewares') + app.request_middleware.append(start_stats) app.request_middleware.append(add_jussi_attrs) app.request_middleware.append(caching_middleware) + app.response_middleware.append(finalize_timers) + app.response_middleware.append(log_stats) + @app.listener('before_server_start') async def setup_cache(app, loop): logger.info('before_server_start -> setup_cache') args = app.config.args # only use redis if we can really talk to it - logger.info('before_server_start -> setup_cache redis_host:%s', args.redis_host) - logger.info('before_server_start -> setup_cache redis_port:%s', args.redis_port) + logger.info('before_server_start -> setup_cache redis_host:%s', + args.redis_host) + logger.info('before_server_start -> setup_cache redis_port:%s', + args.redis_port) try: if not args.redis_host: raise ValueError('no redis host specified') @@ -189,7 +240,9 @@ async def setup_cache(app, loop): assert val == b'testval' except Exception as e: logger.exception(e) - logger.error('Unable to use redis (was a setting not defined?), using in-memory cache instead...') + logger.error( + 'Unable to use redis (was a setting not defined?), using in-memory cache instead...' + ) default_cache = aiocache.SimpleMemoryCache( serializer=CompressionSerializer(), plugins=[ @@ -239,10 +292,10 @@ async def setup_websocket_connection(app, loop): """ logger.info('before_server_start -> setup_ws_client') args = app.config.args - app.config.websocket_kwargs = dict(uri=args.steemd_websocket_url, - max_size=int(2e6), max_queue=200) - app.config.websocket_client = await websockets.connect(**app.config.websocket_kwargs) - + app.config.websocket_kwargs = dict( + uri=args.steemd_websocket_url, max_size=int(2e6), max_queue=200) + app.config.websocket_client = await websockets.connect( + **app.config.websocket_kwargs) @app.listener('before_server_start') @@ -287,27 +340,28 @@ def main(): parser.add_argument('--server_port', type=int, default=9000) parser.add_argument('--server_workers', type=int, default=os.cpu_count()) parser.add_argument( - '--steemd_websocket_url', type=str, - default='wss://steemd.steemitdev.com') - parser.add_argument('--sbds_url', type=str, - default='https://sbds.steemit.com') + '--steemd_websocket_url', + type=str, + default='wss://steemd.steemitdev.com') + parser.add_argument( + '--sbds_url', type=str, default='https://sbds.steemit.com') parser.add_argument('--redis_host', type=str, default=None) parser.add_argument('--redis_port', type=int, default=6379) parser.add_argument('--redis_namespace', type=str, default='jussi') - parser.add_argument('--statsd_host', type=str) + parser.add_argument('--statsd_host', type=str, default='localhost') parser.add_argument('--statsd_port', type=int, default=8125) parser.add_argument('--statsd_prefix', type=str, default='jussi') args = parser.parse_args() app.config.args = args - # run app logger.info('app.run') app.run( - host=args.server_host, - port=args.server_port, - workers=args.server_workers, - log_config=LOGGING) + host=args.server_host, + port=args.server_port, + workers=args.server_workers, + log_config=LOGGING) + if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/jussi/timers.py b/jussi/timers.py new file mode 100644 index 0000000..da778f5 --- /dev/null +++ b/jussi/timers.py @@ -0,0 +1,68 @@ +# coding=utf-8 +import time +from collections import defaultdict + + +class Timer(object): + __slots__ = ('_start_time', '_end_time') + + def __init__(self, start=None, end=None): + self._start_time = start or time.time() + self._end_time = end + + def start(self, start=None): + if start: + self._start_time = start + + def restart(self): + self._start_time = time.time() + + def end(self, end=None): + self._end_time = self._end_time or end or time.time() + + @property + def elapsed(self): + end = self._end_time or time.time() + return int((end - self._start_time) * 1000) + + @property + def final(self): + self.end() + return self.elapsed + + def __str__(self): + return str(self.elapsed) + + def __repr__(self): + return self.elapsed + + def __enter__(self, start=None): + if start: + self.start(start=start) + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.end() + + +def init_timers(start_time=None): + return defaultdict(lambda: Timer(start=start_time)) + + +def end_timers(timers, end=None): + for name, timer in timers.items(): + if end: + timer.end(end) + return timers + + +def display_timers(timers, end=None): + for name, timer in timers.items(): + if end: + timer.end(end) + print('%s %s' % (name, timer)) + + +def log_timers(timers, log_func, end=None): + for name, timer in timers.items(): + log_func('%s %s' % (name, timer)) diff --git a/jussi/utils.py b/jussi/utils.py index c880777..c67831f 100644 --- a/jussi/utils.py +++ b/jussi/utils.py @@ -1,15 +1,18 @@ # -*- coding: utf-8 -*- import functools import time +import logging import websockets + from collections import OrderedDict from collections import namedtuple +from sanic.exceptions import InvalidUsage +from jussi.cache import jsonrpc_cache_key +logger = logging.getLogger('sanic') -from jussi.cache import jsonrpc_cache_key - # decorators def apply_single_or_batch(func): """Decorate func to apply func to single or batch jsonrpc_requests @@ -32,7 +35,6 @@ def apply_single_or_batch(func): return wrapper - def websocket_conn(func): """Decorate func to make sure func has an open websocket client @@ -42,6 +44,7 @@ def websocket_conn(func): Returns: """ + @functools.wraps(func) async def wrapper(app, jussi, jsonrpc_request): ws = app.config.websocket_client @@ -52,9 +55,45 @@ def websocket_conn(func): ws = await websockets.connect(**app.config.websocket_kwargs) app.config.websocket_client = ws return await func(app, jussi, jsonrpc_request) + return wrapper +def async_ignore_exceptions(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + logger.info('Ignored exception: %s', e) + + return wrapper + + +def async_exclude_methods(middleware_func=None, exclude_http_methods=None): + """Exclude specified HTTP methods from middleware + + Args: + middleware_func: + exclude_http_methods: + + Returns: + + """ + if middleware_func is None: + return functools.partial( + async_exclude_methods, exclude_http_methods=exclude_http_methods) + + @functools.wraps(middleware_func) + async def f(request): + if request.method in exclude_http_methods: + return request + else: + return await middleware_func(request) + + return f + + def return_bytes(func): """Decorate func to make sure func has an open websocket client @@ -64,14 +103,15 @@ def return_bytes(func): Returns: """ + @functools.wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) if isinstance(result, str): result = result.encode() return result - return wrapper + return wrapper def generate_int_id(): @@ -137,28 +177,38 @@ async def get_upstream(sanic_http_request, jsonrpc_request): return upstream['url'], upstream['ttl'] -JussiAttributes = namedtuple( - 'JussiAttributes', ['key', 'upstream_url', 'ttl', 'cacheable', 'is_ws']) +JussiAttributes = namedtuple('JussiAttributes', [ + 'key', 'upstream_url', 'ttl', 'cacheable', 'is_ws', 'namespace', + 'method_name', 'log_prefix' +]) async def jussi_attrs(sanic_http_request): jsonrpc_requests = sanic_http_request.json + app = sanic_http_request.app if isinstance(jsonrpc_requests, list): results = [] - for r in jsonrpc_requests: + for i, r in enumerate(jsonrpc_requests): + if not r: + raise InvalidUsage('Bad jsonrpc request') key = jsonrpc_cache_key(r) url, ttl = await get_upstream(sanic_http_request, r) cacheable = ttl > app.config.cache_config['no_cache_ttl'] is_ws = url.startswith('ws') + namespace, method_name = parse_namespaced_method(r['method']) + prefix = '.'.join([str(i), namespace, method_name]) results.append( JussiAttributes( key=key, upstream_url=url, ttl=ttl, cacheable=cacheable, - is_ws=is_ws)) + is_ws=is_ws, + namespace=namespace, + method_name=method_name, + log_prefix=prefix)) sanic_http_request['jussi'] = results sanic_http_request['jussi_is_batch'] = True else: @@ -166,12 +216,18 @@ async def jussi_attrs(sanic_http_request): url, ttl = await get_upstream(sanic_http_request, jsonrpc_requests) cacheable = ttl > app.config.cache_config['no_cache_ttl'] is_ws = url.startswith('ws') + namespace, method_name = parse_namespaced_method( + jsonrpc_requests['method']) + prefix = '.'.join(['0', namespace, method_name]) sanic_http_request['jussi'] = JussiAttributes( key=key, upstream_url=url, ttl=ttl, cacheable=cacheable, - is_ws=is_ws) + is_ws=is_ws, + namespace=namespace, + method_name=method_name, + log_prefix=prefix) sanic_http_request['jussi_is_batch'] = False - return sanic_http_request \ No newline at end of file + return sanic_http_request diff --git a/setup.py b/setup.py index 8386ba1..ff7c4f2 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,8 @@ try: from pipenv.project import Project from pipenv.utils import convert_deps_to_pip except ImportError: - raise Exception('setup requires pipenv. Run "pip3 install pipenv" then try again') + raise Exception( + 'setup requires pipenv. Run "pip3 install pipenv" then try again') pfile = Project().parsed_pipfile requirements = convert_deps_to_pip(pfile['packages'], r=False) @@ -21,24 +22,16 @@ with open(os.path.join(base_dir, "jussi", "__about__.py")) as f: with open(os.path.join(base_dir, "README.md")) as f: long_description = f.read() - setup( name=about["__title__"], version=about["__version__"], - description=about["__summary__"], long_description=long_description, url=about["__uri__"], - author=about["__author__"], author_email=about["__email__"], - - setup_requires=['pipenv','pytest-runner'], + setup_requires=['pipenv', 'pytest-runner'], tests_require=test_requirements, install_requires=requirements, - packages=["jussi"], - entry_points={ - 'console_scripts': ['jussi=jussi.serve:main'] - } -) \ No newline at end of file + entry_points={'console_scripts': ['jussi=jussi.serve:main']}) -- GitLab