Source code for exlab_wizard.plugins.logger

"""Plugin logger shim. Backend Spec §6.1.4 / §6.3.2.

Plugins call ``ctx.log.info(...)`` / ``ctx.log.warning(...)`` rather than
``print()`` or ``logging.getLogger(...)`` directly because:

1. In the worker subprocess (the production path), ``stdout`` is reserved
   for the IPC envelope -- a stray ``print`` corrupts the protocol. Stderr
   carries structured log frames the host re-emits into the wizard log.
2. In-process invocations (unit tests, ``--no-isolation`` debug) need a
   logger that forwards into the canonical
   :func:`exlab_wizard.logging.get_logger` chain so log records show up
   alongside everything else.

The :class:`PluginLogger` base type is the abstract surface plugins see;
the two implementations -- :class:`HostPluginLogger` (in-process) and
:class:`WorkerPluginLogger` (subprocess) -- share the same four-method
shape (``debug`` / ``info`` / ``warning`` / ``error``).

Each call accepts a positional ``message`` and arbitrary structured
``**fields`` keyword args; the host logger renders them as ``key=value``
extras while the worker logger emits them inside the JSON frame's
``context`` block (which the host then merges back in). Plugin authors
do not need to know which path they're on -- the API is identical.
"""

from __future__ import annotations

import logging
import sys
from abc import ABC, abstractmethod
from typing import IO, Any

import msgspec
from msgspec import json as msgspec_json

from exlab_wizard.logging import get_logger

__all__ = [
    "HostPluginLogger",
    "PluginLogFrame",
    "PluginLogger",
    "WorkerPluginLogger",
]


[docs] class PluginLogFrame(msgspec.Struct, frozen=True): """Wire format for worker-side log records. The worker subprocess emits one ``PluginLogFrame``-shaped JSON object per log line on its stderr stream; the host parses each line and forwards into the canonical logger chain. Backend Spec §6.3.2. """ level: str message: str context: dict[str, Any] = {}
[docs] class PluginLogger(ABC): """Abstract structured-log interface plugins receive on ``ctx.log``. Implementations forward to either the in-process stdlib logger (:class:`HostPluginLogger`) or the worker stderr channel (:class:`WorkerPluginLogger`). Both expose the same four-method shape; plugin authors should not reach behind it. """
[docs] @abstractmethod def debug(self, message: str, **fields: Any) -> None: """Emit a DEBUG-level structured log record."""
[docs] @abstractmethod def info(self, message: str, **fields: Any) -> None: """Emit an INFO-level structured log record."""
[docs] @abstractmethod def warning(self, message: str, **fields: Any) -> None: """Emit a WARNING-level structured log record."""
[docs] @abstractmethod def error(self, message: str, **fields: Any) -> None: """Emit an ERROR-level structured log record."""
[docs] class HostPluginLogger(PluginLogger): """In-process forwarder used when plugins run without subprocess isolation. Used by the host's unit-test shim, by ``--no-isolation`` debug invocations of the plugin CLI (Backend Spec §6.10), and by the host when re-emitting parsed worker frames. Routes through :func:`exlab_wizard.logging.get_logger` so records flow into the canonical handler chain (per-equipment file, central rotating, stderr). """ def __init__(self, name: str = "exlab_wizard.plugins") -> None: self._logger: logging.Logger = get_logger(name)
[docs] def debug(self, message: str, **fields: Any) -> None: self._emit(logging.DEBUG, message, fields)
[docs] def info(self, message: str, **fields: Any) -> None: self._emit(logging.INFO, message, fields)
[docs] def warning(self, message: str, **fields: Any) -> None: self._emit(logging.WARNING, message, fields)
[docs] def error(self, message: str, **fields: Any) -> None: self._emit(logging.ERROR, message, fields)
def _emit(self, level: int, message: str, fields: dict[str, Any]) -> None: # Pass the structured fields through ``extra`` so the canonical # formatter (StructuredTagFormatter) can render them; if none were # provided we still emit a plain record so call sites without # context still log. if fields: self._logger.log(level, message, extra={"context": fields}) else: self._logger.log(level, message)
[docs] class WorkerPluginLogger(PluginLogger): """Worker-side forwarder. Emits JSON frames on stderr. Used inside the plugin worker subprocess (Backend Spec §6.3.1). Each call serializes a :class:`PluginLogFrame` via :mod:`msgspec.json` and writes a single newline-terminated line to the configured stream (defaults to ``sys.stderr``). The host reads these line-by-line and forwards them to the canonical logger. The worker MUST NOT use stdout for log output -- that channel is reserved for the IPC envelope. Stderr is the structured-log sideband. """ _encoder: msgspec_json.Encoder = msgspec_json.Encoder() def __init__(self, stream: IO[str] | None = None) -> None: self._stream: IO[str] = stream if stream is not None else sys.stderr
[docs] def debug(self, message: str, **fields: Any) -> None: self._emit("DEBUG", message, fields)
[docs] def info(self, message: str, **fields: Any) -> None: self._emit("INFO", message, fields)
[docs] def warning(self, message: str, **fields: Any) -> None: self._emit("WARNING", message, fields)
[docs] def error(self, message: str, **fields: Any) -> None: self._emit("ERROR", message, fields)
def _emit(self, level: str, message: str, fields: dict[str, Any]) -> None: frame = PluginLogFrame(level=level, message=message, context=fields) encoded = self._encoder.encode(frame).decode("utf-8") self._stream.write(encoded + "\n") self._stream.flush()