"""Orchestrator-only writer for ``ingest.json``. Backend Spec §13.4, §4.4.5.
The orchestrator writes one ``ingest.json`` per staged run, capturing the
five-state lifecycle (§13.3): ``staging`` -> ``complete`` -> ``sync_queued``
-> ``sync_verified`` -> ``cleared``. State transitions are append-only -- a
new entry is added to the ``history`` array on every transition, and the
top-level ``current_state`` mirrors the latest entry.
Disk-side guarantees per §4.4.5:
* ``msgspec.json`` for typed encode/decode (schema validation in one pass).
* ``filelock.FileLock`` advisory exclusive lock around the read-mutate-write
cycle so concurrent appends never lose entries.
* Atomic write via tempfile + ``fsync`` + ``os.replace``.
Reads enforce the §11.9.2 reader policy: a file at a different schema major
than the writer raises ``SchemaMajorMismatchError`` (no silent partial-parse
across major boundaries).
"""
from __future__ import annotations
import asyncio
import socket
from pathlib import Path
from typing import Any
import msgspec
from filelock import FileLock
from exlab_wizard.api.schemas import IngestJson
from exlab_wizard.cache import lock_path_for
from exlab_wizard.constants import INGEST_JSON_VERSION, IngestState
from exlab_wizard.io import atomic_write_bytes, read_msgspec_json
from exlab_wizard.logging import get_logger
from exlab_wizard.utils.state import assert_forward_transition
from exlab_wizard.utils.time import utc_now_iso
__all__ = ["IngestWriter"]
_logger = get_logger(__name__)
# Forward state-machine map per §13.3. Each key is the source state; the
# value is the set of states reachable in one step. Any transition not
# present here is rejected by ``append_state_transition``.
_FORWARD_TRANSITIONS: dict[IngestState, frozenset[IngestState]] = {
IngestState.STAGING: frozenset({IngestState.COMPLETE}),
IngestState.COMPLETE: frozenset({IngestState.SYNC_QUEUED}),
IngestState.SYNC_QUEUED: frozenset({IngestState.SYNC_VERIFIED}),
IngestState.SYNC_VERIFIED: frozenset({IngestState.CLEARED}),
IngestState.CLEARED: frozenset(),
}
# Reader's expected major version (every writer always emits this major).
_EXPECTED_MAJOR: int = int(INGEST_JSON_VERSION.split(".", 1)[0])
[docs]
class IngestWriter:
"""Writer for ``ingest.json``. Orchestrator-mode only.
Backend Spec §13.4. State history is append-only (§13.3) to preserve full
audit history -- the writer never edits or deletes prior entries.
All public methods are ``async`` to match the §4.4.5 ``CacheWriter``
contract; the blocking lock + I/O work is dispatched through
``asyncio.to_thread`` so the FastAPI event loop is never blocked.
"""
[docs]
async def write_ingest(self, path: Path, payload: IngestJson) -> None:
"""Write ``payload`` to ``path`` atomically under an exclusive lock.
Reserved for the initial-creation path (no file exists yet); the
per-file lock is taken defensively so a concurrent initial-write
attempt serializes rather than races.
"""
await asyncio.to_thread(self._write_ingest_blocking, path, payload)
[docs]
async def read_ingest(self, path: Path) -> IngestJson:
"""Read and decode ``path`` into an ``IngestJson``.
Raises ``SchemaMajorMismatchError`` (§11.9.2) when the on-disk file
carries a different schema major than ``INGEST_JSON_VERSION``.
"""
return await asyncio.to_thread(self._read_ingest_blocking, path)
[docs]
async def append_state_transition(
self,
path: Path,
new_state: IngestState,
*,
host: str,
files_received: int | None = None,
bytes_received: int | None = None,
nas_path: str | None = None,
checksum_file: str | None = None,
) -> IngestJson:
"""Append a state-transition entry and update ``current_state``.
File-locked for the entire read-mutate-write cycle so concurrent
callers never lose entries. The new history entry has the shape::
{"state": "<new_state>", "at": "<UTC ISO 8601>", "host": "<host>"}
Per §13.4 the entry carries optional extras when transitioning to
specific states:
* ``complete`` -- ``files_received`` and ``bytes_received``.
* ``sync_verified`` -- ``nas_path`` and ``checksum_file``.
Other state transitions ignore those extras (they are silently dropped
because the spec does not define their meaning at those states).
Raises ``ValueError`` if ``new_state`` is not a permitted forward
transition from the file's current state. Going backward (e.g.
``cleared`` -> ``staging``) is rejected. The full state machine is
documented in ``_FORWARD_TRANSITIONS`` above and §13.3.
"""
return await asyncio.to_thread(
self._append_state_transition_blocking,
path,
new_state,
host,
files_received,
bytes_received,
nas_path,
checksum_file,
)
# ---- Blocking helpers (run via asyncio.to_thread) ---------------------
def _write_ingest_blocking(self, path: Path, payload: IngestJson) -> None:
with FileLock(lock_path_for(path)):
atomic_write_bytes(path, msgspec.json.encode(payload))
_logger.info(
"ingest.json written: %s (current_state=%s)",
path,
payload.current_state,
)
def _read_ingest_blocking(self, path: Path) -> IngestJson:
with FileLock(lock_path_for(path)):
return self._decode_ingest_locked(path)
@staticmethod
def _decode_ingest_locked(path: Path) -> IngestJson:
"""Decode ``ingest.json`` with the §11.9.2 schema-major gate.
Caller MUST already hold the per-file ``FileLock``.
"""
return read_msgspec_json(path, IngestJson, expected_major=_EXPECTED_MAJOR)
def _append_state_transition_blocking(
self,
path: Path,
new_state: IngestState,
host: str,
files_received: int | None,
bytes_received: int | None,
nas_path: str | None,
checksum_file: str | None,
) -> IngestJson:
with FileLock(lock_path_for(path)):
payload = self._decode_ingest_locked(path)
current = IngestState(payload.current_state)
assert_forward_transition(current, new_state, _FORWARD_TRANSITIONS)
entry: dict[str, Any] = {
"state": new_state.value,
"at": utc_now_iso(),
"host": host,
}
if new_state is IngestState.COMPLETE:
if files_received is not None:
entry["files_received"] = files_received
if bytes_received is not None:
entry["bytes_received"] = bytes_received
elif new_state is IngestState.SYNC_VERIFIED:
if nas_path is not None:
entry["nas_path"] = nas_path
if checksum_file is not None:
entry["checksum_file"] = checksum_file
new_history = [*payload.history, entry]
new_payload = msgspec.structs.replace(
payload,
current_state=new_state.value,
history=new_history,
)
atomic_write_bytes(path, msgspec.json.encode(new_payload))
_logger.info(
"ingest.json transition: %s -> %s (host=%s, path=%s)",
current.value,
new_state.value,
host,
path,
)
return new_payload
# Convenience for callers that need a default host string. Not part of the
# public class -- exposed for tests and the orchestrator session bootstrap.
def default_host() -> str:
"""Return ``socket.gethostname()`` (orchestrator default for ``host``)."""
return socket.gethostname()