From 99a6275f23ecc40ed7d072180c96f738c23a3e7e Mon Sep 17 00:00:00 2001 From: roadscape <roadscape@users.noreply.github.com> Date: Thu, 31 May 2018 10:17:44 -0500 Subject: [PATCH] unify batch and worker strategies --- README.md | 3 ++- hive/conf.py | 4 ++-- hive/steem/http_client.py | 17 +++++------------ hive/steem/steem_client.py | 15 ++++++++------- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index f1464b8f6..36d3efa3e 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,8 @@ Hive is deployed as Docker container — see `Dockerfile`. | `HTTP_SERVER_PORT` | `--http-server-port` | 8080 | | `DATABASE_URL` | `--database-url` | postgresql://user:pass@localhost:5432/hive | | `STEEMD_URL` | `--steemd-url` | https://api.steemit.com | -| `MAX_BATCH` | `--max-batch` | 200 | +| `MAX_BATCH` | `--max-batch` | 50 | +| `MAX_WORKERS` | `--max-workers` | 4 | | `TRAIL_BLOCKS` | `--trail-blocks` | 2 | Precedence: CLI over ENV over hive.conf. Check `hive --help` for details. diff --git a/hive/conf.py b/hive/conf.py index b2dfd3c65..21e416833 100644 --- a/hive/conf.py +++ b/hive/conf.py @@ -25,8 +25,8 @@ class Conf(): p.add('--dump-config', type=bool, env_var='DUMP_CONFIG', default=False) # specific to indexer - p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests (untested)', default=1) - p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max size for batch block/content requests', default=100) + p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4) + p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max chunk size for batch requests', default=50) p.add('--trail-blocks', type=int, env_var='TRAIL_BLOCKS', help='number of blocks to trail head by', default=2) p.add('--disable-sync', type=bool, env_var='DISABLE_SYNC', help='(debug) skip sync and sweep; jump to block streaming', default=False) p.add('--sync-to-s3', type=bool, env_var='SYNC_TO_S3', help='alternative healthcheck for background sync service', default=False) diff --git a/hive/steem/http_client.py b/hive/steem/http_client.py index 92ee74f31..34e742857 100644 --- a/hive/steem/http_client.py +++ b/hive/steem/http_client.py @@ -242,17 +242,10 @@ class HttpClient(object): raise Exception("abort %s after %d tries" % (method, tries)) - def exec_multi_with_futures(self, name, params, max_workers=None): - """Process a batch as parallel signular requests.""" + def exec_multi(self, name, params, max_workers, batch_size): + """Process a batch as parallel requests.""" + chunks = [[name, args, True] for args in chunkify(params, batch_size)] with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers) as executor: - futures = (executor.submit(self.exec, name, args) - for args in params) - for future in concurrent.futures.as_completed(futures): - yield future.result() - - def exec_batch(self, name, params, batch_size): - """Chunkify batch requests and return them in order""" - for batch_params in chunkify(params, batch_size): - for item in self.exec(name, batch_params, is_batch=True): - yield item + for items in executor.map(lambda tup: self.exec(*tup), chunks): + yield list(items) # (use of `map` preserves request order) diff --git a/hive/steem/steem_client.py b/hive/steem/steem_client.py index c549392dd..8d3fc1111 100644 --- a/hive/steem/steem_client.py +++ b/hive/steem/steem_client.py @@ -25,7 +25,7 @@ class SteemClient: def __init__(self, url, max_batch=500, max_workers=1): assert url, 'steem-API endpoint undefined' assert max_batch > 0 and max_batch <= 5000 - assert max_workers > 0 and max_workers <= 500 + assert max_workers > 0 and max_workers <= 32 self._max_batch = max_batch self._max_workers = max_workers @@ -214,12 +214,13 @@ class SteemClient: """Perform batch call. Based on config uses either batch or futures.""" time_start = time.perf_counter() - if self._max_workers == 1: - result = list(self._client.exec_batch( - method, params, batch_size=self._max_batch)) - else: - result = list(self._client.exec_multi_with_futures( - method, params, max_workers=self._max_workers)) + result = [] + for part in self._client.exec_multi( + method, + params, + max_workers=self._max_workers, + batch_size=self._max_batch): + result.extend(part) total_time = (time.perf_counter() - time_start) * 1000 ClientStats.log(method, total_time, len(params)) -- GitLab