Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/clive
1 result
Show changes
Commits on Source (10)
Showing
with 94 additions and 60 deletions
......@@ -85,7 +85,7 @@ class PerformActionsOnTransactionCommand(WorldBasedCommand, ABC):
return None
try:
return self.world.profile_data.working_account.keys.get(self.sign)
return self.world.profile_data.keys.get(self.sign)
except KeyNotFoundError:
raise CLIPrettyError(
f"Key `{self.sign}` was not found in the working account keys.", errno.ENOENT
......
......@@ -52,7 +52,7 @@ class AddKey(WorldBasedCommand):
if not profile_data.is_working_account_set():
raise CLIWorkingAccountIsNotSetError(profile_data)
key_manager = profile_data.working_account.keys
key_manager = profile_data.keys
alias_result = PublicKeyAliasValidator(key_manager, validate_like_adding_new=True).validate(
self.get_actual_alias()
)
......@@ -68,7 +68,7 @@ class AddKey(WorldBasedCommand):
profile_data = self.world.profile_data
typer.echo("Importing key...")
profile_data.working_account.keys.add_to_import(self.private_key_aliased)
profile_data.keys.add_to_import(self.private_key_aliased)
try:
await self.world.commands.activate(password=self.password)
......@@ -99,7 +99,7 @@ class RemoveKey(WorldBasedCommand):
typer.echo(f"Removing a key aliased with `{self.alias}`...")
public_key = profile_data.working_account.keys.get(self.alias)
public_key = profile_data.keys.get(self.alias)
self.__remove_key_association_from_the_profile(public_key)
......@@ -114,7 +114,7 @@ class RemoveKey(WorldBasedCommand):
typer.echo(message)
def __remove_key_association_from_the_profile(self, key: PublicKeyAliased) -> None:
self.world.profile_data.working_account.keys.remove(key)
self.world.profile_data.keys.remove(key)
async def __remove_key_from_the_beekeeper(self, key: PublicKeyAliased) -> None:
activate_wrapper = await self.world.commands.activate(password=self.password)
......
......@@ -14,5 +14,5 @@ class ShowKeys(ProfileBasedCommand):
if not self.profile_data.is_working_account_set():
raise CLIWorkingAccountIsNotSetError(self.profile_data)
public_keys = list(self.profile_data.working_account.keys)
public_keys = list(self.profile_data.keys)
typer.echo(f"{profile_name}, your keys are:\n{public_keys}")
......@@ -33,4 +33,4 @@ class SyncDataWithBeekeeper(CommandInActive, Command):
beekeeper=self.beekeeper,
).execute_with_result()
await self.profile_data.working_account.keys.import_pending_to_beekeeper(import_key)
await self.profile_data.keys.import_pending_to_beekeeper(import_key)
from __future__ import annotations
from collections.abc import Awaitable, Callable, Iterator, Sequence
from collections.abc import Awaitable, Callable, Iterator
from typing import Iterable
from clive.__private.core.keys.keys import PrivateKey, PrivateKeyAliased, PublicKey, PublicKeyAliased
from clive.__private.core.keys.keys import KeyAliased, PrivateKey, PrivateKeyAliased, PublicKey, PublicKeyAliased
from clive.__private.logger import logger
from clive.exceptions import CliveError
ImportCallbackT = Callable[[PrivateKeyAliased], Awaitable[PublicKeyAliased]]
class KeyAliasAlreadyInUseError(CliveError):
class KeyManagerError(CliveError):
pass
class KeyNotFoundError(CliveError):
pass
class KeyAliasAlreadyInUseError(KeyManagerError):
def __init__(self, alias: str) -> None:
self.alias = alias
self.message = f"Alias is already in use: {alias}"
super().__init__(self.message)
class KeyNotFoundError(KeyManagerError):
def __init__(self, alias: str | None = None) -> None:
self.alias = alias
self.message = f"Key with alias '{alias}' not found." if alias is not None else "Key not found."
super().__init__(self.message)
class KeyManager:
"""A container-like object, that manages a number of keys, which you iterate over to see all the public keys."""
def __init__(self) -> None:
self.__keys: list[PublicKeyAliased] = []
self.__keys_to_import: list[PrivateKeyAliased] = []
self.__keys: set[PublicKeyAliased] = set()
self.__keys_to_import: set[PrivateKeyAliased] = set()
def __iter__(self) -> Iterator[PublicKeyAliased]:
return iter(sorted(self.__keys, key=lambda key: key.alias))
return iter(self._sorted_keys())
def __reversed__(self) -> Iterator[PublicKeyAliased]:
return iter(sorted(self.__keys, key=lambda key: key.alias, reverse=True))
return iter(self._sorted_keys(reverse=True))
def __len__(self) -> int:
return len(self.__keys)
......@@ -40,46 +51,54 @@ class KeyManager:
"""Check if a key is in the key manager. Possible types are determined by the __eq__ of keys."""
return key in self.__keys
@property
def every(self) -> list[PublicKeyAliased]:
"""A *copy* of the keys."""
return list(iter(self))
@property
def first(self) -> PublicKeyAliased:
try:
return next(iter(self))
except StopIteration:
raise KeyNotFoundError("No keys found.") from None
except StopIteration as error:
raise KeyNotFoundError from error
def is_alias_available(self, alias: str) -> bool:
keys_to_check = self.__keys + self.__keys_to_import
known_aliases = [key.alias for key in keys_to_check]
return alias not in known_aliases
return self._is_public_alias_available(alias) and self._is_key_to_import_alias_available(alias)
def get(self, alias: str) -> PublicKeyAliased:
for key in self.__keys:
if key.alias == alias:
return key
raise KeyNotFoundError(f"Key with alias `{alias}` not found.")
raise KeyNotFoundError(alias)
def add(self, *keys: PublicKeyAliased) -> None:
for key in keys:
self.__assert_no_alias_conflict(key.alias)
self.__keys.append(key)
self._assert_no_alias_conflict(key.alias)
self.__keys.add(key)
def remove(self, *keys: PublicKeyAliased) -> None:
"""Remove a key alias from the Clive's key manager. Still remains in the beekeeper."""
for key in keys:
self.__keys.remove(key)
def rename(self, old_alias: str, new_alias: str) -> None:
"""Rename a key alias."""
self._assert_no_alias_conflict(new_alias)
for key in self.__keys:
if key.alias == old_alias:
self.__keys.remove(key)
self.__keys.add(key.with_alias(new_alias))
return
raise KeyNotFoundError(old_alias)
def add_to_import(self, *keys: PrivateKeyAliased) -> None:
for key in keys:
self.__assert_no_alias_conflict(key.alias)
self.__keys_to_import.append(key)
self._assert_no_alias_conflict(key.alias)
self.__keys_to_import.add(key)
def set_to_import(self, keys: Sequence[PrivateKeyAliased]) -> None:
self.__keys_to_import = list(keys)
def set_to_import(self, keys: Iterable[PrivateKeyAliased]) -> None:
keys_to_import = list(keys)
for key in keys_to_import:
# since "keys to import" will be new (replaced), we should check for conflicts with the public keys only
self._assert_no_public_alias_conflict(key.alias)
self.__keys_to_import = set(keys_to_import)
async def import_pending_to_beekeeper(self, import_callback: ImportCallbackT) -> None:
imported_keys = [await import_callback(key) for key in self.__keys_to_import]
......@@ -87,16 +106,23 @@ class KeyManager:
self.add(*imported_keys)
logger.debug("Imported all pending keys to beekeeper.")
def __assert_no_alias_conflict(self, alias: str) -> None:
if not self.is_alias_available(alias):
raise KeyAliasAlreadyInUseError(f"Alias '{alias}' is already in use.")
def _sorted_keys(self, *, reverse: bool = False) -> list[PublicKeyAliased]:
return sorted(self.__keys, key=lambda key: key.alias, reverse=reverse)
def rename(self, old_alias: str, new_alias: str) -> None:
"""Rename a key alias."""
self.__assert_no_alias_conflict(new_alias)
def _is_public_alias_available(self, alias: str) -> bool:
return self._is_alias_available(alias, self.__keys)
for i, key in enumerate(self.__keys):
if key.alias == old_alias:
self.__keys[i] = key.with_alias(new_alias)
return
raise KeyNotFoundError(f"Key with alias '{old_alias}' not found.")
def _is_key_to_import_alias_available(self, alias: str) -> bool:
return self._is_alias_available(alias, self.__keys_to_import)
def _is_alias_available(self, alias: str, keys: Iterable[KeyAliased]) -> bool:
known_aliases = [key.alias for key in keys]
return alias not in known_aliases
def _assert_no_alias_conflict(self, alias: str) -> None:
if not self.is_alias_available(alias):
raise KeyAliasAlreadyInUseError(alias)
def _assert_no_public_alias_conflict(self, alias: str) -> None:
if not self._is_public_alias_available(alias):
raise KeyAliasAlreadyInUseError(alias)
......@@ -60,6 +60,9 @@ class KeyAliased(Key, ABC):
return self.alias == other.alias and super().__eq__(other)
return super().__eq__(other)
def __hash__(self) -> int:
return hash(self.alias)
@abstractmethod
def without_alias(self) -> Key:
"""Return a new instance of the key without the alias."""
......@@ -87,6 +90,9 @@ class PublicKeyAliased(KeyAliased, PublicKey):
def __eq__(self, other: object) -> bool:
return super().__eq__(other)
def __hash__(self) -> int:
return super().__hash__()
def without_alias(self) -> PublicKey:
return PublicKey(value=self.value)
......@@ -179,6 +185,9 @@ class PrivateKeyAliased(KeyAliased, PrivateKey):
def __eq__(self, other: object) -> bool:
return super().__eq__(other)
def __hash__(self) -> int:
return super().__hash__()
@overload # type: ignore[override]
def calculate_public_key(self, *, with_alias: Literal[False]) -> PublicKey: ...
......
......@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any, Final
from clive.__private import config
from clive.__private.config import settings
from clive.__private.core.clive_import import get_clive
from clive.__private.core.keys import KeyManager
from clive.__private.core.validate_schema_field import is_schema_field_valid
from clive.__private.logger import logger
from clive.__private.storage.accounts import Account, WorkingAccount
......@@ -118,6 +119,7 @@ class ProfileData(Context):
self.__chain_id = self.__default_chain_id()
self.cart = Cart()
self.keys = KeyManager()
if address := self.__get_secret_node_address():
self.__backup_node_addresses = [address]
......
......@@ -7,7 +7,6 @@ from typing import TYPE_CHECKING
from pydantic import ValidationError
from clive.__private.core.alarms.alarms_storage import AlarmsStorage
from clive.__private.core.keys import KeyManager
from clive.__private.core.validate_schema_field import validate_schema_field
from clive.exceptions import CliveError
from clive.models.aliased import AccountName
......@@ -116,7 +115,5 @@ class Account:
@dataclass
class WorkingAccount(Account):
keys: KeyManager = field(init=False, default_factory=KeyManager, compare=False)
def __hash__(self) -> int:
return super().__hash__()
......@@ -47,7 +47,7 @@ class EditKeyAlias(KeyAliasForm):
old_alias = self.public_key.alias
new_alias = self._key_alias_input.value_or_error
self.app.world.profile_data.working_account.keys.rename(old_alias, new_alias)
self.app.world.profile_data.keys.rename(old_alias, new_alias)
self.app.trigger_profile_data_watchers()
self.app.post_message_to_screen("ManageKeyAliases", self.Changed())
......
......@@ -65,7 +65,7 @@ class KeyAlias(CliveCheckerboardTableRow, CliveWidget):
if not result:
return
self.app.world.profile_data.working_account.keys.remove(self.__public_key)
self.app.world.profile_data.keys.remove(self.__public_key)
self.notify(f"Key alias `{self.__public_key.alias}` was removed.")
self.app.post_message_to_screen(ManageKeyAliases, self.Changed())
......@@ -91,7 +91,7 @@ class ManageKeyAliasesTable(CliveCheckerboardTable):
def create_static_rows(self) -> list[KeyAlias]:
key_aliases = []
for idx, key in enumerate(self.app.world.profile_data.working_account.keys):
for idx, key in enumerate(self.app.world.profile_data.keys):
key_aliases.append(KeyAlias(idx, key))
return key_aliases
......
......@@ -161,7 +161,7 @@ class NewKeyAlias(NewKeyAliasBase):
@CliveScreen.try_again_after_activation
@on(NewKeyAliasBase.Saved)
async def new_key_alias_base_saved(self, event: NewKeyAliasBase.Saved) -> None:
self.context.working_account.keys.set_to_import([event.private_key])
self.context.keys.set_to_import([event.private_key])
await self.app.world.commands.sync_data_with_beekeeper()
self.app.trigger_profile_data_watchers()
......@@ -201,7 +201,7 @@ class NewKeyAliasForm(NewKeyAliasBase, FormScreen[ProfileData]):
@on(NewKeyAliasBase.Saved)
def new_key_alias_base_saved(self, event: NewKeyAliasBase.Saved) -> None:
self.context.working_account.keys.set_to_import([event.private_key])
self.context.keys.set_to_import([event.private_key])
logger.debug("New private key is waiting to be imported...")
async def apply_and_validate(self) -> None:
......
......@@ -38,7 +38,7 @@ class KeyAliasForm(BaseScreen, Contextual[ProfileData], ABC):
self._key_alias_input = PublicKeyAliasInput(
value=self._default_key_alias_name(),
setting_key_alias=True,
key_manager=self.context.working_account.keys,
key_manager=self.context.keys,
required=self.IS_KEY_ALIAS_REQUIRED,
)
self._public_key_input = LabelizedInput(
......
......@@ -132,7 +132,7 @@ class OperationActionBindings(CliveWidget, AbstractClassMessagePump):
async def __fast_broadcast(self) -> None:
def get_key() -> PublicKeyAliased | None:
try:
return self.app.world.profile_data.working_account.keys.first
return self.app.world.profile_data.keys.first
except KeyNotFoundError:
self.notify("No keys found for the working account.", severity="error")
return None
......
......@@ -70,12 +70,12 @@ class SelectKey(SafeSelect[PublicKey], CliveWidget):
def __init__(self) -> None:
try:
first_value: PublicKey | NoSelection = self.app.world.profile_data.working_account.keys.first
first_value: PublicKey | NoSelection = self.app.world.profile_data.keys.first
except KeyNotFoundError:
first_value = Select.BLANK
super().__init__(
[(key.alias, key) for key in self.app.world.profile_data.working_account.keys],
[(key.alias, key) for key in self.app.world.profile_data.keys],
value=first_value,
empty_string="no private key found",
)
......
......@@ -43,7 +43,7 @@ class PublicKeyAliasInput(TextInput):
setting_key_alias: Whether setting public key alias or just getting key alias for other purpose.
key_manager: Key manager to use for validation. If not provided, the key manager from the world will be used.
"""
key_manager = key_manager if key_manager is not None else self.app.world.profile_data.working_account.keys
key_manager = key_manager if key_manager is not None else self.app.world.profile_data.keys
super().__init__(
title=title,
......
......@@ -78,7 +78,7 @@ async def prepare_profile(node: tt.RawNode) -> None:
).execute_with_result()
tt.logger.info(f"password for {WORKING_ACCOUNT_DATA.account.name} is: `{password}`")
world.profile_data.working_account.keys.add_to_import(
world.profile_data.keys.add_to_import(
PrivateKeyAliased(
value=WORKING_ACCOUNT_DATA.account.private_key, alias=f"{WORKING_ACCOUNT_DATA.account.name}_key"
)
......
......@@ -38,7 +38,7 @@ async def prepare_beekeeper_wallet(world: World) -> None:
password = (await world.commands.create_wallet(password=WORKING_ACCOUNT_PASSWORD)).result_or_raise
tt.logger.info(f"password for {WORKING_ACCOUNT_DATA.account.name} is: `{password}`")
world.profile_data.working_account.keys.add_to_import(
world.profile_data.keys.add_to_import(
PrivateKeyAliased(value=WORKING_ACCOUNT_DATA.account.private_key, alias=f"{WORKING_ACCOUNT_KEY_ALIAS}")
)
await world.commands.sync_data_with_beekeeper()
......
......@@ -57,7 +57,7 @@ async def prepare_beekeeper_wallet(prepare_profile: ProfileData, world: World) -
password = (await world.commands.create_wallet(password=WORKING_ACCOUNT_PASSWORD)).result_or_raise
tt.logger.info(f"password for {WORKING_ACCOUNT_DATA.account.name} is: `{password}`")
world.profile_data.working_account.keys.add_to_import(
world.profile_data.keys.add_to_import(
PrivateKeyAliased(value=WORKING_ACCOUNT_DATA.account.private_key, alias=WORKING_ACCOUNT_KEY_ALIAS)
)
await world.commands.sync_data_with_beekeeper()
......