Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/wax
1 result
Show changes
Showing
with 804 additions and 14 deletions
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from beekeepy._communication.universal_notification_server import UniversalNotificationServer
from beekeepy._interface.url import HttpUrl
from tests.helpy_test.unit.notification_server_tests.counting_notification_handlers import (
CountingAppbaseNotificationHandler,
)
if TYPE_CHECKING:
from collections.abc import Iterator
@pytest.fixture
def counting_appbase_notification_handler() -> CountingAppbaseNotificationHandler:
return CountingAppbaseNotificationHandler()
@pytest.fixture
def counting_appbase_notification_server(
counting_appbase_notification_handler: CountingAppbaseNotificationHandler,
) -> Iterator[UniversalNotificationServer]:
server = UniversalNotificationServer(counting_appbase_notification_handler)
server.run()
yield server
server.close()
@pytest.fixture
def counting_appbase_notification_server_address(
counting_appbase_notification_server: UniversalNotificationServer,
) -> HttpUrl:
return HttpUrl(f"localhost:{counting_appbase_notification_server.port}", protocol="http")
from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from schemas.notifications import Notification
from beekeepy._communication.appbase_notification_handler import AppbaseNotificationHandler
from beekeepy._communication.httpx_communicator import HttpxCommunicator
from beekeepy._communication.settings import CommunicationSettings
if TYPE_CHECKING:
from schemas.notifications import Error, KnownNotificationT, Status, WebserverListening
from beekeepy._interface.url import HttpUrl
async def send_notification(address: HttpUrl, notification: KnownNotificationT) -> None:
communicator = HttpxCommunicator(settings=CommunicationSettings())
await (await communicator.get_async_client()).put(
address.as_string(),
headers=communicator._json_headers(),
content=Notification(
name=notification.get_notification_name(), time=datetime.now(tz=timezone.utc), value=notification
).json(by_alias=True),
)
class CountingAppbaseNotificationHandler(AppbaseNotificationHandler):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.on_ws_webserver_bind_count: int = 0
self.on_http_webserver_bind_count: int = 0
self.on_error_count: int = 0
self.on_status_changed_count: int = 0
async def on_ws_webserver_bind(self, _: Notification[WebserverListening]) -> None:
self.on_ws_webserver_bind_count += 1
async def on_http_webserver_bind(self, _: Notification[WebserverListening]) -> None:
self.on_http_webserver_bind_count += 1
async def on_error(self, _: Notification[Error]) -> None:
self.on_error_count += 1
async def on_status_changed(self, _: Notification[Status]) -> None:
self.on_status_changed_count += 1
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING
import pytest
from schemas.notifications import Error, Status, WebserverListening
from tests.helpy_test.unit.notification_server_tests.counting_notification_handlers import (
CountingAppbaseNotificationHandler,
send_notification,
)
if TYPE_CHECKING:
from schemas.notifications import KnownNotificationT
from beekeepy._interface.url import HttpUrl
CounterGetterT = Callable[[CountingAppbaseNotificationHandler], int]
smoke_test_cases: list[tuple[CounterGetterT, KnownNotificationT]] = [ # type: ignore[valid-type]
(lambda h: h.on_ws_webserver_bind_count, WebserverListening(type_="WS", address="127.0.0.1", port=9090)),
(lambda h: h.on_http_webserver_bind_count, WebserverListening(type_="HTTP", address="127.0.0.1", port=9090)),
(lambda h: h.on_error_count, Error(message="some error message")),
(lambda h: h.on_status_changed_count, Status(current_status="syncing")),
]
@pytest.mark.parametrize(("counter_getter", "notification"), smoke_test_cases)
async def test_smoke(
counting_appbase_notification_handler: CountingAppbaseNotificationHandler,
counting_appbase_notification_server_address: HttpUrl,
counter_getter: CounterGetterT,
notification: KnownNotificationT,
) -> None:
assert counter_getter(counting_appbase_notification_handler) == 0
await send_notification(counting_appbase_notification_server_address, notification)
assert counter_getter(counting_appbase_notification_handler) == 1
from __future__ import annotations
from inspect import iscoroutinefunction, signature
from typing import TYPE_CHECKING
import pytest
from beekeepy._remote_handle.api import AsyncBeekeeperApi, SyncBeekeeperApi
from wax.helpy._handles.hived.api.account_by_key_api import (
AsyncAccountByKeyApi,
SyncAccountByKeyApi,
)
from wax.helpy._handles.hived.api.account_history_api import (
AsyncAccountHistoryApi,
SyncAccountHistoryApi,
)
from wax.helpy._handles.hived.api.block_api import AsyncBlockApi, SyncBlockApi
from wax.helpy._handles.hived.api.condenser_api import AsyncCondenserApi, SyncCondenserApi
from wax.helpy._handles.hived.api.database_api import AsyncDatabaseApi, SyncDatabaseApi
from wax.helpy._handles.hived.api.debug_node_api import AsyncDebugNodeApi, SyncDebugNodeApi
from wax.helpy._handles.hived.api.jsonrpc import AsyncJsonrpc, SyncJsonrpc
from wax.helpy._handles.hived.api.market_history_api import (
AsyncMarketHistoryApi,
SyncMarketHistoryApi,
)
from wax.helpy._handles.hived.api.network_broadcast_api import (
AsyncNetworkBroadcastApi,
SyncNetworkBroadcastApi,
)
from wax.helpy._handles.hived.api.network_node_api import AsyncNetworkNodeApi, SyncNetworkNodeApi
from wax.helpy._handles.hived.api.rc_api import AsyncRcApi, SyncRcApi
from wax.helpy._handles.hived.api.reputation_api import AsyncReputationApi, SyncReputationApi
from wax.helpy._handles.hived.api.transaction_status_api import (
AsyncTransactionStatusApi,
SyncTransactionStatusApi,
)
from wax.helpy._handles.hived.api.wallet_bridge_api import (
AsyncWalletBridgeApi,
SyncWalletBridgeApi,
)
if TYPE_CHECKING:
from beekeepy._remote_handle.abc.api import AbstractAsyncApi, AbstractSyncApi, RegisteredApisT
@pytest.mark.parametrize(
("async_api", "sync_api"),
[
(AsyncAccountByKeyApi, SyncAccountByKeyApi),
(AsyncAccountHistoryApi, SyncAccountHistoryApi),
(AsyncBeekeeperApi, SyncBeekeeperApi),
(AsyncBlockApi, SyncBlockApi),
(AsyncCondenserApi, SyncCondenserApi),
(AsyncDatabaseApi, SyncDatabaseApi),
(AsyncDebugNodeApi, SyncDebugNodeApi),
(AsyncJsonrpc, SyncJsonrpc),
(AsyncMarketHistoryApi, SyncMarketHistoryApi),
(AsyncNetworkBroadcastApi, SyncNetworkBroadcastApi),
(AsyncNetworkNodeApi, SyncNetworkNodeApi),
(AsyncRcApi, SyncRcApi),
(AsyncReputationApi, SyncReputationApi),
(AsyncTransactionStatusApi, SyncTransactionStatusApi),
(AsyncWalletBridgeApi, SyncWalletBridgeApi),
],
)
def test_is_api_consistent(
registered_apis: RegisteredApisT, async_api: AbstractAsyncApi, sync_api: AbstractSyncApi
) -> None:
sync_api_methods = registered_apis[True][sync_api._api_name()]
async_api_methods = registered_apis[False][async_api._api_name()]
assert len(sync_api_methods) > 0
assert len(async_api_methods) > 0
assert sync_api_methods == async_api_methods
for api_method in sync_api_methods:
sync_method = getattr(sync_api, api_method)
assert not iscoroutinefunction(sync_method)
async_method = getattr(async_api, api_method)
assert iscoroutinefunction(async_method)
assert signature(sync_method) == signature(async_method), f"inconsistency in: {api_method}"
from __future__ import annotations
import pytest
from beekeepy.interfaces import ContextAsync, ContextSync
class MyCustomError(Exception):
pass
class SyncTestContext(ContextSync[None]):
def __init__(self) -> None:
super().__init__()
self.test_variable = "invalid"
def _enter(self) -> None:
self.test_variable = "during-test"
def _finally(self) -> None:
self.test_variable = "correct"
class AsyncTestContext(ContextAsync[None]):
def __init__(self) -> None:
super().__init__()
self.test_variable = "invalid"
async def _aenter(self) -> None:
self.test_variable = "during-test"
async def _afinally(self) -> None:
self.test_variable = "correct"
def test_finally_context_sync() -> None:
# ARRANGE
ctx = SyncTestContext()
assert ctx.test_variable == "invalid"
# ACT & ASSERT
with ctx:
assert ctx.test_variable == "during-test"
assert ctx.test_variable == "correct"
async def test_finally_context_async() -> None:
# ARRANGE
ctx = AsyncTestContext()
assert ctx.test_variable == "invalid"
# ACT & ASSERT
async with ctx:
assert ctx.test_variable == "during-test"
assert ctx.test_variable == "correct"
def test_exception_rethrow_sync() -> None:
with pytest.raises(MyCustomError): # noqa: SIM117
with SyncTestContext():
raise MyCustomError
async def test_exception_rethrow_async() -> None:
with pytest.raises(MyCustomError):
async with AsyncTestContext():
raise MyCustomError
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Final
import pytest
from beekeepy.interfaces import mask, sanitize
if TYPE_CHECKING:
from beekeepy.exceptions import Json
text_to_remove: Final[str] = "brzeczyszczykiewicz"
def extend(data: Json) -> tuple[str, Json, Json, list[Json], list[Json]]:
return (json.dumps(data), data, {"result": data}, [data], [data, data, data])
@pytest.mark.parametrize(
"data",
[
*extend({"password": text_to_remove}),
*extend({"a": [{"wif": text_to_remove}]}),
*extend({"b": {"private_key": text_to_remove}}),
],
)
def test_sanitize(data: Json | list[Json] | str) -> None:
# ARRANGE, ACT
sanitized_data = str(sanitize(data=data))
# ASSERT
assert text_to_remove not in sanitized_data, f"`{text_to_remove}` not removed from: `{sanitized_data}`"
assert mask in sanitized_data, f"`{mask}` not found in: `{sanitized_data}`"
from __future__ import annotations
import pytest
from beekeepy._communication.settings import CommunicationSettings
from beekeepy._interface.settings_holder import SharedSettingsHolder, UniqueSettingsHolder
TestSharedSettingsHolder = SharedSettingsHolder[CommunicationSettings]
TestUniqueSettingsHolder = UniqueSettingsHolder[CommunicationSettings]
AnySettingsHolder = TestSharedSettingsHolder | TestUniqueSettingsHolder
def get_shared_settings() -> TestSharedSettingsHolder:
return SharedSettingsHolder(settings=CommunicationSettings())
def get_unique_settings() -> TestUniqueSettingsHolder:
return UniqueSettingsHolder(settings=CommunicationSettings())
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_copy_in_getter(holder: AnySettingsHolder) -> None:
# ARRANGE
previous_value = holder.settings.max_retries
# ACT
holder.settings.max_retries = 10
# ASSERT
assert holder.settings.max_retries == previous_value
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_modify_settings(holder: AnySettingsHolder) -> None:
# ARRANGE
new_value = holder.settings.max_retries + 1
# ACT
with holder.update_settings() as settings:
settings.max_retries = new_value
# ASSERT
assert holder.settings.max_retries == new_value
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_restore_settings(holder: AnySettingsHolder) -> None:
# ARRANGE
previous_value = holder.settings.max_retries
# ACT
with holder.restore_settings():
holder.settings.max_retries = 10
# ASSERT
assert holder.settings.max_retries == previous_value
def test_shared_settings() -> None:
# ARRANGE
parent = get_shared_settings()
child = SharedSettingsHolder(settings=parent._settings)
new_value = parent.settings.max_retries + 1
# ACT
with parent.update_settings() as settings:
settings.max_retries = new_value
# ASSERT
assert parent.settings.max_retries == child.settings.max_retries
def test_unique_settings() -> None:
# ARRANGE
parent = get_unique_settings()
child = UniqueSettingsHolder(settings=parent._settings)
old_value = parent.settings.max_retries
new_value = parent.settings.max_retries + 1
# ACT
with parent.update_settings() as settings:
settings.max_retries = new_value
# ASSERT
assert parent.settings.max_retries == new_value
assert child.settings.max_retries == old_value
def test_is_shared_has_same_addresses() -> None:
# ARRANGE
parent = get_shared_settings()
# ACT
child = SharedSettingsHolder(settings=parent._settings)
# ASSERT
assert id(parent._settings) == id(child._settings)
def test_is_unique_has_different_addresses() -> None:
# ARRANGE
parent = get_unique_settings()
# ACT
child = UniqueSettingsHolder(settings=parent._settings)
# ASSERT
assert id(parent._settings) != id(child._settings)
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_is_update_event_not_happens_on_error(holder: AnySettingsHolder) -> None:
# ARRANGE
old_value = holder.settings.max_retries
class TestError(Exception):
pass
# ACT
with pytest.raises(TestError): # noqa: PT012, SIM117
with holder.update_settings() as settings:
settings.max_retries = 10
raise TestError
# ASSERT
assert holder.settings.max_retries == old_value
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_is_update_happens_after_exit_from_with_statement(holder: AnySettingsHolder) -> None:
# ARRANGE
old_value = holder.settings.max_retries
new_value = old_value + 1
# ACT & ASSERT
with holder.update_settings() as settings:
settings.max_retries = new_value
assert holder.settings.max_retries == old_value
assert holder.settings.max_retries == new_value
@pytest.mark.parametrize("holder", [get_shared_settings(), get_unique_settings()])
def test_is_update_settings_does_not_affect_settings_accessor(holder: AnySettingsHolder) -> None:
# ARRANGE
old_value = holder.settings.max_retries
new_value = old_value + 1
# ACT & ASSERT
with holder.update_settings() as _:
holder.settings.max_retries = new_value
assert holder.settings.max_retries == old_value
assert holder.settings.max_retries == old_value
from __future__ import annotations
import json
from typing import cast
import pytest
from beekeepy._communication.rules import ApiNotFound
from beekeepy.exceptions import ApiNotFoundError, GroupedErrorsError
from beekeepy.interfaces import HttpUrl, SuppressApiNotFound
def api_not_found_error(api: str) -> ApiNotFoundError:
response = {
"jsonrpc": "2.0",
"error": {
"code": -32003,
"message": ("Assert Exception:api_itr != " "data._registered_apis.end(): Could not find API " + api),
},
"id": 1,
}
result = ApiNotFound(
url=HttpUrl("0.0.0.0:0"),
request={"jsonrpc": "2.0", "id": 1, "method": f"{api}.some_method"},
).check(response=response, response_raw=json.dumps(response))
assert len(result) == 1, "Exception has not been generated"
return cast(ApiNotFoundError, result[0])
@pytest.mark.parametrize(
"error",
[
api_not_found_error(api=api)
for api in [
"rc_api",
"database_api",
"account_history_api",
"future_plugin_that_not_exists_yet_api",
]
],
)
def test_suppress_api_not_found(error: ApiNotFoundError) -> None:
# ARRANGE & ACT
with SuppressApiNotFound(error.api) as suppressed:
raise error from GroupedErrorsError([error])
# ASSERT
assert suppressed.errors[0].api == error.api
@pytest.mark.parametrize(
"error",
[
api_not_found_error("debug_node_api"),
ValueError("some value error"),
],
)
def test_suppress_api_not_found_rethrow(error: Exception) -> None:
# ARRANGE
# ACT & ASSERT
with pytest.raises(type(error)), SuppressApiNotFound("rc_api", "database_api") as suppressed:
raise error from GroupedErrorsError([error])
assert len(suppressed.errors) == 0, "No errors should be suppressed"
from __future__ import annotations
from wax.helpy import Time
def test_comparison_without_tolerance() -> None:
time = Time.parse("1970-01-01T00:00:00")
assert Time.are_close(time, time)
assert not Time.are_close(time, time + Time.seconds(1))
def test_comparison_with_absolute_tolerance() -> None:
time = Time.parse("1970-01-01T00:00:00")
assert Time.are_close(time, time - Time.seconds(6), absolute_tolerance=Time.seconds(5)) is False
assert Time.are_close(time, time - Time.seconds(5), absolute_tolerance=Time.seconds(5)) is True
assert Time.are_close(time, time - Time.seconds(4), absolute_tolerance=Time.seconds(5)) is True
assert Time.are_close(time, time + Time.seconds(4), absolute_tolerance=Time.seconds(5)) is True
assert Time.are_close(time, time + Time.seconds(5), absolute_tolerance=Time.seconds(5)) is True
assert Time.are_close(time, time + Time.seconds(6), absolute_tolerance=Time.seconds(5)) is False
from __future__ import annotations
import random
import pytest
from wax.helpy import Time, TimeFormats
@pytest.mark.parametrize(
"interval", ["milliseconds", "seconds", "minutes", "hours", "days", "weeks", "months", "years"]
)
def test_from_now_with_parameters(interval: str) -> None:
# ARRANGE
value = 0
while value == 0:
value = random.randint(-1000, 1000) # noqa: S311 # that's tests, not implementation of cryptographic lib
interval_container = {interval: value}
delta = getattr(Time, interval)(value)
# ACT
shifted_time = Time.from_now(**interval_container, serialize_format=TimeFormats.DEFAULT_FORMAT_WITH_MILLIS) # type: ignore[arg-type]
# ASSERT
assert isinstance(shifted_time, str)
reference_time_in_from_now = Time.parse(shifted_time, format_=TimeFormats.DEFAULT_FORMAT_WITH_MILLIS) - delta
assert reference_time_in_from_now < Time.now(serialize=False)
def test_from_now_without_argument() -> None:
with pytest.raises(ValueError): # noqa: PT011
Time.from_now()
from __future__ import annotations
import pytest
from wax.helpy import Time
def test_if_object_creation_is_forbidden() -> None:
with pytest.raises(TypeError):
Time()
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from wax.helpy import Time
from wax.helpy.exceptions import ParseError
@pytest.mark.parametrize(
("time", "expected"),
[
("1970-01-01T00:00:00", datetime(1970, 1, 1, tzinfo=timezone.utc)),
("1970-01-01T00:00:00.250", datetime(1970, 1, 1, microsecond=250000, tzinfo=timezone.utc)),
],
ids=("default_format", "default_format_including_millis"),
)
def test_parsing_time_in_default_formats(time: str, expected: datetime) -> None:
assert Time.parse(time) == expected
def test_parsing_invalid_time_in_default_formats() -> None:
time = "01.01.1970"
with pytest.raises(ParseError) as exception:
Time.parse(time)
assert f"Could not be parse the `{time}` string using the" in str(exception)
def test_parsing_time_in_invalid_custom_format() -> None:
time = "01.01.1970"
invalid_format = "invalid_format"
with pytest.raises(ValueError) as exception: # noqa: PT011
assert Time.parse(time, format_=invalid_format)
assert f"'{invalid_format}' is not a valid TimeFormats" in str(exception.value)
from __future__ import annotations
from wax.helpy import Time, TimeFormats
def test_time_serialization_in_default_format() -> None:
time = Time.parse("1970-01-01T00:00:00")
assert Time.serialize(time) == "1970-01-01T00:00:00"
def test_time_serialization_in_custom_format() -> None:
time = Time.parse("1970-01-01T00:00:00")
assert Time.serialize(time, format_=TimeFormats.FAKETIME_FORMAT) == "@1970-01-01 00:00:00"
def test_time_serialization_in_custom_format_with_mills() -> None:
time = Time.parse("1970-01-01T00:00:00")
assert Time.serialize(time, format_=TimeFormats.FAKETIME_FORMAT_WITH_MILLIS) == "@1970-01-01 00:00:00.000000"
from __future__ import annotations
from datetime import timedelta
import pytest
from dateutil.relativedelta import relativedelta
from wax.helpy import Time
@pytest.mark.parametrize(
("created", "expected"),
[
pytest.param((Time.milliseconds(5)), timedelta(milliseconds=5), id="milliseconds"),
pytest.param(Time.seconds(5), timedelta(seconds=5), id="seconds"),
pytest.param(Time.minutes(1), timedelta(minutes=1), id="minutes"),
pytest.param(Time.hours(100), timedelta(hours=100), id="hours"),
pytest.param(Time.days(1000), timedelta(days=1000), id="days"),
pytest.param(Time.weeks(1000), timedelta(weeks=1000), id="weeks"),
pytest.param(Time.months(1000), relativedelta(months=1000), id="months"),
pytest.param(Time.years(1000), relativedelta(years=1000), id="years"),
],
)
def test_timedelta_creation(created: timedelta, expected: timedelta | relativedelta) -> None:
assert created == expected
from __future__ import annotations
import re
from typing import Callable
import pytest
from beekeepy.interfaces import HttpUrl, P2PUrl, WsUrl
from tests.helpy_test.unit.constants import DEFAULT_ADDRESS, DEFAULT_PORT, URL_TYPES
@pytest.mark.parametrize(
("input_url", "expected_protocol", "url_type"),
[
(f"http://{DEFAULT_ADDRESS}:{DEFAULT_PORT}", "http", HttpUrl),
(f"ws://{DEFAULT_ADDRESS}:{DEFAULT_PORT}", "ws", WsUrl),
],
)
def test_url_parsing_without_expected_protocol(input_url: str, expected_protocol: str, url_type: URL_TYPES) -> None:
url = url_type(input_url)
assert url.protocol == expected_protocol
assert url.address == DEFAULT_ADDRESS
assert url.port == DEFAULT_PORT
@pytest.mark.parametrize(
("input_url", "expected_protocol", "url_type"),
[
(f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}", "http", HttpUrl),
(f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}", "ws", WsUrl),
],
)
def test_url_parsing_with_expected_protocol(input_url: str, expected_protocol: str, url_type: URL_TYPES) -> None:
url = url_type(input_url, protocol=expected_protocol) # type: ignore[arg-type]
assert url.protocol == expected_protocol
assert url.address == DEFAULT_ADDRESS
assert url.port == DEFAULT_PORT
@pytest.mark.parametrize(
("input_url", "expected_protocol", "url_type"),
[
(DEFAULT_ADDRESS, "http", HttpUrl),
(DEFAULT_ADDRESS, "ws", WsUrl),
],
)
def test_url_parsing_without_port_given(input_url: str, expected_protocol: str, url_type: URL_TYPES) -> None:
url = url_type(input_url, protocol=expected_protocol) # type: ignore[arg-type]
assert url.protocol == expected_protocol
assert url.address == DEFAULT_ADDRESS
assert url.port is None
@pytest.mark.parametrize("url_type", [HttpUrl, WsUrl])
def test_url_parsing_without_address_given(url_type: URL_TYPES) -> None:
with pytest.raises(ValueError) as exception: # noqa: PT011
url_type(f":{DEFAULT_PORT}")
assert str(exception.value) == "Address was not specified."
@pytest.mark.parametrize("url_cls", [HttpUrl, WsUrl, P2PUrl])
@pytest.mark.parametrize("schema_input", [lambda x: x._allowed_protocols()[0], lambda _: None])
def test_schema_auto_apply_in_url(
schema_input: Callable[[URL_TYPES | type[P2PUrl]], str | None], url_cls: URL_TYPES | type[P2PUrl]
) -> None:
# ARRANGE
address = "some-address"
default_schema = url_cls._allowed_protocols()[0]
if default_schema:
default_schema += "://"
# ACT
url = url_cls(address, protocol=schema_input(url_cls)) # type: ignore[arg-type]
# ASSERT
assert url.as_string() == f"{default_schema}{address}"
@pytest.mark.parametrize("url_cls", [HttpUrl, WsUrl, P2PUrl])
def test_url_schema_validation(url_cls: URL_TYPES | type[P2PUrl]) -> None:
# ARRANGE
invalid_proto = "baaaad"
# ACT & ASSERT
with pytest.raises(
ValueError, match=re.escape(f"Unknown protocol: `{invalid_proto}`, allowed: {url_cls._allowed_protocols()}")
):
url_cls("some-address", protocol=invalid_proto) # type: ignore[arg-type]
from __future__ import annotations
import pytest
from beekeepy.interfaces import HttpUrl, WsUrl
from tests.helpy_test.unit.constants import DEFAULT_ADDRESS, DEFAULT_PORT, URL_TYPES
@pytest.mark.parametrize(
("url", "with_protocol", "expected"),
[
(
HttpUrl(f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}", protocol="http"),
True,
f"http://{DEFAULT_ADDRESS}:{DEFAULT_PORT}",
),
(HttpUrl(f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}", protocol="http"), False, f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}"),
(HttpUrl(f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}"), False, f"{DEFAULT_ADDRESS}:{DEFAULT_PORT}"),
],
)
def test_serialization(url: HttpUrl, with_protocol: bool, expected: str) -> None: # noqa: FBT001
assert url.as_string(with_protocol=with_protocol) == expected
@pytest.mark.parametrize(
("input_url", "expected_protocol", "url_type"),
[
(DEFAULT_ADDRESS, "http", HttpUrl),
(DEFAULT_ADDRESS, "ws", WsUrl),
],
)
def test_url_serializing_without_port_given(input_url: str, expected_protocol: int, url_type: URL_TYPES) -> None:
assert (
url_type(input_url, protocol=expected_protocol).as_string(with_protocol=True) # type: ignore[arg-type]
== f"{expected_protocol}://127.0.0.1"
)
from wax.wax_visitor import AbstractOperationVisitor
from __future__ import annotations
from wax.proto import vote_pb2
from wax.wax_visitor import AbstractOperationVisitor
from wax.proto.operations import vote
class MyOperationVisitor(AbstractOperationVisitor):
def vote(self, op: vote_pb2.vote) -> None:
def vote(self, op: vote) -> None:
pass
......
from google.protobuf.json_format import ParseDict
from wax.proto import (
vote_pb2,
limit_order_cancel_pb2,
operation_pb2,
transaction_pb2
)
from wax.proto.operations import vote, operation
from wax.proto.transaction import transaction
from wax.wax_visitor import OperationVisitor
tx_json = {
......@@ -17,18 +12,18 @@ tx_json = {
}
class MyOperationVisitor(OperationVisitor):
vote_obj: vote_pb2.vote = None
vote_obj: vote = None
def vote(self, op: vote_pb2.vote) -> None:
def vote(self, op: vote) -> None:
print("Processing 'vote' operation")
self.vote_obj = op
def test_operation_visitor():
tx = ParseDict(tx_json, transaction_pb2.transaction())
tx = ParseDict(tx_json, transaction())
visitor = MyOperationVisitor()
for op in tx.operations:
visitor.accept(op)
assert visitor.vote_obj == ParseDict(tx_json["operations"][0]["vote"], vote_pb2.vote())
assert visitor.vote_obj == ParseDict(tx_json["operations"][0]["vote"], vote())