exlab_wizard.sync#

Sync package. Backend Spec §7.

Public re-exports for the NAS sync subsystem. Callers should depend on this package’s surface rather than reach into the sub-modules; the NASSyncClient is the only stateful object outside callers typically interact with.

exlab_wizard.sync.HandleState#

alias of SyncHandleState

class exlab_wizard.sync.NASSyncClient(*, config, queue_db, validator, cache_creation, verifier=None, worker_poll_interval_s=0.05, push_callable_factory=None, hashsum_callable_factory=None, remote_stat_callable=None)[source]#

Bases: object

Durable, per-equipment NAS sync queue with Pre-Sync Gate.

Backend Spec §7.1, §7.3.

Lifecycle:

  • init() opens the queue DB, replays any in-flight jobs, and starts a single background worker task.

  • enqueue() runs the Pre-Sync Gate, gates the run if needed, and otherwise inserts a QUEUED row.

  • close() cancels the worker and closes the DB.

The worker loop is a simple “pick the oldest QUEUED whose next_attempt_at has passed” scheduler with at-most-one inflight job at a time. This keeps determinism for tests; production deployments can extend to per-equipment parallelism without changing the public API.

Parameters:
async close()[source]#

Stop the worker and close the queue DB. Idempotent.

Return type:

None

async enqueue(run_path)[source]#

Pre-Sync Gate -> if hard-tier finding without override, mark sync_status='blocked_by_validation'. Otherwise insert a QUEUED row.

Returns a SyncJobHandle. The handle’s state is either SyncHandleState.BLOCKED or SyncHandleState.QUEUED.

Parameters:

run_path (Path)

Return type:

SyncJobHandle

async force_verify(run_path)[source]#

Recompute the local manifest and verify against itself.

Used by the Settings “verify integrity” action. Does not advance the queue state.

Parameters:

run_path (Path)

Return type:

VerifyResult

async init()[source]#

Open the queue and start the worker task. Backend Spec §7.1.2.

Return type:

None

async retry(job_id)[source]#

Re-arm a FAILED job. Backend Spec §7.1.5 (Problems-tab Retry).

Parameters:

job_id (str)

Return type:

None

async status(run_path)[source]#

Return the queue state of the job for run_path.

"none" when no job exists; otherwise the underlying SyncJobState value.

Parameters:

run_path (Path)

Return type:

str

class exlab_wizard.sync.SyncHandleState(*values)[source]#

Bases: StrEnum

In-process state of a NAS sync job handle as observed by callers.

Distinct from SyncStatus (which is persisted in creation.json) and from the queue’s internal SyncJobState (which tracks the row in sync_queue.db). Backend Spec §7.1.

BLOCKED = 'blocked'#
QUEUED = 'queued'#
class exlab_wizard.sync.SyncJobHandle(job_id, state, run_path, blocking_findings=())[source]#

Bases: object

Lightweight handle returned by NASSyncClient.enqueue().

job_id is empty when the gate blocked enqueue (the on-disk sync_status will reflect the block). blocking_findings is present iff state == BLOCKED.

Parameters:
blocking_findings: tuple[Finding, ...]#
job_id: str#
run_path: str#
state: SyncHandleState#
class exlab_wizard.sync.SyncJobRow(id, run_path, equipment_id, state, attempts=0, last_attempt_at=None, next_attempt_at=None, last_error=None, verify_passes=0, verified_at=None, enqueued_at='', nas_path=None)[source]#

Bases: object

One row in the jobs table. Backend Spec §7.1.1.

Parameters:
attempts: int#
enqueued_at: str#
equipment_id: str#
id: str#
last_attempt_at: str | None#
last_error: str | None#
nas_path: str | None#
next_attempt_at: str | None#
run_path: str#
state: SyncJobState#
verified_at: str | None#
verify_passes: int#
class exlab_wizard.sync.SyncJobState(*values)[source]#

Bases: StrEnum

State machine for a sync job. Backend Spec §7.1.2.

AWAITING_VERIFY = 'awaiting_verify'#
CLEANED = 'cleaned'#
CLEANUP_ELIGIBLE = 'cleanup_eligible'#
FAILED = 'failed'#
QUEUED = 'queued'#
RUNNING = 'running'#
VERIFIED = 'verified'#
class exlab_wizard.sync.SyncQueue(db_path)[source]#

Bases: object

Async SQLite-backed durable sync queue. Backend Spec §7.1.1.

Use init() once at application startup; the database file is created on demand. After init, all CRUD methods are coroutines.

Parameters:

db_path (Path)

async close()[source]#

Close the underlying connection (idempotent).

Return type:

None

property db_path: Path#

The on-disk path of the queue database file.

async delete(job_id)[source]#

Remove a job row entirely. Used by the cleanup reaper after CLEANED.

Parameters:

job_id (str)

Return type:

None

async get_by_id(job_id)[source]#

Return the row with the given job_id or None.

Parameters:

job_id (str)

Return type:

SyncJobRow | None

async get_by_run_path(run_path)[source]#

Return the row whose run_path matches, or None.

Parameters:

run_path (Path)

Return type:

SyncJobRow | None

async init()[source]#

Open the connection, ensure the schema, and replay in-flight rows.

Replay semantics (§7.1.2): any RUNNING row at startup gets downgraded to QUEUED (the worker died mid-transfer); any AWAITING_VERIFY row is left as-is so the verifier picks it up.

Return type:

None

async insert(*, run_path, equipment_id, nas_path=None, job_id=None)[source]#

Insert a new QUEUED row for run_path.

Raises aiosqlite.IntegrityError (via the UNIQUE constraint on run_path) if a row already exists for the same path.

Parameters:
Return type:

SyncJobRow

static is_terminal(state)[source]#

Return True if the state is a terminal state (no further work).

Parameters:

state (SyncJobState)

Return type:

bool

async list_all()[source]#

Return every row in the queue ordered by enqueued_at.

Return type:

list[SyncJobRow]

async list_in_state(state)[source]#

Return every row currently in state ordered by enqueued_at.

Parameters:

state (SyncJobState)

Return type:

list[SyncJobRow]

async record_failure(job_id, error, *, terminal=False, now=None)[source]#

Record a transport failure on job_id.

If terminal is True (auth failure, local file vanished) the job goes straight to FAILED with no backoff. Otherwise:

  • increment attempts

  • if attempts >= MAX_ATTEMPTS: terminal FAILED.

  • else: stay in QUEUED with next_attempt_at per backoff.

Parameters:
Return type:

SyncJobRow

async reset_to_queued(job_id)[source]#

Reset a FAILED job back to QUEUED for a manual retry.

Per §7.1.5 the Problems-tab Retry action re-enqueues a failed job. attempts and last_error are cleared so the backoff schedule starts fresh.

Parameters:

job_id (str)

Return type:

SyncJobRow

async transition(job_id, new_state, *, last_error=None, increment_attempts=False, increment_verify_passes=False, verified_at=None, next_attempt_at=None, last_attempt_at=None, nas_path=None)[source]#

Transition a job to new_state and patch the auxiliary columns.

The patch is one UPDATE statement so either every column moves or none do. Raises ValueError if the job is missing.

Parameters:
Return type:

SyncJobRow

exception exlab_wizard.sync.TransportError(message, *, error_kind=None)[source]#

Bases: Exception

Raised when a transport probe cannot complete.

Distinct from TransportResult: this surfaces conditions where the transport’s hashsum-style probe could not produce a manifest at all – either because the upstream binary is missing (error_kind=None for the historical “binary not on PATH” case) or because the probe ran and failed in a classifiable way (AUTH, NETWORK, UNKNOWN). The queue worker uses error_kind to route the failure through the §7.1.5 retry policy:

  • AUTH – terminal FAILED, no retry (configuration problem).

  • NETWORK – non-terminal failure, exponential backoff retry.

  • UNKNOWN – treated as NETWORK for retry purposes.

  • None – legacy “binary missing” callers. Routed through the §7.1.5 HASH_MISMATCH branch: one immediate retry, then terminal FAILED on the second occurrence. The operator surfaces the binary-missing reason via last_error.

Parameters:
class exlab_wizard.sync.TransportErrorKind(*values)[source]#

Bases: StrEnum

Kind of failure that the queue worker uses to drive the retry policy.

Backend Spec §7.1.5.

  • NETWORK: timeout, ECONNRESET, transient SSH failure – retried with exponential backoff up to MAX_ATTEMPTS.

  • AUTH: authentication failure – terminal FAILED, no retry.

  • HASH_MISMATCH: post-transport hash check failed – single retry of the transport phase, then terminal.

  • LOCAL_FILE_VANISHED: the local file disappeared between transport and verify – terminal FAILED with local_file_vanished reason.

  • UNKNOWN: catch-all for transports returning a non-zero code we don’t recognize – treated as NETWORK for retry purposes.

AUTH = 'auth'#
HASH_MISMATCH = 'hash_mismatch'#
LOCAL_FILE_VANISHED = 'local_file_vanished'#
NETWORK = 'network'#
UNKNOWN = 'unknown'#
class exlab_wizard.sync.TransportResult(ok, error_kind=None, stderr='', stdout='', returncode=0)[source]#

Bases: object

Outcome of a transport push.

ok is True iff the transport reported success. On failure, error_kind selects the retry path; stderr is the raw stderr text for log surfacing; returncode is the subprocess exit code.

Parameters:
error_kind: TransportErrorKind | None#
ok: bool#
returncode: int#
stderr: str#
stdout: str#
class exlab_wizard.sync.Verifier[source]#

Bases: object

SHA-256 verifier. Backend Spec §7.1.4.

async compute_local_manifest(run_path)[source]#

Walk run_path and compute a SHA-256 per file.

Writes the manifest to run_path/.exlab-wizard/checksums.sha256 as a side-effect (the §7.1.4 contract). Files inside the .exlab-wizard/ cache subtree are excluded so the manifest does not record its own hash.

Parameters:

run_path (Path)

Return type:

dict[str, str]

async verify_against_local(run_path, manifest)[source]#

Re-hash every entry in manifest against the local subtree.

Returns a VerifyResult with ok=True iff every entry in the manifest exists locally with the recorded hash.

Files on disk that are NOT in the manifest are returned in extra for diagnostic logging but do not by themselves cause ok=False; a partial transport that wrote a fresh file would be caught by a later compute_local_manifest pass.

Parameters:
Return type:

VerifyResult

verify_against_remote(local_manifest, remote_manifest)[source]#

Compare a local manifest against a remote-derived manifest.

Pure dict comparison with no I/O. Use after the transport reports success, with remote_manifest derived from a remote hash probe (e.g. rclone hashsum sha256 or ssh ... sha256sum).

  • mismatched: keys present in both with differing hex digests.

  • missing: keys present locally but absent remotely; this is the integrity-in-transit failure mode.

  • extra: keys present remotely but not locally; informational only and does not flip ok.

  • ok = not mismatched and not missing. An empty remote_manifest therefore yields ok=False with every local key listed in missing.

Parameters:
Return type:

VerifyResult

class exlab_wizard.sync.VerifyResult(ok, mismatched=(), missing=(), extra=(), manifest=<factory>, error_kind=None)[source]#

Bases: object

Outcome of a verifier pass.

ok is True iff every file in the manifest matched. mismatched lists relative paths whose hash differed; missing lists paths in the manifest that no longer exist on disk; extra lists files on disk that were not in the manifest (informational only).

error_kind is set when the remote-hash probe could not complete (the underlying exlab_wizard.sync.transports.TransportError classified the failure as AUTH / NETWORK / UNKNOWN). The queue worker keys off this field to route via the §7.1.5 retry policy: AUTH -> terminal FAILED, NETWORK / UNKNOWN -> backoff retry. None for every non-remote-probe outcome.

Parameters:
error_kind: TransportErrorKind | None#
extra: tuple[str, ...]#
manifest: dict[str, str]#
mismatched: tuple[str, ...]#
missing: tuple[str, ...]#
ok: bool#
exlab_wizard.sync.cleanup_interlocks_satisfied(*, job, run_path, now_utc, config, overrides_active, remote_stat_ok)[source]#

Evaluate every §7.1.6 interlock; return True iff all pass.

Logs a debug entry naming the failing interlock when one fails so the operator can see why a job stayed in CLEANUP_ELIGIBLE.

The run_path parameter is accepted (but currently unused) so callers can pass the run directory through unchanged; future interlocks (e.g., size-on-disk threshold) may consult it.

Parameters:
Return type:

bool

exlab_wizard.sync.effective_bandwidth_limit_kibps(cfg, *, now_local)[source]#

Return the effective --bwlimit in KiB/s for now_local.

Decision tree per §7.1.7:

  1. If cfg.upload_mbps is None -> unlimited (None).

  2. Else if cfg.schedule is empty -> the cap applies always.

  3. Else if now_local falls inside any schedule window -> the cap applies for this transfer.

  4. Else -> unlimited (None).

Parameters:
Return type:

int | None

exlab_wizard.sync.is_eligible(*, validator, creation_json_path, creation)[source]#

Evaluate the §7.3 eligibility rule for a run.

Returns (True, []) iff there is no hard-tier finding without an active override. Otherwise returns (False, blocking_findings) where blocking_findings is the list of unmasked hard-tier findings (so the caller can surface them in logs).

The creation_json_path is the path to .exlab-wizard/creation.json; the run directory is its parent’s parent. The validator runs in creation-time mode (no walk; just rules over path segments + file names + the cached creation payload).

Parameters:
Return type:

tuple[bool, list[Finding]]

Modules

bandwidth

Bandwidth schedule evaluator.

cleanup

Cleanup safety interlocks.

nas_client

NAS sync client.

pre_sync_gate

Pre-Sync Gate.

queue

Durable SQLite-backed sync-job queue.

transports

Sync transports package.

verifier

SHA-256 hash verifier for synced runs.