Commit 227d8e24 authored by John Gerlock's avatar John Gerlock

Rework timeout handling

parent 04c3b598
......@@ -3,7 +3,7 @@ ROOT_DIR := $(shell pwd)
PROJECT_NAME := $(notdir $(ROOT_DIR))
PROJECT_DOCKER_TAG := steemit/$(PROJECT_NAME)
PROJECT_DOCKER_RUN_ARGS := -p8080:8080 -p7777:7777 --env-file .env -v $(shell pwd)/ALT_config.json:/app/ALT_config.json
PROJECT_DOCKER_RUN_ARGS := -p8080:8080 -p7777:7777 --env-file .env -v $(shell pwd)/DEV_config.json:/app/DEV_config.json
PIPENV_VENV_IN_PROJECT := 1
export PIPENV_VENV_IN_PROJECT
......
# -*- coding: utf-8 -*-
import asyncio
import concurrent.futures
import logging
import uuid
from typing import Optional
from typing import Union
import sanic.exceptions
import structlog
from funcy.decorators import decorator
from sanic import response
from sanic.exceptions import RequestTimeout
from sanic.exceptions import ServiceUnavailable
from sanic.exceptions import SanicException
from .typedefs import HTTPRequest
from .typedefs import HTTPResponse
......@@ -34,37 +34,35 @@ class Default(dict):
def setup_error_handlers(app: WebApp) -> WebApp:
# pylint: disable=unused-variable
@app.exception(RequestTimeout)
@app.exception(sanic.exceptions.RequestTimeout)
def handle_request_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
exception: sanic.exceptions.RequestTimeout) -> Optional[HTTPResponse]:
if not request:
return None
return RequestTimeoutError(http_request=request).to_sanic_response()
return RequestTimeoutError(http_request=request,
jrpc_request=request.jsonrpc,
exception=exception).to_sanic_response()
@app.exception(ServiceUnavailable)
@app.exception(sanic.exceptions.ServiceUnavailable)
def handle_response_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
exception: sanic.exceptions.ServiceUnavailable) -> Optional[HTTPResponse]:
if not request:
return None
return ResponseTimeoutError(http_request=request).to_sanic_response()
@app.exception(asyncio.TimeoutError)
def handle_asyncio_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
return ResponseTimeoutError(http_request=request,
jrpc_request=request.jsonrpc,
exception=exception).to_sanic_response()
@app.exception(concurrent.futures.CancelledError,
concurrent.futures.TimeoutError,
asyncio.TimeoutError)
def handle_async_timeout_errors(request: HTTPRequest,
exception: Exception) -> Optional[HTTPResponse]:
if not request:
return None
return RequestTimeoutError(http_request=request).to_sanic_response()
return RequestTimeoutError(http_request=request,
jrpc_request=request.jsonrpc,
exception=exception).to_sanic_response()
# pylint: disable=unused-argument
@app.exception(JsonRpcError)
def handle_jsonrpc_error(request: HTTPRequest,
exception: JsonRpcError) -> HTTPResponse:
......@@ -80,11 +78,9 @@ def setup_error_handlers(app: WebApp) -> WebApp:
if isinstance(exception, InvalidRequest):
return InvalidRequest(http_request=request,
exception=exception,
reason=exception.message
).to_sanic_response()
reason=exception.message).to_sanic_response()
return JsonRpcError(http_request=request,
exception=exception,
log_traceback=True).to_sanic_response()
exception=exception).to_sanic_response()
return app
......@@ -239,7 +235,9 @@ class JussiInteralError(Exception):
self.logger.error(self.format_message(), **self.to_dict(),
exc_info=self.exception)
else:
self.logger.error(self.format_message(), **self.to_dict())
self.logger.error(self.format_message(),
exception=self.exception,
**self.to_dict())
# pylint: enable=too-many-instance-attributes,too-many-arguments
......@@ -310,6 +308,16 @@ class ResponseTimeoutError(JsonRpcError):
code = 1050
message = 'Response Timeout'
def to_dict(self):
data = super().to_dict()
try:
timings = self.timings()
if timings:
data.update(**timings)
except Exception as e:
logger.info('error adding timing data to RequestTimeoutError', e=e)
return data
class UpstreamResponseError(JsonRpcError):
code = 1100
......
......@@ -30,7 +30,6 @@ async def handle_jsonrpc(http_request: HTTPRequest) -> HTTPResponse:
# retreive parsed jsonrpc_requests after request middleware processing
http_request.timings.append((perf(), 'handle_jsonrpc.enter'))
# make upstream requests
async with timeout(http_request.request_timeout):
if http_request.is_single_jrpc:
......@@ -162,39 +161,6 @@ async def fetch_ws(http_request: HTTPRequest,
jrpc_request.timings.append((perf(), 'fetch_ws.exit'))
return upstream_response
except concurrent.futures.CancelledError as e:
try:
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
log_traceback=True
)
except AssertionError as e:
try:
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
upstream_response=upstream_response
)
except ConnectionClosed as e:
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
log_traceback=True)
except Exception as e:
try:
conn.terminate()
......@@ -202,16 +168,8 @@ async def fetch_ws(http_request: HTTPRequest,
pass
except Exception as e:
logger.error('error while closing connection', e=e)
try:
response = upstream_response
except NameError:
response = None
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
upstream_response=response,
log_traceback=True)
raise e
# pylint: enable=no-value-for-parameter, too-many-locals, too-many-branches, too-many-statements
......@@ -221,35 +179,11 @@ async def fetch_http(http_request: HTTPRequest,
session = http_request.app.config.aiohttp['session']
upstream_request = jrpc_request.to_upstream_request(as_json=False)
try:
async with session.post(jrpc_request.upstream.url,
json=upstream_request,
headers=jrpc_request.upstream_headers,
timeout=jrpc_request.upstream.timeout) as resp:
jrpc_request.timings.append((perf(), 'fetch_http.response'))
upstream_response = await resp.json(encoding='utf-8', content_type=None)
except (concurrent.futures.TimeoutError,
asyncio.TimeoutError) as e:
raise RequestTimeoutError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
tasks_count=len(
asyncio.tasks.Task.all_tasks()),
upstream_request=upstream_request
)
except Exception as e:
try:
response = upstream_response
except NameError:
response = None
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
upstream_response=response,
log_traceback=True)
async with session.post(jrpc_request.upstream.url,
json=upstream_request,
headers=jrpc_request.upstream_headers) as resp:
jrpc_request.timings.append((perf(), 'fetch_http.response'))
upstream_response = await resp.json(encoding='utf-8', content_type=None)
upstream_response['id'] = jrpc_request.id
jrpc_request.timings.append((perf(), 'fetch_http.exit'))
return upstream_response
......
# -*- coding: utf-8 -*-
import asyncio
import json
import sys
from urllib.parse import urlparse
import aiohttp
import async_timeout
import ujson
from jussi.ws.pool import Pool
......@@ -46,7 +48,9 @@ def setup_listeners(app: WebApp) -> WebApp:
"""
logger = app.config.logger
logger.info('setup_aiohttp_session', when='before_server_start')
tcp_connector = aiohttp.TCPConnector()
aio = dict(session=aiohttp.ClientSession(
connector=tcp_connector,
skip_auto_headers=['User-Agent'],
loop=loop,
json_serialize=ujson.dumps,
......
......@@ -199,4 +199,6 @@ class HTTPRequest:
if self.is_single_jrpc:
return self.jsonrpc.upstream.timeout
elif self.is_batch_jrpc:
return min([max(r.upstream.timeout for r in self.jsonrpc) * 2, 30])
return min([sum(r.upstream.timeout for r in self.jsonrpc), 60])
else:
return 1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment