Skip to content
Snippets Groups Projects

Beekeepy: Add session holder class and session token handling

Merged Krzysztof Mochocki requested to merge kmochocki/beekeepy into develop
Compare and Show latest version
28 files
+ 475
121
Compare changes
  • Side-by-side
  • Inline
Files
28
+ 62
0
from __future__ import annotations
from typing import TYPE_CHECKING
import aiohttp
import httpx
from helpy._communication.abc.communicator import (
AbstractCommunicator,
)
from helpy.exceptions import CommunicationError
if TYPE_CHECKING:
from helpy._communication.settings import CommunicationSettings
from helpy._interfaces.url import HttpUrl
class AioHttpCommunicator(AbstractCommunicator):
"""Provides support for aiohttp library."""
def __init__(self, settings: CommunicationSettings) -> None:
super().__init__(settings=settings)
self.__async_client: httpx.AsyncClient | None = httpx.AsyncClient(
timeout=self.settings.timeout.total_seconds(), http2=True
)
async def close(self) -> None:
if self.__async_client is not None:
await self.__async_client.aclose()
self.__async_client = None
def get_async_client(self) -> httpx.AsyncClient:
assert self.__async_client is not None, "Session is closed."
return self.__async_client
async def async_send(self, url: HttpUrl, data: str) -> str:
last_exception: BaseException | None = None
amount_of_retries = 0
while not self._is_amount_of_retries_exceeded(amount=amount_of_retries):
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.settings.timeout.total_seconds())
) as session:
try:
response = await session.post(
url.as_string(),
data=data,
headers=self._json_headers(),
)
raw_body = await response.content.read(n=(response.content_length or -1))
return raw_body.decode("utf-8")
except aiohttp.ClientConnectorError as error:
raise CommunicationError(url=url.as_string(), request=data) from error
except aiohttp.ClientError as error:
last_exception = error
await self._async_sleep_for_retry()
if last_exception is None:
raise ValueError("Retry loop finished, but last_exception was not set")
raise last_exception
def send(self, url: HttpUrl, data: str) -> str:
raise NotImplementedError
Loading