Source code for exlab_wizard.lims.keyring_store

"""OS keyring storage with encrypted-at-rest fallback. Backend Spec §7.4.

Two backends, automatic dispatch:

1. **OS keyring** -- the :mod:`keyring` package routes to Keychain
   (macOS), Credential Manager (Windows), or Secret Service (Linux
   desktop) per §7.4. This is the preferred path because the OS
   already owns lifecycle (encryption-at-rest, scoping to the OS user).
2. **Encrypted-at-rest fallback** -- when no keyring backend is
   available (Linux headless acquisition machines per §7.4.4), the
   store writes a single Fernet-encrypted JSON document at
   ``<state_dir>/exlab-wizard/secrets.enc``. The Fernet key is derived
   from a master passphrase via Argon2id (``time_cost=3``,
   ``memory_cost=64 MiB``, ``parallelism=4``). The salt lives next to
   the ciphertext in the same file (the salt is not secret; the
   passphrase is).

The store API is intentionally credential-agnostic: it indexes secrets
by ``(KEYRING_SERVICE, username)`` so the LIMS credential
(``username="lims"``) and per-equipment NAS credentials
(``username="nas:<equipment_id>"``) share one store.

The ``passphrase_provider`` callable is invoked lazily on the first
fallback-mode operation so that working keyring environments never
prompt the operator. The launcher wires it to ``getpass.getpass`` per
§7.4.4 step 3; tests can pass a constant-returning lambda.
"""

from __future__ import annotations

import base64
import os
from collections.abc import Callable
from pathlib import Path
from typing import Any

import keyring
import keyring.errors
import msgspec
from argon2.low_level import Type, hash_secret_raw
from cryptography.fernet import Fernet, InvalidToken

from exlab_wizard.constants import KEYRING_SERVICE, SECRETS_FILE
from exlab_wizard.errors import KeyringUnavailableError
from exlab_wizard.io import atomic_write_bytes
from exlab_wizard.logging import get_logger

__all__ = ["KeyringStore"]

logger = get_logger(__name__)

# Argon2id KDF parameters per Backend Spec §7.4.4 step 2.
_ARGON2_TIME_COST: int = 3
_ARGON2_MEMORY_KIB: int = 64 * 1024  # 64 MiB; argon2 takes KiB
_ARGON2_PARALLELISM: int = 4
_ARGON2_HASH_LEN: int = 32  # Fernet wants a 32-byte key (base64-encoded)
_ARGON2_SALT_BYTES: int = 16  # 8 KiB referenced in the spec is per-store -- 16 B is the standard salt length for the KDF inputs

_FALLBACK_FORMAT_VERSION: int = 1


[docs] class KeyringStore: """OS keyring with encrypted-at-rest fallback. Backend Spec §7.4. Tries the OS keyring first via the :mod:`keyring` module. When that backend raises :class:`keyring.errors.KeyringError` (or one of its subclasses for "no backend installed"), falls back to ``<state_dir>/exlab-wizard/secrets.enc`` encrypted with Fernet (AES-128-CBC + HMAC-SHA256), keyed by an Argon2id KDF over a user-supplied master passphrase. The fallback path requires a ``passphrase_provider`` callable; if the keyring is available this argument may be ``None`` -- it is only invoked when the fallback engages. """ def __init__( self, *, state_dir: Path, passphrase_provider: Callable[[], str] | None = None, ) -> None: self._state_dir = Path(state_dir) self._secrets_path = self._state_dir / SECRETS_FILE self._passphrase_provider = passphrase_provider # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def is_keyring_available(self) -> bool: """Best-effort probe of the OS keyring backend. Implemented as a round-trip ``set`` + ``delete`` of a sentinel value. A backend that raises on either side is considered unavailable. Per §7.4.4 the ``keyring`` package's "fail" backends raise :class:`keyring.errors.KeyringError` from these calls. """ sentinel_username = "_exlab_probe" try: keyring.set_password(KEYRING_SERVICE, sentinel_username, "ok") keyring.delete_password(KEYRING_SERVICE, sentinel_username) except keyring.errors.KeyringError: return False except Exception: return False return True
[docs] def get_password(self, *, username: str) -> str | None: """Look up the secret for ``(KEYRING_SERVICE, username)``. Returns ``None`` when the entry is absent in both backends. Errors from the OS keyring are caught and the fallback is consulted; errors from the fallback propagate as :class:`exlab_wizard.errors.KeyringUnavailableError`. """ try: value = keyring.get_password(KEYRING_SERVICE, username) except keyring.errors.KeyringError: return self._fallback_get(username) if value is not None: return value # Even when the keyring is healthy, an entry can be absent. For # the no-OS-keyring path the fallback file is still consulted so # a partially-migrated store does not appear empty. if self._secrets_path.exists(): return self._fallback_get(username) return None
[docs] def set_password(self, *, username: str, password: str) -> None: """Persist ``password`` under ``(KEYRING_SERVICE, username)``. On a keyring backend error the fallback file is written; on fallback failure this raises :class:`exlab_wizard.errors.KeyringUnavailableError`. """ try: keyring.set_password(KEYRING_SERVICE, username, password) return except keyring.errors.KeyringError: logger.warning("keyring.fallback_engaged", extra={"op": "set"}) self._fallback_set(username, password)
[docs] def delete_password(self, *, username: str) -> None: """Remove the secret stored under ``(KEYRING_SERVICE, username)``. Missing entries are silent in both backends. Keyring errors defer to the fallback. """ try: keyring.delete_password(KEYRING_SERVICE, username) except keyring.errors.PasswordDeleteError: # Entry was never set in the OS keyring. The fallback may # still hold it, so fall through. pass except keyring.errors.KeyringError: logger.warning("keyring.fallback_engaged", extra={"op": "delete"}) if self._secrets_path.exists(): self._fallback_delete(username)
# ------------------------------------------------------------------ # Fallback (encrypted-at-rest) implementation # ------------------------------------------------------------------ def _fallback_get(self, username: str) -> str | None: if not self._secrets_path.exists(): return None store = self._load_fallback() return store.get(username) def _fallback_set(self, username: str, password: str) -> None: store = self._load_fallback() if self._secrets_path.exists() else {} store[username] = password self._save_fallback(store) def _fallback_delete(self, username: str) -> None: if not self._secrets_path.exists(): return store = self._load_fallback() if username in store: del store[username] self._save_fallback(store) def _passphrase(self) -> str: if self._passphrase_provider is None: msg = ( "OS keyring is unavailable and no passphrase provider " "was supplied; cannot reach the encrypted fallback" ) raise KeyringUnavailableError(msg) passphrase = self._passphrase_provider() if not passphrase: msg = "passphrase provider returned an empty value" raise KeyringUnavailableError(msg) return passphrase def _derive_key(self, passphrase: str, salt: bytes) -> bytes: raw = hash_secret_raw( secret=passphrase.encode("utf-8"), salt=salt, time_cost=_ARGON2_TIME_COST, memory_cost=_ARGON2_MEMORY_KIB, parallelism=_ARGON2_PARALLELISM, hash_len=_ARGON2_HASH_LEN, type=Type.ID, ) return base64.urlsafe_b64encode(raw) def _load_fallback(self) -> dict[str, str]: try: raw = self._secrets_path.read_bytes() except OSError as exc: msg = f"could not read encrypted secrets file: {exc}" raise KeyringUnavailableError(msg) from exc envelope = self._decode_envelope(raw) salt = base64.b64decode(envelope["salt"]) ciphertext = envelope["ciphertext"].encode("ascii") key = self._derive_key(self._passphrase(), salt) try: plaintext = Fernet(key).decrypt(ciphertext) except InvalidToken as exc: msg = "encrypted secrets file failed authentication (wrong passphrase?)" raise KeyringUnavailableError(msg) from exc decoded = msgspec.json.decode(plaintext) if not isinstance(decoded, dict): msg = "encrypted secrets file does not contain a JSON object" raise KeyringUnavailableError(msg) return {str(k): str(v) for k, v in decoded.items()} def _save_fallback(self, store: dict[str, str]) -> None: self._state_dir.mkdir(parents=True, exist_ok=True) salt = self._existing_salt() or os.urandom(_ARGON2_SALT_BYTES) key = self._derive_key(self._passphrase(), salt) plaintext = msgspec.json.encode(store) ciphertext = Fernet(key).encrypt(plaintext).decode("ascii") envelope = { "version": _FALLBACK_FORMAT_VERSION, "salt": base64.b64encode(salt).decode("ascii"), "ciphertext": ciphertext, } encoded = msgspec.json.encode(envelope) atomic_write_bytes(self._secrets_path, encoded) def _existing_salt(self) -> bytes | None: if not self._secrets_path.exists(): return None try: raw = self._secrets_path.read_bytes() except OSError: return None try: envelope = self._decode_envelope(raw) except KeyringUnavailableError: return None return base64.b64decode(envelope["salt"]) @staticmethod def _decode_envelope(raw: bytes) -> dict[str, Any]: try: decoded = msgspec.json.decode(raw) except msgspec.DecodeError as exc: msg = f"encrypted secrets file is not valid JSON: {exc}" raise KeyringUnavailableError(msg) from exc if not isinstance(decoded, dict): msg = "encrypted secrets file is not a JSON object" raise KeyringUnavailableError(msg) if decoded.get("version") != _FALLBACK_FORMAT_VERSION: msg = ( f"encrypted secrets file version {decoded.get('version')!r} " f"is unsupported; expected {_FALLBACK_FORMAT_VERSION}" ) raise KeyringUnavailableError(msg) if "salt" not in decoded or "ciphertext" not in decoded: msg = "encrypted secrets file missing salt or ciphertext" raise KeyringUnavailableError(msg) return decoded