Skip to content
Snippets Groups Projects
Commit 217b5c5c authored by Radosław Masłowski's avatar Radosław Masłowski
Browse files

Create new Wallet base on beekeeper

parent 80ebc587
No related branches found
No related tags found
1 merge request!206Implement new wallet based on beekeeper
# mypy: ignore-errors
# ruff: noqa
# file for deletion after cli_wallet deprecation
from __future__ import annotations
import typing
from typing import TYPE_CHECKING, Literal
from test_tools.__private.type_annotations.any_node import AnyNode
from typing import TYPE_CHECKING
from test_tools.__private.node import Node
from test_tools.__private.remote_node import RemoteNode
from test_tools.__private.user_handles.get_implementation import get_implementation
from test_tools.__private.user_handles.handle import Handle
from test_tools.__private.user_handles.handles.node_handles.node_handle_base import NodeHandleBase
from test_tools.__private.user_handles.handles.node_handles.remote_node_handle import RemoteNodeHandle as RemoteNode
from test_tools.__private.wallet import SingleTransactionContext, Wallet
from test_tools.__private.user_handles.handles.node_handles.remote_node_handle import RemoteNodeHandle
from test_tools.__private.wallet.wallet import Wallet
if TYPE_CHECKING:
from collections.abc import Iterable
from pathlib import Path
from helpy import Hf26Asset as Asset
from schemas.fields.hex import Hex
from schemas.operations import AnyOperation
from test_tools.__private.account import Account
from test_tools.__private.type_annotations.any_node import AnyNode
from test_tools.__private.wallet.constants import TransactionSerializationTypes, WalletResponse, WalletResponseBase
from test_tools.__private.wallet.single_transaction_context import SingleTransactionContext
class WalletHandle(Handle):
DEFAULT_PASSWORD = Wallet.DEFAULT_PASSWORD
def __init__(
self,
attach_to: AnyNode | None = None,
additional_arguments: Iterable[str] = (),
preconfigure: bool = True,
time_control: str | None = None,
chain_id: Hex | str = "default",
transaction_serialization: TransactionSerializationTypes = "hf26",
):
"""
Creates wallet, runs its process and blocks until wallet will be ready to use.
Prepare environment for wallet based on instance of beekeeper and wax, runs beekeeper instance, beekeeper session and wallet and made preconfigurations for test usage.
:param attach_to: Wallet will send messages to node passed as this parameter. Passing node is optional, but when
node is omitted, wallet functionalities are limited.
:param additional_arguments: Command line arguments which will be applied during running wallet.
:param preconfigure: If set to True, after run wallet will be unlocked with password DEFAULT_PASSWORD and
initminer's keys imported.
:param time_control: See parameter ``time_control`` in :func:`run`.
:param preconfigure: If set to True, after run wallet initminer's keys imported.
:param chain_id: If set to "default", wallet use chain_id from node, else use typed chain_id.
:param transaction_serialization: Set type of transaction serialization- hf26 or legacy. Default: hf26. Legacy serialization may not work correctly with the testnet.
"""
if isinstance(attach_to, NodeHandleBase | RemoteNode):
attach_to = get_implementation(attach_to, AnyNode)
if isinstance(attach_to, NodeHandleBase | RemoteNodeHandle):
attach_to = get_implementation(attach_to, Node | RemoteNode) # type: ignore[arg-type]
super().__init__(
implementation=Wallet(
attach_to=attach_to,
additional_arguments=additional_arguments,
preconfigure=preconfigure,
time_control=time_control,
chain_id=chain_id,
transaction_serialization=transaction_serialization,
)
)
self.api = self.__implementation.api
@property
def __implementation(self) -> Wallet:
def __implementation(self) -> Wallet: # type: ignore[override] # this is forward of base class private member to child private member
return get_implementation(self, Wallet)
def in_single_transaction(self, *, broadcast: bool | None = None) -> SingleTransactionContext:
@property
def connected_node(self) -> None | Node | RemoteNode:
"""Returns node instance connected to the wallet."""
return self.__implementation.connected_node
def in_single_transaction(self, *, broadcast: bool = True, blocking: bool = True) -> SingleTransactionContext:
"""
Returns context manager allowing aggregation of multiple operations to single transaction.
......@@ -70,60 +73,41 @@ class WalletHandle(Handle):
If set to False, only builds transaction without sending, which may be accessed with `get_response` method
of returned context manager object.
Otherwise behavior is undefined.
:param blocking: If set to True, wallet waiting for response from blockchain. If set to False, directly after send request, program continue working.
"""
return self.__implementation.in_single_transaction(broadcast=broadcast)
return self.__implementation.in_single_transaction(broadcast=broadcast, blocking=blocking)
def run(
self,
*,
timeout: float = Wallet.DEFAULT_RUN_TIMEOUT,
preconfigure: bool = True,
time_control: str | None = None,
):
def send(
self, operations: list[AnyOperation], broadcast: bool, blocking: bool = True
) -> WalletResponseBase | WalletResponse | None:
"""
Runs wallet's process and blocks until wallet will be ready to use.
:param timeout: TimeoutError will be raised, if wallet won't start before specified timeout.
:param preconfigure: If set to True, after run wallet will be unlocked with password DEFAULT_PASSWORD and
initminer's keys imported.
:param time_control: Allows to change system date and time a node sees (without changing real OS time).
Can be specified either absolutely, relatively and speed up or slow down clock. Value passed in
`time_control` is written to `FAKETIME` environment variable. For details and examples see libfaketime
official documentation: https://github.com/wolfcw/libfaketime.
"""
self.__implementation.run(timeout=timeout, preconfigure=preconfigure, time_control=time_control)
Enable to sign and send transaction with any operations.
def restart(self, *, preconfigure: bool = True, time_control: str | None = None) -> None:
:param: operations: List of operation, which have to be send in this transaction.
:param broadcast: If set to True, this is default behavior, sends (broadcasts) transaction to blockchain.
If set to False, only builds transaction without sending.
:param blocking: If set to True, wallet waiting for response from blockchain. If set to False, directly after send request, program continue working.
"""
Closes wallet's process, runs it again and blocks until wallet will be ready to use.
return self.__implementation.send(operations=operations, broadcast=broadcast, blocking=blocking)
:param preconfigure: If set to True, after run wallet will be unlocked with password DEFAULT_PASSWORD and
initminer's keys imported.
:param time_control: See parameter ``time_control`` in :func:`run`.
def run(self, preconfigure: bool = True) -> None:
"""
self.__implementation.restart(preconfigure=preconfigure, time_control=time_control)
Runs beekeeper instance, beekeeper session and wallet. Also makes preconfigurations for test usage.
@property
def transaction_serialization(self) -> Literal["legacy", "hf26"]:
:param preconfigure: If set to True, after run wallet initminer's keys imported.
"""
Returns information about how transactions are serialized.
self.__implementation.run(preconfigure=preconfigure)
Can be serialized in legacy way (e.g. assets are serialized as strings "3.000 HIVE", but it's not the only
difference) or in HF26 way (then are serialized as {"amount": "3000", "precision": 3, "nai": "@@000000021"}).
"""
return self.__implementation.transaction_serialization
def restart(self) -> None:
"""Close wallet and run it again."""
self.__implementation.restart()
def close(self) -> None:
"""
Terminates wallet's process and performs needed cleanups.
Blocks until wallet process will be finished. Closing is performed by sending SIGINT signal.
If wallet doesn't close before timeout, sends SIGKILL and emits warning message.
"""
"""Remove beekeeper instance, close beekeeper session and performs needed cleanups."""
self.__implementation.close()
def is_running(self) -> bool:
"""Returns True if wallet's process is running, otherwise False."""
"""Returns True if wallet is running, otherwise False."""
return self.__implementation.is_running()
def create_accounts(
......@@ -160,7 +144,7 @@ class WalletHandle(Handle):
hives: Asset.TestT | float | None = None,
vests: Asset.TestT | float | None = None,
hbds: Asset.TbdT | float | None = None,
) -> dict:
) -> WalletResponseBase | WalletResponse | None:
"""
Creates account in blockchain and optionally fund it with given amount of hives, vests and HBDs.
......
This diff is collapsed.
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Final, Literal
from schemas.fields.basic import AccountName, EmptyString, PrivateKey, PublicKey, WitnessUrl
from schemas.fields.compound import HbdExchangeRate
from schemas.fields.hive_datetime import HiveDateTime
from schemas.operations.representations.hf26_representation import HF26Representation
from schemas.transaction import Transaction
if TYPE_CHECKING:
from schemas.operations import AnyOperation
from test_tools.__private.node import Node
from test_tools.__private.remote_node import RemoteNode
AnyNode = Node | RemoteNode
DEFAULT_PASSWORD: Final[str] = "password"
HIVE_MAX_TIME_UNTIL_EXPIRATION: Final[int] = 60 * 60
HIVE_MAX_TIME_UNTIL_SIGNATURE_EXPIRATION: Final[int] = 86400
ACCOUNT_PER_TRANSACTION: Final[int] = 500
MULTIPLE_IMPORT_KEYS_BATCH_SIZE: Final[int] = 10000
AccountNameApiType = AccountName | str
EmptyStringApiType = EmptyString | str
HiveDateTimeApiType = HiveDateTime | datetime | str
PublicKeyApiType = PublicKey | str
WitnessUrlApiType = WitnessUrl | str
HbdExchangeRateApiType = HbdExchangeRate | dict
AuthorityType = Literal["active", "owner", "posting"]
TransactionSerializationTypes = Literal["hf26", "legacy"]
@dataclass
class AuthorityRequirementsHolder:
active: set[str] = field(default_factory=set)
owner: set[str] = field(default_factory=set)
posting: set[str] = field(default_factory=set)
def all_(self) -> set[str]:
return {*self.active, *self.owner, *self.posting}
@dataclass
class AuthorityHolder:
active: dict[PublicKey, PrivateKey] = field(default_factory=dict)
owner: dict[PublicKey, PrivateKey] = field(default_factory=dict)
posting: dict[PublicKey, PrivateKey] = field(default_factory=dict)
def all_(self) -> dict[PublicKey, PrivateKey]:
result = self.active.copy()
result.update(self.owner)
result.update(self.posting)
return result
class SimpleTransaction(Transaction):
def add_operation(self, operation: AnyOperation) -> None:
self.operations.append(HF26Representation(type=operation.get_name_with_suffix(), value=operation))
class WalletResponseBase(SimpleTransaction):
transaction_id: str
class WalletResponse(WalletResponseBase):
block_num: int
transaction_num: int
rc_cost: None | int
from __future__ import annotations
import concurrent
import concurrent.futures
import os
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from loguru import logger
import helpy
import wax
from helpy import wax as wax_helpy
from helpy._interfaces.asset.asset import Asset
from schemas.fields.compound import Authority
from schemas.fields.hive_int import HiveInt
from schemas.operations.account_create_operation import AccountCreateOperation
from test_tools.__private.account import Account
from test_tools.__private.remote_node import RemoteNode
from test_tools.__private.wallet.constants import (
ACCOUNT_PER_TRANSACTION,
MULTIPLE_IMPORT_KEYS_BATCH_SIZE,
SimpleTransaction,
WalletResponseBase,
)
if TYPE_CHECKING:
from collections.abc import Callable
from beekeepy import Beekeeper, PackedSyncBeekeeper
from beekeepy._interface.abc.synchronous.wallet import UnlockedWallet
from schemas.fields.basic import PublicKey
from schemas.operations import AnyOperation
from test_tools.__private.node import Node
AnyNode = Node | RemoteNode
def get_authority(key: PublicKey | str) -> Authority:
return Authority(weight_threshold=1, account_auths=[], key_auths=[[key, 1]])
def generate_transaction_template(node: RemoteNode) -> SimpleTransaction:
gdpo = node.api.database.get_dynamic_global_properties()
block_id = gdpo.head_block_id
# set header
tapos_data = wax.get_tapos_data(block_id.encode())
ref_block_num = tapos_data.ref_block_num
ref_block_prefix = tapos_data.ref_block_prefix
assert ref_block_num >= 0, f"ref_block_num value `{ref_block_num}` is invalid`"
assert ref_block_prefix > 0, f"ref_block_prefix value `{ref_block_prefix}` is invalid`"
return SimpleTransaction(
ref_block_num=HiveInt(ref_block_num),
ref_block_prefix=HiveInt(ref_block_prefix),
expiration=gdpo.time + timedelta(seconds=1800),
extensions=[],
signatures=[],
operations=[],
)
def sign_transaction(
node: RemoteNode, transaction: SimpleTransaction, beekeeper_wallet: UnlockedWallet
) -> SimpleTransaction:
wax_helpy.calculate_transaction_id(transaction=transaction)
node_config = node.api.database.get_config()
sig_digest = wax_helpy.calculate_sig_digest(transaction, node_config.HIVE_CHAIN_ID)
key_to_sign_with = beekeeper_wallet.import_key(private_key=Account("initminer").private_key)
time_before = datetime.now()
signature = beekeeper_wallet.sign_digest(sig_digest=sig_digest, key=key_to_sign_with)
logger.info(f"Sign digest time: {datetime.now() - time_before}")
transaction.signatures.append(signature)
transaction.signatures = list(set(transaction.signatures))
wax_helpy.validate_transaction(transaction)
return transaction
def prepare_transaction(
operations: list[AnyOperation], node: RemoteNode, beekeeper_wallet: UnlockedWallet
) -> WalletResponseBase:
transaction = generate_transaction_template(node)
account_creation_fee = node.api.wallet_bridge.get_chain_properties().account_creation_fee
for operation in operations:
if isinstance(operation, AccountCreateOperation):
operation.fee = account_creation_fee # type: ignore[assignment]
transaction.add_operation(operation)
transaction = sign_transaction(node, transaction, beekeeper_wallet)
return WalletResponseBase(
transaction_id=wax_helpy.calculate_transaction_id(transaction=transaction),
ref_block_num=transaction.ref_block_num,
ref_block_prefix=transaction.ref_block_prefix,
expiration=transaction.expiration,
extensions=transaction.extensions,
signatures=transaction.signatures,
operations=transaction.operations,
)
def send_transaction( # noqa: C901
accounts_: list[Account],
packed_beekeeper: PackedSyncBeekeeper,
node_address: helpy.HttpUrl,
beekeeper_wallet_name: str,
beekeeper_wallet_password: str,
) -> None:
def retry_until_success(predicate: Callable[[], Any], *, fail_message: str, max_retries: int = 20) -> bool:
retries = 0
while retries <= max_retries:
try:
predicate()
except Exception as e: # noqa: BLE001
logger.error(f"{fail_message} ; {e}")
retries += 1
else:
return True
return False
def ensure_accounts_exists() -> None:
listed_accounts = node.api.wallet_bridge.list_accounts(accounts_[0].name, 1)
logger.info(listed_accounts)
if accounts_[0].name in listed_accounts:
logger.debug(f"Accounts created: {accounts_range_message}")
def broadcast_transaction(
operations: list[AnyOperation], node: RemoteNode, beekeeper_wallet: UnlockedWallet
) -> bool:
def prepare_and_broadcast() -> None:
trx = prepare_transaction(operations, node, beekeeper_wallet)
with node.restore_settings():
node.settings.timeout = timedelta(hours=1)
node.api.wallet_bridge.broadcast_transaction_synchronous(trx)
return retry_until_success(
prepare_and_broadcast,
fail_message=f"Failed to send transaction: {accounts_range_message}",
)
operations: list[AnyOperation] = []
beekeeper = packed_beekeeper.unpack()
node = RemoteNode(http_endpoint=node_address)
accounts_range_message = f"{accounts_[0].name}..{accounts_[-1].name}"
for account in accounts_:
operation = AccountCreateOperation(
creator="initminer",
new_account_name=account.name,
json_metadata="{}",
fee=Asset.Test(1).as_nai(),
owner=get_authority(account.public_key),
active=get_authority(account.public_key),
posting=get_authority(account.public_key),
memo_key=account.public_key,
)
operations.append(operation)
# Send transaction
with beekeeper.create_session() as session:
beekeeper_wallet = session.open_wallet(name=beekeeper_wallet_name)
beekeeper_wallet = beekeeper_wallet.unlock(beekeeper_wallet_password)
while True:
if not broadcast_transaction(operations, node, beekeeper_wallet):
continue
if retry_until_success(
ensure_accounts_exists,
fail_message=f"Node ignored create accounts request of accounts {accounts_range_message}, requesting again...",
max_retries=5,
):
return
def create_accounts(
beekeeper: Beekeeper,
node: AnyNode,
beekeeper_wallet_name: str,
beekeeper_wallet_password: str,
number_of_accounts: int,
import_keys: bool,
name_base: str = "account",
*,
secret: str = "secret",
) -> list[Account]:
def run_in_thread_pool_executor(
predicate: Callable[..., Any],
iterable_args: list[Any],
packed_beekeeper: PackedSyncBeekeeper,
node_address: helpy.HttpUrl,
beekeeper_wallet_name: str,
beekeeper_wallet_password: str,
*,
max_threads: int = (os.cpu_count() or 24), # noqa: B008
) -> None:
with concurrent.futures.ProcessPoolExecutor(max_workers=max_threads) as executor:
futures: list[concurrent.futures.Future[Any]] = []
for args in iterable_args:
futures.append(
executor.submit(
predicate,
args,
packed_beekeeper,
node_address,
beekeeper_wallet_name,
beekeeper_wallet_password,
)
)
start = time.perf_counter()
while (
len((tasks := concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED))[0]) > 0
):
task = tasks[0].pop()
futures.pop(futures.index(task))
logger.debug(f"Joined next item after {(time.perf_counter() - start) :.4f} seconds...")
start = time.perf_counter()
if (exc := task.exception()) is not None:
logger.error(f"got {exc=}")
raise exc
def split(
collection: list[Any], items_per_chunk: int, *, predicate: Callable[[Any], Any] = lambda x: x
) -> list[Any]:
return [
[predicate(item) for item in collection[i : i + items_per_chunk]]
for i in range(0, len(collection), items_per_chunk)
]
packed_beekeeper = beekeeper.pack()
accounts = Account.create_multiple(number_of_accounts, name_base, secret=secret)
run_in_thread_pool_executor(
send_transaction,
split(accounts, ACCOUNT_PER_TRANSACTION),
packed_beekeeper,
node.http_endpoint,
beekeeper_wallet_name,
beekeeper_wallet_password,
)
with beekeeper.create_session() as session:
beekeeper_wallet = session.open_wallet(name=beekeeper_wallet_name)
beekeeper_wallet = beekeeper_wallet.unlock(beekeeper_wallet_password)
if import_keys:
keys: list[str] = []
for account in accounts:
keys.append(account.private_key)
num_keys = len(keys)
for i in range(0, num_keys, MULTIPLE_IMPORT_KEYS_BATCH_SIZE):
batch_keys = keys[i : i + MULTIPLE_IMPORT_KEYS_BATCH_SIZE]
time1 = datetime.now()
beekeeper_wallet.import_keys(private_keys=batch_keys)
time2 = datetime.now()
logger.info(f"Import time: {time2-time1}")
return accounts
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Any
from helpy._interfaces.context import ContextSync
from test_tools.__private.wallet.constants import WalletResponse, WalletResponseBase
if TYPE_CHECKING:
from test_tools.__private.node import Node
from test_tools.__private.remote_node import RemoteNode
AnyNode = Node | RemoteNode
from test_tools.__private.wallet.wallet import Wallet
class SingleTransactionContext(ContextSync["SingleTransactionContext"]):
def __init__(self, wallet_: Wallet, *, broadcast: bool, blocking: bool) -> None:
self.__wallet = wallet_
self.__broadcast = broadcast
self.__blocking = blocking
self.__response: WalletResponseBase | WalletResponse | None = None
self.__was_run_as_context_manager = False
def get_response(self) -> None | WalletResponseBase | WalletResponse:
return self.__response
def as_dict(self) -> None | dict[str, Any]:
if isinstance(self.__response, WalletResponseBase | WalletResponse):
return self.__response.__dict__
return None
def __del__(self) -> None:
if not self.__was_run_as_context_manager:
from test_tools.__private.wallet.wallet import Wallet
raise RuntimeError(
f'You used {Wallet.__name__}.{Wallet.in_single_transaction.__name__}() not in "with" statement'
)
def _enter(self) -> SingleTransactionContext:
self.__wallet.api._start_gathering_operations_for_single_transaction()
self.__was_run_as_context_manager = True
return self
def _finally(self) -> None:
exc_type, exc_value, exc_traceback = sys.exc_info()
if exc_value is not None:
raise exc_value
self.__response = self.__wallet.api._send_gathered_operations_as_single_transaction(
broadcast=self.__broadcast, blocking=self.__blocking
)
from __future__ import annotations
import shutil
import warnings
from datetime import timedelta
from typing import TYPE_CHECKING, Any, get_args
from beekeepy import Beekeeper, Settings
import helpy
import wax
from helpy import Hf26Asset as Asset
from helpy import wax as wax_helpy
from schemas.fields.basic import PublicKey
from schemas.fields.hex import Hex
from schemas.fields.hive_int import HiveInt
from test_tools.__private import exceptions
from test_tools.__private.account import Account
from test_tools.__private.node import Node
from test_tools.__private.remote_node import RemoteNode
from test_tools.__private.scope import ScopedObject, context
from test_tools.__private.user_handles.implementation import Implementation as UserHandleImplementation
from test_tools.__private.wallet.constants import (
DEFAULT_PASSWORD,
AuthorityType,
SimpleTransaction,
TransactionSerializationTypes,
WalletResponse,
WalletResponseBase,
)
from test_tools.__private.wallet.create_accounts import (
create_accounts,
)
from test_tools.__private.wallet.single_transaction_context import SingleTransactionContext
from test_tools.__private.wallet.wallet_api import Api
if TYPE_CHECKING:
from pathlib import Path
from beekeepy._interface.abc.synchronous.session import Session
from beekeepy._interface.abc.synchronous.wallet import UnlockedWallet
from schemas.apis.wallet_bridge_api.fundaments_of_responses import Account as AccountSchema
from schemas.fields.assets.hbd import AssetHbdHF26
from schemas.fields.assets.hive import AssetHiveHF26
from schemas.fields.assets.vests import AssetVestsHF26
from schemas.fields.compound import Authority
from schemas.operations import AnyOperation
from test_tools.__private.user_handles.handles.wallet_handle import WalletHandle
AnyNode = Node | RemoteNode
class Wallet(UserHandleImplementation, ScopedObject):
def __init__(
self,
*,
attach_to: None | AnyNode,
preconfigure: bool = True,
chain_id: str = "default",
transaction_serialization: TransactionSerializationTypes = "hf26",
handle: WalletHandle | None = None,
):
super().__init__(handle=handle)
self.api = Api(self)
self.connected_node: None | AnyNode = attach_to
self.__chain_id: str = chain_id
self._transaction_serialization: str = transaction_serialization
assert self._transaction_serialization in get_args(
TransactionSerializationTypes
), "Invalid transaction_serialization parameter value"
self._use_authority: dict[str, AuthorityType] = {}
self.__beekeeper: Beekeeper | None = None
self.__beekeeper_session: Session | None = None
self._beekeeper_wallet: UnlockedWallet | None = None
self._transaction_expiration_offset = timedelta(seconds=30)
self.__prepare_directory()
self.run(preconfigure=preconfigure)
if (
self.connected_node is not None
and self._transaction_serialization == "legacy"
and self.connected_node.api.database.get_version().node_type == "testnet"
):
warnings.warn("Wallet in legacy mode may not work correctly with the testnet hive instance.", stacklevel=1)
@property
def _force_connected_node(self) -> AnyNode:
assert self.connected_node is not None, "Node not exist"
return self.connected_node
def __get_chain_id(self) -> Hex:
node_config = self._force_connected_node.api.database.get_config()
if self.__chain_id != "default":
assert self.__chain_id.isdigit(), "Invalid chain_id value: it must be a digit string"
chain_id = self.__chain_id[:64]
return Hex(Hex.validate(chain_id.ljust(64, "0")))
return node_config.HIVE_CHAIN_ID
def __prepare_directory(self) -> None:
self.name: str
self.directory: Path
if isinstance(self.connected_node, Node):
self.name = context.names.register_numbered_name(f"{self.connected_node}.Wallet")
self.directory = self.connected_node.directory.parent / self.name
self.connected_node.register_wallet(self)
elif isinstance(self.connected_node, RemoteNode):
self.name = context.names.register_numbered_name(f"{self.connected_node}.Wallet")
self.directory = context.get_current_directory() / self.name
else:
self.name = context.names.register_numbered_name("Wallet")
self.directory = context.get_current_directory() / self.name
if self.directory.exists():
shutil.rmtree(self.directory)
@property
def beekeeper_wallet(self) -> UnlockedWallet:
assert self._beekeeper_wallet is not None, "Beekeeper wallet not exist"
return self._beekeeper_wallet
def send(
self, operations: list[AnyOperation], broadcast: bool, blocking: bool
) -> None | WalletResponseBase | WalletResponse:
return self.api._send(operations, broadcast, blocking)
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return str(self)
def get_stdout_file_path(self) -> Path:
return self.directory / "stdout.txt"
def get_stderr_file_path(self) -> Path:
return self.directory / "stderr.txt"
def is_running(self) -> bool:
return self._beekeeper_wallet is not None
def run(
self,
preconfigure: bool = True,
) -> None:
if self.is_running():
raise RuntimeError("Wallet is already running")
if self.connected_node is not None and not self.connected_node.is_running():
raise exceptions.NodeIsNotRunningError("Before attaching wallet you have to run node")
self.__beekeeper = Beekeeper.factory(settings=Settings(working_directory=self.directory))
self.__beekeeper_session = self.__beekeeper.create_session()
try:
self._beekeeper_wallet = self.__beekeeper_session.create_wallet(name=self.name, password=DEFAULT_PASSWORD)
if preconfigure:
self._beekeeper_wallet.import_key(private_key=Account("initminer").private_key)
except helpy.exceptions.RequestError as exception:
if f"Wallet with name: '{self.name}' already exists" in exception.error:
locked_wallet = self.__beekeeper_session.open_wallet(name=self.name)
self._beekeeper_wallet = locked_wallet.unlock(DEFAULT_PASSWORD)
else:
raise
def at_exit_from_scope(self) -> None:
self.close()
def restart(self) -> None:
self.close()
self.run()
def close(self) -> None:
if self.is_running():
if self.__beekeeper is not None:
self.__beekeeper.teardown()
self.__beekeeper = None
self.__beekeeper_session = None
self._beekeeper_wallet = None
def create_account(
self,
name: str,
*,
creator: str = "initminer",
hives: Asset.TestT | float | None = None,
vests: Asset.TestT | float | None = None,
hbds: Asset.TbdT | float | None = None,
) -> WalletResponseBase | WalletResponse | None:
"""
The `transfer_to_vesting` operation can be only done by sending the Asset.Test type.
That's why the method in place of `vests` accepts the Asset.Test and numeric types instead of Asset.Vest.
"""
if isinstance(hives, float | int):
hives = Asset.Test(hives)
if isinstance(vests, float | int):
vests = Asset.Test(vests)
if isinstance(hbds, float | int):
hbds = Asset.Tbd(hbds)
account = Account(name)
with self.in_single_transaction() as transaction:
self.api.create_account_with_keys(
creator,
account.name,
"{}",
account.public_key,
account.public_key,
account.public_key,
account.public_key,
)
if hives is not None:
self.api.transfer(creator, name, hives, "memo")
if vests is not None:
self.api.transfer_to_vesting(creator, name, vests)
if hbds is not None:
self.api.transfer(creator, name, hbds, "memo")
self.api.import_key(account.private_key)
return transaction.get_response()
def create_accounts(
self, number_of_accounts: int, name_base: str = "account", *, secret: str = "secret", import_keys: bool = True
) -> list[Account]:
assert self.__beekeeper is not None, "Beekeeper not exist"
max_num_of_accounts_in_single_transaction = 500
if number_of_accounts <= max_num_of_accounts_in_single_transaction:
accounts = Account.create_multiple(number_of_accounts, name_base, secret=secret)
with self.in_single_transaction():
for account in accounts:
self.api.create_account("initminer", account.name, "{}")
if import_keys:
self.api.import_keys([account.private_key for account in accounts])
return accounts
return create_accounts(
beekeeper=self.__beekeeper,
node=self._force_connected_node,
beekeeper_wallet_name=self.name,
beekeeper_wallet_password=DEFAULT_PASSWORD,
number_of_accounts=number_of_accounts,
import_keys=import_keys,
name_base=name_base,
secret=secret,
)
def list_accounts(self) -> list[str]:
next_account = ""
all_accounts = []
while True:
result = self.api.list_accounts(next_account, 1000)
if len(result) == 1:
all_accounts.extend(result)
break
all_accounts.extend(result[:-1])
next_account = result[-1]
return all_accounts
def __generate_transaction_template(
self,
node: AnyNode,
) -> SimpleTransaction:
gdpo = node.api.database.get_dynamic_global_properties()
block_id = gdpo.head_block_id
# set header
tapos_data = wax_helpy.calculate_tapos_data(block_id)
ref_block_num = tapos_data.ref_block_num
ref_block_prefix = tapos_data.ref_block_prefix
assert ref_block_num >= 0, f"ref_block_num value `{ref_block_num}` is invalid`"
assert ref_block_prefix > 0, f"ref_block_prefix value `{ref_block_prefix}` is invalid`"
return SimpleTransaction(
ref_block_num=HiveInt(ref_block_num),
ref_block_prefix=HiveInt(ref_block_prefix),
expiration=gdpo.time + self._transaction_expiration_offset,
extensions=[],
signatures=[],
operations=[],
)
def _prepare_and_send_transaction(
self, operations: list[AnyOperation], blocking: bool, broadcast: bool
) -> WalletResponseBase | WalletResponse:
transaction = self.__generate_transaction_template(self._force_connected_node)
for operation in operations:
transaction.add_operation(operation)
transaction = self.complex_transaction_sign(transaction)
return self.broadcast_transaction(transaction, blocking, broadcast)
def broadcast_transaction(
self, transaction: SimpleTransaction, blocking: bool, broadcast: bool
) -> WalletResponseBase | WalletResponse:
if broadcast:
if blocking:
with self._force_connected_node.restore_settings():
self._force_connected_node.settings.timeout = timedelta(hours=1)
broadcast_response = self._force_connected_node.api.wallet_bridge.broadcast_transaction_synchronous(
transaction
)
return WalletResponse(
transaction_id=(
wax_helpy.calculate_transaction_id(transaction=transaction)
if self._transaction_serialization == "hf26"
else wax_helpy.calculate_legacy_transaction_id(transaction=transaction)
),
block_num=broadcast_response.block_num,
transaction_num=broadcast_response.trx_num,
rc_cost=broadcast_response.rc_cost,
ref_block_num=transaction.ref_block_num,
ref_block_prefix=transaction.ref_block_prefix,
expiration=transaction.expiration,
extensions=transaction.extensions,
signatures=transaction.signatures,
operations=transaction.operations,
)
self._force_connected_node.api.wallet_bridge.broadcast_transaction(transaction)
return WalletResponseBase(
transaction_id=(
wax_helpy.calculate_transaction_id(transaction=transaction)
if self._transaction_serialization == "hf26"
else wax_helpy.calculate_legacy_transaction_id(transaction=transaction)
),
ref_block_num=transaction.ref_block_num,
ref_block_prefix=transaction.ref_block_prefix,
expiration=transaction.expiration,
extensions=transaction.extensions,
signatures=transaction.signatures,
operations=transaction.operations,
)
def complex_transaction_sign(self, transaction: SimpleTransaction) -> SimpleTransaction:
sig_digest = self.calculate_sig_digest(transaction)
sign_keys, retrived_authorities = self.import_required_keys(transaction)
signed_transaction = self.sign_transaction(transaction, sig_digest, sign_keys)
if self._use_authority != {}:
return signed_transaction
reduced_sign_keys = self.reduce_signatures(signed_transaction, sign_keys, retrived_authorities)
return self.sign_transaction(transaction, sig_digest, reduced_sign_keys)
def calculate_sig_digest(self, transaction: SimpleTransaction) -> str:
chain_id = self.__get_chain_id()
return (
wax_helpy.calculate_sig_digest(transaction, chain_id)
if self._transaction_serialization == "hf26"
else wax_helpy.calculate_legacy_sig_digest(transaction, chain_id)
)
def sign_transaction(
self, transaction: SimpleTransaction, sig_digest: str, keys_to_sign_with: list[str] | list[Any]
) -> SimpleTransaction:
assert self._beekeeper_wallet is not None
for key in keys_to_sign_with:
signature = self._beekeeper_wallet.sign_digest(sig_digest=sig_digest, key=key)
transaction.signatures.append(signature)
transaction.signatures = list(set(transaction.signatures))
wax_helpy.validate_transaction(transaction)
return transaction
def reduce_signatures(
self,
transaction: SimpleTransaction,
keys_to_sign_with: list[PublicKey],
retrived_authorities: dict[bytes, wax.python_authorities],
) -> list[str] | list[Any]:
def retrieve_witness_key(wittnes_name: bytes) -> bytes:
get_witness = self._force_connected_node.api.wallet_bridge.get_witness(wittnes_name.decode())
assert get_witness is not None
return get_witness.signing_key.encode()
return wax_helpy.minimize_required_signatures(
transaction,
chain_id=self.__get_chain_id(),
available_keys=keys_to_sign_with,
authorities_map=retrived_authorities,
get_witness_key=retrieve_witness_key,
)
def import_required_keys(
self, transaction: SimpleTransaction
) -> tuple[list[PublicKey], dict[bytes, wax.python_authorities]]:
def list_to_dict(list_: list[Any]) -> dict[bytes, int]:
result: dict[bytes, int] = {}
for i in list_:
result[i[0].encode()] = i[1]
return result
def to_python_authority(account_authority: Authority) -> wax.python_authority:
return wax.python_authority(
weight_threshold=account_authority.weight_threshold,
account_auths=list_to_dict(account_authority.account_auths),
key_auths=list_to_dict(account_authority.key_auths),
)
def to_python_authorities(
account_authorities: AccountSchema[AssetHiveHF26, AssetHbdHF26, AssetVestsHF26]
) -> wax.python_authorities:
return wax.python_authorities(
active=to_python_authority(account_authorities.active),
owner=to_python_authority(account_authorities.owner),
posting=to_python_authority(account_authorities.posting),
)
retrived_authorities: dict[bytes, wax.python_authorities] = {}
def retrieve_authorities(account_names: list[bytes]) -> dict[bytes, wax.python_authorities]:
accounts = self._force_connected_node.api.wallet_bridge.get_accounts(
[account_name.decode() for account_name in account_names]
)
retrived_authoritity = {acc.name.encode(): to_python_authorities(acc) for acc in accounts}
retrived_authorities.update(retrived_authoritity)
return retrived_authoritity
keys_for_signing = wax_helpy.collect_signing_keys(transaction, retrieve_authorities)
if self._use_authority != {}:
account_name = next(iter(self._use_authority.keys()))
authority_account = self._force_connected_node.api.database.find_accounts(accounts=[account_name])
return [
getattr(authority_account.accounts[0], self._use_authority[account_name]).key_auths[0][0]
], retrived_authorities
imported_keys_in_beekeeper = set(self.beekeeper_wallet.public_keys)
sign_keys = list(set(keys_for_signing) & set(imported_keys_in_beekeeper))
validated_sign_keys = [PublicKey(PublicKey.validate(key)) for key in sign_keys]
return validated_sign_keys, retrived_authorities
def in_single_transaction(
self, *, broadcast: None | bool = True, blocking: bool = True
) -> SingleTransactionContext:
if broadcast is None:
broadcast = True
return SingleTransactionContext(self, broadcast=broadcast, blocking=blocking)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment