Commit 2c349a6b authored by Krzysztof Mochocki's avatar Krzysztof Mochocki Committed by Krzysztof Mochocki

Added test that checks pagination of `enum_virtual_ops` endpoint

parent 9f01bbae
......@@ -159,11 +159,13 @@ hived_options_tests:
PYTHONPATH: $CI_PROJECT_DIR/tests/functional
script:
- cd tests/functional/python_tests/hived
- apt-get update -y && apt-get install -y python3 python3-pip python3-dev
- pip3 install -U psutil
- "python3 hived_options_tests.py --run-hived $CI_PROJECT_DIR/testnet_node_build/install-root/bin/hived"
tags:
- public-runner-docker
hived_snapshots_tests:
hived_replay_tests:
stage: test
needs:
- job: consensus_build
......@@ -172,17 +174,30 @@ hived_snapshots_tests:
variables:
PYTHONPATH: $CI_PROJECT_DIR/tests/functional
script:
- export ROOT_DIRECTORY=$PWD
- mkdir $ROOT_DIRECTORY/replay_logs
- cd tests/functional/python_tests/hived
- apt-get update -y && apt-get install -y python3 python3-pip
- pip3 install -U wget psutil junit_xml gcovr secp256k1prp
- $CI_PROJECT_DIR/consensus_build/install-root/bin/truncate_block_log /blockchain/block_log /tmp/block_log 10000
- "python3 snapshot_1.py --run-hived $CI_PROJECT_DIR/consensus_build/install-root/bin/hived --block-log /tmp/block_log"
- "python3 snapshot_2.py --run-hived $CI_PROJECT_DIR/consensus_build/install-root/bin/hived --block-log /tmp/block_log"
- apt-get update -y && apt-get install -y python3 python3-pip python3-dev
- pip3 install -U wget psutil junit_xml gcovr secp256k1prp requests
- $CI_PROJECT_DIR/consensus_build/install-root/bin/truncate_block_log /blockchain/block_log /tmp/block_log 3000000
# quick replays for 10k blocks, with node restarts
- "python3 snapshot_1.py --run-hived $CI_PROJECT_DIR/consensus_build/install-root/bin/hived --block-log /tmp/block_log --blocks 10000 --artifact-directory $ROOT_DIRECTORY/replay_logs"
- "python3 snapshot_2.py --run-hived $CI_PROJECT_DIR/consensus_build/install-root/bin/hived --block-log /tmp/block_log --blocks 10000 --artifact-directory $ROOT_DIRECTORY/replay_logs"
# group of tests, that uses one node with 5 milion blocks replayed
- "python3 start_replay_tests.py --run-hived $CI_PROJECT_DIR/consensus_build/install-root/bin/hived --blocks 3000000 --block-log /tmp/block_log --test-directory $PWD/replay_based_tests --artifact-directory $ROOT_DIRECTORY/replay_logs"
artifacts:
paths:
- replay_logs
when: always
expire_in: 6 months
tags:
- public-runner-docker
- hived-for-tests
package_consensus_node:
stage: package
needs:
......
......@@ -390,13 +390,3 @@ def compare_snapshots(path_to_first_snapshot : str, path_to_second_snapshot : st
ret.append(key)
return ret
from hive_utils.hive_node import HiveNode
def wait_for_node(node_to_wait : HiveNode, msg = ""):
from time import sleep
from psutil import pid_exists, Process, STATUS_ZOMBIE
print(msg)
with node_to_wait:
pid = node_to_wait.hived_process.pid
while pid_exists(pid) and Process(pid).status() != STATUS_ZOMBIE:
sleep(0.25)
......@@ -31,7 +31,7 @@ class HiveNode(object):
hived_data_dir = None
hived_args = list()
def __init__(self, binary_path : str, working_dir : str, binary_args : list):
def __init__(self, binary_path : str, working_dir : str, binary_args : list, stdout_stream = subprocess.PIPE, stderr_stream = None):
logger.info("New hive node")
if not os.path.exists(binary_path):
raise ValueError("Path to hived binary is not valid.")
......@@ -48,10 +48,13 @@ class HiveNode(object):
if binary_args:
self.hived_args.extend(binary_args)
self.stdout_stream = stdout_stream
self.stderr_stream = stderr_stream
def __enter__(self):
self.hived_lock.acquire()
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL
from time import sleep
hived_command = [
......@@ -60,7 +63,7 @@ class HiveNode(object):
]
hived_command.extend(self.hived_args)
self.hived_process = Popen(hived_command, stdout=PIPE, stderr=None)
self.hived_process = Popen(hived_command, stdout=self.stdout_stream, stderr=self.stderr_stream)
self.hived_process.poll()
sleep(5)
......@@ -77,23 +80,36 @@ class HiveNode(object):
logger.info("Closing node")
from signal import SIGINT, SIGTERM
from time import sleep
from psutil import pid_exists
if self.hived_process is not None:
self.hived_process.poll()
if self.hived_process.returncode != 0:
if pid_exists(self.hived_process.pid):
self.hived_process.send_signal(SIGINT)
sleep(7)
self.hived_process.poll()
if self.hived_process.returncode != 0:
if pid_exists(self.hived_process.pid):
self.hived_process.send_signal(SIGTERM)
sleep(7)
self.hived_process.poll()
if self.hived_process.returncode != 0:
if pid_exists(self.hived_process.pid):
raise Exception("Error during stopping node. Manual intervention required.")
self.last_returncode = self.hived_process.returncode
self.hived_process = None
self.hived_lock.release()
# waits for node, to close. Recomended to use with `--exit-after-replay` flag
def wait_till_end(self):
assert self.hived_process is not None
# assert "--exit-after-replay" in self.hived_args
from time import sleep
from psutil import pid_exists, Process, STATUS_ZOMBIE
pid = self.hived_process.pid
while pid_exists(pid) and Process(pid).status() != STATUS_ZOMBIE:
sleep(0.25)
class HiveNodeInScreen(object):
def __init__(self, hive_executable, working_dir, config_src_path, run_using_existing_data = False, node_is_steem = False):
self.hive_executable = hive_executable
......
......@@ -48,15 +48,17 @@ class config:
file.write("{} = {}\n".format(key.replace("_", "-"), value))
def load(self, path_to_file : str):
keys = self.__dict__.keys()
for key in keys:
key = key.replace("_", "-")
_keys = list(self.__dict__.keys())
keys = []
for key in _keys:
setattr(self, key, None)
keys.append(key.replace("_", "-"))
def proc_line(line : str):
values = line.split("=")
return values[0].strip(), values[1].strip()
def match_property(self, line : str):
def match_property(line : str):
if line.startswith('#'):
return
key, value = proc_line(line)
......@@ -66,3 +68,46 @@ class config:
with open(path_to_file, 'r') as file:
for line in file:
match_property(line)
def update_plugins(self, required_plugins : list):
plugins = self.plugin.split(" ")
plugins.extend(x for x in required_plugins if x not in plugins)
self.plugin = " ".join(plugins)
def exclude_plugins(self, plugins_to_exclude : list):
current_plugins = self.plugin.split(" ")
for plugin in plugins_to_exclude:
try:
current_plugins.remove(plugin)
except ValueError:
pass
self.plugin = " ".join(current_plugins)
def validate_address(val : str) -> bool:
try:
val = val.strip()
address, port = val.split(":")
port = int(port)
assert port >= 0
assert port < 0xffff
def validate_ip(ip : list):
addr = ip.split('.')
assert len(ip) == 4
for part in addr:
x = int(part)
assert x >= 0
assert x <= 255
try:
validate_address(address)
except:
from socket import gethostbyname as get_ip
validate_address(get_ip(address))
return True
except:
return False
\ No newline at end of file
# Replay Based Tests
#### Note
##### All python files in this directory, will be started after 3.5 million block replay will finish
---
## Flag requirements
- ### `--run-hived <address:port>`
address and port to currently running node
#### Example
python3 sample_test.py --run-hived 127.0.0.1:8090
- ### `--path-to-config <path>`
path to `config.ini` file of currently running node
#### Example
python3 sample_test.py --path-to-config /home/user/datadir/config.ini
#!/usr/bin/python3
import sys
import os
import tempfile
import argparse
from threading import Thread
sys.path.append("../../../")
import hive_utils
from hive_utils.resources.configini import config as configuration
from hive_utils.resources.configini import validate_address
# https://developers.hive.io/tutorials-recipes/paginated-api-methods.html#account_history_apiget_account_history
MAX_AT_ONCE = 10000
parser = argparse.ArgumentParser()
parser.add_argument("--run-hived", dest="hived", help = "IP address to replayed node", required=True, type=str)
parser.add_argument("--path-to-config", dest="config_path", help = "Path to node config file", required=True, type=str, default=None)
parser.add_argument("--blocks", dest="blocks", help = "Blocks to replay", required=False, type=int, default=1000000)
args = parser.parse_args()
node = None
assert int(args.blocks) >= 1000000, "replay has to be done for more than 1 million blocks"
# config
config = configuration()
config.load(args.config_path)
# check existance of required plugins
plugins = config.plugin.split(' ')
assert "account_history_rocksdb" in plugins
assert "account_history_api" in plugins
# class that compressing vop
class compressed_vop:
def __init__(self, vop):
from hashlib import sha512
from random import randint
from json import dumps
self.id = "{}_{}_{}".format( (~0x8000000000000000) & int(vop["operation_id"]), vop["block"], vop["trx_in_block"])
self.checksum = sha512( dumps(vop).encode() ).hexdigest()
# self.content = vop
def get(self):
return self.__dict__
# return compressed data from api call
def compress_vops(data : list) -> list:
ret = []
for vop in data:
ret.append(compressed_vop(vop).get())
return ret
# this function do call to API
def get_vops(range_begin : int, range_end : int, start_from_id : int, limit : int) -> dict:
global config
from requests import post
from json import dumps
# from time import sleep
# sleep(0.25)
data = {
"jsonrpc":"2.0",
"method":"call",
"params":[
"account_history_api",
"enum_virtual_ops",
{
"block_range_begin":range_begin,
"block_range_end":range_end,
"operation_begin":start_from_id,
"limit":limit
}
],
"id":1
}
ret = post(f"http://{config.webserver_http_endpoint}", data=dumps(data))
if ret.status_code == 200:
return ret.json()['result']
else:
raise Exception("bad request")
# checks is there anythink more to get
def paginated(data : dict, range_end : int) -> bool:
return not ( data['next_operation_begin'] == 0 and ( data['next_block_range_begin'] == range_end or data['next_block_range_begin'] == 0 ) )
# do one huge call
def get_vops_at_once(range_begin : int, range_end : int) -> list:
tmp = get_vops(range_begin, range_end, 0, MAX_AT_ONCE)
assert not paginated(tmp, range_end)
return compress_vops(tmp['ops'])
# generator, that get page by page in step given as limit
def get_vops_paginated(range_begin : int, range_end : int, limit : int):
ret = get_vops(range_begin, range_end, 0, limit)
yield compress_vops(ret['ops'])
if not paginated(ret, range_end):
ret = None
while ret is not None:
ret = get_vops(ret['next_block_range_begin'], range_end, ret['next_operation_begin'], limit)
yield compress_vops(ret['ops'])
if not paginated(ret, range_end):
ret = None
yield None
# wrapper on generator that agregates paginated output
def get_vops_with_step(range_begin : int, range_end : int, limit : int) -> list:
next_object = get_vops_paginated(range_begin, range_end, limit)
ret = []
value = next(next_object)
while value is not None:
ret.extend(value)
value = next(next_object)
return ret
# proxy, to get_vops_with_step with limit set as 1
def get_vops_one_by_one(range_begin : int, range_end : int) -> list:
return get_vops_with_step(range_begin, range_end, 1)
# get same data in given range with diffrent step
def check_range(range_begin : int, blocks : int):
from operator import itemgetter
from json import dump
range_end = range_begin + blocks + 1
print(f"gathering blocks in range [ {range_begin} ; {range_end} )")
all_at_once = get_vops_at_once(range_begin, range_end)
paginated_by_1 = get_vops_one_by_one(range_begin, range_end)
paginated_by_2 = get_vops_with_step(range_begin, range_end, 2)
paginated_by_5 = get_vops_with_step(range_begin, range_end, 5)
paginated_by_10 = get_vops_with_step(range_begin, range_end, 10)
# dump(all_at_once, open("all_at_once.json", 'w'))
# dump(paginated_by_1, open("paginated_by_1.json", 'w'))
# dump(paginated_by_2, open("paginated_by_2.json", 'w'))
# dump(paginated_by_5, open("paginated_by_5.json", 'w'))
# dump(paginated_by_10, open("paginated_by_10.json", 'w'))
assert all_at_once == paginated_by_1
print(f"[OK] all == paginated by 1 [ {range_begin} ; {range_end} )")
assert all_at_once == paginated_by_2
print(f"[OK] all == paginated by 2 [ {range_begin} ; {range_end} )")
assert all_at_once == paginated_by_5
print(f"[OK] all == paginated by 5 [ {range_begin} ; {range_end} )")
assert all_at_once == paginated_by_10
print(f"[OK] all == paginated by 10 [ {range_begin} ; {range_end} )")
return True
threads = []
STEP = 100
# start tests in diffrent threads
for i in range(args.blocks - (STEP * 4), args.blocks, STEP):
th = Thread(target=check_range, args=(i, STEP))
th.start()
threads.append( th )
for job in threads:
job.join()
print("success")
exit(0)
......@@ -9,7 +9,7 @@ sys.path.append("../../")
import hive_utils
from hive_utils.resources.configini import config as configuration
from hive_utils.common import wait_for_node
from subprocess import PIPE, STDOUT
parser = argparse.ArgumentParser()
......@@ -17,6 +17,7 @@ parser.add_argument("--run-hived", dest="hived", help = "Path to hived executabl
parser.add_argument("--block-log", dest="block_log_path", help = "Path to block log", required=True, type=str, default=None)
parser.add_argument("--blocks", dest="blocks", help = "Blocks to replay", required=False, type=int, default=1000)
parser.add_argument("--leave", dest="leave", action='store_true')
parser.add_argument("--artifact-directory", dest="artifacts", help = "Path to directory where logs will be stored", required=False, type=str)
args = parser.parse_args()
node = None
......@@ -58,15 +59,27 @@ base_hv_args = [ "--stop-replay-at-block", str(args.blocks), "--exit-after-repla
hv_args = base_hv_args.copy()
hv_args.append("--replay-blockchain")
# setting up logging
stdout = PIPE
stderr = None
if args.artifacts:
stderr = STDOUT
stdout = open(os.path.join(args.artifacts, "replayed_node_snapshot_1.log"), 'w', 1)
# setup for replay
node = hive_utils.hive_node.HiveNode(
args.hived,
work_dir,
hv_args.copy()
hv_args.copy(),
stdout,
stderr
)
# replay
wait_for_node(node, "waiting for replay of {} blocks...".format(int(args.blocks)))
print(f"waiting for replay of {args.blocks} blocks...")
with node:
node.wait_till_end()
print("replay completed, creating snapshot")
# setup for first snapshot
......@@ -76,7 +89,9 @@ hv_args.extend(["--dump-snapshot", first_snapshot_name])
node.hived_args = hv_args.copy()
# creating snapshot
wait_for_node(node, "creating snapshot '{}' ...".format(first_snapshot_name))
print("creating snapshot '{}' ...".format(first_snapshot_name))
with node:
node.wait_till_end()
# setup for loading snapshot
hv_args = base_hv_args.copy()
......@@ -85,7 +100,9 @@ node.hived_args = hv_args.copy()
os.remove(os.path.join(blockchain_dir, "shared_memory.bin"))
# loading snapshot
wait_for_node(node, "creating snapshot '{}' ...".format(first_snapshot_name))
print("creating snapshot '{}' ...".format(first_snapshot_name))
with node:
node.wait_till_end()
# setup for redumping snapshot
second_snapshot_name = "second_snapshot"
......@@ -94,7 +111,9 @@ hv_args.extend(["--dump-snapshot", second_snapshot_name])
node.hived_args = hv_args.copy()
# redumpping snapshot
wait_for_node(node, "creating snapshot '{}' ...".format(second_snapshot_name))
print("creating snapshot '{}' ...".format(second_snapshot_name))
with node:
node.wait_till_end()
path_to_first_snapshot = os.path.join(snapshot_root, first_snapshot_name)
path_to_second_snapshot = os.path.join(snapshot_root, second_snapshot_name)
......@@ -117,4 +136,7 @@ if not args.leave:
else:
print("datadir not deleted: {}".format(work_dir))
if stderr is not None:
stdout.close()
exit(len(miss_list))
......@@ -4,12 +4,13 @@ import sys
import os
import tempfile
import argparse
from subprocess import PIPE, STDOUT
sys.path.append("../../")
import hive_utils
from hive_utils.resources.configini import config as configuration
from hive_utils.common import wait_n_blocks, wait_for_node
from hive_utils.common import wait_n_blocks
parser = argparse.ArgumentParser()
......@@ -17,6 +18,7 @@ parser.add_argument("--run-hived", dest="hived", help = "Path to hived executabl
parser.add_argument("--block-log", dest="block_log_path", help = "Path to block log", required=True, type=str, default=None)
parser.add_argument("--blocks", dest="blocks", help = "Blocks to replay", required=False, type=int, default=1000)
parser.add_argument("--leave", dest="leave", action='store_true')
parser.add_argument("--artifact-directory", dest="artifacts", help = "Path to directory where logs will be stored", required=False, type=str)
args = parser.parse_args()
node = None
......@@ -63,7 +65,9 @@ def dump_snapshot(Node, snapshot_name):
hv_args.extend(["--dump-snapshot", snapshot_name])
Node.hived_args = hv_args
# creating snapshot
wait_for_node(Node, "creating snapshot '{}' ...".format(snapshot_name))
print("creating snapshot '{}' ...".format(snapshot_name))
with Node:
Node.wait_till_end()
def load_snapshot(Node, snapshot_name):
# setup for loading snapshot
......@@ -72,34 +76,54 @@ def load_snapshot(Node, snapshot_name):
Node.hived_args = hv_args
os.remove(os.path.join(blockchain_dir, "shared_memory.bin"))
# loading snapshot
wait_for_node(Node, "loading snapshot '{}' ...".format(snapshot_name))
print( "loading snapshot '{}' ...".format(snapshot_name))
with Node:
Node.wait_till_end()
def run_for_n_blocks(Node, blocks : int, additional_args : list = []):
args.blocks += blocks
Node.hived_args = get_base_hv_args()
if len(additional_args) > 0:
Node.hived_args.extend(additional_args)
wait_for_node(Node, "waiting for {} blocks...".format(int(args.blocks)))
print("waiting for {} blocks...".format(int(args.blocks)))
with Node:
Node.wait_till_end()
def require_success(node):
assert node.last_returncode == 0
def require_fail(node):
assert node.last_returncode == 0
def require_fail(node, filepath):
with open(filepath, 'r') as fff:
for line in fff:
if line.find("Snapshot generation FAILED."):
return
assert False
hv_args = get_base_hv_args()
hv_args.append("--replay-blockchain")
# setting up logging
stdout = PIPE
stderr = None
if args.artifacts:
stderr = STDOUT
stdout = open(os.path.join(args.artifacts, "replayed_node_snapshot_2.log"), 'w', 1)
# setup for replay
node = hive_utils.hive_node.HiveNode(
args.hived,
work_dir,
hv_args
hv_args,
stdout,
stderr
)
# replay
wait_for_node(node, "waiting for replay of {} blocks...".format(int(args.blocks)))
print("waiting for replay of {} blocks...".format(int(args.blocks)))
with node:
node.wait_till_end()
require_success(node)
print("replay completed, creating snapshot")
......@@ -120,8 +144,27 @@ run_for_n_blocks(node, 100, ["--replay-blockchain"])
require_success(node)
# dump to same directory
# change output stream
tmp_filel = "tmp_file.log"
tmp_output = open(tmp_filel, 'w')
current_outuput = node.stdout_stream
node.stdout_stream = tmp_output
# gather output
dump_snapshot(node, "snap_1")
require_fail(node)
# optionally append output to artiffacts
tmp_output.close()
node.stdout_stream = current_outuput
if stderr is not None:
with open(tmp_filel, 'r'):
for line in tmp_filel:
stdout.write(line)
# check is failed
require_fail(node, tmp_filel)
os.remove(tmp_filel)
from shutil import rmtree
rmtree(os.path.join(config.snapshot_root_dir, "snap_1"))