Skip to content
Snippets Groups Projects
Commit 5b38d8c9 authored by Krzysztof Mochocki's avatar Krzysztof Mochocki
Browse files

Move abstract classess to abc dir in executable

parent ba4c8f72
No related branches found
No related tags found
1 merge request!80Draft: Remove notifications
from __future__ import annotations
from beekeepy._executable.abc.arguments import Arguments
from beekeepy._executable.abc.config import Config
from beekeepy._executable.abc.executable import ArgumentT, ConfigT, Executable
from beekeepy._executable.abc.streams import StreamRepresentation, StreamsHolder
__all__ = [
"Arguments",
"Executable",
"StreamsHolder",
"StreamRepresentation",
"Config",
"ArgumentT",
"ConfigT",
]
......@@ -12,6 +12,71 @@ if TYPE_CHECKING:
from typing_extensions import Self
__all__ = ["Arguments"]
class CliParser:
@classmethod
def parse_cli_input(cls, cli: list[str]) -> dict[str, str | list[str] | bool]:
ordered_cli: dict[str, str | set[str] | None] = {}
previous_key: str | None = None
for item in cli:
key, value = cls._preprocess_option(item)
if key.startswith("__"):
previous_key = key[2:]
ordered_cli[previous_key] = value
continue
if key.startswith("_"):
previous_key = key[1:]
ordered_cli[previous_key] = value
continue
if key in ordered_cli:
if isinstance((dict_value := ordered_cli[key]), set):
dict_value.add(item)
elif ordered_cli[key] is None:
assert isinstance(item, str), "parsing failed, item is not string" # mypy check
ordered_cli[key] = item
else: # if ordered_cli[key] is not None and is not set
assert isinstance(item, str), "parsing failed, item is not string" # mypy check
dict_value = ordered_cli[key]
assert isinstance(dict_value, str), "parsing failed, dict_value is not string" # mypy check
ordered_cli[key] = {dict_value, item}
continue
assert (
previous_key is not None
), "parsing failed, previous_key was not set and following argument is not prefixed"
ordered_cli[previous_key] = item
previous_key = None
return cls._convert_sets_to_lists_and_none_to_boolean(ordered_cli)
@classmethod
def _convert_sets_to_lists_and_none_to_boolean(
cls, ordered_cli: dict[str, str | set[str] | None]
) -> dict[str, str | list[str] | bool]:
result: dict[str, str | list[str] | bool] = {}
for key, value in ordered_cli.items():
if isinstance(value, set):
result[key] = list(value)
elif value is None:
result[key] = True
else:
result[key] = value
return result
@classmethod
def _preprocess_option(cls, item: str) -> tuple[str, str | None]:
key = item
value = None
if "=" in item:
key, value = item.split("=")
key = key.replace("-", "_")
return key, value
class Arguments(PreconfiguredBaseModel, ABC):
help_: bool = Field(alias="help", default=False)
version: bool = False
......@@ -48,9 +113,24 @@ class Arguments(PreconfiguredBaseModel, ABC):
return cli_arguments
def process(self, *, with_prefix: bool = True) -> list[str]:
pattern = "--{0}" if with_prefix else "{0}"
pattern = self._generate_argument_prefix(with_prefix=with_prefix)
return self.__prepare_arguments(pattern)
def _generate_argument_prefix(self, *, with_prefix: bool) -> str:
return "--{0}" if with_prefix else "{0}"
def update_with(self, other: Self | None) -> None:
if other is None:
return
for other_name, other_value in other.dict(exclude_unset=True, exclude_defaults=True, exclude_none=True).items():
assert isinstance(other_name, str), "Member name has to be string"
setattr(self, other_name, other_value)
@classmethod
def parse_cli_input(cls, cli: list[str]) -> Self:
return cls(**CliParser.parse_cli_input(cli))
@classmethod
def just_get_help(cls) -> Self:
return cls(help_=True)
......
......@@ -4,9 +4,10 @@ from pathlib import Path
from types import UnionType
from typing import TYPE_CHECKING, Any, ClassVar, get_args
from loguru import logger
from pydantic import BaseModel
from beekeepy._interface.url import Url
from beekeepy._communication import Url
from beekeepy.exceptions import InvalidOptionError
if TYPE_CHECKING:
......@@ -25,26 +26,30 @@ class Config(BaseModel):
out_file.write("# config automatically generated by helpy\n")
for member_name, member_value in self.__dict__.items():
if member_value is not None:
out_file.write(
f"{self._convert_member_name_to_config_name(member_name)}={self._convert_member_value_to_config_value(member_value)}\n"
)
if isinstance(member_value, list) and len(member_value) == 0:
continue
entry_name = self._convert_member_name_to_config_name(member_name)
entry_value = self._convert_member_value_to_config_value(member_name, member_value)
for value in [entry_value] if not isinstance(entry_value, list) else entry_value:
out_file.write(f"{entry_name} = {value}\n")
@classmethod
def load(cls, source: Path) -> Self:
source = source / Config.DEFAULT_FILE_NAME if source.is_dir() else source
assert source.exists(), "Given file does not exists."
fields = cls.__fields__
values_to_write = {}
values_to_write: dict[str, Any] = {}
with source.open("rt", encoding="utf-8") as in_file:
for line in in_file:
if (line := line.strip("\n")) and not line.startswith("#"):
config_name, config_value = line.split("=")
member_name = cls._convert_config_name_to_member_name(config_name)
member_type = fields[member_name].annotation
if isinstance(member_type, UnionType) and get_args(member_type)[-1] is type(None):
if isinstance(member_type, UnionType) and (type(None) in get_args(member_type)):
member_type = get_args(member_type)[0]
values_to_write[member_name] = cls._convert_config_value_to_member_value(
config_value, expected=member_type
config_value, expected=member_type, current_value=values_to_write.get(member_name)
)
return cls(**values_to_write)
......@@ -57,9 +62,9 @@ class Config(BaseModel):
return config_name.strip().replace("-", "_")
@classmethod
def _convert_member_value_to_config_value(cls, member_value: Any) -> str:
def _convert_member_value_to_config_value(cls, member_name: str, member_value: Any) -> str | list[str]: # noqa: ARG003
if isinstance(member_value, list):
return " ".join(member_value)
return member_value
if isinstance(member_value, bool):
return "yes" if member_value else "no"
......@@ -73,8 +78,8 @@ class Config(BaseModel):
return str(member_value)
@classmethod
def _convert_config_value_to_member_value( # noqa: PLR0911
cls, config_value: str, *, expected: type[Any]
def _convert_config_value_to_member_value( # noqa: PLR0911, C901
cls, config_value: str, *, expected: type[Any], current_value: Any | None
) -> Any | None:
config_value = config_value.strip()
if config_value is None:
......@@ -83,8 +88,21 @@ class Config(BaseModel):
if expected == Path:
return Path(config_value.replace('"', ""))
if expected == list[str]:
return config_value.split()
if issubclass(expected, list) or "list" in str(expected):
list_arg_t = get_args(expected)[0]
if len(get_args(list_arg_t)): # in case of unions
list_arg_t = get_args(list_arg_t)[0]
logger.info(f"{list_arg_t=}")
values = [
cls._convert_config_value_to_member_value(value, expected=list_arg_t, current_value=None)
for value in config_value.split()
]
if current_value is not None:
if isinstance(current_value, list):
current_value.extend(values)
return current_value
return [*values, current_value]
return values
if expected == Url:
return Url(config_value)
......@@ -99,4 +117,10 @@ class Config(BaseModel):
raise InvalidOptionError(f"Expected `yes` or `no`, got: `{config_value}`")
if "str" in str(expected):
return config_value.strip('"')
if isinstance(expected, type) and issubclass(expected, int | str) and hasattr(expected, "validate"):
return expected.validate(config_value)
return expected(config_value) if expected is not None else None
......@@ -3,17 +3,21 @@ from __future__ import annotations
import os
import signal
import subprocess
import warnings
import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from beekeepy._executable.arguments.arguments import Arguments
from beekeepy._executable.streams import StreamsHolder
from beekeepy._interface.config import Config
from beekeepy._interface.context import ContextSync
from beekeepy.exceptions import BeekeeperIsNotRunningError, TimeoutReachWhileCloseError
import psutil
from beekeepy._executable.abc.arguments import Arguments
from beekeepy._executable.abc.config import Config
from beekeepy._executable.abc.streams import StreamsHolder
from beekeepy._utilities.context import ContextSync
from beekeepy.exceptions import ExecutableIsNotRunningError, TimeoutReachWhileCloseError
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
from loguru import Logger
......@@ -21,7 +25,7 @@ if TYPE_CHECKING:
class Closeable(ABC):
@abstractmethod
def close(self) -> None: ...
def close(self, timeout_secs: float = 10.0) -> None: ...
class AutoCloser(ContextSync[None]):
......@@ -70,15 +74,41 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
def config(self) -> ConfigT:
return self.__config
def run(
@property
def arguments(self) -> ArgumentT:
return self.__arguments
@property
def executable_path(self) -> Path:
return self.__executable_path
def _run(
self,
*,
blocking: bool,
arguments: ArgumentT | None = None,
environ: dict[str, str] | None = None,
propagate_sigint: bool = True,
save_config: bool = True,
) -> AutoCloser:
command, environment_variables = self.__prepare(arguments=arguments, environ=environ)
return self.__run(
blocking=blocking,
arguments=self.arguments,
environ=environ,
propagate_sigint=propagate_sigint,
save_config=save_config,
)
def __run(
self,
*,
blocking: bool,
arguments: ArgumentT,
environ: dict[str, str] | None = None,
propagate_sigint: bool = True,
save_config: bool = True,
) -> AutoCloser:
command, environment_variables = self.__prepare(arguments=arguments, environ=environ, save_config=save_config)
self._logger.info(f"starting `{self.__executable_path.stem}` as: `{command}`")
if blocking:
with self.__files.stdout as stdout, self.__files.stderr as stderr:
......@@ -114,9 +144,11 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
return result.decode().strip()
def __prepare(
self, arguments: ArgumentT | None, environ: dict[str, str] | None
self,
arguments: ArgumentT,
environ: dict[str, str] | None,
save_config: bool = True, # noqa: FBT001, FBT002
) -> tuple[list[str], dict[str, str]]:
arguments = arguments or self.__arguments
environ = environ or {}
self.__working_directory.mkdir(exist_ok=True)
......@@ -127,7 +159,8 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
environment_variables = dict(os.environ)
environment_variables.update(environ)
self.config.save(self.working_directory)
if save_config:
self.config.save(self.working_directory)
return command, environment_variables
......@@ -136,7 +169,7 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
def detach(self) -> int:
if self.__process is None:
raise BeekeeperIsNotRunningError
raise ExecutableIsNotRunningError
pid = self.pid
self.__process = None
self.__files.close()
......@@ -158,16 +191,6 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
self.__process = None
self.__files.close()
def __warn_if_pid_files_exists(self) -> None:
if self.__pid_files_exists():
warnings.warn(
f"PID file has not been removed, malfunction may occur. Working directory: {self.working_directory}",
stacklevel=2,
)
def __pid_files_exists(self) -> bool:
return len(list(self.working_directory.glob("*.pid"))) > 0
def is_running(self) -> bool:
if not self.__process:
return False
......@@ -177,6 +200,17 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
def log_has_phrase(self, text: str) -> bool:
return text in self.__files
@contextmanager
def restore_arguments(self, new_arguments: ArgumentT | None) -> Iterator[None]:
__backup = self.__arguments
self.__arguments = new_arguments or self.__arguments
try:
yield
except: # noqa: TRY302 # https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
raise
finally:
self.__arguments = __backup
@abstractmethod
def _construct_config(self) -> ConfigT: ...
......@@ -184,9 +218,20 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
def _construct_arguments(self) -> ArgumentT: ...
def generate_default_config(self) -> ConfigT:
path_to_config = self.working_directory / (Config.DEFAULT_FILE_NAME)
self.run(blocking=True, arguments=self.__arguments.just_dump_config())
temp_path_to_file = path_to_config.rename(Config.DEFAULT_FILE_NAME + ".tmp")
if not self.working_directory.exists():
self.working_directory.mkdir(parents=True)
orig_path_to_config: Path | None = None
path_to_config = self.working_directory / Config.DEFAULT_FILE_NAME
if path_to_config.exists():
orig_path_to_config = path_to_config.rename(
path_to_config.with_suffix(".ini.orig")
) # temporary move it to not interfere with config generation
arguments = self._construct_arguments()
arguments.dump_config = True
self.__run(blocking=True, arguments=arguments, save_config=False)
temp_path_to_file = path_to_config.rename(path_to_config.with_suffix(".ini.tmp"))
if orig_path_to_config is not None:
orig_path_to_config.rename(path_to_config)
return self.config.load(temp_path_to_file)
def get_help_text(self) -> str:
......@@ -194,3 +239,13 @@ class Executable(Closeable, Generic[ConfigT, ArgumentT]):
def version(self) -> str:
return self.run_and_get_output(arguments=self.__arguments.just_get_version())
def reserved_ports(self, *, timeout_seconds: int = 10) -> list[int]:
assert self.is_running(), "Cannot obtain reserved ports for not started executable"
start = time.perf_counter()
while start + timeout_seconds >= time.perf_counter():
connections = psutil.net_connections("inet4")
reserved_ports = [connection.laddr[1] for connection in connections if connection.pid == self.pid] # type: ignore[misc]
if reserved_ports:
return reserved_ports
raise TimeoutError
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, TextIO, cast
from beekeepy._utilities.context import ContextSync
if TYPE_CHECKING:
from pathlib import Path
@dataclass
class StreamRepresentation(ContextSync[TextIO]):
filename: str
dirpath: Path | None = None
stream: TextIO | None = None
_backup_count: int = 0
_current_filename: str | None = None
def __get_path(self) -> Path:
assert self.dirpath is not None, "Path is not specified"
if self._current_filename is None:
self._current_filename = self.__next_filename()
return self.dirpath / self._current_filename
def __get_stream(self) -> TextIO:
assert self.stream is not None, "Unable to get stream, as it is not opened"
return self.stream
def open_stream(self, mode: str = "wt") -> TextIO:
assert self.stream is None, "Stream is already opened"
self.__next_filename()
self.__create_user_friendly_link()
self.stream = cast(TextIO, self.__get_path().open(mode))
assert not self.stream.closed, f"Failed to open stream: `{self.stream.errors}`"
return self.stream
def close_stream(self) -> None:
self.__get_stream().close()
self.stream = None
def set_path_for_dir(self, dir_path: Path) -> None:
self.dirpath = dir_path
def _enter(self) -> TextIO:
return self.open_stream()
def _finally(self) -> None:
self.close_stream()
def __create_user_friendly_link(self) -> None:
assert self.dirpath is not None, "dirpath is not set"
user_friendly_link_dst = self.dirpath / f"{self.filename}.log"
user_friendly_link_dst.unlink(missing_ok=True)
user_friendly_link_dst.symlink_to(self.__get_path())
def __next_filename(self) -> str:
self._current_filename = f"{self.filename}_{self._backup_count}.log"
self._backup_count += 1
return self._current_filename
def __contains__(self, text: str) -> bool:
if not self.__get_path().exists():
return False
with self.open_stream("rt") as file:
for line in file:
if text in line:
return True
return False
@dataclass
class StreamsHolder:
stdout: StreamRepresentation = field(default_factory=lambda: StreamRepresentation("stdout"))
stderr: StreamRepresentation = field(default_factory=lambda: StreamRepresentation("stderr"))
def set_paths_for_dir(self, dir_path: Path) -> None:
self.stdout.set_path_for_dir(dir_path)
self.stderr.set_path_for_dir(dir_path)
def close(self) -> None:
self.stdout.close_stream()
self.stderr.close_stream()
def __contains__(self, text: str) -> bool:
return (text in self.stderr) or (text in self.stdout)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment