Source code for exlab_wizard.api.routers.problems

"""``/problems`` router. Backend Spec §4.6.1, §11.8.

Endpoints:

* ``GET /problems`` -- query findings with optional scope/severity/class.
* ``POST /problems/{run_path}/override`` -- append an override entry.
* ``POST /problems/{run_path}/override/revoke`` -- append a tombstone.
* ``POST /problems/refresh`` -- re-run ``Validator.audit("all")``.
* ``WS /problems/events`` -- subscribe to the audit pub-sub channel.

The router dispatches to the bound :class:`Validator`, the
:class:`CreationWriter` for override mutations, and the
``audit_channel`` pub-sub object that the lifespan handler attaches.
"""

from __future__ import annotations

import contextlib
import uuid
from pathlib import Path
from typing import Any

from fastapi import (
    APIRouter,
    Depends,
    HTTPException,
    Query,
    Request,
    WebSocket,
    WebSocketDisconnect,
    status,
)
from pydantic import BaseModel, ConfigDict, Field

from exlab_wizard.api._dependencies import require_deps
from exlab_wizard.api.events import encode_event, event_from_dict
from exlab_wizard.api.schemas import (
    OverrideEntry,
    TombstoneEntry,
    override_entry_to_dict,
    tombstone_entry_to_dict,
)
from exlab_wizard.api.setup import setup_state_gate
from exlab_wizard.constants import AuditScopeKind
from exlab_wizard.logging import get_logger
from exlab_wizard.paths import creation_json_path
from exlab_wizard.utils.time import utc_now_iso

__all__ = [
    "FindingResponse",
    "OverrideRequest",
    "OverrideResponse",
    "ProblemsResponse",
    "RefreshResponse",
    "RevokeRequest",
    "build_problems_router",
]

_log = get_logger(__name__)


# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------


[docs] class FindingResponse(BaseModel): """One finding row in the §11.8 schema.""" model_config = ConfigDict(extra="forbid") rule: str tier: str run_path: str offending_path: str offending_kind: str matched_token: str | None = None rule_detail: str = "" synced_under_prior_policy: bool = False override_active: bool = False
[docs] class ProblemsResponse(BaseModel): """``GET /problems`` response.""" model_config = ConfigDict(extra="forbid") findings: list[FindingResponse] audit_at: str
[docs] class OverrideRequest(BaseModel): """``POST /problems/{run_path}/override`` body. Backend Spec §11.3.""" model_config = ConfigDict(extra="forbid") problem_class: str reason: str = Field(min_length=10, max_length=500) operator: str = "" expires_at: str | None = None
[docs] class OverrideResponse(BaseModel): """Override append response.""" model_config = ConfigDict(extra="forbid") id: str problem_class: str operator: str recorded_at: str reason: str revoked: bool expires_at: str | None = None
[docs] class RevokeRequest(BaseModel): """``POST /problems/{run_path}/override/revoke`` body. Backend Spec §11.3.""" model_config = ConfigDict(extra="forbid") revokes: str reason: str = Field(min_length=10, max_length=500) operator: str = ""
class TombstoneResponse(BaseModel): model_config = ConfigDict(extra="forbid") id: str revokes: str operator: str recorded_at: str reason: str revoked: bool
[docs] class RefreshResponse(BaseModel): """``POST /problems/refresh`` response.""" model_config = ConfigDict(extra="forbid") audit_at: str finding_count: int
# --------------------------------------------------------------------------- # Router builder # ---------------------------------------------------------------------------
[docs] def build_problems_router() -> APIRouter: """Construct the ``/problems`` router.""" router = APIRouter(prefix="/problems", tags=["problems"]) @router.get( "", response_model=ProblemsResponse, dependencies=[Depends(setup_state_gate)], ) async def list_problems( request: Request, scope: str = Query("all"), scope_value: str | None = Query(None), severity: str | None = Query(None), problem_class: str | None = Query(None, alias="class"), ) -> ProblemsResponse: validator = _require_validator(request) scope_arg: dict[str, Any] = _build_scope(scope, scope_value) findings = validator.query_problems(scope_arg) rows = [ FindingResponse(**finding.to_dict()) for finding in findings if _matches_filters(finding, severity=severity, problem_class=problem_class) ] audit_at = _last_audit_at(request) return ProblemsResponse(findings=rows, audit_at=audit_at) @router.post( "/refresh", response_model=RefreshResponse, dependencies=[Depends(setup_state_gate)], ) async def refresh(request: Request) -> RefreshResponse: validator = _require_validator(request) findings = validator.audit({"kind": AuditScopeKind.ALL}) audit_at = utc_now_iso() deps = require_deps(request) deps.last_audit_at = audit_at # Publish an audit-pass snapshot if a channel is wired. channel = getattr(deps, "audit_channel", None) if channel is not None: with contextlib.suppress(Exception): await channel.publish_snapshot(findings, audit_at) return RefreshResponse(audit_at=audit_at, finding_count=len(findings)) @router.post( "/{run_path:path}/override", response_model=OverrideResponse, status_code=status.HTTP_201_CREATED, dependencies=[Depends(setup_state_gate)], ) async def append_override( request: Request, run_path: str, body: OverrideRequest ) -> OverrideResponse: cache_writer = _require_cache_writer(request) path = _require_creation_json(run_path) entry = OverrideEntry( id=str(uuid.uuid4()), problem_class=body.problem_class, operator=body.operator, recorded_at=utc_now_iso(), reason=body.reason, expires_at=body.expires_at, ) entry_dict = override_entry_to_dict(entry) def _mutate(payload: Any) -> Any: payload.validation_overrides = [*payload.validation_overrides, dict(entry_dict)] return payload await cache_writer.update_creation_atomic(path, _mutate) return OverrideResponse(**entry_dict) @router.post( "/{run_path:path}/override/revoke", response_model=TombstoneResponse, status_code=status.HTTP_201_CREATED, dependencies=[Depends(setup_state_gate)], ) async def revoke_override( request: Request, run_path: str, body: RevokeRequest ) -> TombstoneResponse: cache_writer = _require_cache_writer(request) path = _require_creation_json(run_path) entry = TombstoneEntry( id=str(uuid.uuid4()), revokes=body.revokes, operator=body.operator, recorded_at=utc_now_iso(), reason=body.reason, ) entry_dict = tombstone_entry_to_dict(entry) def _mutate(payload: Any) -> Any: payload.validation_overrides = [*payload.validation_overrides, dict(entry_dict)] return payload await cache_writer.update_creation_atomic(path, _mutate) return TombstoneResponse(**entry_dict) @router.websocket("/events") async def problems_events(websocket: WebSocket) -> None: deps = getattr(websocket.app.state, "dependencies", None) channel = getattr(deps, "audit_channel", None) if deps else None if channel is None: await websocket.close(code=1011, reason="audit channel not initialized") return await websocket.accept() validator = getattr(deps, "validator", None) if validator is not None: findings = validator.audit({"kind": AuditScopeKind.ALL}) audit_at = getattr(deps, "last_audit_at", None) or utc_now_iso() await websocket.send_bytes( encode_event( event_from_dict( { "kind": "snapshot", "findings": [f.to_dict() for f in findings], "audit_at": audit_at, } ) ) ) try: await _stream_audit_channel(websocket, channel) except WebSocketDisconnect: return except Exception as exc: _log.warning("problems_events stream error: %s", exc) with contextlib.suppress(Exception): await websocket.close(code=1011, reason="stream_error") return router
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _build_scope(scope: AuditScopeKind | str, scope_value: str | None) -> dict[str, Any]: try: kind = AuditScopeKind(scope) if not isinstance(scope, AuditScopeKind) else scope except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={ "code": "validation_failed", "message": f"unknown scope {scope!r}", "field": "scope", }, ) from exc if kind is AuditScopeKind.ALL: return {"kind": kind.value} if kind is AuditScopeKind.EQUIPMENT_ID: return {"kind": kind.value, "value": scope_value or ""} if kind is AuditScopeKind.PROJECT_PATH: return {"kind": kind.value, "value": scope_value or ""} # Unreachable: AuditScopeKind has only the three members above. raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={ "code": "validation_failed", "message": f"unknown scope {scope!r}", "field": "scope", }, ) def _matches_filters(finding: Any, *, severity: str | None, problem_class: str | None) -> bool: if severity is not None and finding.tier != severity: return False return not (problem_class is not None and finding.rule != problem_class) def _last_audit_at(request: Request) -> str: deps = getattr(request.app.state, "dependencies", None) if deps is None: return utc_now_iso() return getattr(deps, "last_audit_at", None) or utc_now_iso() def _require_validator(request: Request) -> Any: deps = require_deps(request) validator = getattr(deps, "validator", None) if validator is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": "internal_error", "message": "validator is not initialized", }, ) return validator def _require_cache_writer(request: Request) -> Any: deps = require_deps(request) writer = getattr(deps, "cache_creation", None) if writer is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": "internal_error", "message": "cache writer is not initialized", }, ) return writer def _require_creation_json(run_path: str) -> Path: """Resolve a ``run_path`` to its ``creation.json`` or raise 404. Used by the override-append and override-revoke routes; both need the same "the run directory must exist on disk and have a ``creation.json`` cache file" gate before they can mutate the file. """ path = creation_json_path(Path(run_path)) if not path.exists(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={ "code": "session_not_found", "message": f"creation.json not found at {path}", }, ) return path async def _stream_audit_channel(websocket: WebSocket, channel: Any) -> None: """Forward each delta frame the audit channel publishes. The channel exposes an async iterator via ``subscribe()`` returning dicts shaped per :func:`event_from_dict`. """ async for frame in channel.subscribe(): try: typed = event_from_dict(frame) except ValueError: continue await websocket.send_bytes(encode_event(typed))