Source code for exlab_wizard.logging.handlers
"""Equipment-scoped file handler. Backend Spec §16.2.4.
The :class:`EquipmentScopedFileHandler` resolves its destination at
``emit`` time using the active context's ``equipment_id``. There is one
file descriptor open per equipment, cached for the lifetime of the
process; concurrent emits from the same hostname use ``O_APPEND``
semantics (POSIX) or ``FILE_APPEND_DATA | FILE_SHARE_WRITE`` (Windows) so
writes don't tear (§4.5 same-equipment concurrency rule).
``fsync`` is called only on ``ERROR``-level events. ``INFO``/``DEBUG``
emits are flushed but not fsync'd, matching §16.2.4: durability matters
for hard failures, throughput matters for routine traffic.
Events without an ``equipment_id`` in context are silently skipped here
-- they reach the central handler via the same queue listener.
"""
from __future__ import annotations
import contextlib
import logging
import os
import socket
import sys
import threading
from pathlib import Path
from typing import IO
from exlab_wizard.constants import CACHE_DIR_NAME, LOG_FILE_TEMPLATE
from exlab_wizard.logging.context import get_run_context
__all__ = [
"EquipmentScopedFileHandler",
]
[docs]
class EquipmentScopedFileHandler(logging.Handler):
"""Per-equipment ``wizard.<hostname>.log`` writer.
Resolves ``<local_root>/<equipment_id>/.exlab-wizard/wizard.<hostname>.log``
at emit time from the active ``equipment_id`` context var. Lazy-opens
the file on first emit and caches the file object per equipment for
the process lifetime. ``O_APPEND`` semantics on POSIX (and the
equivalent share-mode flags on Windows) keep concurrent emits
tear-free without explicit locking.
Construct with the configured ``local_root`` (a :class:`pathlib.Path`).
The handler accepts an optional ``hostname`` kwarg so tests can pin a
deterministic value; production callers leave it as ``None`` and the
handler resolves ``socket.gethostname()`` once on construction.
Events without an ``equipment_id`` in context are skipped: they fall
through to the central handler downstream (which has no scope
requirement).
"""
def __init__(
self,
local_root: Path,
*,
hostname: str | None = None,
) -> None:
super().__init__()
self._local_root = Path(local_root)
self._hostname = hostname or socket.gethostname()
# equipment_id -> file object. The lock guards both lookup-and-open
# (to prevent two concurrent emits for the same equipment from
# opening twice) and close-on-shutdown.
self._files: dict[str, _OpenLogFile] = {}
self._lock = threading.Lock()
# -- logging.Handler overrides ------------------------------------------
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Write ``record`` to the active equipment's log file."""
try:
equipment_id = get_run_context().get("equipment_id")
if not equipment_id:
# No scope -- the central handler will pick this up.
return
entry = self._open_for(equipment_id)
line = self.format(record) + "\n"
entry.write(line)
if record.levelno >= logging.ERROR:
entry.fsync()
except Exception:
# stdlib handler error contract (see ``logging.Handler.emit``).
self.handleError(record)
[docs]
def close(self) -> None:
"""Close every cached file descriptor.
Called on listener teardown. Idempotent: a second call is a no-op
because the cache is cleared on first close.
"""
with self._lock:
entries = list(self._files.values())
self._files.clear()
for entry in entries:
entry.close()
super().close()
# -- internals ----------------------------------------------------------
def _open_for(self, equipment_id: str) -> _OpenLogFile:
"""Return the cached :class:`_OpenLogFile` for ``equipment_id``.
Opens the destination on first request. The path is composed at
emit time so a config reload that changes ``local_root`` is picked
up on the next emit (the old descriptor is left to GC if it's
still in use; in practice the listener is torn down on
``configure_logging`` re-entry, which closes everything).
"""
with self._lock:
existing = self._files.get(equipment_id)
if existing is not None:
return existing
path = self._compose_path(equipment_id)
path.parent.mkdir(parents=True, exist_ok=True)
entry = _OpenLogFile.open(path)
self._files[equipment_id] = entry
return entry
def _compose_path(self, equipment_id: str) -> Path:
"""Return ``<local_root>/<equipment_id>/.exlab-wizard/wizard.<host>.log``."""
return (
self._local_root
/ equipment_id
/ CACHE_DIR_NAME
/ LOG_FILE_TEMPLATE.format(hostname=self._hostname)
)
# ---------------------------------------------------------------------------
# _OpenLogFile -- the per-equipment file wrapper
# ---------------------------------------------------------------------------
class _OpenLogFile:
"""Append-mode log file with platform-appropriate share / append flags.
The wrapper is intentionally minimal -- it owns the ``open()`` call and
the matching ``close()``, plus a ``write`` and ``fsync`` pair the
handler uses on hot paths. We do NOT subclass ``io.TextIOBase`` to
avoid pulling in the buffer-manager state machine; the handler holds
the only reference and serializes calls through the queue listener
thread.
"""
__slots__ = ("_fileno_cached", "_fp", "_lock")
def __init__(self, fp: IO[str]) -> None:
self._fp = fp
self._fileno_cached = fp.fileno()
self._lock = threading.Lock()
@classmethod
def open(cls, path: Path) -> _OpenLogFile:
"""Open ``path`` in append mode with the platform-appropriate flags."""
# POSIX: O_APPEND guarantees that each write() lands at end-of-file
# atomically, even with multiple writers. Windows: opening the file
# in append mode via the C runtime translates to FILE_APPEND_DATA;
# the additional FILE_SHARE_WRITE flag is the default for Python's
# ``open`` on Windows so we don't need to drop into the Win32 API.
fp: IO[str]
if sys.platform == "win32":
# ``Path.open`` with mode "a" on Windows yields FILE_APPEND_DATA +
# FILE_SHARE_READ + FILE_SHARE_WRITE under the hood, matching
# the §16.2.4 requirement for tear-free concurrent emits from
# the same hostname.
fp = path.open("a", encoding="utf-8", buffering=1)
else:
# POSIX: mode "a" sets O_APPEND, which is what §16.2.4 calls
# for. Buffering=1 means line-buffered text mode.
fd = os.open(
path,
os.O_WRONLY | os.O_CREAT | os.O_APPEND,
0o644,
)
fp = os.fdopen(fd, "a", encoding="utf-8", buffering=1)
return cls(fp)
def write(self, line: str) -> None:
"""Append ``line`` (already newline-terminated) to the file."""
with self._lock:
self._fp.write(line)
self._fp.flush()
def fsync(self) -> None:
"""``os.fsync`` the underlying file descriptor.
Called only on ``ERROR``-level emits per §16.2.4.
"""
with self._lock:
os.fsync(self._fileno_cached)
def close(self) -> None:
"""Flush and close the file. Idempotent."""
with self._lock:
with contextlib.suppress(Exception):
self._fp.flush()
self._fp.close()