Source code for exlab_wizard.tray.status
"""Status-submenu rendering. Backend Spec §4.3.2.
The tray's status submenu shows live state derived from server-side
components: ``SessionStore.active_sessions``,
``NASSyncClient.queue_depth``, ``Validator.audit_summary``. The string
follows the §4.3.2 formatter:
* ``"Idle"`` -- nothing in progress.
* ``"Sync: <N> jobs"`` -- N >= 1 NAS-sync jobs in queue.
* ``"<N> plugin needs input"`` -- one or more sessions in
``INPUT_REQUIRED`` (the warning emoji is omitted from the canonical
formatter; pystray menu labels are plain text).
* Combined when multiple conditions hold (e.g. ``"Sync: 2 jobs; 1 plugin
needs input"``).
A 5-second :class:`StatusTicker` re-evaluates the formatter and notifies
a callback the tray uses to refresh the menu label. The ticker runs on
a daemon thread; tests can drive a single tick deterministically by
calling :meth:`StatusTicker.tick_once`.
"""
from __future__ import annotations
import threading
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from exlab_wizard.logging import get_logger
__all__ = [
"DEFAULT_REFRESH_SECONDS",
"StatusSnapshot",
"StatusTicker",
"format_status",
"snapshot_status",
]
_log = get_logger(__name__)
DEFAULT_REFRESH_SECONDS: float = 5.0
[docs]
@dataclass(frozen=True, slots=True)
class StatusSnapshot:
"""Immutable view over the §4.3.2 status inputs."""
active_sessions: int = 0
sync_queue_depth: int = 0
input_required_count: int = 0
[docs]
def snapshot_status(
*,
session_store: Any = None,
nas_sync: Any = None,
) -> StatusSnapshot:
"""Build a :class:`StatusSnapshot` from live component references.
Each component is read defensively so the launcher can pass
``None`` (setup-incomplete state) without the formatter crashing.
The function reads ``active_sessions`` and ``input_required``-style
counts from ``session_store`` and ``in_flight_jobs`` /
``queue_depth`` from ``nas_sync``.
"""
active = _safe_int(session_store, "active_sessions")
input_required = _safe_int(session_store, "input_required")
queue_depth = _safe_int(nas_sync, "queue_depth")
return StatusSnapshot(
active_sessions=active,
sync_queue_depth=queue_depth,
input_required_count=input_required,
)
[docs]
class StatusTicker:
"""Polls :func:`snapshot_status` on a fixed cadence.
On every tick the formatted string is computed; if it differs from
the previous label the ``on_update`` callback is invoked with the
new label. The callback is the tray menu's "set status text" hook;
tests pass a recording callable.
"""
def __init__(
self,
*,
session_store: Any = None,
nas_sync: Any = None,
on_update: Callable[[str], None] | None = None,
interval_seconds: float = DEFAULT_REFRESH_SECONDS,
) -> None:
self._session_store = session_store
self._nas_sync = nas_sync
self._on_update = on_update or (lambda _label: None)
self._interval = float(interval_seconds)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self._last_label: str | None = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def start(self) -> None:
"""Spawn the ticker thread. Idempotent."""
if self._thread is not None and self._thread.is_alive():
return
self._stop.clear()
thread = threading.Thread(target=self._run, name="exlab-status-ticker", daemon=True)
thread.start()
self._thread = thread
[docs]
def stop(self) -> None:
"""Signal the ticker to exit. Idempotent."""
self._stop.set()
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._thread = None
[docs]
def tick_once(self) -> str:
"""Run a single tick synchronously. Returns the new label.
Tests call this directly so they don't have to wait for a real
5-second interval.
"""
snapshot = snapshot_status(session_store=self._session_store, nas_sync=self._nas_sync)
label = format_status(snapshot)
if label != self._last_label:
self._last_label = label
try:
self._on_update(label)
except Exception:
_log.exception("status-update callback raised")
return label
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _run(self) -> None:
while not self._stop.is_set():
self.tick_once()
self._stop.wait(self._interval)
def _safe_int(obj: Any, attr: str) -> int:
"""Best-effort numeric read; missing / None / non-int returns 0."""
if obj is None:
return 0
raw = getattr(obj, attr, 0)
if callable(raw):
try:
raw = raw()
except Exception:
return 0
try:
return int(raw)
except (TypeError, ValueError):
return 0