Source code for exlab_wizard.orchestrator.cleanup

"""Staging-side cleanup helpers. Backend Spec §13.7.

Once a run is verified on the NAS, the orchestrator deletes the local
staging copy. Two policies are supported:

* ``manual`` (default for v1) -- only an explicit operator action
  advances ``sync_verified`` -> ``cleared``. The watcher never auto-clears.
* ``scheduled`` -- runs whose ``sync_verified_at`` was at least
  ``retain_hours`` ago are auto-cleared by the periodic sweep.

Deletion is logged with file count and bytes freed (§13.7).

Both helpers are pure read-side utilities except :func:`clear_run`,
which performs the on-disk delete and writes the ``cleared`` history
entry. The watcher keeps the responsibility of *deciding* when to call
:func:`clear_run` -- this module only enforces the policy and the
filesystem effect.
"""

from __future__ import annotations

import shutil
from datetime import datetime, timedelta
from pathlib import Path

from exlab_wizard.api.schemas import IngestJson
from exlab_wizard.cache.ingest_writer import IngestWriter, default_host
from exlab_wizard.config.models import Config
from exlab_wizard.constants import (
    IngestState,
    StagingCleanupMode,
)
from exlab_wizard.logging import get_logger
from exlab_wizard.paths import ingest_json_path
from exlab_wizard.utils.time import parse_utc_iso_or_none, utc_now_or

__all__ = ["cleanup_eligible", "clear_run", "freed_bytes_and_count"]

_log = get_logger(__name__)


[docs] def cleanup_eligible( *, ingest: IngestJson, config: Config, now_utc: datetime | None = None, ) -> bool: """Return True if the run's local staging copy should be cleared now. Backend Spec §13.7: * ``manual`` -- always returns False. The operator must invoke :func:`clear_run` directly (UI button or API). * ``scheduled`` -- returns True iff ``current_state == sync_verified`` AND ``sync_verified_at + retain_hours <= now_utc``. A run that is not ``sync_verified`` is never eligible -- attempting to clear earlier states is a contract violation that the watcher must avoid. """ if ingest.current_state != IngestState.SYNC_VERIFIED.value: return False mode = config.orchestrator.staging_cleanup.mode if mode == StagingCleanupMode.MANUAL.value: return False if mode != StagingCleanupMode.SCHEDULED.value: # Defensive: the Pydantic Literal already constrains the values, # but if a future mode is added without a code path here we # default to "not eligible" -- the safer behaviour. return False verified_at = _find_state_timestamp(ingest, IngestState.SYNC_VERIFIED) if verified_at is None: return False now = utc_now_or(now_utc) retain_hours = config.orchestrator.staging_cleanup.retain_hours return verified_at + timedelta(hours=retain_hours) <= now
[docs] async def clear_run( run_path: Path, *, config: Config, ingest_writer: IngestWriter, host: str | None = None, ) -> tuple[int, int]: """Remove the staged run directory and append the ``cleared`` entry. Returns ``(file_count, bytes_freed)`` so the caller can log/notify accurately. The ingest entry is written **before** the deletion so a crash mid-clear leaves a coherent state record. The function is idempotent: calling it after the directory is gone is a no-op that returns ``(0, 0)`` and does not append a duplicate history entry. """ _ = config # kept on the signature for spec parity / future hooks if not run_path.exists(): # noqa: ASYNC240 -- one-shot stat, sync filelock cycle below return 0, 0 file_count, bytes_freed = freed_bytes_and_count(run_path) ingest_path = ingest_json_path(run_path) host_label = host or default_host() if ingest_path.exists(): await ingest_writer.append_state_transition( ingest_path, IngestState.CLEARED, host=host_label, ) # Now delete the staged directory in full -- the ingest.json entry # we just wrote is part of the directory and is acceptable to discard # because §13 only requires the ``cleared`` entry to flow to NAS via # the prior ``sync_verified`` transition (the NAS copy already has it). shutil.rmtree(run_path, ignore_errors=True) _log.info( "staging cleared: path=%s files=%d bytes_freed=%d host=%s", run_path, file_count, bytes_freed, host_label, ) return file_count, bytes_freed
[docs] def freed_bytes_and_count(run_path: Path) -> tuple[int, int]: """Sum file count and byte total under ``run_path``. Counts files only (directories are not counted as files); the ``.exlab-wizard/`` cache subtree is included because :func:`clear_run` deletes the whole run. """ files = 0 total = 0 if not run_path.exists(): return 0, 0 for entry in run_path.rglob("*"): try: if entry.is_file(): files += 1 total += entry.stat().st_size except OSError: continue return files, total
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _find_state_timestamp(ingest: IngestJson, target: IngestState) -> datetime | None: """Return the latest history-entry ``at`` for ``target`` or None.""" for entry in reversed(ingest.history): if entry.get("state") != target.value: continue return parse_utc_iso_or_none(entry.get("at")) return None