"""Durable SQLite-backed sync-job queue. Backend Spec §7.1.1, §7.1.2, §7.1.5.
The queue is a single SQLite file (``{state_dir}/sync_queue.db``) that
holds one row per NAS-sync job. Jobs persist across server restarts so
in-flight work survives process exits; on startup, ``RUNNING`` jobs are
re-queued and ``AWAITING_VERIFY`` jobs are re-verified.
State machine (§7.1.2)::
QUEUED -> RUNNING -> AWAITING_VERIFY -> VERIFIED -> CLEANUP_ELIGIBLE -> CLEANED
\\ \\ \\ \\
FAILED <- FAILED <- FAILED <- FAILED
Backoff (§7.1.5): ``30s, 2m, 8m, 30m, 2h``. After 5 failed attempts the
job becomes terminal ``FAILED``. Auth failures and ``local_file_vanished``
go straight to ``FAILED`` with no backoff.
The schema below mirrors the API contract in the Phase 10 brief:
.. code-block:: sql
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
run_path TEXT NOT NULL UNIQUE,
equipment_id TEXT NOT NULL,
state TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
next_attempt_at TEXT,
last_error TEXT,
verify_passes INTEGER NOT NULL DEFAULT 0,
verified_at TEXT,
enqueued_at TEXT NOT NULL,
nas_path TEXT
)
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from enum import StrEnum
from pathlib import Path
import aiosqlite
from exlab_wizard.logging import get_logger
from exlab_wizard.utils.time import dt_to_iso, utc_now_iso, utc_now_or
__all__ = [
"BACKOFF_SCHEDULE_SECONDS",
"MAX_ATTEMPTS",
"SyncJobRow",
"SyncJobState",
"SyncQueue",
]
_log = get_logger(__name__)
[docs]
class SyncJobState(StrEnum):
"""State machine for a sync job. Backend Spec §7.1.2."""
QUEUED = "queued"
RUNNING = "running"
AWAITING_VERIFY = "awaiting_verify"
VERIFIED = "verified"
CLEANUP_ELIGIBLE = "cleanup_eligible"
CLEANED = "cleaned"
FAILED = "failed"
# Retry backoff sequence per §7.1.5: 30s, 2m, 8m, 30m, 2h. After 5 attempts
# the job becomes terminal FAILED.
BACKOFF_SCHEDULE_SECONDS: tuple[int, ...] = (30, 120, 480, 1800, 7200)
MAX_ATTEMPTS: int = 5
_TERMINAL_STATES: frozenset[SyncJobState] = frozenset({SyncJobState.FAILED, SyncJobState.CLEANED})
_CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
run_path TEXT NOT NULL UNIQUE,
equipment_id TEXT NOT NULL,
state TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
next_attempt_at TEXT,
last_error TEXT,
verify_passes INTEGER NOT NULL DEFAULT 0,
verified_at TEXT,
enqueued_at TEXT NOT NULL,
nas_path TEXT
)
"""
_INDEX_SQL = (
"CREATE INDEX IF NOT EXISTS idx_jobs_state ON jobs(state)",
"CREATE INDEX IF NOT EXISTS idx_jobs_next_attempt ON jobs(next_attempt_at)",
)
[docs]
@dataclass(frozen=True, slots=True)
class SyncJobRow:
"""One row in the ``jobs`` table. Backend Spec §7.1.1."""
id: str
run_path: str
equipment_id: str
state: SyncJobState
attempts: int = 0
last_attempt_at: str | None = None
next_attempt_at: str | None = None
last_error: str | None = None
verify_passes: int = 0
verified_at: str | None = None
enqueued_at: str = ""
nas_path: str | None = None
def _row_to_job(row: aiosqlite.Row | tuple) -> SyncJobRow:
"""Materialize an ``aiosqlite.Row`` (or tuple) into a :class:`SyncJobRow`."""
return SyncJobRow(
id=row[0],
run_path=row[1],
equipment_id=row[2],
state=SyncJobState(row[3]),
attempts=row[4] or 0,
last_attempt_at=row[5],
next_attempt_at=row[6],
last_error=row[7],
verify_passes=row[8] or 0,
verified_at=row[9],
enqueued_at=row[10] or "",
nas_path=row[11],
)
def compute_next_attempt_at(*, attempts_after: int, now: datetime | None = None) -> str | None:
"""Return the ISO timestamp of the next retry attempt.
``attempts_after`` is the value of ``attempts`` AFTER the current
failure has been recorded. ``None`` means "no further attempts" --
either because the job is terminal or because the schedule has been
exhausted.
"""
if attempts_after < 1 or attempts_after > MAX_ATTEMPTS:
return None
delay_seconds = BACKOFF_SCHEDULE_SECONDS[attempts_after - 1]
moment = utc_now_or(now) + timedelta(seconds=delay_seconds)
return dt_to_iso(moment)
[docs]
class SyncQueue:
"""Async SQLite-backed durable sync queue. Backend Spec §7.1.1.
Use :meth:`init` once at application startup; the database file is
created on demand. After init, all CRUD methods are coroutines.
"""
def __init__(self, db_path: Path) -> None:
self._db_path = db_path
self._conn: aiosqlite.Connection | None = None
@property
def db_path(self) -> Path:
"""The on-disk path of the queue database file."""
return self._db_path
[docs]
async def init(self) -> None:
"""Open the connection, ensure the schema, and replay in-flight rows.
Replay semantics (§7.1.2): any ``RUNNING`` row at startup gets
downgraded to ``QUEUED`` (the worker died mid-transfer); any
``AWAITING_VERIFY`` row is left as-is so the verifier picks it up.
"""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = await aiosqlite.connect(str(self._db_path))
# Use WAL mode so concurrent readers do not block writers; this is
# the standard recommendation for SQLite-backed queues.
await self._conn.execute("PRAGMA journal_mode=WAL")
await self._conn.execute(_CREATE_TABLE_SQL)
for stmt in _INDEX_SQL:
await self._conn.execute(stmt)
# Re-queue jobs that were RUNNING when the previous process exited.
await self._conn.execute(
"UPDATE jobs SET state = ? WHERE state = ?",
(SyncJobState.QUEUED.value, SyncJobState.RUNNING.value),
)
await self._conn.commit()
_log.debug("SyncQueue init at %s", self._db_path)
[docs]
async def close(self) -> None:
"""Close the underlying connection (idempotent)."""
if self._conn is not None:
await self._conn.close()
self._conn = None
def _require_conn(self) -> aiosqlite.Connection:
if self._conn is None:
msg = "SyncQueue.init() must be called before use"
raise RuntimeError(msg)
return self._conn
async def _require_job(self, job_id: str) -> SyncJobRow:
"""Return the row for ``job_id`` or raise ``ValueError``.
Centralizes the "look up a job by id, raise on miss" pattern that
:meth:`transition`, :meth:`record_failure`, and
:meth:`reset_to_queued` all need.
"""
existing = await self.get_by_id(job_id)
if existing is None:
msg = f"unknown job_id {job_id!r}"
raise ValueError(msg)
return existing
# ----------------------------------------------------------- CRUD
[docs]
async def insert(
self,
*,
run_path: Path,
equipment_id: str,
nas_path: str | None = None,
job_id: str | None = None,
) -> SyncJobRow:
"""Insert a new ``QUEUED`` row for ``run_path``.
Raises :class:`aiosqlite.IntegrityError` (via the UNIQUE constraint
on ``run_path``) if a row already exists for the same path.
"""
conn = self._require_conn()
row = SyncJobRow(
id=job_id or str(uuid.uuid4()),
run_path=str(run_path),
equipment_id=equipment_id,
state=SyncJobState.QUEUED,
enqueued_at=utc_now_iso(),
nas_path=nas_path,
)
await conn.execute(
"""
INSERT INTO jobs (
id, run_path, equipment_id, state, attempts,
last_attempt_at, next_attempt_at, last_error,
verify_passes, verified_at, enqueued_at, nas_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
row.id,
row.run_path,
row.equipment_id,
row.state.value,
row.attempts,
row.last_attempt_at,
row.next_attempt_at,
row.last_error,
row.verify_passes,
row.verified_at,
row.enqueued_at,
row.nas_path,
),
)
await conn.commit()
return row
[docs]
async def get_by_id(self, job_id: str) -> SyncJobRow | None:
"""Return the row with the given ``job_id`` or ``None``."""
conn = self._require_conn()
cursor = await conn.execute(
"SELECT * FROM jobs WHERE id = ?",
(job_id,),
)
row = await cursor.fetchone()
await cursor.close()
if row is None:
return None
return _row_to_job(row)
[docs]
async def get_by_run_path(self, run_path: Path) -> SyncJobRow | None:
"""Return the row whose ``run_path`` matches, or ``None``."""
conn = self._require_conn()
cursor = await conn.execute(
"SELECT * FROM jobs WHERE run_path = ?",
(str(run_path),),
)
row = await cursor.fetchone()
await cursor.close()
if row is None:
return None
return _row_to_job(row)
[docs]
async def list_in_state(self, state: SyncJobState) -> list[SyncJobRow]:
"""Return every row currently in ``state`` ordered by ``enqueued_at``."""
conn = self._require_conn()
cursor = await conn.execute(
"SELECT * FROM jobs WHERE state = ? ORDER BY enqueued_at",
(state.value,),
)
rows = await cursor.fetchall()
await cursor.close()
return [_row_to_job(r) for r in rows]
[docs]
async def list_all(self) -> list[SyncJobRow]:
"""Return every row in the queue ordered by ``enqueued_at``."""
conn = self._require_conn()
cursor = await conn.execute("SELECT * FROM jobs ORDER BY enqueued_at")
rows = await cursor.fetchall()
await cursor.close()
return [_row_to_job(r) for r in rows]
# ----------------------------------------------------------- transitions
[docs]
async def transition(
self,
job_id: str,
new_state: SyncJobState,
*,
last_error: str | None = None,
increment_attempts: bool = False,
increment_verify_passes: bool = False,
verified_at: str | None = None,
next_attempt_at: str | None = None,
last_attempt_at: str | None = None,
nas_path: str | None = None,
) -> SyncJobRow:
"""Transition a job to ``new_state`` and patch the auxiliary columns.
The patch is one ``UPDATE`` statement so either every column moves
or none do. Raises :class:`ValueError` if the job is missing.
"""
existing = await self._require_job(job_id)
new_attempts = existing.attempts + (1 if increment_attempts else 0)
new_verify_passes = existing.verify_passes + (1 if increment_verify_passes else 0)
updated = replace(
existing,
state=new_state,
attempts=new_attempts,
last_attempt_at=last_attempt_at
if last_attempt_at is not None
else existing.last_attempt_at,
next_attempt_at=next_attempt_at
if next_attempt_at is not None
else existing.next_attempt_at,
last_error=last_error if last_error is not None else existing.last_error,
verify_passes=new_verify_passes,
verified_at=verified_at if verified_at is not None else existing.verified_at,
nas_path=nas_path if nas_path is not None else existing.nas_path,
)
conn = self._require_conn()
await conn.execute(
"""
UPDATE jobs SET
state = ?,
attempts = ?,
last_attempt_at = ?,
next_attempt_at = ?,
last_error = ?,
verify_passes = ?,
verified_at = ?,
nas_path = ?
WHERE id = ?
""",
(
updated.state.value,
updated.attempts,
updated.last_attempt_at,
updated.next_attempt_at,
updated.last_error,
updated.verify_passes,
updated.verified_at,
updated.nas_path,
updated.id,
),
)
await conn.commit()
return updated
[docs]
async def record_failure(
self,
job_id: str,
error: str,
*,
terminal: bool = False,
now: datetime | None = None,
) -> SyncJobRow:
"""Record a transport failure on ``job_id``.
If ``terminal`` is True (auth failure, local file vanished) the
job goes straight to ``FAILED`` with no backoff. Otherwise:
- increment ``attempts``
- if ``attempts >= MAX_ATTEMPTS``: terminal ``FAILED``.
- else: stay in ``QUEUED`` with ``next_attempt_at`` per backoff.
"""
existing = await self._require_job(job_id)
now = utc_now_or(now)
last_attempt_iso = dt_to_iso(now)
if terminal:
return await self.transition(
job_id,
SyncJobState.FAILED,
last_error=error,
last_attempt_at=last_attempt_iso,
next_attempt_at="",
)
new_attempts = existing.attempts + 1
if new_attempts >= MAX_ATTEMPTS:
return await self.transition(
job_id,
SyncJobState.FAILED,
increment_attempts=True,
last_error=error,
last_attempt_at=last_attempt_iso,
next_attempt_at="",
)
next_iso = compute_next_attempt_at(attempts_after=new_attempts, now=now) or ""
return await self.transition(
job_id,
SyncJobState.QUEUED,
increment_attempts=True,
last_error=error,
last_attempt_at=last_attempt_iso,
next_attempt_at=next_iso,
)
[docs]
async def reset_to_queued(self, job_id: str) -> SyncJobRow:
"""Reset a ``FAILED`` job back to ``QUEUED`` for a manual retry.
Per §7.1.5 the Problems-tab Retry action re-enqueues a failed job.
``attempts`` and ``last_error`` are cleared so the backoff schedule
starts fresh.
"""
# Side-effect: raises ``ValueError`` if the job id is unknown.
await self._require_job(job_id)
conn = self._require_conn()
await conn.execute(
"""
UPDATE jobs SET
state = ?,
attempts = 0,
last_attempt_at = NULL,
next_attempt_at = NULL,
last_error = NULL
WHERE id = ?
""",
(SyncJobState.QUEUED.value, job_id),
)
await conn.commit()
return await self.get_by_id(job_id) # type: ignore[return-value]
[docs]
async def delete(self, job_id: str) -> None:
"""Remove a job row entirely. Used by the cleanup reaper after CLEANED."""
conn = self._require_conn()
await conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,))
await conn.commit()
[docs]
@staticmethod
def is_terminal(state: SyncJobState) -> bool:
"""Return True if the state is a terminal state (no further work)."""
return state in _TERMINAL_STATES