diff --git a/README.md b/README.md index f1464b8f6aa7246a7d25513e9afeacf7df5097ae..36d3efa3e17757e04a57be9842c17e02c4e2687b 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,8 @@ Hive is deployed as Docker container — see `Dockerfile`. | `HTTP_SERVER_PORT` | `--http-server-port` | 8080 | | `DATABASE_URL` | `--database-url` | postgresql://user:pass@localhost:5432/hive | | `STEEMD_URL` | `--steemd-url` | https://api.steemit.com | -| `MAX_BATCH` | `--max-batch` | 200 | +| `MAX_BATCH` | `--max-batch` | 50 | +| `MAX_WORKERS` | `--max-workers` | 4 | | `TRAIL_BLOCKS` | `--trail-blocks` | 2 | Precedence: CLI over ENV over hive.conf. Check `hive --help` for details. diff --git a/hive/conf.py b/hive/conf.py index b2dfd3c656a88bfb595ea895f561b72833ed7e42..21e416833dad0461a56bd116347dcce4dcee7788 100644 --- a/hive/conf.py +++ b/hive/conf.py @@ -25,8 +25,8 @@ class Conf(): p.add('--dump-config', type=bool, env_var='DUMP_CONFIG', default=False) # specific to indexer - p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests (untested)', default=1) - p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max size for batch block/content requests', default=100) + p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4) + p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max chunk size for batch requests', default=50) p.add('--trail-blocks', type=int, env_var='TRAIL_BLOCKS', help='number of blocks to trail head by', default=2) p.add('--disable-sync', type=bool, env_var='DISABLE_SYNC', help='(debug) skip sync and sweep; jump to block streaming', default=False) p.add('--sync-to-s3', type=bool, env_var='SYNC_TO_S3', help='alternative healthcheck for background sync service', default=False) diff --git a/hive/steem/http_client.py b/hive/steem/http_client.py index 92ee74f31fe9e573c2d2f4dd0055f78039afcb30..34e742857fb6e4b517d4bd9acee50d61dc3461ed 100644 --- a/hive/steem/http_client.py +++ b/hive/steem/http_client.py @@ -242,17 +242,10 @@ class HttpClient(object): raise Exception("abort %s after %d tries" % (method, tries)) - def exec_multi_with_futures(self, name, params, max_workers=None): - """Process a batch as parallel signular requests.""" + def exec_multi(self, name, params, max_workers, batch_size): + """Process a batch as parallel requests.""" + chunks = [[name, args, True] for args in chunkify(params, batch_size)] with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers) as executor: - futures = (executor.submit(self.exec, name, args) - for args in params) - for future in concurrent.futures.as_completed(futures): - yield future.result() - - def exec_batch(self, name, params, batch_size): - """Chunkify batch requests and return them in order""" - for batch_params in chunkify(params, batch_size): - for item in self.exec(name, batch_params, is_batch=True): - yield item + for items in executor.map(lambda tup: self.exec(*tup), chunks): + yield list(items) # (use of `map` preserves request order) diff --git a/hive/steem/steem_client.py b/hive/steem/steem_client.py index c549392dd13b9a4155a01c470164aee3fb478481..8d3fc1111ae1d113c2e023888f2d2c82686429cf 100644 --- a/hive/steem/steem_client.py +++ b/hive/steem/steem_client.py @@ -25,7 +25,7 @@ class SteemClient: def __init__(self, url, max_batch=500, max_workers=1): assert url, 'steem-API endpoint undefined' assert max_batch > 0 and max_batch <= 5000 - assert max_workers > 0 and max_workers <= 500 + assert max_workers > 0 and max_workers <= 32 self._max_batch = max_batch self._max_workers = max_workers @@ -214,12 +214,13 @@ class SteemClient: """Perform batch call. Based on config uses either batch or futures.""" time_start = time.perf_counter() - if self._max_workers == 1: - result = list(self._client.exec_batch( - method, params, batch_size=self._max_batch)) - else: - result = list(self._client.exec_multi_with_futures( - method, params, max_workers=self._max_workers)) + result = [] + for part in self._client.exec_multi( + method, + params, + max_workers=self._max_workers, + batch_size=self._max_batch): + result.extend(part) total_time = (time.perf_counter() - time_start) * 1000 ClientStats.log(method, total_time, len(params))