Skip to content
Snippets Groups Projects
Commit 3ff0b890 authored by Holger Nahrstaedt's avatar Holger Nahrstaedt
Browse files

num_retries improved

Steemnoderpc
* self.errror_cnt_call used instead of local variable doRetryCount
Graphenerpc
* num_retries_call added for setting a max number of retries on a rpc call
* self.n_urls added for getting the number of given nodes
* sleep on retry removed, when switching over to the next node
self.error_cnt_call used for counting rpc call retries
rpcutils
* sleep_and_check_retries improved
Unit tests
num_retries added
test_golos reduced
parent 421627a9
No related branches found
No related tags found
No related merge requests found
Showing
with 69 additions and 190 deletions
......@@ -38,11 +38,9 @@ class SteemNodeRPC(GrapheneRPC):
:raises ValueError: if the server does not respond in proper JSON format
:raises RPCError: if the server returns an error
"""
doRetryCount = 0
doRetry = True
while doRetry and doRetryCount < 5:
while doRetry and self.error_cnt_call < self.num_retries_call:
doRetry = False
doRetryCount += 1
try:
# Forward call to GrapheneWebsocketRPC and catch+evaluate errors
return super(SteemNodeRPC, self).rpcexec(payload)
......@@ -59,16 +57,16 @@ class SteemNodeRPC(GrapheneRPC):
elif re.search("Could not find API", msg):
raise exceptions.NoApiWithName(msg)
elif re.search("Unable to acquire database lock", msg):
sleep_and_check_retries(5, doRetryCount, self.url, str(msg))
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, str(msg))
doRetry = True
elif re.search("Internal Error", msg):
sleep_and_check_retries(5, doRetryCount, self.url, str(msg))
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, str(msg))
doRetry = True
elif re.search("Service Temporarily Unavailable", msg):
sleep_and_check_retries(5, doRetryCount, self.url, str(msg))
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, str(msg))
doRetry = True
elif re.search("Bad Gateway", msg):
sleep_and_check_retries(5, doRetryCount, self.url, str(msg))
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, str(msg))
doRetry = True
elif msg:
raise exceptions.UnhandledRPCError(msg)
......
......@@ -53,7 +53,7 @@ class GrapheneRPC(object):
:param str user: Username for Authentication
:param str password: Password for Authentication
:param int num_retries: Try x times to num_retries to a node on disconnect, -1 for indefinitely
:param int num_retries_call: Repeat num_retries_call times a rpccall on node error (default is 5)
Available APIs
* database
......@@ -82,14 +82,20 @@ class GrapheneRPC(object):
self.current_rpc = self.rpc_methods["ws"]
self._request_id = 0
if isinstance(urls, str):
self.urls = cycle(re.split(r",|;", urls))
url_list = re.split(r",|;", urls)
self.n_urls = len(url_list)
self.urls = cycle(url_list)
if self.urls is None:
self.n_urls = 1
self.urls = cycle([urls])
elif isinstance(urls, (list, tuple, set)):
self.n_urls = len(urls)
self.urls = cycle(urls)
elif urls is not None:
self.n_urls = 1
self.urls = cycle([urls])
else:
self.n_urls = 0
self.urls = None
self.current_rpc = self.rpc_methods["offline"]
self.user = user
......@@ -97,7 +103,9 @@ class GrapheneRPC(object):
self.ws = None
self.rpc_queue = []
self.num_retries = kwargs.get("num_retries", -1)
self.error_cnt = 0
self.num_retries_call = kwargs.get("num_retries_call", 5)
self.error_cnt_call = 0
self.rpcconnect()
def get_request_id(self):
......@@ -120,12 +128,13 @@ class GrapheneRPC(object):
def rpcconnect(self, next_url=True):
"""Connect to next url in a loop."""
cnt = 0
self.error_cnt = 0
if self.urls is None:
return
while True:
cnt += 1
self.error_cnt += 1
if next_url:
self.error_cnt_call = 0
self.url = next(self.urls)
log.debug("Trying to connect to node %s" % self.url)
if self.url[:3] == "wss":
......@@ -147,7 +156,6 @@ class GrapheneRPC(object):
self.current_rpc = self.rpc_methods["jsonrpc"]
self.headers = {'User-Agent': 'beem v0.19.14',
'content-type': 'application/json'}
try:
if not self.ws:
break
......@@ -157,16 +165,15 @@ class GrapheneRPC(object):
except KeyboardInterrupt:
raise
except Exception as e:
sleep_and_check_retries(self.num_retries, cnt, self.url, str(e))
do_sleep = not next_url or (next_url and self.n_urls == 1)
sleep_and_check_retries(self.num_retries, self.error_cnt, self.url, str(e), sleep=do_sleep)
next_url = True
try:
props = self.get_config(api="database")
except:
if self.ws:
self.current_rpc = self.rpc_methods["wsappbase"]
else:
self.current_rpc = self.rpc_methods["appbase"]
props = self.get_config(api="database")
if props is None:
raise RPCError("Could not recieve answer for get_config")
if is_network_appbase_ready(props):
if self.ws:
self.current_rpc = self.rpc_methods["wsappbase"]
......@@ -207,10 +214,9 @@ class GrapheneRPC(object):
:raises RPCError: if the server returns an error
"""
log.debug(json.dumps(payload))
cnt = 0
reply = {}
while True:
cnt += 1
self.error_cnt_call += 1
try:
if self.current_rpc == 0 or self.current_rpc == 3:
......@@ -223,7 +229,7 @@ class GrapheneRPC(object):
except WebSocketConnectionClosedException:
self.rpcconnect(next_url=False)
except Exception as e:
sleep_and_check_retries(self.num_retries, cnt, self.url, str(e))
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, str(e))
# retry
self.rpcconnect()
......@@ -256,12 +262,15 @@ class GrapheneRPC(object):
raise RPCError(ret['error']['message'])
else:
ret_list.append(r["result"])
self.error_cnt_call = 0
return ret_list
elif isinstance(ret, dict) and "result" in ret:
self.error_cnt_call = 0
return ret["result"]
elif isinstance(ret, int):
raise ValueError("Client returned invalid format. Expected JSON!")
else:
self.error_cnt_call = 0
return ret
# End of Deprecated methods
......
......@@ -83,22 +83,21 @@ def get_api_name(appbase, *args, **kwargs):
return api_name
def sleep_and_check_retries(num_retries, cnt, url, errorMsg=None):
def sleep_and_check_retries(num_retries, cnt, url, errorMsg=None, sleep=True):
"""Sleep and check if num_retries is reached"""
if errorMsg:
log.warning("Error: {}\n".format(errorMsg))
if (num_retries >= 0 and cnt > num_retries):
raise NumRetriesReached()
log.warning("\nLost connection or internal error on node: %s (%d/%d) " % (url, cnt, num_retries))
if not sleep:
return
if cnt < 1:
sleeptime = 0
elif cnt < 10:
sleeptime = (cnt - 1) * 2
sleeptime = (cnt - 1) * 1.5 + 0.5
else:
sleeptime = 10
if sleeptime:
log.warning("\nLost connection or internal error on node: %s (%d/%d) "
% (url, cnt, num_retries) +
"Retrying in %d seconds\n" % sleeptime
)
log.warning("Retrying in %d seconds\n" % sleeptime)
time.sleep(sleeptime)
......@@ -32,14 +32,16 @@ class Testcases(unittest.TestCase):
nobroadcast=True,
bundle=False,
# Overwrite wallet to use this list of wifs only
wif={"active": wif}
wif={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
bundle=False,
# Overwrite wallet to use this list of wifs only
wif={"active": wif}
wif={"active": wif},
num_retries=10
)
self.bts.set_default_account("test")
set_shared_steem_instance(self.bts)
......
......@@ -23,10 +23,12 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
num_retries=10
)
set_shared_steem_instance(self.bts)
self.asset = Asset("SBD")
......
......@@ -24,10 +24,12 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
num_retries=10
)
set_shared_steem_instance(self.bts)
......
......@@ -21,6 +21,7 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
set_shared_steem_instance(self.bts)
......
......@@ -27,11 +27,13 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
......@@ -30,11 +30,13 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
......@@ -32,7 +32,8 @@ class Testcases(unittest.TestCase):
nobroadcast=True,
bundle=False,
# Overwrite wallet to use this list of wifs only
wif={"active": wif}
wif={"active": wif},
num_retries=10
)
self.bts.set_default_account("test")
set_shared_steem_instance(self.bts)
......
......@@ -27,11 +27,13 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
......@@ -17,11 +17,13 @@ class Testcases(unittest.TestCase):
b1 = Steem(
node=["wss://testnet.steem.vc"],
nobroadcast=True,
num_retries=10
)
b2 = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
self.assertNotEqual(b1.rpc.url, b2.rpc.url)
......
......@@ -33,11 +33,13 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
......@@ -40,83 +40,18 @@ class Testcases(unittest.TestCase):
nobroadcast=True,
bundle=False,
# Overwrite wallet to use this list of wifs only
wif={"active": wif}
wif={"active": wif},
num_retries=10
)
self.bts = Steem(
node=["wss://ws.golos.io"],
keys={"active": wif, "owner": wif, "posting": wif},
nobroadcast=True,
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
def test_transfer(self):
bts = self.bts
# bts.prefix ="STX"
acc = Account("test", steem_instance=bts)
tx = acc.transfer(
"test1", 1.33, "GBG", memo="Foobar")
self.assertEqual(
tx["operations"][0][0],
"transfer"
)
op = tx["operations"][0][1]
self.assertIn("memo", op)
self.assertEqual(op["from"], "test")
self.assertEqual(op["to"], "test1")
amount = Amount(op["amount"], steem_instance=bts)
self.assertEqual(float(amount), 1.33)
def test_create_account(self):
bts = self.bts
name = ''.join(random.choice(string.ascii_lowercase) for _ in range(12))
key1 = PrivateKey()
key2 = PrivateKey()
key3 = PrivateKey()
key4 = PrivateKey()
key5 = PrivateKey()
tx = bts.create_account(
name,
creator="test",
owner_key=format(key1.pubkey, core_unit),
active_key=format(key2.pubkey, core_unit),
posting_key=format(key3.pubkey, core_unit),
memo_key=format(key4.pubkey, core_unit),
additional_owner_keys=[format(key5.pubkey, core_unit)],
additional_active_keys=[format(key5.pubkey, core_unit)],
additional_owner_accounts=["test1"], # 1.2.0
additional_active_accounts=["test1"],
storekeys=False
)
self.assertEqual(
tx["operations"][0][0],
"account_create"
)
op = tx["operations"][0][1]
role = "active"
self.assertIn(
format(key5.pubkey, core_unit),
[x[0] for x in op[role]["key_auths"]])
self.assertIn(
format(key5.pubkey, core_unit),
[x[0] for x in op[role]["key_auths"]])
self.assertIn(
"test1",
[x[0] for x in op[role]["account_auths"]])
role = "owner"
self.assertIn(
format(key5.pubkey, core_unit),
[x[0] for x in op[role]["key_auths"]])
self.assertIn(
format(key5.pubkey, core_unit),
[x[0] for x in op[role]["key_auths"]])
self.assertIn(
"test1",
[x[0] for x in op[role]["account_auths"]])
self.assertEqual(
op["creator"],
"test")
def test_connect(self):
bts = self.bts
self.assertEqual(bts.prefix, "GLS")
......@@ -133,22 +68,6 @@ class Testcases(unittest.TestCase):
'time']:
self.assertTrue(key in info)
def test_finalizeOps(self):
bts = self.bts
tx1 = bts.new_tx()
tx2 = bts.new_tx()
acc = Account("test", steem_instance=bts)
acc.transfer("test1", 1, "GOLOS", append_to=tx1)
acc.transfer("test1", 2, "GOLOS", append_to=tx2)
acc.transfer("test1", 3, "GOLOS", append_to=tx1)
tx1 = tx1.json()
tx2 = tx2.json()
ops1 = tx1["operations"]
ops2 = tx2["operations"]
self.assertEqual(len(ops1), 2)
self.assertEqual(len(ops2), 1)
def test_weight_threshold(self):
bts = self.bts
auth = {'account_auths': [['test', 1]],
......@@ -167,75 +86,3 @@ class Testcases(unittest.TestCase):
with self.assertRaises(ValueError):
bts._test_weights_treshold(auth)
def test_allow(self):
bts = self.bts
self.assertIn(bts.prefix, "GLS")
acc = Account("test", steem_instance=bts)
self.assertIn(acc.steem.prefix, "GLS")
tx = acc.allow(
"GLS55VCzsb47NZwWe5F3qyQKedX9iHBHMVVFSc96PDvV7wuj7W86n",
account="test",
weight=1,
threshold=1,
permission="owner",
)
self.assertEqual(
(tx["operations"][0][0]),
"account_update"
)
op = tx["operations"][0][1]
self.assertIn("owner", op)
self.assertIn(
["GLS55VCzsb47NZwWe5F3qyQKedX9iHBHMVVFSc96PDvV7wuj7W86n", '1'],
op["owner"]["key_auths"])
self.assertEqual(op["owner"]["weight_threshold"], 1)
def test_disallow(self):
bts = self.bts
acc = Account("test", steem_instance=bts)
if sys.version > '3':
_assertRaisesRegex = self.assertRaisesRegex
else:
_assertRaisesRegex = self.assertRaisesRegexp
with _assertRaisesRegex(ValueError, ".*Changes nothing.*"):
acc.disallow(
"GLS55VCzsb47NZwWe5F3qyQKedX9iHBHMVVFSc96PDvV7wuj7W86n",
weight=1,
threshold=1,
permission="owner"
)
with _assertRaisesRegex(ValueError, ".*Changes nothing!.*"):
acc.disallow(
"GLS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV",
weight=1,
threshold=1,
permission="owner"
)
def test_update_memo_key(self):
bts = self.bts
self.assertEqual(bts.prefix, "GLS")
acc = Account("test", steem_instance=bts)
tx = acc.update_memo_key("GLS55VCzsb47NZwWe5F3qyQKedX9iHBHMVVFSc96PDvV7wuj7W86n")
self.assertEqual(
(tx["operations"][0][0]),
"account_update"
)
op = tx["operations"][0][1]
self.assertEqual(
op["memo_key"],
"GLS55VCzsb47NZwWe5F3qyQKedX9iHBHMVVFSc96PDvV7wuj7W86n")
def test_approvewitness(self):
bts = self.bts
w = Account("test", steem_instance=bts)
tx = w.approvewitness("test1")
self.assertEqual(
(tx["operations"][0][0]),
"account_witness_vote"
)
op = tx["operations"][0][1]
self.assertIn(
"test1",
op["witness"])
......@@ -28,11 +28,13 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
keys={"active": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
......@@ -21,7 +21,8 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
wif=[wif]
wif=[wif],
num_retries=10
)
set_shared_steem_instance(self.bts)
......
......@@ -45,6 +45,7 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
def test_connect(self):
......
......@@ -22,6 +22,7 @@ class Testcases(unittest.TestCase):
self.bts = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
set_shared_steem_instance(self.bts)
......
......@@ -20,6 +20,7 @@ class Testcases(unittest.TestCase):
steem = Steem(
node=nodes,
nobroadcast=True,
num_retries=10
)
set_shared_steem_instance(steem)
......
......@@ -37,13 +37,15 @@ class Testcases(unittest.TestCase):
node=nodes,
nobroadcast=True,
data_refresh_time_seconds=900,
keys={"active": wif, "owner": wif, "memo": wif}
keys={"active": wif, "owner": wif, "memo": wif},
num_retries=10
)
self.appbase = Steem(
node=nodes_appbase,
nobroadcast=True,
data_refresh_time_seconds=900,
keys={"active": wif, "owner": wif, "memo": wif}
keys={"active": wif, "owner": wif, "memo": wif},
num_retries=10
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment