exlab_wizard.orchestrator#

Orchestrator-mode runtime. Backend Spec §12, §13.

This package implements the orchestrator-only features that activate when config.orchestrator.enabled is True:

  • StagingWatcher – background polling task that walks staging_root, writes the initial ingest.json for each new run, and drives the five-state lifecycle described in §13.3.

  • cleanup_eligible() / clear_run() – helpers for both the manual operator flow and the scheduled background sweeper.

  • list_staged_runs() – read-side query that backs the Staging UI panel and the GET /staging endpoint.

The orchestrator never touches single-equipment workstations: every public surface in the api/routers/staging.py router is gated behind config.orchestrator.enabled and returns 503 with code: "orchestrator_disabled" otherwise.

class exlab_wizard.orchestrator.StagedRunSummary(path, current_state, equipment_id, project_name, run_kind, file_count, byte_total, elapsed_seconds_since_last_activity, last_activity_at)[source]#

Bases: object

One row in the orchestrator’s staging panel.

Backend Spec §13.8:

  • path – absolute filesystem path of the run leaf directory.

  • current_state – the latest ingest.json current_state.

  • equipment_id – the equipment segment of the run path.

  • project_name – the LIMS project short id (parent dir).

  • run_kind"experimental" or "test".

  • file_count / byte_total – size of the staged data.

  • elapsed_seconds_since_last_activity – seconds between now_utc and the most recent history entry’s at field (falls back to the directory mtime when no history exists).

  • last_activity_at – ISO-8601 string of the same timestamp.

Parameters:
  • path (str)

  • current_state (str)

  • equipment_id (str)

  • project_name (str)

  • run_kind (str)

  • file_count (int)

  • byte_total (int)

  • elapsed_seconds_since_last_activity (int)

  • last_activity_at (str)

byte_total: int#
current_state: str#
elapsed_seconds_since_last_activity: int#
equipment_id: str#
file_count: int#
last_activity_at: str#
path: str#
project_name: str#
run_kind: str#
class exlab_wizard.orchestrator.StagingWatcher(*, config, ingest_writer, nas_sync, cache_creation, on_state_change=None, poll_interval_s=10.0)[source]#

Bases: object

Polls staging_root and drives the §13.3 lifecycle.

Constructor arguments mirror the spec: a Config, the orchestrator-side IngestWriter, a NASSyncClient-shaped sync client, the CreationWriter used to read pushed creation snapshots, and an optional on_state_change callable invoked after every successful transition (used by the UI to refresh the panel).

Polling cadence defaults to 10s (§13.5 – “polls until all are present”) and is overridable for tests via poll_interval_s.

Parameters:
async evaluate_run(run_path)[source]#

Evaluate one run and advance its state if conditions are met.

Returns the post-evaluation state. Idempotent within a single cycle – calling repeatedly while no condition has changed is a cheap no-op (only reads the on-disk ingest entry).

State decisions, in order:

  1. No ingest.json yet – bootstrap one in staging from the equipment’s pushed creation.json.

  2. staging – if the equipment-defined completeness signal is present, advance to complete (recording file/byte counts).

  3. complete – enqueue with NASSyncClient; on success advance to sync_queued.

  4. sync_queued – check NAS sync status; on verified-or-better advance to sync_verified.

  5. sync_verified – if cleanup policy says we may clear, invoke clear_run() and advance to cleared.

  6. cleared – terminal; nothing more to do.

Parameters:

run_path (Path)

Return type:

IngestState

async poll_once()[source]#

Walk staging_root once and run evaluate_run() on each leaf.

Exposed publicly so tests can drive the watcher synchronously without spinning up the asyncio task. Returns the list of post-evaluation states for every run found (in walk order).

Return type:

list[IngestState]

async start()[source]#

Start the background polling task. Idempotent.

Returns immediately; the task runs until stop() is called or the surrounding event loop tears down.

Return type:

None

async stop()[source]#

Cancel the background task and wait for it to exit. Idempotent.

Return type:

None

exlab_wizard.orchestrator.cleanup_eligible(*, ingest, config, now_utc=None)[source]#

Return True if the run’s local staging copy should be cleared now.

Backend Spec §13.7:

  • manual – always returns False. The operator must invoke clear_run() directly (UI button or API).

  • scheduled – returns True iff current_state == sync_verified AND sync_verified_at + retain_hours <= now_utc.

A run that is not sync_verified is never eligible – attempting to clear earlier states is a contract violation that the watcher must avoid.

Parameters:
Return type:

bool

async exlab_wizard.orchestrator.clear_run(run_path, *, config, ingest_writer, host=None)[source]#

Remove the staged run directory and append the cleared entry.

Returns (file_count, bytes_freed) so the caller can log/notify accurately. The ingest entry is written before the deletion so a crash mid-clear leaves a coherent state record.

The function is idempotent: calling it after the directory is gone is a no-op that returns (0, 0) and does not append a duplicate history entry.

Parameters:
Return type:

tuple[int, int]

exlab_wizard.orchestrator.list_staged_runs(*, config, staging_root=None, now_utc=None)[source]#

Enumerate every staged run with its current lifecycle state.

staging_root defaults to config.orchestrator.staging_root. Returns an empty list when the orchestrator is not enabled (cheap no-op so callers never need to gate on the flag themselves) or when the directory does not exist.

Sort order: most recent activity first.

Parameters:
Return type:

list[StagedRunSummary]

Modules

cleanup

Staging-side cleanup helpers.

staging_query

Read-only enumeration of staged runs.

staging_watcher

Background staging watcher.