Skip to content
Snippets Groups Projects

Beekeepy: Add session holder class and session token handling

Merged Krzysztof Mochocki requested to merge kmochocki/beekeepy into develop
Compare and Show latest version
45 files
+ 1289
464
Compare changes
  • Side-by-side
  • Inline
Files
45
@@ -3,29 +3,19 @@ from __future__ import annotations
import asyncio
import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from helpy._communication.settings import CommunicationSettings
from helpy._interfaces.settings_holder import SharedSettingsHolder
from helpy.exceptions import CommunicationError
if TYPE_CHECKING:
from collections.abc import Iterator
from helpy._interfaces.url import HttpUrl
class AbstractCommunicator(ABC):
class AbstractCommunicator(SharedSettingsHolder[CommunicationSettings], ABC):
"""Provides basic interface for communicators, which can implement communications using different way."""
def __init__(self, *args: Any, settings: CommunicationSettings | None = None, **kwargs: Any) -> None:
self.__settings = settings or CommunicationSettings()
super().__init__(*args, **kwargs)
@property
def settings(self) -> CommunicationSettings:
return self.__settings
@abstractmethod
def send(self, url: HttpUrl, data: str) -> str:
"""Sends to given url given data synchronously."""
@@ -59,9 +49,3 @@ class AbstractCommunicator(ABC):
if not (ok_status_code_lower_bound <= status_code <= ok_status_code_upper_bound):
raise CommunicationError(f"{status_code=}", f"{sent=}", f"{received=}")
@contextmanager
def restore_settings(self) -> Iterator[None]:
before = self.settings.copy()
yield
self.__settings = before
Loading