"""aiosqlite-backed cache for LIMS project rows. Backend Spec §7.2.4.
The cache lives at ``<xdg_cache_home>/exlab-wizard/lims_cache.db`` and
holds a flat row-per-project table keyed by ``(lims_endpoint, short_id)``.
Storing the endpoint as part of the primary key means a single workstation
can switch between LIMS instances during testing without colliding rows.
Access pattern:
- :meth:`LIMSCache.upsert_many` is called by ``LIMSClient.list_projects``
on every successful refresh. The ``last_refreshed`` column is stamped
with the wizard's current UTC ISO time at write.
- :meth:`LIMSCache.list_projects` and :meth:`LIMSCache.get_project` are
consulted before the network for §7.2.4 cache-first lookups.
- :meth:`LIMSCache.is_fresh` returns True when the most recent
``last_refreshed`` for the endpoint is within ``ttl_hours``. Callers
use this to decide whether to short-circuit a network refresh.
- The cache is **never** consulted for write paths -- there are no
writes to LIMS in v1.
The rows we materialize back into LIMSProject use the ``fetched_at``
column from ``last_refreshed``. ``metadata`` is round-tripped through
the JSON column ``metadata_json``.
"""
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from typing import Any
import aiosqlite
import msgspec
from exlab_wizard.lims.schemas import LIMSProject
from exlab_wizard.logging import get_logger
from exlab_wizard.utils.time import parse_utc_iso, utc_now
__all__ = ["LIMSCache"]
logger = get_logger(__name__)
_CREATE_TABLE_SQL: str = """
CREATE TABLE IF NOT EXISTS lims_projects (
lims_endpoint TEXT NOT NULL,
short_id TEXT NOT NULL,
uid TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL,
contact_name TEXT,
owner TEXT NOT NULL,
metadata_json TEXT NOT NULL,
last_refreshed TEXT NOT NULL,
PRIMARY KEY (lims_endpoint, short_id)
);
"""
_CREATE_INDEX_SQL: str = """
CREATE INDEX IF NOT EXISTS idx_endpoint_refresh
ON lims_projects(lims_endpoint, last_refreshed);
"""
_UPSERT_SQL: str = """
INSERT INTO lims_projects (
lims_endpoint, short_id, uid, name, description, status,
contact_name, owner, metadata_json, last_refreshed
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(lims_endpoint, short_id) DO UPDATE SET
uid=excluded.uid,
name=excluded.name,
description=excluded.description,
status=excluded.status,
contact_name=excluded.contact_name,
owner=excluded.owner,
metadata_json=excluded.metadata_json,
last_refreshed=excluded.last_refreshed
;
"""
_SELECT_ALL_SQL: str = """
SELECT uid, short_id, name, description, status, contact_name, owner,
metadata_json, last_refreshed
FROM lims_projects
WHERE lims_endpoint = ?
"""
_SELECT_ONE_SQL: str = """
SELECT uid, short_id, name, description, status, contact_name, owner,
metadata_json, last_refreshed
FROM lims_projects
WHERE lims_endpoint = ? AND (uid = ? OR short_id = ?)
LIMIT 1
"""
_MAX_REFRESH_SQL: str = """
SELECT MAX(last_refreshed) FROM lims_projects WHERE lims_endpoint = ?
"""
[docs]
class LIMSCache:
"""SQLite TTL cache for LIMS project rows. Backend Spec §7.2.4.
The cache is async-native via aiosqlite so that ``LIMSClient`` can
interleave cache reads with httpx network calls without blocking
the event loop.
"""
def __init__(self, db_path: Path, *, ttl_hours: int = 24) -> None:
self._db_path = Path(db_path)
self._ttl_hours = ttl_hours
self._conn: aiosqlite.Connection | None = None
[docs]
async def init(self) -> None:
"""Create the table and index if absent. Idempotent."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = await aiosqlite.connect(str(self._db_path))
await self._conn.execute(_CREATE_TABLE_SQL)
await self._conn.execute(_CREATE_INDEX_SQL)
await self._conn.commit()
[docs]
async def close(self) -> None:
"""Close the underlying aiosqlite connection."""
if self._conn is not None:
await self._conn.close()
self._conn = None
[docs]
async def upsert_many(self, endpoint: str, projects: list[LIMSProject]) -> None:
"""Insert or update every row. ``last_refreshed`` is taken from
each ``LIMSProject.fetched_at``; the caller stamps that value
before invoking this method so a single refresh batch shares one
timestamp.
"""
if not projects:
return
conn = self._require_conn()
rows = [
(
endpoint,
project.short_id,
project.uid,
project.name,
project.description,
project.status,
project.contact_name,
project.owner,
msgspec.json.encode(project.metadata).decode("utf-8"),
project.fetched_at,
)
for project in projects
]
await conn.executemany(_UPSERT_SQL, rows)
await conn.commit()
[docs]
async def list_projects(
self, endpoint: str, *, status_filter: list[str] | None = None
) -> list[LIMSProject]:
"""Return every cached project for ``endpoint``, optionally
filtered by ``status_filter`` (an OR of allowed status values).
"""
conn = self._require_conn()
async with conn.execute(_SELECT_ALL_SQL, (endpoint,)) as cursor:
rows = await cursor.fetchall()
projects = [self._row_to_project(row) for row in rows]
if status_filter:
allowed = set(status_filter)
projects = [p for p in projects if p.status in allowed]
return projects
[docs]
async def get_project(self, endpoint: str, uid_or_short_id: str) -> LIMSProject | None:
"""Return one project by uid or short_id, or None if absent."""
conn = self._require_conn()
async with conn.execute(
_SELECT_ONE_SQL, (endpoint, uid_or_short_id, uid_or_short_id)
) as cursor:
row = await cursor.fetchone()
return self._row_to_project(row) if row else None
[docs]
async def is_fresh(self, endpoint: str) -> bool:
"""True iff the most recent ``last_refreshed`` is within
``ttl_hours`` of the wizard's current UTC time. False when the
cache has no rows for ``endpoint``.
"""
conn = self._require_conn()
async with conn.execute(_MAX_REFRESH_SQL, (endpoint,)) as cursor:
row = await cursor.fetchone()
if row is None or row[0] is None:
return False
try:
most_recent = parse_utc_iso(row[0])
except ValueError:
logger.warning("lims_cache.invalid_timestamp", extra={"value": row[0]})
return False
cutoff = utc_now() - timedelta(hours=self._ttl_hours)
return most_recent >= cutoff
# ------------------------------------------------------------------
# internal
# ------------------------------------------------------------------
def _require_conn(self) -> aiosqlite.Connection:
if self._conn is None:
msg = "LIMSCache.init() has not been called"
raise RuntimeError(msg)
return self._conn
@staticmethod
def _row_to_project(row: aiosqlite.Row | tuple[Any, ...]) -> LIMSProject:
(
uid,
short_id,
name,
description,
status,
contact_name,
owner,
metadata_json,
last_refreshed,
) = row
metadata = msgspec.json.decode(metadata_json) if metadata_json else {}
return LIMSProject(
uid=uid,
short_id=short_id,
name=name,
description=description,
status=status,
contact_name=contact_name,
owner=owner,
metadata=metadata,
fetched_at=last_refreshed,
)