Source code for exlab_wizard.sync.transports.rsync_ssh

"""rsync-over-SSH transport driver. Backend Spec §7.1.3.

Shells out to ``rsync -avz --checksum --partial -e "ssh -i <key> -o
BatchMode=yes" --bwlimit=<K> <local> <user>@<host>:<path>``. SSH
authentication is **key-based only**; the spec rejects password auth at
config-validation time (see :class:`exlab_wizard.config.models.RsyncSshTransport`).

The driver is intentionally thin: it calls the binary, captures stdout /
stderr, and translates the exit-code + stderr-substring into one of the
``TransportErrorKind`` retry classes. Hash verification is the
:mod:`exlab_wizard.sync.verifier` module's responsibility, NOT this
driver's.
"""

from __future__ import annotations

import shlex
from pathlib import Path

from exlab_wizard.logging import get_logger
from exlab_wizard.sync.transports import (
    TransportError,
    TransportErrorKind,
    TransportResult,
)
from exlab_wizard.sync.transports._run import run_subprocess

__all__ = ["RsyncSshTransport"]

_log = get_logger(__name__)


# rsync exit codes (selection used by the classifier):
# 0  = success
# 5  = error starting client-server protocol (often auth failure)
# 23 = some files could not be transferred (often permission-denied)
# 12 = error in rsync protocol data stream
# 30 = timeout in data send/receive
_RSYNC_AUTH_RETURNCODES: frozenset[int] = frozenset({5, 23})


# Substrings that indicate authentication failure rather than a transient
# network error. Match case-insensitive.
_AUTH_FAILURE_MARKERS: tuple[str, ...] = (
    "auth_error",
    "permission denied (publickey)",
    "permission denied",
    "authentication failed",
    "host key verification failed",
)


def _classify_failure(stderr: str, returncode: int) -> TransportErrorKind:
    """Map a (returncode, stderr) into a :class:`TransportErrorKind`."""
    lowered = stderr.lower()
    if any(marker in lowered for marker in _AUTH_FAILURE_MARKERS):
        return TransportErrorKind.AUTH
    if returncode in _RSYNC_AUTH_RETURNCODES and "permission" in lowered:
        return TransportErrorKind.AUTH
    if "hash mismatch" in lowered or "checksum mismatch" in lowered:
        return TransportErrorKind.HASH_MISMATCH
    if returncode != 0:
        return TransportErrorKind.NETWORK
    return TransportErrorKind.UNKNOWN


def _ssh_command(key: Path) -> list[str]:
    """Return the canonical ``ssh`` argv prefix used by both push and hashsum.

    Centralises the ``-i <key> -o BatchMode=yes`` invocation so the push
    path (which embeds it as a single ``-e`` value via ``shlex.join``) and
    the hashsum path (which uses the list directly as the argv prefix)
    cannot drift.
    """
    return ["ssh", "-i", str(key), "-o", "BatchMode=yes"]


[docs] class RsyncSshTransport: """rsync-over-SSH transport driver. Backend Spec §7.1.3.""" def __init__(self, *, binary: str = "rsync") -> None: self._binary = binary
[docs] async def push( self, local: Path, ssh_target: str, ssh_key_path: Path, remote_path: str, *, bwlimit_kibps: int | None = None, ) -> TransportResult: """Run ``rsync -avz --checksum`` from ``local`` to ``ssh_target:remote_path``. ``ssh_target`` is ``<user>@<host>``. ``ssh_key_path`` is forwarded via ``-e 'ssh -i <key> -o BatchMode=yes'`` so the driver never prompts for a password. Returns a :class:`TransportResult`. Raises :class:`TransportError` when the rsync binary is missing (no retry will help). """ ssh_cmd = shlex.join(_ssh_command(ssh_key_path)) cmd: list[str] = [ self._binary, "-avz", "--checksum", "--partial", "-e", ssh_cmd, ] if bwlimit_kibps is not None and bwlimit_kibps > 0: cmd.append(f"--bwlimit={bwlimit_kibps}") cmd.append(str(local)) cmd.append(f"{ssh_target}:{remote_path}") _log.debug("rsync cmd: %s", shlex.join(cmd)) try: rc, stdout, stderr = await run_subprocess(cmd) except FileNotFoundError as exc: msg = f"rsync binary not found: {self._binary!r}" raise TransportError(msg) from exc if rc == 0: return TransportResult(ok=True, returncode=0, stdout=stdout, stderr=stderr) kind = _classify_failure(stderr, rc) _log.warning("rsync failed rc=%d kind=%s", rc, kind.value) return TransportResult( ok=False, error_kind=kind, stderr=stderr, stdout=stdout, returncode=rc, )
[docs] async def hashsum( self, ssh_target: str, ssh_key_path: Path, remote_path: str, ) -> dict[str, str]: """Probe ``remote_path`` via ``ssh ... find ... sha256sum`` and parse. Returns a ``{relative-path: sha256-hex}`` dict whose keys are stripped of the ``remote_path`` prefix so they match the local manifest's relative-path convention (see :meth:`exlab_wizard.sync.verifier.Verifier.compute_local_manifest`) on success (``rc == 0``). The dict may legitimately be empty if the remote subtree contains no files. Failure modes are surfaced as :class:`TransportError` with the classified ``error_kind`` so the caller (the verifier / queue worker) can route via the spec-correct §7.1.5 retry path: - ``AUTH`` -- terminal FAILED. - ``NETWORK`` / ``UNKNOWN`` -- backoff retry. Spawn failure (binary missing) also raises :class:`TransportError` but with ``error_kind=None`` so the worker treats it as a non-terminal failure (operator can install ssh and the job will retry rather than terminating). """ from exlab_wizard.sync.verifier import parse_manifest remote_cmd = f"find {shlex.quote(remote_path)} -type f -exec sha256sum {{}} +" cmd: list[str] = [*_ssh_command(ssh_key_path), ssh_target, remote_cmd] _log.debug("rsync ssh hashsum cmd: %s", shlex.join(cmd)) try: rc, stdout, stderr = await run_subprocess(cmd) except FileNotFoundError as exc: msg = "ssh binary not found: ssh" raise TransportError(msg) from exc if rc != 0: kind = _classify_failure(stderr, rc) _log.warning("rsync ssh hashsum failed rc=%d kind=%s", rc, kind.value) msg = f"rsync ssh hashsum failed rc={rc} kind={kind.value}: {stderr.strip()}" raise TransportError(msg, error_kind=kind) # Parse, then strip the remote_path prefix so keys are run-relative. absolute_manifest = parse_manifest(stdout) prefix = remote_path.rstrip("/") + "/" relative: dict[str, str] = {} for path_key, hex_digest in absolute_manifest.items(): if path_key.startswith(prefix): relative[path_key[len(prefix) :]] = hex_digest elif path_key == remote_path.rstrip("/"): # A single-file remote target. Use the basename. relative[Path(path_key).name] = hex_digest else: # Fallback: keep the original key. relative[path_key] = hex_digest return relative