diff --git a/clive/__private/cli/commands/abc/memo_command.py b/clive/__private/cli/commands/abc/memo_command.py new file mode 100644 index 0000000000000000000000000000000000000000..9e0c18ecbe80ce01b3f5e9ba2a6d4b8c0d0ce622 --- /dev/null +++ b/clive/__private/cli/commands/abc/memo_command.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import cast + +from clive.__private.cli.commands.abc.world_based_command import WorldBasedCommand +from clive.__private.cli.exceptions import CLIPrivateKeyInMemoValidationError +from clive.__private.validators.private_key_in_memo_validator import PrivateKeyInMemoValidator + + +@dataclass(kw_only=True) +class MemoCommand(WorldBasedCommand, ABC): + memo: str | None + + @property + def ensure_memo(self) -> str: + memo = self.memo + assert self.is_option_given(memo) + return cast("str", memo) + + async def fetch_data(self) -> None: + await super().fetch_data() + await self.world.commands.update_node_data(accounts=self.profile.accounts.tracked) + + async def validate_inside_context_manager(self) -> None: + self._validate_private_key_in_memo(self.ensure_memo) + await super().validate_inside_context_manager() + + def _validate_private_key_in_memo(self, memo_value: str) -> None: + memo_validator = PrivateKeyInMemoValidator(self.world) + result = memo_validator.validate(memo_value=memo_value) + if not result.is_valid: + raise CLIPrivateKeyInMemoValidationError(PrivateKeyInMemoValidator.PRIVATE_KEY_IN_MEMO_FAILURE_DESCRIPTION) diff --git a/clive/__private/cli/commands/process/process_deposit.py b/clive/__private/cli/commands/process/process_deposit.py index 75f96093d80fdb7766a44cd6aafcc6f2227a46e5..5068ebba8d9d7b6d33dcd2fae13cf4443d395522 100644 --- a/clive/__private/cli/commands/process/process_deposit.py +++ b/clive/__private/cli/commands/process/process_deposit.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING +from clive.__private.cli.commands.abc.memo_command import MemoCommand from clive.__private.cli.commands.abc.operation_command import OperationCommand from clive.__private.models.schemas import TransferToSavingsOperation @@ -12,16 +13,15 @@ if TYPE_CHECKING: @dataclass(kw_only=True) -class ProcessDeposit(OperationCommand): +class ProcessDeposit(OperationCommand, MemoCommand): from_account: str to_account: str amount: Asset.LiquidT - memo: str async def _create_operations(self) -> ComposeTransaction: yield TransferToSavingsOperation( from_=self.from_account, to=self.to_account, amount=self.amount, - memo=self.memo, + memo=self.ensure_memo, ) diff --git a/clive/__private/cli/commands/process/process_transfer_schedule.py b/clive/__private/cli/commands/process/process_transfer_schedule.py index 1e8ea257cc0dba4de6c69577914a9acfc1e7d0d3..2c89b40081a886380e2a1082a12a15e9248eb8a1 100644 --- a/clive/__private/cli/commands/process/process_transfer_schedule.py +++ b/clive/__private/cli/commands/process/process_transfer_schedule.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, field from datetime import timedelta from typing import TYPE_CHECKING, Any +from clive.__private.cli.commands.abc.memo_command import MemoCommand from clive.__private.cli.commands.abc.operation_command import OperationCommand from clive.__private.cli.exceptions import ( ProcessTransferScheduleAlreadyExistsError, @@ -69,6 +70,7 @@ class _ProcessTransferScheduleCommon(OperationCommand, ABC): return scheduled_transfer.to == self.to and scheduled_transfer.pair_id == pair_id async def fetch_data(self) -> None: + await super().fetch_data() self.account_scheduled_transfers_data = await self.fetch_scheduled_transfers_for_current_account() async def fetch_scheduled_transfers_for_current_account(self) -> AccountScheduledTransferData: @@ -95,9 +97,8 @@ class _ProcessTransferScheduleCommon(OperationCommand, ABC): @dataclass(kw_only=True) -class _ProcessTransferScheduleCreateModifyCommon(_ProcessTransferScheduleCommon): +class _ProcessTransferScheduleCreateModifyCommon(_ProcessTransferScheduleCommon, MemoCommand): amount: Asset.LiquidT | None - memo: str | None frequency: timedelta | None repeat: int | None @@ -124,13 +125,12 @@ class _ProcessTransferScheduleCreateModifyCommon(_ProcessTransferScheduleCommon) async def _create_operations(self) -> ComposeTransaction: assert self.repeat is not None, "Value of repeat is None." - assert self.memo is not None, "Value of memo is None." assert self.amount is not None, "Value of amount is None." yield RecurrentTransferOperation( from_=self.from_account, to=self.to, amount=self.amount, - memo=self.memo, + memo=self.ensure_memo, recurrence=timedelta_to_int_hours(self.frequency_ensure), executions=self.repeat, extensions=self._create_recurrent_transfer_pair_id_extension(), @@ -140,7 +140,6 @@ class _ProcessTransferScheduleCreateModifyCommon(_ProcessTransferScheduleCommon) @dataclass(kw_only=True) class ProcessTransferScheduleCreate(_ProcessTransferScheduleCreateModifyCommon): amount: Asset.LiquidT - memo: str frequency: timedelta repeat: int @@ -157,7 +156,7 @@ class ProcessTransferScheduleCreate(_ProcessTransferScheduleCreateModifyCommon): @dataclass(kw_only=True) class ProcessTransferScheduleModify(_ProcessTransferScheduleCreateModifyCommon): async def fetch_data(self) -> None: - self.account_scheduled_transfers_data = await self.fetch_scheduled_transfers_for_current_account() + await super().fetch_data() if self.scheduled_transfer: self.amount = self.amount if self.amount is not None else self.scheduled_transfer.amount self.repeat = self.repeat if self.repeat is not None else self.scheduled_transfer.remaining_executions diff --git a/clive/__private/cli/commands/process/process_withdrawal.py b/clive/__private/cli/commands/process/process_withdrawal.py index 8f60c8978d86925e0cf6b7fc94bde1573022d3cb..874dc10bf8cc7b13420a4da9df7ec299495311b7 100644 --- a/clive/__private/cli/commands/process/process_withdrawal.py +++ b/clive/__private/cli/commands/process/process_withdrawal.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING +from clive.__private.cli.commands.abc.memo_command import MemoCommand from clive.__private.cli.commands.abc.operation_command import OperationCommand from clive.__private.models.schemas import TransferFromSavingsOperation @@ -13,23 +14,29 @@ if TYPE_CHECKING: @dataclass(kw_only=True) -class ProcessWithdrawal(OperationCommand): +class ProcessWithdrawal(OperationCommand, MemoCommand): from_account: str request_id: int | None to_account: str amount: Asset.LiquidT - memo: str - async def _create_operations(self) -> ComposeTransaction: + @property + def request_id_ensure(self) -> int: + assert self.request_id is not None, "request_id should be set at this point" + return self.request_id + + async def fetch_data(self) -> None: + await super().fetch_data() if self.request_id is None: wrapper = await self.world.commands.retrieve_savings_data(account_name=self.profile.accounts.working.name) savings_data: SavingsData = wrapper.result_or_raise self.request_id = savings_data.create_request_id() + async def _create_operations(self) -> ComposeTransaction: yield TransferFromSavingsOperation( from_=self.from_account, - request_id=self.request_id, + request_id=self.request_id_ensure, to=self.to_account, amount=self.amount, - memo=self.memo, + memo=self.ensure_memo, ) diff --git a/clive/__private/cli/commands/process/transfer.py b/clive/__private/cli/commands/process/transfer.py index 5302f5287e1364ae52d3e6a424f24a94c7b13da9..dd93c57f417ce3ec5369a83cc0b6efa234282526 100644 --- a/clive/__private/cli/commands/process/transfer.py +++ b/clive/__private/cli/commands/process/transfer.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING +from clive.__private.cli.commands.abc.memo_command import MemoCommand from clive.__private.cli.commands.abc.operation_command import OperationCommand from clive.__private.models.schemas import TransferOperation @@ -12,16 +13,15 @@ if TYPE_CHECKING: @dataclass(kw_only=True) -class Transfer(OperationCommand): +class Transfer(OperationCommand, MemoCommand): from_account: str to: str amount: Asset.LiquidT - memo: str async def _create_operations(self) -> ComposeTransaction: yield TransferOperation( from_=self.from_account, to=self.to, amount=self.amount, - memo=self.memo, + memo=self.ensure_memo, ) diff --git a/clive/__private/cli/exceptions.py b/clive/__private/cli/exceptions.py index 72942aa03bc4c6a96bc5ca6ff1eaf86d3a923937..080bd7ecf92414ed920f28e73b8107d9fb8223b8 100644 --- a/clive/__private/cli/exceptions.py +++ b/clive/__private/cli/exceptions.py @@ -545,3 +545,16 @@ class CLIChangeRecoveryAccountValidationError(CLIPrettyError): def __init__(self, name: str, reason: str) -> None: message = f"Account `{name}` can't be used. Reason: {reason}" super().__init__(message, errno.EINVAL) + + +class CLIPrivateKeyInMemoValidationError(CLIPrettyError): + """ + Raise when memo validation detects a private key. + + Args: + reason: Description of the validation failure. + """ + + def __init__(self, reason: str) -> None: + message = f"Memo validation failed: {reason}" + super().__init__(message, errno.EINVAL) diff --git a/clive/__private/core/authority/authority.py b/clive/__private/core/authority/authority.py index 9086fd3b4b7cb066e67ca1fa7d507eea888e07df..9a07a556737579f31972a7e009746d22673c83fb 100644 --- a/clive/__private/core/authority/authority.py +++ b/clive/__private/core/authority/authority.py @@ -7,6 +7,7 @@ from clive.__private.core.authority.roles import AuthorityRoleMemo, AuthorityRol from clive.__private.core.str_utils import Matchable from clive.__private.core.wax_operation_wrapper import WaxOperationWrapper from clive.__private.models.schemas import AccountUpdate2Operation +from wax.models.authority import WaxAuthorities if TYPE_CHECKING: from clive.__private.core.authority.entries import ( @@ -56,6 +57,10 @@ class Authority(AuthorityEntriesHolder, Matchable): def memo_role(self) -> AuthorityRoleMemo: return self._memo_role + @property + def memo_key(self) -> str: + return self.memo_role.entry.value + @property def roles(self) -> list[AuthorityRoleRegular | AuthorityRoleMemo]: return [self.owner_role, self.active_role, self.posting_role, self.memo_role] @@ -81,6 +86,19 @@ class Authority(AuthorityEntriesHolder, Matchable): """ return any(role.is_matching_pattern(*patterns) for role in self.roles) + @property + def wax_authorities(self) -> WaxAuthorities: + """ + Convert authority data back to wax-compatible WaxAuthorities format. + + Includes regular (owner, active, and posting) authorities (excludes memo). + """ + return WaxAuthorities( + owner=self.operation.categories.hive.authorities.owner.value, + active=self.operation.categories.hive.authorities.active.value, + posting=self.operation.categories.hive.authorities.posting.value, + ) + def to_schemas(self, api: IHiveChainInterface) -> AccountUpdate2Operation: operation_wrapper = WaxOperationWrapper(self._operation) return operation_wrapper.to_schemas(wax_interface=api, expect_type=AccountUpdate2Operation) diff --git a/clive/__private/ui/widgets/inputs/memo_input.py b/clive/__private/ui/widgets/inputs/memo_input.py index 32058ed115790c8baf6bb150f086818425b5b4fe..c427c8962177618c080cc444e7304aaa6615d999 100644 --- a/clive/__private/ui/widgets/inputs/memo_input.py +++ b/clive/__private/ui/widgets/inputs/memo_input.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING from clive.__private.core.constants.tui.placeholders import MEMO_PLACEHOLDER from clive.__private.ui.widgets.inputs.text_input import TextInput +from clive.__private.validators.private_key_in_memo_validator import PrivateKeyInMemoValidator if TYPE_CHECKING: from collections.abc import Iterable @@ -43,7 +44,7 @@ class MemoInput(TextInput): show_invalid_reasons=show_invalid_reasons, required=required, suggester=suggester, - validators=validators, # TODO: Add memo validator e.g. checking for any private keys pasted + validators=validators or [PrivateKeyInMemoValidator(self.world)], validate_on=validate_on, valid_empty=valid_empty, id=id, diff --git a/clive/__private/validators/private_key_in_memo_validator.py b/clive/__private/validators/private_key_in_memo_validator.py new file mode 100644 index 0000000000000000000000000000000000000000..5bd039c7ac18691531a2fddb1ad1eee2cae6cd01 --- /dev/null +++ b/clive/__private/validators/private_key_in_memo_validator.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +from textual.validation import Validator + +from clive.__private.validators.private_key_validation_tools import contains_private_key + +if TYPE_CHECKING: + from textual.validation import ValidationResult + + from clive.__private.core.world import World + + +class PrivateKeyInMemoValidator(Validator): + PRIVATE_KEY_IN_MEMO_FAILURE_DESCRIPTION: Final[str] = "Private key detected" + + def __init__(self, world: World) -> None: + super().__init__() + self._world = world + + def validate(self, memo_value: str) -> ValidationResult: + if contains_private_key(memo_value, self._world): + return self.failure(self.PRIVATE_KEY_IN_MEMO_FAILURE_DESCRIPTION, memo_value) + return self.success() diff --git a/clive/__private/validators/private_key_validation_tools.py b/clive/__private/validators/private_key_validation_tools.py new file mode 100644 index 0000000000000000000000000000000000000000..e7a0c0b14954425ede632459413172b7e5bd9751 --- /dev/null +++ b/clive/__private/validators/private_key_validation_tools.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from wax.exceptions.chain_errors import PrivateKeyDetectedInMemoError + +if TYPE_CHECKING: + from clive.__private.core.world import World + + +def contains_private_key(content: str, world: World) -> bool: + """ + Check if the given content contains a private key for any of the tracked accounts. + + Args: + content: The text content to scan for private keys. + world: The world object containing wax_interface and profile with tracked accounts. + + Returns: + True if a private key is detected in the content, False otherwise. + """ + for account in world.profile.accounts.tracked: + authority = account.data.authority + try: + world.wax_interface.scan_text_for_matching_private_keys( + content=content, + account=account.name, + account_authorities=authority.wax_authorities, + memo_key=authority.memo_key, + ) + except PrivateKeyDetectedInMemoError: + return True + return False diff --git a/tests/functional/cli/test_private_key_in_memo_validation.py b/tests/functional/cli/test_private_key_in_memo_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..e790e6b961a9d78e5c4a11612e57d709c16eb895 --- /dev/null +++ b/tests/functional/cli/test_private_key_in_memo_validation.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +import pytest +import test_tools as tt + +from clive.__private.cli.exceptions import CLIPrivateKeyInMemoValidationError +from clive.__private.validators.private_key_in_memo_validator import PrivateKeyInMemoValidator +from clive_local_tools.cli.exceptions import CLITestCommandError +from clive_local_tools.data.constants import WORKING_ACCOUNT_KEY_ALIAS +from clive_local_tools.helpers import get_formatted_error_message +from clive_local_tools.testnet_block_log.constants import ( + WATCHED_ACCOUNTS_DATA, + WORKING_ACCOUNT_DATA, + WORKING_ACCOUNT_NAME, +) + +if TYPE_CHECKING: + from clive_local_tools.cli.cli_tester import CLITester + +RECEIVER: Final[str] = WATCHED_ACCOUNTS_DATA[0].account.name +NORMAL_MEMO: Final[str] = "This is a normal memo without any private keys" +EMPTY_MEMO: Final[str] = "" +PRIVATE_KEY_ERROR_MATCH: Final[str] = get_formatted_error_message( + CLIPrivateKeyInMemoValidationError(PrivateKeyInMemoValidator.PRIVATE_KEY_IN_MEMO_FAILURE_DESCRIPTION) +) + + +async def test_transfer_with_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that transfer with private key in memo is rejected.""" + # ARRANGE + private_key = WORKING_ACCOUNT_DATA.account.private_key + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=private_key, + ) + + +async def test_transfer_with_watched_account_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that transfer with watched account's private key in memo is rejected.""" + # ARRANGE + watched_account_private_key = WATCHED_ACCOUNTS_DATA[0].account.private_key + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=watched_account_private_key, + ) + + +async def test_transfer_with_normal_memo_succeeds( + cli_tester: CLITester, +) -> None: + """Check that transfer with normal memo text succeeds.""" + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=NORMAL_MEMO, + ) + + +async def test_transfer_with_empty_memo_succeeds( + cli_tester: CLITester, +) -> None: + """Check that transfer with empty memo succeeds.""" + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=EMPTY_MEMO, + ) + + +async def test_transfer_with_key_like_but_invalid_memo_succeeds( + cli_tester: CLITester, +) -> None: + """Check that transfer with key-like but invalid string succeeds (no false positive).""" + # ARRANGE - generate valid key format but corrupt checksum by changing last character + valid_key = str(WORKING_ACCOUNT_DATA.account.private_key) + last_char = valid_key[-1] + corrupted_char = "A" if last_char != "A" else "B" + invalid_key_with_bad_checksum = valid_key[:-1] + corrupted_char + + # ACT - should succeed because checksum is invalid + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=invalid_key_with_bad_checksum, + ) + + +async def test_transfer_with_public_key_in_memo_succeeds( + cli_tester: CLITester, +) -> None: + """Check that transfer with public key in memo succeeds (public keys are safe to share).""" + # ARRANGE + public_key = WORKING_ACCOUNT_DATA.account.public_key + + # ACT + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=public_key, + ) + + +async def test_transfer_with_partial_private_key_succeeds( + cli_tester: CLITester, +) -> None: + """Check that transfer with partial (truncated) private key succeeds.""" + # ARRANGE - only first half of the private key + private_key = WORKING_ACCOUNT_DATA.account.private_key + partial_key = private_key[: len(private_key) // 2] + + # ACT + cli_tester.process_transfer( + from_=WORKING_ACCOUNT_NAME, + amount=tt.Asset.Hive(1), + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=partial_key, + ) + + +async def test_savings_deposit_with_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that savings deposit with private key in memo is rejected.""" + # ARRANGE + private_key = WORKING_ACCOUNT_DATA.account.private_key + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_savings_deposit( + amount=tt.Asset.Hive(1), + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=private_key, + ) + + +async def test_savings_withdrawal_with_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that savings withdrawal with private key in memo is rejected.""" + # ARRANGE + private_key = WORKING_ACCOUNT_DATA.account.private_key + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_savings_withdrawal( + amount=tt.Asset.Hive(1), + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=private_key, + ) + + +async def test_transfer_schedule_create_with_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that creating scheduled transfer with private key in memo is rejected.""" + # ARRANGE + private_key = WORKING_ACCOUNT_DATA.account.private_key + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_transfer_schedule_create( + to=RECEIVER, + amount=tt.Asset.Hive(1), + frequency="24h", + repeat=2, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=private_key, + ) + + +async def test_transfer_schedule_modify_with_private_key_in_memo_is_rejected( + cli_tester: CLITester, +) -> None: + """Check that modifying scheduled transfer with private key in memo is rejected.""" + # ARRANGE + private_key = WORKING_ACCOUNT_DATA.account.private_key + # First create a valid scheduled transfer + cli_tester.process_transfer_schedule_create( + to=RECEIVER, + amount=tt.Asset.Hive(1), + frequency="24h", + repeat=2, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=NORMAL_MEMO, + ) + + # ACT & ASSERT + with pytest.raises(CLITestCommandError, match=PRIVATE_KEY_ERROR_MATCH): + cli_tester.process_transfer_schedule_modify( + to=RECEIVER, + sign_with=WORKING_ACCOUNT_KEY_ALIAS, + memo=private_key, + )