exlab_wizard.sync.queue#
Durable SQLite-backed sync-job queue. Backend Spec §7.1.1, §7.1.2, §7.1.5.
The queue is a single SQLite file ({state_dir}/sync_queue.db) that
holds one row per NAS-sync job. Jobs persist across server restarts so
in-flight work survives process exits; on startup, RUNNING jobs are
re-queued and AWAITING_VERIFY jobs are re-verified.
State machine (§7.1.2):
QUEUED -> RUNNING -> AWAITING_VERIFY -> VERIFIED -> CLEANUP_ELIGIBLE -> CLEANED
\ \ \ \
FAILED <- FAILED <- FAILED <- FAILED
Backoff (§7.1.5): 30s, 2m, 8m, 30m, 2h. After 5 failed attempts the
job becomes terminal FAILED. Auth failures and local_file_vanished
go straight to FAILED with no backoff.
The schema below mirrors the API contract in the Phase 10 brief:
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
run_path TEXT NOT NULL UNIQUE,
equipment_id TEXT NOT NULL,
state TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
next_attempt_at TEXT,
last_error TEXT,
verify_passes INTEGER NOT NULL DEFAULT 0,
verified_at TEXT,
enqueued_at TEXT NOT NULL,
nas_path TEXT
)
Functions
|
Return the ISO timestamp of the next retry attempt. |
Classes
|
One row in the |
|
State machine for a sync job. |
|
Async SQLite-backed durable sync queue. |
- class exlab_wizard.sync.queue.SyncJobRow(id, run_path, equipment_id, state, attempts=0, last_attempt_at=None, next_attempt_at=None, last_error=None, verify_passes=0, verified_at=None, enqueued_at='', nas_path=None)[source]#
Bases:
objectOne row in the
jobstable. Backend Spec §7.1.1.- Parameters:
- state: SyncJobState#
- class exlab_wizard.sync.queue.SyncJobState(*values)[source]#
Bases:
StrEnumState machine for a sync job. Backend Spec §7.1.2.
- AWAITING_VERIFY = 'awaiting_verify'#
- CLEANED = 'cleaned'#
- CLEANUP_ELIGIBLE = 'cleanup_eligible'#
- FAILED = 'failed'#
- QUEUED = 'queued'#
- RUNNING = 'running'#
- VERIFIED = 'verified'#
- class exlab_wizard.sync.queue.SyncQueue(db_path)[source]#
Bases:
objectAsync SQLite-backed durable sync queue. Backend Spec §7.1.1.
Use
init()once at application startup; the database file is created on demand. After init, all CRUD methods are coroutines.- Parameters:
db_path (
Path)
- async get_by_id(job_id)[source]#
Return the row with the given
job_idorNone.- Parameters:
job_id (
str)- Return type:
- async get_by_run_path(run_path)[source]#
Return the row whose
run_pathmatches, orNone.- Parameters:
run_path (
Path)- Return type:
- async init()[source]#
Open the connection, ensure the schema, and replay in-flight rows.
Replay semantics (§7.1.2): any
RUNNINGrow at startup gets downgraded toQUEUED(the worker died mid-transfer); anyAWAITING_VERIFYrow is left as-is so the verifier picks it up.- Return type:
- async insert(*, run_path, equipment_id, nas_path=None, job_id=None)[source]#
Insert a new
QUEUEDrow forrun_path.Raises
aiosqlite.IntegrityError(via the UNIQUE constraint onrun_path) if a row already exists for the same path.
- static is_terminal(state)[source]#
Return True if the state is a terminal state (no further work).
- Parameters:
state (
SyncJobState)- Return type:
- async list_in_state(state)[source]#
Return every row currently in
stateordered byenqueued_at.- Parameters:
state (
SyncJobState)- Return type:
- async record_failure(job_id, error, *, terminal=False, now=None)[source]#
Record a transport failure on
job_id.If
terminalis True (auth failure, local file vanished) the job goes straight toFAILEDwith no backoff. Otherwise:increment
attemptsif
attempts >= MAX_ATTEMPTS: terminalFAILED.else: stay in
QUEUEDwithnext_attempt_atper backoff.
- async reset_to_queued(job_id)[source]#
Reset a
FAILEDjob back toQUEUEDfor a manual retry.Per §7.1.5 the Problems-tab Retry action re-enqueues a failed job.
attemptsandlast_errorare cleared so the backoff schedule starts fresh.- Parameters:
job_id (
str)- Return type:
- async transition(job_id, new_state, *, last_error=None, increment_attempts=False, increment_verify_passes=False, verified_at=None, next_attempt_at=None, last_attempt_at=None, nas_path=None)[source]#
Transition a job to
new_stateand patch the auxiliary columns.The patch is one
UPDATEstatement so either every column moves or none do. RaisesValueErrorif the job is missing.