Source code for exlab_wizard.api.routers.staging

"""``/staging`` router. Backend Spec §13.7, §13.8.

Three endpoints back the orchestrator's staging panel:

* ``GET /staging`` -- enumerate every staged run with its lifecycle
  state, file count, byte total, and elapsed time since last activity.
* ``POST /staging/{run_path}/force-sync`` -- enqueue an immediate
  NAS sync for a specific run (used when the operator wants to skip
  the watcher's polling latency).
* ``POST /staging/{run_path}/clear`` -- delete the local staging copy
  of a sync-verified run (the manual-mode action from §13.7).

All three return ``503`` with ``{"code": "orchestrator_disabled"}``
when ``config.orchestrator.enabled`` is False -- the router is only
mounted when the flag is True (see :func:`api.app.create_app`), but the
guard is kept here so a misconfigured deployment surfaces a clear error.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

from fastapi import APIRouter, HTTPException, Request, status
from pydantic import BaseModel, ConfigDict

from exlab_wizard.api._dependencies import require_deps
from exlab_wizard.cache.ingest_writer import IngestWriter
from exlab_wizard.config.models import Config
from exlab_wizard.constants import IngestState, SyncHandleState
from exlab_wizard.logging import get_logger
from exlab_wizard.orchestrator.cleanup import clear_run
from exlab_wizard.orchestrator.staging_query import (
    StagedRunSummary,
    list_staged_runs,
)
from exlab_wizard.paths import ingest_json_path
from exlab_wizard.utils.time import utc_now

__all__ = [
    "ClearResponse",
    "ForceSyncResponse",
    "StagedRunRow",
    "StagingListResponse",
    "build_staging_router",
]

_log = get_logger(__name__)


# ---------------------------------------------------------------------------
# Pydantic response models
# ---------------------------------------------------------------------------


[docs] class StagedRunRow(BaseModel): """One staging-panel row. Backend Spec §13.8.""" model_config = ConfigDict(extra="forbid") path: str current_state: str equipment_id: str project_name: str run_kind: str file_count: int byte_total: int elapsed_seconds_since_last_activity: int last_activity_at: str
[docs] class StagingListResponse(BaseModel): """``GET /staging`` response.""" model_config = ConfigDict(extra="forbid") runs: list[StagedRunRow]
[docs] class ForceSyncResponse(BaseModel): """``POST /staging/{run_path}/force-sync`` response.""" model_config = ConfigDict(extra="forbid") run_path: str state: str # "queued" or "blocked" job_id: str | None = None
[docs] class ClearResponse(BaseModel): """``POST /staging/{run_path}/clear`` response.""" model_config = ConfigDict(extra="forbid") run_path: str files_freed: int bytes_freed: int
# --------------------------------------------------------------------------- # Router builder # ---------------------------------------------------------------------------
[docs] def build_staging_router() -> APIRouter: """Construct the ``/staging`` router. Backend Spec §13.7, §13.8.""" router = APIRouter(tags=["staging"]) @router.get("/staging", response_model=StagingListResponse) async def get_staging(request: Request) -> StagingListResponse: deps = require_deps(request) config = _require_orchestrator_enabled(deps) rows = list_staged_runs(config=config, now_utc=utc_now()) return StagingListResponse(runs=[_row_from_summary(s) for s in rows]) @router.post( "/staging/{run_path:path}/force-sync", response_model=ForceSyncResponse, ) async def post_force_sync(request: Request, run_path: str) -> ForceSyncResponse: deps = require_deps(request) _require_orchestrator_enabled(deps) nas_sync = _require_nas_sync(deps) path = Path(run_path) handle = await nas_sync.enqueue(path) # ``handle`` is a SyncJobHandle-like object exposing .state / .job_id. state_value = getattr(handle, "state", SyncHandleState.QUEUED) job_id_value = getattr(handle, "job_id", None) or None _log.info( "force-sync requested via API: path=%s state=%s job_id=%s", run_path, state_value, job_id_value, ) return ForceSyncResponse( run_path=run_path, state=str(state_value), job_id=job_id_value, ) @router.post( "/staging/{run_path:path}/clear", response_model=ClearResponse, ) async def post_clear(request: Request, run_path: str) -> ClearResponse: deps = require_deps(request) config = _require_orchestrator_enabled(deps) ingest_writer = _require_ingest_writer(deps) path = Path(run_path) # Defensive check: the spec only allows clearing sync-verified # runs (manual mode). The watcher would never call this on # earlier states, but the API is operator-facing so we enforce # the rule here too. ingest_path = ingest_json_path(path) if ingest_path.exists(): try: payload = await ingest_writer.read_ingest(ingest_path) except Exception: payload = None if payload is not None and payload.current_state != IngestState.SYNC_VERIFIED: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={ "code": "staging_not_sync_verified", "message": ( f"Cannot clear run in state {payload.current_state!r}; " "only sync_verified runs may be cleared." ), }, ) files_freed, bytes_freed = await clear_run( path, config=config, ingest_writer=ingest_writer, ) return ClearResponse( run_path=run_path, files_freed=files_freed, bytes_freed=bytes_freed, ) return router
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _row_from_summary(summary: StagedRunSummary) -> StagedRunRow: return StagedRunRow( path=summary.path, current_state=summary.current_state, equipment_id=summary.equipment_id, project_name=summary.project_name, run_kind=summary.run_kind, file_count=summary.file_count, byte_total=summary.byte_total, elapsed_seconds_since_last_activity=summary.elapsed_seconds_since_last_activity, last_activity_at=summary.last_activity_at, ) def _require_orchestrator_enabled(deps: Any) -> Config: """Return the live :class:`Config` after asserting orchestrator mode. Returns 503 with ``code: "orchestrator_disabled"`` per the spec when ``config.orchestrator.enabled`` is False (or when no config is wired). """ config = getattr(deps, "config", None) if config is None or not config.orchestrator.enabled: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": "orchestrator_disabled", "message": ( "the orchestrator is not enabled on this workstation; " "set orchestrator.enabled to true in config.yaml to use " "this endpoint" ), }, ) return config def _require_nas_sync(deps: Any) -> Any: nas_sync = getattr(deps, "nas_sync", None) if nas_sync is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "code": "internal_error", "message": "NAS sync client is not wired on this app instance", }, ) return nas_sync def _require_ingest_writer(deps: Any) -> IngestWriter: writer = getattr(deps, "ingest_writer", None) if writer is None: # Fall back to a freshly constructed writer; the IngestWriter is # stateless across calls (one FileLock per ingest path). return IngestWriter() return writer