Skip to content
Snippets Groups Projects

Beekeepy: Add session holder class and session token handling

Merged Krzysztof Mochocki requested to merge kmochocki/beekeepy into develop
Compare and Show latest version
14 files
+ 247
57
Compare changes
  • Side-by-side
  • Inline
Files
14
+ 37
41
@@ -5,11 +5,12 @@ from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Protocol
from loguru import logger
from typing_extensions import Self
from helpy._communication.httpx_communicator import HttpxCommunicator
from helpy._communication.aiohttp_communicator import AioHttpCommunicator
from helpy._communication.request_communicator import RequestCommunicator
from helpy._handles.build_json_rpc_call import build_json_rpc_call
from helpy._interfaces.context import ContextAsync, ContextSync
from helpy._handles.settings import Settings
from helpy._interfaces.settings_holder import UniqueSettingsHolder
from helpy._interfaces.stopwatch import Stopwatch
from helpy.exceptions import CommunicationError, HelpyError, RequestError
from schemas.jsonrpc import ExpectResultT, JSONRPCResult, get_response_model
@@ -20,7+21,7 @@
from loguru import Logger
from helpy._communication.abc.communicator import AbstractCommunicator
from helpy._handles.abc.api_collection import AbstractAsyncApiCollection, AbstractSyncApiCollection
from helpy._handles.batch_handle import AsyncBatchHandle, SyncBatchHandle
from helpy._interfaces.url import HttpUrl
@@ -29,7+30,7 @@
"""Raised if response does not have any response."""
class AbstractHandle:
class AbstractHandle(UniqueSettingsHolder[Settings], ABC):
"""Provides basic interface for all network handles."""
def __init__(
self,
*args: Any,
http_url: HttpUrl | None = None,
communicator: AbstractCommunicator | None = None,
settings: Settings,
**kwargs: Any,
) -> None:
"""Constructs handle to network service.
Keyword Arguments:
http_url -- http url where, service is available.
communicator -- communicator class to use for communication (default: {HttpxCommunicator})
Args:
http_url: http url where, service is available.
communicator: communicator class to use for communication
"""
super().__init__(*args, **kwargs)
super().__init__(*args, settings=settings, **kwargs)
self.__logger = self.__configure_logger()
self.__http_endpoint = http_url
self.__communicator = communicator or HttpxCommunicator()
self.__communicator = self.settings.try_get_communicator_instance() or self._get_recommended_communicator()
self.__api = self._construct_api()
@property
def http_endpoint(self) -> HttpUrl:
"""Return endpoint where handle is connected to."""
assert self.__http_endpoint is not None
return self.__http_endpoint
return self.settings.http_endpoint
@http_endpoint.setter
def http_endpoint(self, value: HttpUrl) -> None:
"""Set http endpoint."""
self.logger.debug(f"setting http endpoint to: {value.as_string()}")
self.__http_endpoint = value
with self.update_settings() as settings:
settings.http_endpoint = value
@property
def api(self) -> AbstractAsyncApiCollection | AbstractSyncApiCollection:
@@ -78,12 +76,12 @@ class AbstractHandle:
return self.__logger
@abstractmethod
def _construct_api(self) -> AbstractAsyncApiCollection | AbstractSyncApiCollection:
def _get_recommended_communicator(self) -> AbstractCommunicator:
"""Return api collection."""
@abstractmethod
def _clone(self) -> Self:
"""Return clone of itself."""
def _construct_api(self) -> AbstractAsyncApiCollection | AbstractSyncApiCollection:
"""Return api collection."""
@abstractmethod
def _is_synchronous(self) -> bool:
@@ -109,7 +107,7 @@ class AbstractHandle:
parsed_response = json.loads(response)
if "error" in parsed_response:
raise RequestError(send=params, error=str(parsed_response["error"]))
raise RequestError(send=params, error=parsed_response["error"])
if "result" not in parsed_response:
raise MissingResultError
@@ -121,10 +119,6 @@ class AbstractHandle:
def __configure_logger(self) -> Logger:
return logger.bind(**self._logger_extras())
@abstractmethod
def batch(self, *, delay_error_on_data_access: bool = False) -> SyncBatchHandle[Any] | AsyncBatchHandle[Any]:
"""Returns sync batch handle."""
class _SyncCall(Protocol):
def __call__(
@@ -153,9 +147,9 @@ def _retry_on_unable_to_acquire_database_lock( # noqa: C901
"Unable to acquire forkdb lock",
]
message = exception.error if isinstance(exception, RequestError) else str(exception.args)
for imsg in ignored_messages:
if imsg in message:
logger.debug(f"Ignored for {this.http_endpoint}: '{imsg}'")
for ignored_msg in ignored_messages:
if ignored_msg in message:
logger.debug(f"Ignored for {this.http_endpoint}: '{ignored_msg}'")
return
raise exception
@@ -182,7 +176,7 @@ def _retry_on_unable_to_acquire_database_lock( # noqa: C901
return __workaround_communication_problem_with_node
class AbstractAsyncHandle(ABC, AbstractHandle, ContextAsync[Self]): # type: ignore[misc]
class AbstractAsyncHandle(AbstractHandle, ABC):
"""Base class for service handlers that uses asynchronous communication."""
@_retry_on_unable_to_acquire_database_lock(async_version=True) # type: ignore[arg-type]
@@ -199,17 +193,18 @@ class AbstractAsyncHandle(ABC, AbstractHandle, ContextAsync[Self]): # type: ign
)
return self._response_handle(params=params, response=response, expected_type=expected_type)
async def _enter(self) -> Self:
return self._clone()
async def _finally(self) -> None:
"""Does nothing."""
def _is_synchronous(self) -> bool:
return True
def _get_recommended_communicator(self) -> AbstractCommunicator:
return AioHttpCommunicator(settings=self._settings)
class AbstractSyncHandle(ABC, AbstractHandle, ContextSync[Self]): # type: ignore[misc]
@abstractmethod
async def batch(self, *, delay_error_on_data_access: bool = False) -> AsyncBatchHandle[Any]:
"""Returns async batch handle."""
class AbstractSyncHandle(AbstractHandle, ABC):
"""Base class for service handlers that uses synchronous communication."""
@_retry_on_unable_to_acquire_database_lock(async_version=False) # type: ignore[arg-type]
@@ -224,11 +219,12 @@ class AbstractSyncHandle(ABC, AbstractHandle, ContextSync[Self]): # type: ignor
)
return self._response_handle(params=params, response=response, expected_type=expected_type)
def _enter(self) -> Self:
return self._clone()
def _finally(self) -> None:
"""Does nothing."""
def _is_synchronous(self) -> bool:
return False
def _get_recommended_communicator(self) -> AbstractCommunicator:
return RequestCommunicator(settings=self._settings)
@abstractmethod
def batch(self, *, delay_error_on_data_access: bool = False) -> SyncBatchHandle[Any]:
"""Returns sync batch handle."""
Loading