Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/clive
1 result
Show changes
Commits on Source (11)
......@@ -122,7 +122,7 @@ class BeekeeperExecutable:
command = self.pre_run_preparation(
allow_empty_notification_server=allow_empty_notification_server, arguments=arguments
)
logger.info("Executing beekeeper:", command)
logger.debug(f"Executing beekeeper: {command}")
result = subprocess.check_output(command, stderr=subprocess.STDOUT, timeout=timeout)
return result.decode("utf-8").strip()
......@@ -133,7 +133,7 @@ class BeekeeperExecutable:
allow_empty_notification_server=allow_empty_notification_server, arguments=arguments
)
self.__prepare_files_for_streams(self.__config.wallet_dir)
logger.info("Executing beekeeper:", command)
logger.debug(f"Executing beekeeper: {command}")
try:
self.__process = Popen(
command,
......
......@@ -50,7 +50,7 @@ class BeekeeperNotificationsServer:
return self.http_endpoint
def notify(self, message: JsonT) -> None:
logger.info(f"Got notification: {message}")
logger.debug(f"Got notification: {message}")
name = message["name"]
details = message["value"]
......
......@@ -38,4 +38,4 @@ class Command(ABC):
asyncio.gather(*[command.execute() for command in commands])
def _log_execution_info(self) -> None:
logger.info(f"Executing command: {self.__class__.__name__}")
logger.debug(f"Executing command: {self.__class__.__name__}")
......@@ -19,6 +19,9 @@ if TYPE_CHECKING:
from loguru._logger import Core
LogFilePaths = tuple[Path, ...]
GroupLogFilePaths = dict[str, LogFilePaths]
LOG_FORMAT: Final[str] = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green>"
" | <level>{level.icon} {level: <8}</level>"
......@@ -27,28 +30,6 @@ LOG_FORMAT: Final[str] = (
)
def create_log_file(log_name: str, log_group: str | None = None) -> tuple[Path, Path]:
log_directory = Path(settings.log_path)
if log_group:
log_directory = log_directory / log_group
log_directory.mkdir(parents=True, exist_ok=True)
log_file_name = f"{LAUNCH_TIME.strftime('%Y-%m-%d_%H-%M-%S')}_{log_name}.log"
log_file_path = log_directory / log_file_name
with log_file_path.open("a", encoding="utf-8"):
# We just need to create an empty file to which we will log later
pass
latest_log_file_name = "latest.log"
latest_log_file_path = log_directory / latest_log_file_name
with latest_log_file_path.open("w", encoding="utf-8"):
# We just need to create an empty file to which we will log later
pass
return log_file_path, latest_log_file_path
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Get corresponding Loguru level if it exists
......@@ -69,6 +50,13 @@ class InterceptHandler(logging.Handler):
class Logger:
"""Logger used to log into both Textual (textual console) and Loguru (file located in logs/)."""
AVAILABLE_LOG_LEVELS: Final[list[str]] = [
"DEBUG",
"INFO",
"WARNING",
"ERROR",
]
def __init__(self) -> None:
self.__enabled_loguru = True
self.__enabled_textual = True
......@@ -79,15 +67,18 @@ class Logger:
textual_log_attr = getattr(textual_logger, item, None)
if not callable(loguru_attr) or not callable(textual_log_attr):
raise TypeError(f"Callable `{item}` not found in either Textual or Loguru loggers.")
raise TypeError(
f"Callable `{item}` not found in either Textual or Loguru loggers.\n"
f"Try one of: {self.AVAILABLE_LOG_LEVELS}"
)
def __hooked(*args: Any, **kwargs: Any) -> None:
def _hooked(*args: Any, **kwargs: Any) -> None:
if self.__enabled_loguru:
loguru_attr(*args, **kwargs)
if self.__enabled_textual:
textual_log_attr(*args, **kwargs)
return __hooked
return _hooked
def setup(
self, *, enable_loguru: bool = True, enable_textual: bool = True, enable_stream_handlers: bool = False
......@@ -96,33 +87,10 @@ class Logger:
self.__enabled_textual = enable_textual
if enable_loguru:
self.__configure_loguru(enable_stream_handlers=enable_stream_handlers)
@staticmethod
def __configure_loguru(*, enable_stream_handlers: bool = False) -> None:
def make_filter(*, level: int | str, level_3rd_party: int | str) -> Callable[..., bool]:
level_no = getattr(logging, level) if isinstance(level, str) else level
level_no_3rd_party = (
getattr(logging, level_3rd_party) if isinstance(level_3rd_party, str) else level_3rd_party
)
def __filter(record: dict[str, Any]) -> bool:
is_3rd_party = ROOT_DIRECTORY not in Path(record["file"].path).parents
if level_no_3rd_party is not None and is_3rd_party:
return bool(record["level"].no >= level_no_3rd_party)
return bool(record["level"].no >= level_no)
return __filter
self._configure_loguru(enable_stream_handlers=enable_stream_handlers)
def remove_stream_handlers() -> None:
"""Remove all handlers that log to stdout and stderr."""
core: Core = loguru_logger._core # type: ignore[attr-defined]
for handler in core.handlers.values():
if isinstance(handler._sink, StreamSink):
loguru_logger.remove(handler._id)
log_file_path, latest_log_file_path = create_log_file(log_name="defined", log_group=settings.LOG_LEVEL.lower())
log_file_path_debug, latest_log_file_path_debug = create_log_file(log_name="debug", log_group="debug")
def _configure_loguru(self, *, enable_stream_handlers: bool = False) -> None:
log_paths = self._create_log_files()
logging.root.handlers = [InterceptHandler()]
logging.root.setLevel(logging.DEBUG)
......@@ -134,28 +102,85 @@ class Logger:
logging.getLogger(name).setLevel(logging.DEBUG)
if not enable_stream_handlers:
remove_stream_handlers()
loguru_logger.add(
sink=log_file_path,
format=LOG_FORMAT,
filter=make_filter(level=settings.LOG_LEVEL, level_3rd_party=settings.LOG_LEVEL_3RD_PARTY),
)
loguru_logger.add(
sink=latest_log_file_path,
format=LOG_FORMAT,
filter=make_filter(level=settings.LOG_LEVEL, level_3rd_party=settings.LOG_LEVEL_3RD_PARTY),
)
loguru_logger.add(
sink=log_file_path_debug,
format=LOG_FORMAT,
filter=make_filter(level=logging.DEBUG, level_3rd_party=logging.DEBUG),
)
loguru_logger.add(
sink=latest_log_file_path_debug,
format=LOG_FORMAT,
filter=make_filter(level=logging.DEBUG, level_3rd_party=logging.DEBUG),
)
self._remove_stream_handlers()
self._add_file_handlers(log_paths)
def _create_log_files(self) -> GroupLogFilePaths:
log_paths: GroupLogFilePaths = {}
log_levels = settings.get("LOG_LEVELS", ["INFO"])
for log_level in log_levels:
log_level_lower = log_level.lower()
log_level_upper = log_level.upper()
if log_level_upper not in self.AVAILABLE_LOG_LEVELS:
raise RuntimeError(f"Invalid log level: {log_level}, expected one of {self.AVAILABLE_LOG_LEVELS}.")
log_paths[log_level_upper] = self._create_log_files_per_group(group_name=log_level_lower)
return log_paths
def _create_log_files_per_group(self, group_name: str) -> LogFilePaths:
def create_empty_file(file_name: str) -> Path:
empty_file_path = log_group_directory / file_name
with empty_file_path.open("w", encoding="utf-8"):
"""We just need to create an empty file to which we will log later"""
return empty_file_path
log_group_directory = Path(settings.get("LOG_PATH", ".")) / group_name
log_group_directory.mkdir(parents=True, exist_ok=True)
latest_log_file_name = "latest.log"
latest_log_path = create_empty_file(latest_log_file_name)
keep_history = settings.get("LOG_KEEP_HISTORY", True)
if not keep_history:
return (latest_log_path,)
dated_log_file_name = f"{LAUNCH_TIME.strftime('%Y-%m-%d_%H-%M-%S')}.log"
dated_log_path = create_empty_file(dated_log_file_name)
return dated_log_path, latest_log_path
def _remove_stream_handlers(self) -> None:
"""Remove all handlers that log to stdout and stderr."""
core: Core = loguru_logger._core # type: ignore[attr-defined]
for handler in core.handlers.values():
if isinstance(handler._sink, StreamSink):
loguru_logger.remove(handler._id)
def _add_file_handlers(self, log_paths: GroupLogFilePaths) -> None:
for log_level, paths in log_paths.items():
log_level_3rd_party = self._get_3rd_party_log_level(log_level)
for path in paths:
loguru_logger.add(
sink=path,
format=LOG_FORMAT,
filter=self._make_filter(level=log_level, level_3rd_party=log_level_3rd_party),
)
def _get_3rd_party_log_level(self, log_level: str) -> str:
"""
Return the log level for 3rd party modules based on given log_level for clive.
We want to include everything when clive is in DEBUG mode, but also leave the option to set a different log lvl
for 3rd party modules when clive is in higher log levels than DEBUG.
"""
log_level_3rd_party = str(settings.get("LOG_LEVEL_3RD_PARTY", "DEBUG")).upper()
return "DEBUG" if log_level == "DEBUG" else log_level_3rd_party
def _make_filter(self, *, level: int | str, level_3rd_party: int | str) -> Callable[..., bool]:
level_no = getattr(logging, level) if isinstance(level, str) else level
level_no_3rd_party = getattr(logging, level_3rd_party) if isinstance(level_3rd_party, str) else level_3rd_party
def _filter(record: dict[str, Any]) -> bool:
is_3rd_party = ROOT_DIRECTORY not in Path(record["file"].path).parents
if level_no_3rd_party is not None and is_3rd_party:
return bool(record["level"].no >= level_no_3rd_party)
return bool(record["level"].no >= level_no)
return _filter
logger = Logger()
......@@ -2,8 +2,9 @@
FORCE_ONBOARDING = false
LOG_DEBUG_LOOP = false
LOG_DIRECTORY = "" # if not given, logs will be placed in data directory
LOG_LEVEL = "INFO"
LOG_LEVEL_3RD_PARTY = "WARNING"
LOG_LEVELS = ["INFO"] # possible values: ["DEBUG", "INFO", "WARNING", "ERROR"], multiple values could be given and logs from and up that level will be saved also in corresponding directories
LOG_LEVEL_3RD_PARTY = "WARNING" # matters only for directories containing LOG_LEVEL higher than DEBUG, in DEBUG 3rd party logs are always DEBUG
LOG_KEEP_HISTORY = false # whether to keep history of logs, if false - only logs of latest run will be kept
LOG_DEBUG_PERIOD = 1
[default.beekeeper]
......