Skip to content
Snippets Groups Projects
Commit 7b37eaaa authored by Krzysztof Mochocki's avatar Krzysztof Mochocki
Browse files

Add sanitize function

parent 99bd5725
No related branches found
No related tags found
1 merge request!73Clive integration related fixes
......@@ -19,6 +19,7 @@ from helpy._interfaces.stopwatch import Stopwatch
from helpy._interfaces.time import OffsetTimeControl, SpeedUpRateTimeControl, StartTimeControl, Time, TimeFormats
from helpy._interfaces.transaction_helper import Transaction
from helpy._interfaces.url import HttpUrl, P2PUrl, WsUrl
from helpy._sanitize import sanitize
__version__ = "0.0.0"
......@@ -46,6 +47,7 @@ __all__ = [
"Stopwatch",
"SelfContextAsync",
"SelfContextSync",
"sanitize",
"Time",
"TimeFormats",
"Transaction",
......
......@@ -112,11 +112,29 @@ class AbstractHandle(UniqueSettingsHolder[Settings], ABC, Generic[ApiT]):
return serialized_data
def __configure_logger(self) -> Logger:
return logger.bind(**self._logger_extras())
# credit for lazy=True: https://github.com/Delgan/loguru/issues/402#issuecomment-2028011786
return logger.opt(lazy=True).bind(**self._logger_extras())
def teardown(self) -> None:
self._overseer.teardown()
def _sanitize_data(self, data: Json | list[Json] | str) -> Json | list[Json] | str:
return data
def _log_request(self, request: str) -> None:
self.logger.trace(
"sending to `{}`: `{}`",
self.http_endpoint.as_string,
lambda: self._sanitize_data(request), # to reduce deepcopy
)
def _log_response(self, seconds_delta: float, response: Json | list[Json]) -> None:
self.logger.trace(
f"got response in {seconds_delta :.5f}s from " + "`{}`: `{}`",
self.http_endpoint.as_string,
lambda: self._sanitize_data(response), # to reduce deepcopy
)
class AbstractAsyncHandle(AbstractHandle[ApiT], SelfContextAsync, ABC):
"""Base class for service handlers that uses asynchronous communication."""
......@@ -126,12 +144,10 @@ class AbstractAsyncHandle(AbstractHandle[ApiT], SelfContextAsync, ABC):
) -> JSONRPCResult[ExpectResultT]:
"""Sends data asynchronously to handled service basing on jsonrpc."""
request = build_json_rpc_call(method=endpoint, params=params)
self.logger.trace(f"sending to `{self.http_endpoint.as_string()}`: `{request}`")
self._log_request(request)
with Stopwatch() as record:
response = await self._overseer.async_send(self.http_endpoint, data=request)
self.logger.trace(
f"got response in {record.seconds_delta :.5f}s from `{self.http_endpoint.as_string()}`: `{response}`"
)
self._log_response(record.seconds_delta, response)
return self._response_handle(response=response, expected_type=expected_type)
def _is_synchronous(self) -> bool:
......@@ -154,12 +170,10 @@ class AbstractSyncHandle(AbstractHandle[ApiT], SelfContextSync, ABC):
def _send(self, *, endpoint: str, params: str, expected_type: type[ExpectResultT]) -> JSONRPCResult[ExpectResultT]:
"""Sends data synchronously to handled service basing on jsonrpc."""
request = build_json_rpc_call(method=endpoint, params=params)
self.logger.debug(f"sending to `{self.http_endpoint.as_string()}`: `{request}`")
self._log_request(request)
with Stopwatch() as record:
response = self._overseer.send(self.http_endpoint, data=request)
self.logger.trace(
f"got response in {record.seconds_delta:.5f}s from `{self.http_endpoint.as_string()}`: `{response}`"
)
self._log_response(record.seconds_delta, response)
return self._response_handle(response=response, expected_type=expected_type)
def _is_synchronous(self) -> bool:
......
......@@ -10,6 +10,7 @@ from helpy._handles.beekeeper.api.api_collection import (
BeekeeperSyncApiCollection,
)
from helpy._handles.beekeeper.api.session_holder import AsyncSessionHolder, SyncSessionHolder
from helpy._sanitize import sanitize
if TYPE_CHECKING:
from typing_extensions import Self
......@@ -17,6 +18,7 @@ if TYPE_CHECKING:
from helpy._communication.abc.overseer import AbstractOverseer
from helpy._handles.beekeeper.api import AsyncBeekeeperApi, SyncBeekeeperApi
from helpy._interfaces.url import HttpUrl
from helpy.exceptions import Json
_handle_target_service_name = "beekeeper"
......@@ -93,6 +95,9 @@ class Beekeeper(AbstractSyncHandle[BeekeeperSyncApiCollection], SyncSessionHolde
def _get_salt(self) -> str:
return _random_string()
def _sanitize_data(self, data: Json | list[Json] | str) -> Json | list[Json] | str:
return sanitize(data)
class AsyncBeekeeper(AbstractAsyncHandle[BeekeeperAsyncApiCollection], AsyncSessionHolder):
"""Asynchronous handle for beekeeper service communication."""
......@@ -121,3 +126,6 @@ class AsyncBeekeeper(AbstractAsyncHandle[BeekeeperAsyncApiCollection], AsyncSess
def _get_salt(self) -> str:
return _random_string()
def _sanitize_data(self, data: Json | list[Json] | str) -> Json | list[Json] | str:
return sanitize(data)
from __future__ import annotations
import json
from contextlib import suppress
from copy import deepcopy
from typing import TYPE_CHECKING, Final, TypeVar, overload
if TYPE_CHECKING:
from helpy.exceptions import Json
mask: Final[str] = "***"
sensitive_keywords: Final[list[str]] = ["wif", "password", "private_key"]
T = TypeVar("T")
def _sanitize_recursively(data: T, *, use_mask_on_str: bool = False) -> T:
if isinstance(data, str) and use_mask_on_str:
return mask # type: ignore[return-value]
if isinstance(data, dict):
for key in data:
data[key] = _sanitize_recursively(data[key], use_mask_on_str=(key in sensitive_keywords))
if isinstance(data, list):
return [_sanitize_recursively(item) for item in data] # type: ignore[return-value]
return data
@overload
def _sanitize(data: str) -> str: ...
@overload
def _sanitize(data: list[Json]) -> list[Json]: ...
@overload
def _sanitize(data: Json) -> Json: ...
def _sanitize(data: Json | list[Json] | str) -> Json | list[Json] | str:
if isinstance(data, dict):
return _sanitize_recursively(data)
if isinstance(data, list):
return [(_sanitize(item) if isinstance(item, (dict, list)) else item) for item in data]
if isinstance(data, str):
with suppress(json.JSONDecodeError):
return json.dumps(_sanitize(json.loads(data)))
return data
def sanitize(data: Json | list[Json] | str) -> Json | list[Json] | str:
return _sanitize(deepcopy(data))
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Final
import pytest
from helpy._sanitize import mask, sanitize
if TYPE_CHECKING:
from helpy.exceptions import Json
text_to_remove: Final[str] = "brzeczyszczykiewicz"
def extend(data: Json) -> tuple[str, Json, Json, list[Json], list[Json]]:
return (json.dumps(data), data, {"result": data}, [data], [data, data, data])
@pytest.mark.parametrize(
"data",
[
*extend({"password": text_to_remove}),
*extend({"a": [{"wif": text_to_remove}]}),
*extend({"b": {"private_key": text_to_remove}}),
],
)
def test_sanitize(data: Json | list[Json] | str) -> None:
# ARRANGE, ACT
sanitized_data = str(sanitize(data=data))
# ASSERT
assert text_to_remove not in sanitized_data, f"`{text_to_remove}` not removed from: `{sanitized_data}`"
assert mask in sanitized_data, f"`{mask}` not found in: `{sanitized_data}`"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment