Skip to content
Snippets Groups Projects
Commit bb576767 authored by John Gerlock's avatar John Gerlock
Browse files

Merge branch 'master' of github.com:steemit/jussi

parents 0acde451 0700f41a
No related branches found
No related tags found
No related merge requests found
[{"id": 0, "jsonrpc": "2.0", "method": "get_block", "params": ["0"]}, {"id": 1, "jsonrpc": "2.0", "method": "get_block", "params": ["1"]}, {"id": 2, "jsonrpc": "2.0", "method": "get_block", "params": ["2"]}, {"id": 3, "jsonrpc": "2.0", "method": "get_block", "params": ["3"]}, {"id": 4, "jsonrpc": "2.0", "method": "get_block", "params": ["4"]}, {"id": 5, "jsonrpc": "2.0", "method": "get_block", "params": ["5"]}, {"id": 6, "jsonrpc": "2.0", "method": "get_block", "params": ["6"]}, {"id": 7, "jsonrpc": "2.0", "method": "get_block", "params": ["7"]}, {"id": 8, "jsonrpc": "2.0", "method": "get_block", "params": ["8"]}, {"id": 9, "jsonrpc": "2.0", "method": "get_block", "params": ["9"]}, {"id": 10, "jsonrpc": "2.0", "method": "get_block", "params": ["10"]}, {"id": 11, "jsonrpc": "2.0", "method": "get_block", "params": ["11"]}, {"id": 12, "jsonrpc": "2.0", "method": "get_block", "params": ["12"]}, {"id": 13, "jsonrpc": "2.0", "method": "get_block", "params": ["13"]}, {"id": 14, "jsonrpc": "2.0", "method": "get_block", "params": ["14"]}, {"id": 15, "jsonrpc": "2.0", "method": "get_block", "params": ["15"]}, {"id": 16, "jsonrpc": "2.0", "method": "get_block", "params": ["16"]}, {"id": 17, "jsonrpc": "2.0", "method": "get_block", "params": ["17"]}, {"id": 18, "jsonrpc": "2.0", "method": "get_block", "params": ["18"]}, {"id": 19, "jsonrpc": "2.0", "method": "get_block", "params": ["19"]}, {"id": 20, "jsonrpc": "2.0", "method": "get_block", "params": ["20"]}, {"id": 21, "jsonrpc": "2.0", "method": "get_block", "params": ["21"]}, {"id": 22, "jsonrpc": "2.0", "method": "get_block", "params": ["22"]}, {"id": 23, "jsonrpc": "2.0", "method": "get_block", "params": ["23"]}, {"id": 24, "jsonrpc": "2.0", "method": "get_block", "params": ["24"]}, {"id": 25, "jsonrpc": "2.0", "method": "get_block", "params": ["25"]}, {"id": 26, "jsonrpc": "2.0", "method": "get_block", "params": ["26"]}, {"id": 27, "jsonrpc": "2.0", "method": "get_block", "params": ["27"]}, {"id": 28, "jsonrpc": "2.0", "method": "get_block", "params": ["28"]}, {"id": 29, "jsonrpc": "2.0", "method": "get_block", "params": ["29"]}, {"id": 30, "jsonrpc": "2.0", "method": "get_block", "params": ["30"]}, {"id": 31, "jsonrpc": "2.0", "method": "get_block", "params": ["31"]}, {"id": 32, "jsonrpc": "2.0", "method": "get_block", "params": ["32"]}, {"id": 33, "jsonrpc": "2.0", "method": "get_block", "params": ["33"]}, {"id": 34, "jsonrpc": "2.0", "method": "get_block", "params": ["34"]}, {"id": 35, "jsonrpc": "2.0", "method": "get_block", "params": ["35"]}, {"id": 36, "jsonrpc": "2.0", "method": "get_block", "params": ["36"]}, {"id": 37, "jsonrpc": "2.0", "method": "get_block", "params": ["37"]}, {"id": 38, "jsonrpc": "2.0", "method": "get_block", "params": ["38"]}, {"id": 39, "jsonrpc": "2.0", "method": "get_block", "params": ["39"]}, {"id": 40, "jsonrpc": "2.0", "method": "get_block", "params": ["40"]}, {"id": 41, "jsonrpc": "2.0", "method": "get_block", "params": ["41"]}, {"id": 42, "jsonrpc": "2.0", "method": "get_block", "params": ["42"]}, {"id": 43, "jsonrpc": "2.0", "method": "get_block", "params": ["43"]}, {"id": 44, "jsonrpc": "2.0", "method": "get_block", "params": ["44"]}, {"id": 45, "jsonrpc": "2.0", "method": "get_block", "params": ["45"]}, {"id": 46, "jsonrpc": "2.0", "method": "get_block", "params": ["46"]}, {"id": 47, "jsonrpc": "2.0", "method": "get_block", "params": ["47"]}, {"id": 48, "jsonrpc": "2.0", "method": "get_block", "params": ["48"]}, {"id": 49, "jsonrpc": "2.0", "method": "get_block", "params": ["49"]}, {"id": 50, "jsonrpc": "2.0", "method": "get_block", "params": ["50"]}, {"id": 51, "jsonrpc": "2.0", "method": "get_block", "params": ["51"]}, {"id": 52, "jsonrpc": "2.0", "method": "get_block", "params": ["52"]}, {"id": 53, "jsonrpc": "2.0", "method": "get_block", "params": ["53"]}, {"id": 54, "jsonrpc": "2.0", "method": "get_block", "params": ["54"]}, {"id": 55, "jsonrpc": "2.0", "method": "get_block", "params": ["55"]}, {"id": 56, "jsonrpc": "2.0", "method": "get_block", "params": ["56"]}, {"id": 57, "jsonrpc": "2.0", "method": "get_block", "params": ["57"]}, {"id": 58, "jsonrpc": "2.0", "method": "get_block", "params": ["58"]}, {"id": 59, "jsonrpc": "2.0", "method": "get_block", "params": ["59"]}, {"id": 60, "jsonrpc": "2.0", "method": "get_block", "params": ["60"]}, {"id": 61, "jsonrpc": "2.0", "method": "get_block", "params": ["61"]}, {"id": 62, "jsonrpc": "2.0", "method": "get_block", "params": ["62"]}, {"id": 63, "jsonrpc": "2.0", "method": "get_block", "params": ["63"]}, {"id": 64, "jsonrpc": "2.0", "method": "get_block", "params": ["64"]}, {"id": 65, "jsonrpc": "2.0", "method": "get_block", "params": ["65"]}, {"id": 66, "jsonrpc": "2.0", "method": "get_block", "params": ["66"]}, {"id": 67, "jsonrpc": "2.0", "method": "get_block", "params": ["67"]}, {"id": 68, "jsonrpc": "2.0", "method": "get_block", "params": ["68"]}, {"id": 69, "jsonrpc": "2.0", "method": "get_block", "params": ["69"]}, {"id": 70, "jsonrpc": "2.0", "method": "get_block", "params": ["70"]}, {"id": 71, "jsonrpc": "2.0", "method": "get_block", "params": ["71"]}, {"id": 72, "jsonrpc": "2.0", "method": "get_block", "params": ["72"]}, {"id": 73, "jsonrpc": "2.0", "method": "get_block", "params": ["73"]}, {"id": 74, "jsonrpc": "2.0", "method": "get_block", "params": ["74"]}, {"id": 75, "jsonrpc": "2.0", "method": "get_block", "params": ["75"]}, {"id": 76, "jsonrpc": "2.0", "method": "get_block", "params": ["76"]}, {"id": 77, "jsonrpc": "2.0", "method": "get_block", "params": ["77"]}, {"id": 78, "jsonrpc": "2.0", "method": "get_block", "params": ["78"]}, {"id": 79, "jsonrpc": "2.0", "method": "get_block", "params": ["79"]}, {"id": 80, "jsonrpc": "2.0", "method": "get_block", "params": ["80"]}, {"id": 81, "jsonrpc": "2.0", "method": "get_block", "params": ["81"]}, {"id": 82, "jsonrpc": "2.0", "method": "get_block", "params": ["82"]}, {"id": 83, "jsonrpc": "2.0", "method": "get_block", "params": ["83"]}, {"id": 84, "jsonrpc": "2.0", "method": "get_block", "params": ["84"]}, {"id": 85, "jsonrpc": "2.0", "method": "get_block", "params": ["85"]}, {"id": 86, "jsonrpc": "2.0", "method": "get_block", "params": ["86"]}, {"id": 87, "jsonrpc": "2.0", "method": "get_block", "params": ["87"]}, {"id": 88, "jsonrpc": "2.0", "method": "get_block", "params": ["88"]}, {"id": 89, "jsonrpc": "2.0", "method": "get_block", "params": ["89"]}, {"id": 90, "jsonrpc": "2.0", "method": "get_block", "params": ["90"]}, {"id": 91, "jsonrpc": "2.0", "method": "get_block", "params": ["91"]}, {"id": 92, "jsonrpc": "2.0", "method": "get_block", "params": ["92"]}, {"id": 93, "jsonrpc": "2.0", "method": "get_block", "params": ["93"]}, {"id": 94, "jsonrpc": "2.0", "method": "get_block", "params": ["94"]}, {"id": 95, "jsonrpc": "2.0", "method": "get_block", "params": ["95"]}, {"id": 96, "jsonrpc": "2.0", "method": "get_block", "params": ["96"]}, {"id": 97, "jsonrpc": "2.0", "method": "get_block", "params": ["97"]}, {"id": 98, "jsonrpc": "2.0", "method": "get_block", "params": ["98"]}, {"id": 99, "jsonrpc": "2.0", "method": "get_block", "params": ["99"]}]
\ No newline at end of file
[{"id": 0, "jsonrpc": "2.0", "method": "get_block", "params": ["0"]}, {"id": 1, "jsonrpc": "2.0", "method": "get_block", "params": ["1"]}, {"id": 2, "jsonrpc": "2.0", "method": "get_block", "params": ["2"]}, {"id": 3, "jsonrpc": "2.0", "method": "get_block", "params": ["3"]}, {"id": 4, "jsonrpc": "2.0", "method": "get_block", "params": ["4"]}, {"id": 5, "jsonrpc": "2.0", "method": "get_block", "params": ["5"]}, {"id": 6, "jsonrpc": "2.0", "method": "get_block", "params": ["6"]}, {"id": 7, "jsonrpc": "2.0", "method": "get_block", "params": ["7"]}, {"id": 8, "jsonrpc": "2.0", "method": "get_block", "params": ["8"]}, {"id": 9, "jsonrpc": "2.0", "method": "get_block", "params": ["9"]}]
\ No newline at end of file
This diff is collapsed.
...@@ -25,6 +25,18 @@ class RPCError(Exception): ...@@ -25,6 +25,18 @@ class RPCError(Exception):
class RPCConnectionError(Exception): class RPCConnectionError(Exception):
pass pass
def chunkify(iterable, chunksize=3000):
i = 0
chunk = []
for item in iterable:
chunk.append(item)
i += 1
if i == chunksize:
yield chunk
i = 0
chunk = []
if len(chunk) > 0:
yield chunk
class SimpleSteemAPIClient(object): class SimpleSteemAPIClient(object):
"""Simple Steem JSON-HTTP-RPC API """Simple Steem JSON-HTTP-RPC API
...@@ -156,30 +168,49 @@ class SimpleSteemAPIClient(object): ...@@ -156,30 +168,49 @@ class SimpleSteemAPIClient(object):
def exec_multi(self, name, params): def exec_multi(self, name, params):
body_gen = ({ body_gen = ({
"method": name, "method": name,
"params": [i], "params": [str(i)],
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 0 "id": i
} for i in params) } for i in params)
for body in body_gen: for chunk in chunkify(body_gen):
json_body = json.dumps(body, ensure_ascii=False).encode('utf8') batch_json_body = json.dumps(chunk, ensure_ascii=False).encode('utf8')
yield self._return( r = self.request(body=batch_json_body).read()
response=self.request(body=json_body), print(r)
args=body['params'], batch_response = json.loads(self.request(body=batch_json_body).read())
return_with_args=True) for i,resp in enumerate(batch_response):
yield self._return(
def exec_multi_with_futures(self, name, params, max_workers=None): response=resp,
args=batch_json_body[i]['params'],
return_with_args=True)
def exec_batch(self, name, params):
batch_requests = [{
"method": name,
"params": [str(i)],
"jsonrpc": "2.0",
"id": i
} for i in params]
for chunk in chunkify(batch_requests):
batch_json_body = json.dumps(chunk).encode()
r = self.request(body=batch_json_body)
batch_response = json.loads(r.data.decode())
for i,resp in enumerate(batch_response):
yield json.dumps(resp)
def exec_batch_with_futures(self, name, params, max_workers=None):
with concurrent.futures.ThreadPoolExecutor( with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers) as executor: max_workers=max_workers) as executor:
futures = (executor.submit( futures = []
self.exec, name, param, return_with_args=True) for chunk in chunkify(params):
for param in params) futures.append(executor.submit(self.exec_batch, name, chunk))
for future in concurrent.futures.as_completed(futures): for future in concurrent.futures.as_completed(futures):
result, args = future.result() for item in future.result():
if result: yield item
yield result
else:
executor.submit(
self.exec, name, args, return_with_args=True)
get_dynamic_global_properties = partialmethod( get_dynamic_global_properties = partialmethod(
exec, 'get_dynamic_global_properties') exec, 'get_dynamic_global_properties')
......
...@@ -50,19 +50,16 @@ def fetch_blocks(block_nums, ...@@ -50,19 +50,16 @@ def fetch_blocks(block_nums,
steemd_http_url, steemd_http_url,
max_threads=max_threads) max_threads=max_threads)
chunks = chunkify(block_nums, 10000) chunks = chunkify(block_nums, chunksize)
with Pool(processes=max_workers) as pool: with Pool(processes=max_workers) as pool:
results = pool.map(map_func, chunks) results = pool.map(map_func, chunks)
print(results) #print(results)
def do_test(steemd_http_url, max_procs, max_threads, start=None, end=None): def do_test(steemd_http_url, max_procs, max_threads, start=None, end=None):
client = http_client.SimpleSteemAPIClient(url=steemd_http_url) client = http_client.SimpleSteemAPIClient(url=steemd_http_url)
print(client.get_dynamic_global_properties())
try: try:
start = start or 1 start = start or 1
end = end or client.block_height() end = end or client.block_height()
...@@ -90,8 +87,7 @@ def do_test(steemd_http_url, max_procs, max_threads, start=None, end=None): ...@@ -90,8 +87,7 @@ def do_test(steemd_http_url, max_procs, max_threads, start=None, end=None):
def block_fetcher_thread_worker(rpc_url, block_nums, max_threads=None): def block_fetcher_thread_worker(rpc_url, block_nums, max_threads=None):
rpc = http_client.SimpleSteemAPIClient(rpc_url, return_with_args=False) rpc = http_client.SimpleSteemAPIClient(rpc_url, return_with_args=False)
# pylint: disable=unused-variable # pylint: disable=unused-variable
for block in rpc.exec_multi_with_futures( for block in rpc.exec_batch('get_block', block_nums): #, max_workers=max_threads):
'get_block', block_nums, max_workers=max_threads):
yield block yield block
...@@ -99,10 +95,10 @@ def block_adder_process_worker( ...@@ -99,10 +95,10 @@ def block_adder_process_worker(
rpc_url, rpc_url,
block_nums, block_nums,
max_threads=5): max_threads=5):
rpc = http_client.SimpleSteemAPIClient(rpc_url, return_with_args=False)
for block in rpc.exec_batch('get_block', block_nums): #, max_workers=max_threads):
print(block)
for raw_block in block_fetcher_thread_worker(rpc_url, block_nums, max_threads=max_threads):
print(raw_block)
return True
# included only for debugging with pdb, all the above code should be called # included only for debugging with pdb, all the above code should be called
# using the click framework # using the click framework
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment