Skip to content
Snippets Groups Projects
Commit 99a6275f authored by roadscape's avatar roadscape
Browse files

unify batch and worker strategies

parent 43991f68
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,8 @@ Hive is deployed as Docker container — see `Dockerfile`. ...@@ -55,7 +55,8 @@ Hive is deployed as Docker container — see `Dockerfile`.
| `HTTP_SERVER_PORT` | `--http-server-port` | 8080 | | `HTTP_SERVER_PORT` | `--http-server-port` | 8080 |
| `DATABASE_URL` | `--database-url` | postgresql://user:pass@localhost:5432/hive | | `DATABASE_URL` | `--database-url` | postgresql://user:pass@localhost:5432/hive |
| `STEEMD_URL` | `--steemd-url` | https://api.steemit.com | | `STEEMD_URL` | `--steemd-url` | https://api.steemit.com |
| `MAX_BATCH` | `--max-batch` | 200 | | `MAX_BATCH` | `--max-batch` | 50 |
| `MAX_WORKERS` | `--max-workers` | 4 |
| `TRAIL_BLOCKS` | `--trail-blocks` | 2 | | `TRAIL_BLOCKS` | `--trail-blocks` | 2 |
Precedence: CLI over ENV over hive.conf. Check `hive --help` for details. Precedence: CLI over ENV over hive.conf. Check `hive --help` for details.
......
...@@ -25,8 +25,8 @@ class Conf(): ...@@ -25,8 +25,8 @@ class Conf():
p.add('--dump-config', type=bool, env_var='DUMP_CONFIG', default=False) p.add('--dump-config', type=bool, env_var='DUMP_CONFIG', default=False)
# specific to indexer # specific to indexer
p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests (untested)', default=1) p.add('--max-workers', type=int, env_var='MAX_WORKERS', help='max workers for batch requests', default=4)
p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max size for batch block/content requests', default=100) p.add('--max-batch', type=int, env_var='MAX_BATCH', help='max chunk size for batch requests', default=50)
p.add('--trail-blocks', type=int, env_var='TRAIL_BLOCKS', help='number of blocks to trail head by', default=2) p.add('--trail-blocks', type=int, env_var='TRAIL_BLOCKS', help='number of blocks to trail head by', default=2)
p.add('--disable-sync', type=bool, env_var='DISABLE_SYNC', help='(debug) skip sync and sweep; jump to block streaming', default=False) p.add('--disable-sync', type=bool, env_var='DISABLE_SYNC', help='(debug) skip sync and sweep; jump to block streaming', default=False)
p.add('--sync-to-s3', type=bool, env_var='SYNC_TO_S3', help='alternative healthcheck for background sync service', default=False) p.add('--sync-to-s3', type=bool, env_var='SYNC_TO_S3', help='alternative healthcheck for background sync service', default=False)
......
...@@ -242,17 +242,10 @@ class HttpClient(object): ...@@ -242,17 +242,10 @@ class HttpClient(object):
raise Exception("abort %s after %d tries" % (method, tries)) raise Exception("abort %s after %d tries" % (method, tries))
def exec_multi_with_futures(self, name, params, max_workers=None): def exec_multi(self, name, params, max_workers, batch_size):
"""Process a batch as parallel signular requests.""" """Process a batch as parallel requests."""
chunks = [[name, args, True] for args in chunkify(params, batch_size)]
with concurrent.futures.ThreadPoolExecutor( with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers) as executor: max_workers=max_workers) as executor:
futures = (executor.submit(self.exec, name, args) for items in executor.map(lambda tup: self.exec(*tup), chunks):
for args in params) yield list(items) # (use of `map` preserves request order)
for future in concurrent.futures.as_completed(futures):
yield future.result()
def exec_batch(self, name, params, batch_size):
"""Chunkify batch requests and return them in order"""
for batch_params in chunkify(params, batch_size):
for item in self.exec(name, batch_params, is_batch=True):
yield item
...@@ -25,7 +25,7 @@ class SteemClient: ...@@ -25,7 +25,7 @@ class SteemClient:
def __init__(self, url, max_batch=500, max_workers=1): def __init__(self, url, max_batch=500, max_workers=1):
assert url, 'steem-API endpoint undefined' assert url, 'steem-API endpoint undefined'
assert max_batch > 0 and max_batch <= 5000 assert max_batch > 0 and max_batch <= 5000
assert max_workers > 0 and max_workers <= 500 assert max_workers > 0 and max_workers <= 32
self._max_batch = max_batch self._max_batch = max_batch
self._max_workers = max_workers self._max_workers = max_workers
...@@ -214,12 +214,13 @@ class SteemClient: ...@@ -214,12 +214,13 @@ class SteemClient:
"""Perform batch call. Based on config uses either batch or futures.""" """Perform batch call. Based on config uses either batch or futures."""
time_start = time.perf_counter() time_start = time.perf_counter()
if self._max_workers == 1: result = []
result = list(self._client.exec_batch( for part in self._client.exec_multi(
method, params, batch_size=self._max_batch)) method,
else: params,
result = list(self._client.exec_multi_with_futures( max_workers=self._max_workers,
method, params, max_workers=self._max_workers)) batch_size=self._max_batch):
result.extend(part)
total_time = (time.perf_counter() - time_start) * 1000 total_time = (time.perf_counter() - time_start) * 1000
ClientStats.log(method, total_time, len(params)) ClientStats.log(method, total_time, len(params))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment