Unverified Commit 0e85fbe1 authored by John G G's avatar John G G Committed by GitHub

Merge pull request #208 from steemit/timeouts

Add smarter timeouts
parents 7a8fce5d 04c3b598
......@@ -57,6 +57,7 @@ structlog = "*"
ujson = "*"
uvloop = "*"
websockets = "*"
async-timeout = "*"
[requires]
python_version = "3.6"
# -*- coding: utf-8 -*-
import asyncio
import logging
import uuid
from typing import Optional
......@@ -53,6 +54,16 @@ def setup_error_handlers(app: WebApp) -> WebApp:
return None
return ResponseTimeoutError(http_request=request).to_sanic_response()
@app.exception(asyncio.TimeoutError)
def handle_asyncio_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
if not request:
return None
return RequestTimeoutError(http_request=request).to_sanic_response()
# pylint: disable=unused-argument
@app.exception(JsonRpcError)
def handle_jsonrpc_error(request: HTTPRequest,
......
......@@ -7,6 +7,8 @@ from typing import Coroutine
import cytoolz
import structlog
from async_timeout import timeout
from sanic import response
from ujson import loads
from websockets.exceptions import ConnectionClosed
......@@ -28,15 +30,19 @@ async def handle_jsonrpc(http_request: HTTPRequest) -> HTTPResponse:
# retreive parsed jsonrpc_requests after request middleware processing
http_request.timings.append((perf(), 'handle_jsonrpc.enter'))
# make upstream requests
if http_request.is_single_jrpc:
jsonrpc_response = await dispatch_single(http_request,
http_request.jsonrpc)
else:
futures = [dispatch_single(http_request, request)
for request in http_request.jsonrpc]
jsonrpc_response = await asyncio.gather(*futures)
http_request.timings.append((perf(), 'handle_jsonrpc.exit'))
return response.json(jsonrpc_response)
async with timeout(http_request.request_timeout):
if http_request.is_single_jrpc:
jsonrpc_response = await dispatch_single(http_request,
http_request.jsonrpc)
else:
futures = [dispatch_single(http_request, request)
for request in http_request.jsonrpc]
jsonrpc_response = await asyncio.gather(*futures)
http_request.timings.append((perf(), 'handle_jsonrpc.exit'))
return response.json(jsonrpc_response)
async def healthcheck(http_request: HTTPRequest) -> HTTPResponse:
......@@ -147,7 +153,7 @@ async def fetch_ws(http_request: HTTPRequest,
jrpc_request.timings.append((perf(), 'fetch_ws.acquire'))
await conn.send(upstream_request)
jrpc_request.timings.append((perf(), 'fetch_ws.send'))
upstream_response_json = await asyncio.wait_for(conn.recv(), jrpc_request.upstream.timeout)
upstream_response_json = await conn.recv()
jrpc_request.timings.append((perf(), 'fetch_ws.response'))
upstream_response = loads(upstream_response_json)
await pool.release(conn)
......@@ -156,23 +162,6 @@ async def fetch_ws(http_request: HTTPRequest,
jrpc_request.timings.append((perf(), 'fetch_ws.exit'))
return upstream_response
except (concurrent.futures.TimeoutError,
asyncio.TimeoutError) as e:
try:
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)
raise RequestTimeoutError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
tasks_count=len(
asyncio.tasks.Task.all_tasks()),
upstream_request=upstream_request
)
except concurrent.futures.CancelledError as e:
try:
conn.terminate()
......@@ -231,6 +220,7 @@ async def fetch_http(http_request: HTTPRequest,
jrpc_request.timings.append((perf(), 'fetch_http.enter'))
session = http_request.app.config.aiohttp['session']
upstream_request = jrpc_request.to_upstream_request(as_json=False)
try:
async with session.post(jrpc_request.upstream.url,
json=upstream_request,
......
......@@ -2,7 +2,10 @@
import asyncio
from time import perf_counter as perf
import structlog
from async_timeout import timeout
from sanic import response
from ujson import loads
......@@ -18,24 +21,25 @@ async def get_response(request: HTTPRequest) -> None:
# return cached response from cache if all requests were in cache
if not request.jsonrpc:
return
request.timings.append((perf(), 'get_cached_response.enter'))
cache_group = request.app.config.cache_group
cache_read_timeout = request.app.config.cache_read_timeout
try:
cached_response = None
if request.is_single_jrpc:
cached_response_future = \
cache_group.get_single_jsonrpc_response(request.jsonrpc)
elif request.is_batch_jrpc:
cached_response_future = \
cache_group.get_batch_jsonrpc_responses(request.jsonrpc)
else:
request.timings.append((perf(), 'get_cached_response.exit'))
return
request.timings.append((perf(), 'get_cached_response.acquire'))
cached_response = await asyncio.wait_for(cached_response_future,
timeout=cache_read_timeout)
async with timeout(cache_read_timeout):
if request.is_single_jrpc:
cached_response_future = \
cache_group.get_single_jsonrpc_response(request.jsonrpc)
elif request.is_batch_jrpc:
cached_response_future = \
cache_group.get_batch_jsonrpc_responses(request.jsonrpc)
else:
request.timings.append((perf(), 'get_cached_response.exit'))
return
cached_response = await cached_response_future
request.timings.append((perf(), 'get_cached_response.response'))
if cached_response and \
......
......@@ -193,3 +193,10 @@ class HTTPRequest:
@property
def request_start_time(self) -> float:
return self.timings[0][0]
@property
def request_timeout(self) -> Union[int, float]:
if self.is_single_jrpc:
return self.jsonrpc.upstream.timeout
elif self.is_batch_jrpc:
return min([max(r.upstream.timeout for r in self.jsonrpc) * 2, 30])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment