exlab_wizard.orchestrator#
Orchestrator-mode runtime. Backend Spec §12, §13.
This package implements the orchestrator-only features that activate when
config.orchestrator.enabled is True:
StagingWatcher– background polling task that walksstaging_root, writes the initialingest.jsonfor each new run, and drives the five-state lifecycle described in §13.3.cleanup_eligible()/clear_run()– helpers for both the manual operator flow and the scheduled background sweeper.list_staged_runs()– read-side query that backs the Staging UI panel and theGET /stagingendpoint.
The orchestrator never touches single-equipment workstations: every
public surface in the api/routers/staging.py router is gated behind
config.orchestrator.enabled and returns 503 with
code: "orchestrator_disabled" otherwise.
- class exlab_wizard.orchestrator.StagedRunSummary(path, current_state, equipment_id, project_name, run_kind, file_count, byte_total, elapsed_seconds_since_last_activity, last_activity_at)[source]#
Bases:
objectOne row in the orchestrator’s staging panel.
Backend Spec §13.8:
path– absolute filesystem path of the run leaf directory.current_state– the latestingest.jsoncurrent_state.equipment_id– the equipment segment of the run path.project_name– the LIMS project short id (parent dir).run_kind–"experimental"or"test".file_count/byte_total– size of the staged data.elapsed_seconds_since_last_activity– seconds betweennow_utcand the most recent history entry’satfield (falls back to the directory mtime when no history exists).last_activity_at– ISO-8601 string of the same timestamp.
- Parameters:
- class exlab_wizard.orchestrator.StagingWatcher(*, config, ingest_writer, nas_sync, cache_creation, on_state_change=None, poll_interval_s=10.0)[source]#
Bases:
objectPolls
staging_rootand drives the §13.3 lifecycle.Constructor arguments mirror the spec: a
Config, the orchestrator-sideIngestWriter, aNASSyncClient-shaped sync client, theCreationWriterused to read pushed creation snapshots, and an optionalon_state_changecallable invoked after every successful transition (used by the UI to refresh the panel).Polling cadence defaults to 10s (§13.5 – “polls until all are present”) and is overridable for tests via
poll_interval_s.- Parameters:
config (
Config)ingest_writer (
IngestWriter)nas_sync (
NASSyncLike)cache_creation (
CreationCacheLike)on_state_change (
Callable[[Path,IngestState],Awaitable[None] |None] |None)poll_interval_s (
float)
- async evaluate_run(run_path)[source]#
Evaluate one run and advance its state if conditions are met.
Returns the post-evaluation state. Idempotent within a single cycle – calling repeatedly while no condition has changed is a cheap no-op (only reads the on-disk ingest entry).
State decisions, in order:
No
ingest.jsonyet – bootstrap one instagingfrom the equipment’s pushedcreation.json.staging– if the equipment-defined completeness signal is present, advance tocomplete(recording file/byte counts).complete– enqueue with NASSyncClient; on success advance tosync_queued.sync_queued– check NAS sync status; on verified-or-better advance tosync_verified.sync_verified– if cleanup policy says we may clear, invokeclear_run()and advance tocleared.cleared– terminal; nothing more to do.
- Parameters:
run_path (
Path)- Return type:
IngestState
- async poll_once()[source]#
Walk staging_root once and run
evaluate_run()on each leaf.Exposed publicly so tests can drive the watcher synchronously without spinning up the asyncio task. Returns the list of post-evaluation states for every run found (in walk order).
- Return type:
list[IngestState]
- exlab_wizard.orchestrator.cleanup_eligible(*, ingest, config, now_utc=None)[source]#
Return True if the run’s local staging copy should be cleared now.
Backend Spec §13.7:
manual– always returns False. The operator must invokeclear_run()directly (UI button or API).scheduled– returns True iffcurrent_state == sync_verifiedANDsync_verified_at + retain_hours <= now_utc.
A run that is not
sync_verifiedis never eligible – attempting to clear earlier states is a contract violation that the watcher must avoid.- Parameters:
ingest (
IngestJson)config (
Config)
- Return type:
- async exlab_wizard.orchestrator.clear_run(run_path, *, config, ingest_writer, host=None)[source]#
Remove the staged run directory and append the
clearedentry.Returns
(file_count, bytes_freed)so the caller can log/notify accurately. The ingest entry is written before the deletion so a crash mid-clear leaves a coherent state record.The function is idempotent: calling it after the directory is gone is a no-op that returns
(0, 0)and does not append a duplicate history entry.
- exlab_wizard.orchestrator.list_staged_runs(*, config, staging_root=None, now_utc=None)[source]#
Enumerate every staged run with its current lifecycle state.
staging_rootdefaults toconfig.orchestrator.staging_root. Returns an empty list when the orchestrator is not enabled (cheap no-op so callers never need to gate on the flag themselves) or when the directory does not exist.Sort order: most recent activity first.
Modules
Staging-side cleanup helpers. |
|
Read-only enumeration of staged runs. |
|
Background staging watcher. |