exlab_wizard.sync.queue#

Durable SQLite-backed sync-job queue. Backend Spec §7.1.1, §7.1.2, §7.1.5.

The queue is a single SQLite file ({state_dir}/sync_queue.db) that holds one row per NAS-sync job. Jobs persist across server restarts so in-flight work survives process exits; on startup, RUNNING jobs are re-queued and AWAITING_VERIFY jobs are re-verified.

State machine (§7.1.2):

QUEUED -> RUNNING -> AWAITING_VERIFY -> VERIFIED -> CLEANUP_ELIGIBLE -> CLEANED
   \         \              \              \
    FAILED <- FAILED  <- FAILED <- FAILED

Backoff (§7.1.5): 30s, 2m, 8m, 30m, 2h. After 5 failed attempts the job becomes terminal FAILED. Auth failures and local_file_vanished go straight to FAILED with no backoff.

The schema below mirrors the API contract in the Phase 10 brief:

CREATE TABLE jobs (
    id TEXT PRIMARY KEY,
    run_path TEXT NOT NULL UNIQUE,
    equipment_id TEXT NOT NULL,
    state TEXT NOT NULL,
    attempts INTEGER NOT NULL DEFAULT 0,
    last_attempt_at TEXT,
    next_attempt_at TEXT,
    last_error TEXT,
    verify_passes INTEGER NOT NULL DEFAULT 0,
    verified_at TEXT,
    enqueued_at TEXT NOT NULL,
    nas_path TEXT
)

Functions

compute_next_attempt_at(*, attempts_after[, now])

Return the ISO timestamp of the next retry attempt.

Classes

SyncJobRow(id, run_path, equipment_id, state)

One row in the jobs table.

SyncJobState(*values)

State machine for a sync job.

SyncQueue(db_path)

Async SQLite-backed durable sync queue.

class exlab_wizard.sync.queue.SyncJobRow(id, run_path, equipment_id, state, attempts=0, last_attempt_at=None, next_attempt_at=None, last_error=None, verify_passes=0, verified_at=None, enqueued_at='', nas_path=None)[source]#

Bases: object

One row in the jobs table. Backend Spec §7.1.1.

Parameters:
attempts: int#
enqueued_at: str#
equipment_id: str#
id: str#
last_attempt_at: str | None#
last_error: str | None#
nas_path: str | None#
next_attempt_at: str | None#
run_path: str#
state: SyncJobState#
verified_at: str | None#
verify_passes: int#
class exlab_wizard.sync.queue.SyncJobState(*values)[source]#

Bases: StrEnum

State machine for a sync job. Backend Spec §7.1.2.

AWAITING_VERIFY = 'awaiting_verify'#
CLEANED = 'cleaned'#
CLEANUP_ELIGIBLE = 'cleanup_eligible'#
FAILED = 'failed'#
QUEUED = 'queued'#
RUNNING = 'running'#
VERIFIED = 'verified'#
class exlab_wizard.sync.queue.SyncQueue(db_path)[source]#

Bases: object

Async SQLite-backed durable sync queue. Backend Spec §7.1.1.

Use init() once at application startup; the database file is created on demand. After init, all CRUD methods are coroutines.

Parameters:

db_path (Path)

async close()[source]#

Close the underlying connection (idempotent).

Return type:

None

property db_path: Path#

The on-disk path of the queue database file.

async delete(job_id)[source]#

Remove a job row entirely. Used by the cleanup reaper after CLEANED.

Parameters:

job_id (str)

Return type:

None

async get_by_id(job_id)[source]#

Return the row with the given job_id or None.

Parameters:

job_id (str)

Return type:

SyncJobRow | None

async get_by_run_path(run_path)[source]#

Return the row whose run_path matches, or None.

Parameters:

run_path (Path)

Return type:

SyncJobRow | None

async init()[source]#

Open the connection, ensure the schema, and replay in-flight rows.

Replay semantics (§7.1.2): any RUNNING row at startup gets downgraded to QUEUED (the worker died mid-transfer); any AWAITING_VERIFY row is left as-is so the verifier picks it up.

Return type:

None

async insert(*, run_path, equipment_id, nas_path=None, job_id=None)[source]#

Insert a new QUEUED row for run_path.

Raises aiosqlite.IntegrityError (via the UNIQUE constraint on run_path) if a row already exists for the same path.

Parameters:
Return type:

SyncJobRow

static is_terminal(state)[source]#

Return True if the state is a terminal state (no further work).

Parameters:

state (SyncJobState)

Return type:

bool

async list_all()[source]#

Return every row in the queue ordered by enqueued_at.

Return type:

list[SyncJobRow]

async list_in_state(state)[source]#

Return every row currently in state ordered by enqueued_at.

Parameters:

state (SyncJobState)

Return type:

list[SyncJobRow]

async record_failure(job_id, error, *, terminal=False, now=None)[source]#

Record a transport failure on job_id.

If terminal is True (auth failure, local file vanished) the job goes straight to FAILED with no backoff. Otherwise:

  • increment attempts

  • if attempts >= MAX_ATTEMPTS: terminal FAILED.

  • else: stay in QUEUED with next_attempt_at per backoff.

Parameters:
Return type:

SyncJobRow

async reset_to_queued(job_id)[source]#

Reset a FAILED job back to QUEUED for a manual retry.

Per §7.1.5 the Problems-tab Retry action re-enqueues a failed job. attempts and last_error are cleared so the backoff schedule starts fresh.

Parameters:

job_id (str)

Return type:

SyncJobRow

async transition(job_id, new_state, *, last_error=None, increment_attempts=False, increment_verify_passes=False, verified_at=None, next_attempt_at=None, last_attempt_at=None, nas_path=None)[source]#

Transition a job to new_state and patch the auxiliary columns.

The patch is one UPDATE statement so either every column moves or none do. Raises ValueError if the job is missing.

Parameters:
Return type:

SyncJobRow