Source code for exlab_wizard.sync.cleanup
"""Cleanup safety interlocks. Backend Spec §7.1.6.
The cleanup reaper deletes local files only when **all** of the
following hold for a job:
1. ``verify_passes >= nas_cleanup.min_verify_passes`` (default 2).
2. Hours since the most recent ``verified_at`` >= ``min_age_hours``
(default 24).
3. The remote NAS path is reachable (the caller passes the result of
a remote ``stat`` as ``remote_stat_ok``).
4. No active ``validation_overrides`` revocation (tombstone) has been
written within the last ``min_age_hours`` -- a revoked override
re-blocks sync, so we don't want to delete locally if the run is
now blocked.
This module is the pure interlock evaluator. The reaper itself lives in
:mod:`exlab_wizard.sync.nas_client`; it consults this helper before
issuing a delete.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from exlab_wizard.config.models import NASCleanupConfig
from exlab_wizard.logging import get_logger
from exlab_wizard.sync.queue import SyncJobRow
from exlab_wizard.utils.time import parse_utc_iso_or_none
__all__ = [
"cleanup_interlocks_satisfied",
"has_recent_revocation",
]
_log = get_logger(__name__)
[docs]
def has_recent_revocation(
overrides: list[dict[str, Any]],
*,
now_utc: datetime,
min_age_hours: int,
) -> bool:
"""Return True if any tombstone was recorded within ``min_age_hours``.
A tombstone is an override-list entry with ``revoked: True``. The
timestamp is read from ``recorded_at``; entries with a missing or
malformed timestamp are treated as recent (fail-safe: we'd rather
block cleanup than delete files for a run that may be re-gated).
"""
cutoff = now_utc - timedelta(hours=min_age_hours)
for entry in overrides:
if not entry.get("revoked", False):
continue
recorded = parse_utc_iso_or_none(entry.get("recorded_at"))
if recorded is None:
# Malformed timestamp: be conservative.
return True
if recorded > cutoff:
return True
return False
[docs]
def cleanup_interlocks_satisfied(
*,
job: SyncJobRow,
run_path: Any,
now_utc: datetime,
config: NASCleanupConfig,
overrides_active: list[dict[str, Any]],
remote_stat_ok: bool,
) -> bool:
"""Evaluate every §7.1.6 interlock; return True iff all pass.
Logs a debug entry naming the failing interlock when one fails so
the operator can see why a job stayed in ``CLEANUP_ELIGIBLE``.
The ``run_path`` parameter is accepted (but currently unused) so
callers can pass the run directory through unchanged; future
interlocks (e.g., size-on-disk threshold) may consult it.
"""
# 1. verify_passes threshold.
if job.verify_passes < config.min_verify_passes:
_log.debug(
"cleanup blocked: verify_passes=%d < min=%d for job %s",
job.verify_passes,
config.min_verify_passes,
job.id,
)
return False
# 2. min_age_hours since verified_at.
verified_dt = parse_utc_iso_or_none(job.verified_at)
if verified_dt is None:
_log.debug("cleanup blocked: verified_at missing/malformed for job %s", job.id)
return False
age = now_utc - verified_dt
if age < timedelta(hours=config.min_age_hours):
_log.debug(
"cleanup blocked: age=%s < min_age=%dh for job %s",
age,
config.min_age_hours,
job.id,
)
return False
# 3. remote NAS reachable.
if not remote_stat_ok:
_log.debug("cleanup blocked: remote stat failed for job %s", job.id)
return False
# 4. no recent revocation.
if has_recent_revocation(
overrides_active,
now_utc=now_utc,
min_age_hours=config.min_age_hours,
):
_log.debug("cleanup blocked: recent revocation for job %s", job.id)
return False
return True