Source code for spikelab.spike_sorting.guards._preflight

"""Pre-loop resource checks for the spike-sorting pipeline.

:func:`run_preflight` inspects the host before any sorter is spawned
and returns a list of :class:`PreflightFinding` records. Each finding
carries a level (``"warn"`` or ``"fail"``) and a human-readable
remediation hint.

The intent is to surface predictable failure causes — disk full, RAM
already exhausted, GPU saturated by another process, ``HDF5_PLUGIN_PATH``
pointing at a missing directory — at the start of the run rather than
after a long sort has already crashed the workstation.

Default behaviour is permissive: every finding is reported but only
``"fail"``-level findings raise. ``ExecutionConfig.preflight_strict``
flips ``"warn"`` findings into ``"fail"`` for stricter deployments.
"""

from __future__ import annotations

import contextlib
import logging
import math
import os
import re
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple

_logger = logging.getLogger(__name__)

from .._exceptions import (
    EnvironmentSortFailure,
    HDF5PluginMissingError,
    ResourceSortFailure,
)


[docs] @dataclass class PreflightFinding: """A single resource-check finding from :func:`run_preflight`. Parameters: level (str): Either ``"warn"`` or ``"fail"``. code (str): Short stable identifier (e.g. ``"low_disk_inter"``, ``"low_vram"``). message (str): One-line description of what was observed. remediation (str or None): Suggested action for the operator. category (str): One of ``"resource"`` or ``"environment"`` — controls which exception subclass is raised when the finding is escalated. """ level: str code: str message: str remediation: Optional[str] = None category: str = "resource"
_GB = 1024**3 @contextlib.contextmanager def _with_pynvml(): """Yield the pynvml module after ``nvmlInit()`` and call ``nvmlShutdown`` on exit. Centralises the init / shutdown lifecycle so callers do not independently re-implement the per-call pattern. The helper yields ``None`` when pynvml is missing or initialisation fails; callers branch on that and fall back to alternative detection (``nvidia-smi``, ``torch.cuda``). Exceptions raised inside the with-block propagate; the ``finally`` always runs ``nvmlShutdown`` so the NVML context is never leaked even when ``nvmlDeviceGetMemoryInfo`` (or similar) raises with the handle still open. Yields: pynvml (module or None): The imported and initialised pynvml module, or ``None`` when unavailable. """ try: import pynvml except ImportError: yield None return try: pynvml.nvmlInit() except Exception: yield None return try: yield pynvml finally: try: pynvml.nvmlShutdown() except Exception: pass def _disk_free_gb(path: Path) -> Optional[float]: """Return free disk space in GB at *path*'s nearest existing parent.""" p = Path(path) while not p.exists() and p.parent != p: p = p.parent try: return shutil.disk_usage(str(p)).free / _GB except OSError: return None def _available_ram_gb() -> Optional[float]: """Return available host RAM in GB via psutil, or ``None``.""" try: import psutil return psutil.virtual_memory().available / _GB except ImportError: return None def _free_vram_gb() -> Optional[float]: """Return free GPU memory (sum across devices) in GB. Tries ``pynvml`` first via :func:`_with_pynvml` (which guarantees ``nvmlShutdown`` runs even when a per-device read raises mid-loop), falls back to parsing ``nvidia-smi``. Returns ``None`` when no GPU/driver is detectable. """ with _with_pynvml() as pynvml: if pynvml is not None: try: count = pynvml.nvmlDeviceGetCount() free_total = 0 for i in range(count): handle = pynvml.nvmlDeviceGetHandleByIndex(i) info = pynvml.nvmlDeviceGetMemoryInfo(handle) free_total += info.free return free_total / _GB except Exception: # ``_with_pynvml`` finally still runs ``nvmlShutdown`` # even though we swallow this exception; the handle is # not leaked. pass try: out = subprocess.check_output( [ "nvidia-smi", "--query-gpu=memory.free", "--format=csv,noheader,nounits", ], text=True, timeout=5, ) except (subprocess.SubprocessError, FileNotFoundError): return None free_total_mib = 0.0 for line in out.strip().splitlines(): try: free_total_mib += float(line.strip()) except ValueError: continue return free_total_mib / 1024.0 def _sorter_uses_gpu(config: Any) -> bool: """Return True when the configured sorter is GPU-backed. KS2 only uses the GPU through Docker (the MATLAB host path is CPU- only here); KS4 and RT-Sort always use the GPU. """ name = getattr(config.sorter, "sorter_name", "").lower() if name in ("kilosort4", "rt_sort"): return True if name == "kilosort2" and getattr(config.sorter, "use_docker", False): return True return False def _parse_wslconfig_memory_gb(text: str) -> Optional[float]: """Parse the ``[wsl2] memory=`` value from a ``.wslconfig`` body. Accepts ``memory=8GB``, ``memory=8192MB``, ``memory=8gb`` (any case), with or without surrounding whitespace. Returns ``None`` when the key is absent, malformed, or expressed in an unknown unit. Parameters: text (str): Full text of ``~/.wslconfig``. Returns: memory_gb (float or None): Configured WSL2 memory ceiling in GB. Returns ``None`` when not configured. """ # Strip UTF-8 BOM if present (Notepad-edited .wslconfig files commonly # start with one; Python's default text-mode `open` does NOT remove it # for utf-8, only for utf-8-sig). Without this the regex below fails # to match the "[wsl2]" section header on BOM-prefixed input. if text.startswith(""): text = text[1:] in_wsl2 = False for raw in text.splitlines(): # Drop inline comments (``memory=8GB ; comment`` is valid INI but # the regex below is anchored to end-of-line; an inline comment # would otherwise prevent any match). stripped = raw for marker in (";", "#"): idx = stripped.find(marker) if idx != -1: stripped = stripped[:idx] line = stripped.strip() if not line: continue if line.startswith("["): in_wsl2 = line.lower().startswith("[wsl2") continue if not in_wsl2: continue m = re.match(r"^memory\s*=\s*([\d.]+)\s*([a-zA-Z]+)?\s*$", line) if m is None: continue value = float(m.group(1)) unit = (m.group(2) or "GB").upper() if unit in ("GB", "G"): return value if unit in ("MB", "M"): return value / 1024.0 if unit in ("KB", "K"): return value / (1024.0 * 1024.0) return None return None def estimate_rt_sort_intermediate_gb( *, n_channels: int, n_samples: int, ) -> float: """Project the on-disk size of RT-Sort's intermediate files in GB. RT-Sort writes three large per-recording artefacts under the intermediate folder during ``detect_sequences``: * scaled traces (float32, 4 bytes/sample) * model traces (float16, 2 bytes/sample) * model outputs (float16, 2 bytes/sample) Total per-sample byte cost is therefore ``4 + 2 + 2 = 8`` bytes per channel-sample. Parameters: n_channels (int): Channel count of the recording. n_samples (int): Total samples (per channel) over the recording duration. Returns: gb (float): Projected on-disk footprint in GB. """ bytes_per_channel_sample = 4 + 2 + 2 total_bytes = float(n_channels) * float(n_samples) * float(bytes_per_channel_sample) return total_bytes / _GB def _rt_sort_disk_finding( config: Any, recording_files: Sequence[Any], intermediate_folders: Sequence[Any], ) -> Optional[PreflightFinding]: """Warn when RT-Sort's intermediate-folder footprint will not fit. Only fires for the RT-Sort sorter. Skips when channel count or sample count cannot be determined from the inputs (the recording is loaded lazily by ``load_recording`` later, so at preflight we only know the path — to keep the check cheap we accept that the estimate is ``None`` for inputs given as paths/strings instead of pre-loaded recordings). Notes: - Compares the *largest* single-recording estimate against the *smallest* free-disk among intermediate folders. This is correct only when each intermediate folder hosts at most one recording at a time. If multiple recordings share an intermediate volume, the cumulative footprint may still exceed free disk while no individual recording does — a per-volume sum would be more precise but is left as a future enhancement. """ if getattr(config.sorter, "sorter_name", "").lower() != "rt_sort": return None if not recording_files: return None # Try to extract channel and sample counts from any pre-loaded # recordings. Path-only inputs would require loading the file # here, which is too expensive for a preflight; we silently skip # those. estimates: List[Tuple[Any, float]] = [] for rec in recording_files: try: n_ch = int(rec.get_num_channels()) n_smp = int(rec.get_num_samples()) except Exception: continue estimates.append( (rec, estimate_rt_sort_intermediate_gb(n_channels=n_ch, n_samples=n_smp)) ) if not estimates: return None # Compare the largest estimate to the smallest free-disk among # intermediate folders. If we cannot read free disk (e.g. unusual # OS), bail out cleanly. largest_gb = max(g for _, g in estimates) free_gbs = [] for folder in intermediate_folders: free = _disk_free_gb(Path(folder)) if free is not None: free_gbs.append(free) if not free_gbs: return None smallest_free_gb = min(free_gbs) if largest_gb <= smallest_free_gb: return None return PreflightFinding( level="warn", code="rt_sort_disk_projection", category="resource", message=( f"RT-Sort: intermediate-folder projection of " f"{largest_gb:.1f} GB exceeds free disk " f"({smallest_free_gb:.1f} GB) on at least one " "intermediate path. RT-Sort writes scaled traces, model " "traces, and model outputs to disk during sequence " "detection." ), remediation=( "Free disk space, point intermediate_folders at a larger " "volume, or shorten the recording window via " "RTSortConfig.recording_window_ms." ), ) def _wslconfig_finding(config: Any) -> Optional[PreflightFinding]: """Warn when running Docker on Windows without a sane ``.wslconfig``. Docker Desktop on Windows runs in a WSL2 VM whose memory ceiling is governed by ``%USERPROFILE%\\.wslconfig`` ([wsl2] memory=...). When the file is missing or the limit is unset, the VM can grow beyond a safe fraction of physical RAM and drag the host into thrash even with a Docker ``mem_limit`` configured. Only relevant when: * Host platform is Windows. * The configured sorter uses Docker (KS2/KS4 with ``use_docker=True``). Returns ``None`` when neither condition holds, when ``.wslconfig`` is configured with a sensible memory ceiling, or when host RAM cannot be detected. """ if sys.platform != "win32": return None if not getattr(config.sorter, "use_docker", False): return None wslconfig = Path(os.path.expanduser("~")) / ".wslconfig" if not wslconfig.is_file(): return PreflightFinding( level="warn", code="wslconfig_missing", category="environment", message=( "Docker-on-Windows: ~/.wslconfig is missing. The WSL2 " "VM hosting Docker has no host-side memory ceiling, so " "a runaway sort can take Windows down even with the " "container's mem_limit set." ), remediation=( "Create %USERPROFILE%\\.wslconfig with [wsl2] " "memory=<N>GB where <N> is roughly 75% of host RAM, " "then run `wsl --shutdown` and restart Docker Desktop." ), ) try: text = wslconfig.read_text(errors="replace") except OSError: return None memory_gb = _parse_wslconfig_memory_gb(text) if memory_gb is None: return PreflightFinding( level="warn", code="wslconfig_no_memory", category="environment", message=( "Docker-on-Windows: ~/.wslconfig exists but has no " "[wsl2] memory= setting. Without it WSL2 can grow to " "consume up to half of host RAM by default." ), remediation=( "Add a [wsl2] section with memory=<N>GB (~75% of host " "RAM) to ~/.wslconfig, then run `wsl --shutdown`." ), ) # Guard against an over-generous setting relative to host RAM. try: from ..sorting_utils import get_system_ram_bytes host_bytes = get_system_ram_bytes() except Exception: host_bytes = None if host_bytes is not None: host_gb = host_bytes / _GB if memory_gb > 0.85 * host_gb: return PreflightFinding( level="warn", code="wslconfig_memory_too_high", category="environment", message=( f"Docker-on-Windows: ~/.wslconfig sets WSL2 " f"memory={memory_gb:.1f} GB on a {host_gb:.1f} GB " "host (>85% of physical RAM). A runaway sort can " "still drag Windows into swap." ), remediation=( "Lower [wsl2] memory= in ~/.wslconfig to ~75% of " "host RAM, then run `wsl --shutdown` and restart " "Docker Desktop." ), ) return None _KNOWN_RECORDING_EXTENSIONS = (".h5", ".nwb", ".dat", ".raw") def _validate_recording_inputs( recording_files: Sequence[Any], ) -> List[PreflightFinding]: """Quick existence + extension checks for path-style recording inputs. Catches typos and missing files in microseconds rather than the seconds-to-minutes that a full ``load_recording`` failure costs. Skips entries that are pre-loaded SpikeInterface ``BaseRecording`` objects — those have already been validated by the loader. Returns a list of findings: * ``recording_missing`` (level=fail, environment) — path doesn't exist on disk. * ``recording_extension_unknown`` (level=warn, environment) — file extension is not in the known list. Parameters: recording_files (sequence): Per-recording inputs. Can mix paths (``str`` / ``Path``) and pre-loaded ``BaseRecording`` objects; only path-style entries are checked. Returns: findings (list[PreflightFinding]): One finding per problem recording. Empty when all recordings exist and have a known extension. """ findings: List[PreflightFinding] = [] for i, entry in enumerate(recording_files): if entry is None: raise ValueError( f"recording_inputs[{i}] is None (caller bug — pass a path, " f"BaseRecording, or omit the entry)" ) rec = entry if not isinstance(rec, (str, Path)): # Pre-loaded recording object — skip. continue p = Path(rec) if not p.exists(): findings.append( PreflightFinding( level="fail", code="recording_missing", category="environment", message=(f"Recording {p!s} does not exist on disk."), remediation=( "Verify the path is correct, the file has not " "been moved, and any networked storage is " "mounted." ), ) ) continue if p.is_dir(): # Directory inputs are concatenated by the loader; no # extension check applies. continue # Get the full multi-suffix tail (e.g. ``.raw.h5``) and check # whether *any* extension in it is in the known list. suffixes = [s.lower() for s in p.suffixes] if not any(s in _KNOWN_RECORDING_EXTENSIONS for s in suffixes): ext_str = "".join(p.suffixes) if p.suffixes else "(no extension)" findings.append( PreflightFinding( level="warn", code="recording_extension_unknown", category="environment", message=( f"Recording {p.name} has unfamiliar extension " f"{ext_str}. Known extensions: " f"{', '.join(_KNOWN_RECORDING_EXTENSIONS)}." ), remediation=( "If the file is genuinely a supported format " "with an unusual extension this warning can be " "ignored; otherwise verify the path." ), ) ) return findings # Hard-coded fallbacks used when ``pyproject.toml`` cannot be # located (installed package may not ship its own pyproject) or # when the ``[tool.spikelab.tested_versions]`` section is missing. _FALLBACK_TESTED_SI_VERSION_RANGE = ("0.100.0", "0.110.0") _FALLBACK_TESTED_KILOSORT4_VERSION_RANGE = ("4.0.0", "5.0.0") def _load_tested_version_ranges() -> Tuple[Tuple[str, str], Tuple[str, str]]: """Return ``(si_range, ks4_range)`` sourced from pyproject.toml when available. Walks parents from this module's location to find the project root's ``pyproject.toml`` and reads ``[tool.spikelab.tested_versions]``. Falls back to the hard-coded constants when the file is not accessible (installed package, frozen wheel, etc.) or the section is missing — keeps the version-range source-of-truth aligned with the dependency pins without breaking installs that don't ship pyproject.toml. Returns: ranges (tuple): ``((si_low, si_high), (ks4_low, ks4_high))`` of dotted version strings. """ try: import tomllib # type: ignore[import-not-found] # py311+ except ImportError: try: import tomli as tomllib # type: ignore[import-not-found] except ImportError: return ( _FALLBACK_TESTED_SI_VERSION_RANGE, _FALLBACK_TESTED_KILOSORT4_VERSION_RANGE, ) # Walk up from this file looking for pyproject.toml — the # source layout is ``SpikeLab/src/spikelab/spike_sorting/guards/_preflight.py`` # and the pyproject lives at ``SpikeLab/pyproject.toml``. here = Path(__file__).resolve() for ancestor in [here, *here.parents]: candidate = ancestor.parent / "pyproject.toml" if candidate.is_file(): try: with open(candidate, "rb") as f: data = tomllib.load(f) except Exception: break tested = data.get("tool", {}).get("spikelab", {}).get("tested_versions", {}) si = tested.get("spikeinterface") ks4 = tested.get("kilosort4") si_range = ( tuple(si) if isinstance(si, list) and len(si) == 2 else _FALLBACK_TESTED_SI_VERSION_RANGE ) ks4_range = ( tuple(ks4) if isinstance(ks4, list) and len(ks4) == 2 else _FALLBACK_TESTED_KILOSORT4_VERSION_RANGE ) return si_range, ks4_range # type: ignore[return-value] return ( _FALLBACK_TESTED_SI_VERSION_RANGE, _FALLBACK_TESTED_KILOSORT4_VERSION_RANGE, ) _TESTED_SI_VERSION_RANGE, _TESTED_KILOSORT4_VERSION_RANGE = ( _load_tested_version_ranges() ) def _check_kilosort2_host(config: Any) -> List[PreflightFinding]: """Probe local Kilosort2 dependencies (host path, no Docker). KS2's host path needs MATLAB on PATH plus a checkout of the Kilosort2 sources containing ``master_kilosort.m``. The sources location is taken from ``SorterConfig.sorter_path`` when set, otherwise from the ``KILOSORT_PATH`` environment variable. Parameters: config (SortingPipelineConfig): Pipeline configuration. Returns: findings (list[PreflightFinding]): Up to two fail-level findings — one for missing ``matlab``, plus at most one for the source directory (either ``KILOSORT_PATH`` is unset, the configured directory does not exist, or the directory exists but is missing ``master_kilosort.m``). """ findings: List[PreflightFinding] = [] if shutil.which("matlab") is None: findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( "Kilosort2 (host) requires MATLAB but `matlab` was " "not found on PATH." ), remediation=( "Install MATLAB and ensure `matlab` resolves on " "PATH, or switch to Kilosort2 Docker via " "SorterConfig(use_docker=True)." ), ) ) ks_path = getattr(config.sorter, "sorter_path", None) or os.environ.get( "KILOSORT_PATH" ) if not ks_path: findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( "Kilosort2 (host) requires the Kilosort2 source " "directory but neither SorterConfig.sorter_path " "nor the KILOSORT_PATH environment variable is set." ), remediation=( "Clone https://github.com/MouseLand/Kilosort and " "set KILOSORT_PATH (or SorterConfig.sorter_path) " "to the directory containing master_kilosort.m." ), ) ) else: ks_dir = Path(ks_path) master_m = ks_dir / "master_kilosort.m" if not ks_dir.is_dir(): findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"Kilosort2 sources directory {ks_dir!s} does " "not exist." ), remediation=( "Set KILOSORT_PATH (or SorterConfig.sorter_path) " "to a valid Kilosort2 source directory." ), ) ) elif not master_m.is_file(): findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"Kilosort2 sources directory {ks_dir!s} does " "not contain master_kilosort.m." ), remediation=( "Verify KILOSORT_PATH points to the root of a " "Kilosort2 checkout (the directory holding " "master_kilosort.m)." ), ) ) return findings def _check_kilosort4_host(config: Any) -> List[PreflightFinding]: """Probe local Kilosort4 dependencies (host path, no Docker). Verifies the ``kilosort`` package imports and falls inside the SpikeLab-tested major-version window. Out-of-range versions warn rather than fail because newer KS4 releases sometimes work without incident — the warning just makes the operator aware. Parameters: config (SortingPipelineConfig): Pipeline configuration (unused, kept for signature symmetry with the other ``_check_*_host`` helpers). Returns: findings (list[PreflightFinding]): Empty when KS4 imports and the version is in-range; otherwise one fail- or warn-level finding. """ try: import kilosort as _ks4 except ImportError as exc: return [ PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"Kilosort4 (host) requires the `kilosort` Python " f"package but it is not importable: {exc}." ), remediation=( "Install Kilosort4 in the active environment " "(`pip install kilosort`), or switch to Kilosort4 " "Docker via SorterConfig(use_docker=True)." ), ) ] version = getattr(_ks4, "__version__", None) if version is None: return [] parsed = _parse_version(version) low = _parse_version(_TESTED_KILOSORT4_VERSION_RANGE[0]) high = _parse_version(_TESTED_KILOSORT4_VERSION_RANGE[1]) if parsed is None or low is None or high is None: return [] if low <= parsed < high: return [] return [ PreflightFinding( level="warn", code="kilosort4_version_outside_tested_range", category="environment", message=( f"Kilosort4 {version} is outside the SpikeLab tested " f"range [{_TESTED_KILOSORT4_VERSION_RANGE[0]}, " f"{_TESTED_KILOSORT4_VERSION_RANGE[1]})." ), remediation=( f"Pin Kilosort4 to a version inside " f"[{_TESTED_KILOSORT4_VERSION_RANGE[0]}, " f"{_TESTED_KILOSORT4_VERSION_RANGE[1]}), or run a smoke " "test before relying on a new release." ), ) ] def _ping_docker_daemon() -> Tuple[bool, Optional[PreflightFinding]]: """Confirm the Docker daemon is reachable. Tries the docker-py path first (when the package is installed), falls back to the ``docker info`` subprocess. Either path returns a ``(daemon_ok, finding)`` pair: ``daemon_ok`` is True only when the daemon responded; ``finding`` is non-None only when a fail-level finding should be surfaced. Returns: result (tuple): ``(daemon_ok, finding)``. The finding is None on success. """ try: import docker as _docker # type: ignore[import-not-found] try: _docker.from_env().ping() return True, None except Exception as exc: return False, PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=f"Docker daemon ping failed via docker-py: {exc!r}.", remediation=( "Start Docker Desktop / the docker service, or " "switch to a host-path sorter (set " "SorterConfig.use_docker=False)." ), ) except ImportError: pass try: subprocess.run( ["docker", "info"], check=True, timeout=5, capture_output=True, ) return True, None except ( subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired, ) as exc: return False, PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"Docker daemon is not reachable: `docker info` " f"failed ({exc!r})." ), remediation=( "Start Docker Desktop / the docker service, or " "switch to a host-path sorter (set " "SorterConfig.use_docker=False)." ), ) def _check_image_cached(image_tag: str) -> bool: """Return True when *image_tag* is in the local Docker image cache. Mirrors the daemon-ping helper's docker-py-then-subprocess fallback. Returns False on any failure mode (image missing, docker subprocess error, timeout); the caller decides how to surface that. """ try: import docker as _docker # type: ignore[import-not-found] try: _docker.from_env().images.get(image_tag) return True except Exception: return False except ImportError: pass try: subprocess.run( ["docker", "image", "inspect", image_tag], check=True, timeout=5, capture_output=True, ) return True except ( subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired, ): return False def _read_image_digest(image_tag: str) -> Optional[str]: """Return the local image's RepoDigest, or None if unavailable. Used to validate against ``config.execution.docker_image_expected_digest`` when digest pinning is configured. Returns None when docker-py is not importable, when the image is missing, when no digests are recorded for the local image, or on any other failure — callers should treat None as "could not validate". """ try: import docker as _docker # type: ignore[import-not-found] except ImportError: return None try: image = _docker.from_env().images.get(image_tag) except Exception: return None digests = getattr(image, "attrs", {}).get("RepoDigests") or [] if not digests: return None # Each entry is "repo@sha256:HEX"; return the first digest part. first = digests[0] return first.split("@", 1)[-1] if "@" in first else first def _check_docker_sorter(config: Any) -> List[PreflightFinding]: """Probe Docker-backed sorter dependencies (daemon + image cache). Validates that the Docker daemon is reachable and that the image selected by :func:`docker_utils.get_docker_image` for the chosen sorter is present in the local image cache. Pull-ability is not probed — that requires a registry round-trip and would defeat the "milliseconds-cheap" preflight goal; SpikeInterface will pull on first use if the image is missing, but we surface a warn-level finding so the operator knows ahead of time. When ``config.execution.docker_image_expected_digest`` is set, the locally-cached image's RepoDigest is validated against the pinned value — a mismatch is surfaced as a fail-level finding so a tampered or accidentally-replaced image is caught before the sort runs. Parameters: config (SortingPipelineConfig): Pipeline configuration. The caller is expected to gate on ``use_docker=True`` — ``get_docker_image`` may raise for sorters that have no Docker image registered (e.g. ``"rt_sort"``) when ``use_docker`` is False. Returns: findings (list[PreflightFinding]): Daemon failures are ``level="fail"``; missing local image is ``level="warn"``; digest mismatch is ``level="fail"``. """ findings: List[PreflightFinding] = [] daemon_ok, daemon_finding = _ping_docker_daemon() if daemon_finding is not None: findings.append(daemon_finding) if not daemon_ok: return findings # Resolve the image tag the sort would actually use, then check # the local cache. Tag resolution can raise (e.g. unsupported # CUDA version) — surface that as a finding rather than letting # it crash the preflight. try: from ..docker_utils import get_docker_image image_tag = get_docker_image(getattr(config.sorter, "sorter_name", "")) except Exception as exc: findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"Could not resolve the Docker image for " f"{getattr(config.sorter, 'sorter_name', '?')}: " f"{exc!r}." ), remediation=( "Check sorter name and CUDA driver compatibility. " "See docker_utils._IMAGE_REGISTRY for available tags." ), ) ) return findings if not _check_image_cached(image_tag): findings.append( PreflightFinding( level="warn", code="sorter_dependency_missing", category="environment", message=( f"Docker image {image_tag!s} is not in the local " "cache. SpikeInterface will attempt to pull it on " "first use, which can take minutes and fails " "without network connectivity to the registry." ), remediation=( f"Pre-pull the image with `docker pull {image_tag}` " "before launching the sort to fail fast on network " "or auth issues." ), ) ) # Skip digest validation when the image isn't even cached; # the warn above is the actionable finding. return findings execution = getattr(config, "execution", None) expected_digest = ( getattr(execution, "docker_image_expected_digest", None) if execution is not None else None ) if expected_digest: actual_digest = _read_image_digest(image_tag) if actual_digest is None: findings.append( PreflightFinding( level="warn", code="docker_image_digest_unverified", category="environment", message=( f"Docker image {image_tag!s} is cached locally " f"but its RepoDigest could not be read; " f"expected_digest pin " f"({expected_digest}) was not validated." ), remediation=( "Install docker-py (`pip install docker`) so " "preflight can read the image's RepoDigest, or " "drop ``docker_image_expected_digest`` from the " "config to silence the warning." ), ) ) elif actual_digest != expected_digest: findings.append( PreflightFinding( level="fail", code="docker_image_digest_mismatch", category="environment", message=( f"Docker image {image_tag!s} digest " f"({actual_digest}) does not match the pinned " f"expected_digest ({expected_digest})." ), remediation=( "The locally-cached image has been replaced or " "tampered with. Re-pull the pinned tag, or " "update ``docker_image_expected_digest`` to " "the new digest if the change is intentional." ), ) ) return findings def _check_rt_sort(config: Any) -> List[PreflightFinding]: """Probe RT-Sort runtime dependencies. RT-Sort needs PyTorch (DL detection model), diptest (amplitude unimodality test), scikit-learn (Gaussian mixture clustering), h5py (intermediate I/O), and tqdm (progress bars). When the configured device is CUDA, also verifies that ``torch.cuda`` is actually available so a missing driver does not surface as a cryptic kernel-launch error mid-sort. Parameters: config (SortingPipelineConfig): Pipeline configuration. Returns: findings (list[PreflightFinding]): One fail-level finding per missing import; one additional fail when ``device="cuda"`` but ``torch.cuda.is_available()`` is False. """ findings: List[PreflightFinding] = [] torch_missing = False required = [ ("torch", "PyTorch — required for the DL detection model"), ("diptest", "amplitude unimodality test in cluster splitting"), ("sklearn", "Gaussian mixture clustering"), ("h5py", "intermediate scaled-traces I/O"), ("tqdm", "progress bars"), ] # ``importlib.util.find_spec`` checks for importability without # actually executing the module. Some optional deps (notably # torch) perform lazy CUDA init at import time; ``__import__`` # would trigger that side effect just to confirm presence. import importlib.util as _importutil for module_name, role in required: try: spec = _importutil.find_spec(module_name) except (ImportError, ValueError) as exc: spec = None spec_error: Optional[BaseException] = exc else: spec_error = None if spec is None: if module_name == "torch": torch_missing = True reason = repr(spec_error) if spec_error is not None else "module not found" findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"RT-Sort requires `{module_name}` ({role}) but " f"it is not importable: {reason}." ), remediation=( f"Install the missing dependency, e.g. " f"`pip install {module_name}`." ), ) ) device = str(getattr(config.rt_sort, "device", "") or "") # Short-circuit the cuda probe when torch is already known # missing — the dependency-missing finding above already covers # the operator-actionable case, and re-importing torch here # would always hit the inner ImportError branch unreachably. if device.startswith("cuda") and not torch_missing: try: import torch as _torch cuda_available = _torch.cuda.is_available() except ImportError: # Already reported above as a missing dependency. cuda_available = True # suppress the follow-up cuda finding except (RuntimeError, Exception) as exc: # A broken CUDA install can make ``torch.cuda.is_available()`` # raise RuntimeError (driver/runtime mismatch) rather than # return False. Surface it as a fail-level dependency finding # so the operator gets actionable detail instead of a raw # traceback escaping out of preflight. findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"RT-Sort is configured with device={device!r} but " f"torch.cuda.is_available() raised " f"{type(exc).__name__}: {exc}." ), remediation=( "Check CUDA driver/runtime version compatibility, " "reinstall a matching CUDA-enabled PyTorch build, " "or switch RTSortConfig(device='cpu')." ), ) ) cuda_available = True # suppress the follow-up cuda finding if not cuda_available: findings.append( PreflightFinding( level="fail", code="sorter_dependency_missing", category="environment", message=( f"RT-Sort is configured with device={device!r} " "but torch.cuda.is_available() is False." ), remediation=( "Verify the NVIDIA driver, install a CUDA-" "enabled PyTorch build, or switch RTSortConfig" "(device='cpu')." ), ) ) return findings def _check_sorter_dependencies(config: Any) -> List[PreflightFinding]: """Dispatch to the per-sorter dependency probe for the active config. Catches the most common environment-shaped failures (wrong conda env, missing CUDA wheels, broken Docker daemon, unset MATLAB path) in milliseconds rather than letting them surface as cryptic tracebacks deep inside the sort. Parameters: config (SortingPipelineConfig): Pipeline configuration. Returns: findings (list[PreflightFinding]): All findings produced by the per-sorter probe; empty when dependencies look healthy or when the sorter is unrecognized. """ sorter_name = getattr(config.sorter, "sorter_name", "").lower() use_docker = bool(getattr(config.sorter, "use_docker", False)) if sorter_name in ("kilosort2", "kilosort4") and use_docker: return _check_docker_sorter(config) if sorter_name == "kilosort2": return _check_kilosort2_host(config) if sorter_name == "kilosort4": return _check_kilosort4_host(config) if sorter_name == "rt_sort": return _check_rt_sort(config) return [] def _resolve_target_device_index(config: Any) -> int: """Resolve the GPU device index the configured sorter would use. Mirrors :func:`._gpu_watchdog.resolve_active_device` so the preflight check sees the same target device the watchdog will monitor at run time. Parameters: config (SortingPipelineConfig): Pipeline configuration. Returns: index (int): Device index (defaults to 0 when the sorter does not expose an explicit device). """ from ._gpu_watchdog import _resolve_device_index sorter_name = (getattr(config.sorter, "sorter_name", "") or "").lower() if sorter_name == "rt_sort": return _resolve_device_index(getattr(config.rt_sort, "device", None)) if sorter_name == "kilosort4": params = getattr(config.sorter, "sorter_params", None) or {} return _resolve_device_index(params.get("torch_device")) return 0 def _detect_gpu_device_count() -> Optional[int]: """Return the number of CUDA devices visible to the host. Tries pynvml first (cheapest, no torch import cost) via :func:`_with_pynvml`, then ``torch.cuda.device_count()``, then ``nvidia-smi``. Returns ``None`` when none of the three is available — callers should treat that as "cannot validate" and stay silent rather than emit noise (the existing ``vram_unknown`` finding already flags the broader detection gap). """ with _with_pynvml() as pynvml: if pynvml is not None: try: return int(pynvml.nvmlDeviceGetCount()) except Exception: pass try: import torch if torch.cuda.is_available(): return int(torch.cuda.device_count()) except Exception: pass try: out = subprocess.check_output( ["nvidia-smi", "--query-gpu=count", "--format=csv,noheader,nounits"], text=True, timeout=5, ).strip() except (subprocess.SubprocessError, FileNotFoundError): return None # ``nvidia-smi --query-gpu=count`` repeats the count once per GPU # row, so any line is a valid sample. for line in out.splitlines(): try: return int(line.strip()) except ValueError: continue return None def _check_gpu_device_present(config: Any) -> Optional[PreflightFinding]: """Verify the configured GPU device index actually exists. A user setting ``torch_device="cuda:1"`` (or ``RTSortConfig.device= "cuda:2"``) on a host with one GPU otherwise discovers the mistake a minute or more into the sort, when CUDA reports an opaque invalid-device error from the kernel launch. Parameters: config (SortingPipelineConfig): Pipeline configuration. Must describe a GPU-backed sorter (caller is expected to gate on :func:`_sorter_uses_gpu`). Returns: finding (PreflightFinding or None): Fail-level finding when the resolved index is out of range. ``None`` when the index is valid or when the device count cannot be detected (silent skip — already covered by ``vram_unknown``). Notes: - Multi-GPU configs like ``"cuda:0,1"`` are not formally supported. ``_resolve_target_device_index`` parses only the leading index, so multi-GPU strings silently resolve to device 0 and skip the multi-device check. If a future sorter needs proper multi-GPU validation, the resolver must be updated alongside this helper. """ target = _resolve_target_device_index(config) count = _detect_gpu_device_count() if count is None or count <= 0: return None if target < count: return None valid_indices = ", ".join(str(i) for i in range(count)) return PreflightFinding( level="fail", code="gpu_device_not_present", category="environment", message=( f"Configured GPU device index {target} is out of range; " f"host exposes {count} CUDA device(s)." ), remediation=( f"Pick an available device index ({valid_indices}) via " "RTSortConfig.device='cuda:N' or " "SorterConfig.sorter_params={'torch_device': 'cuda:N'}." ), ) # Sample-rate windows beyond which sorter output becomes unreliable. # KS2/KS4: bandpass + drift correction adapt across a wide range, but # below 10 kHz the assumed spike timescales no longer hold and above # 50 kHz the templates and PCA become numerically degenerate. RT-Sort: # the bundled detection model is rate-locked — feeding rates outside # the trained sampling-clock tolerance puts the model out of # distribution and silently degrades quality. _SAMPLE_RATE_RANGES_HZ: Dict[str, Tuple[float, float]] = { "kilosort2": (10_000.0, 50_000.0), "kilosort4": (10_000.0, 50_000.0), } _RT_SORT_NOMINAL_HZ: Dict[str, float] = { "mea": 20_000.0, "neuropixels": 30_000.0, } _RT_SORT_TOLERANCE_FRAC: float = 0.005 # 0.5 % recording-clock jitter def _expected_sample_rate_window(config: Any) -> Optional[Tuple[float, float, str]]: """Return ``(low_hz, high_hz, label)`` for the configured sorter. Returns ``None`` when the sorter has no defined window (e.g. an unrecognized name or an RT-Sort probe variant we don't have nominal rates for). """ sorter_name = getattr(config.sorter, "sorter_name", "").lower() if sorter_name in _SAMPLE_RATE_RANGES_HZ: low, high = _SAMPLE_RATE_RANGES_HZ[sorter_name] return low, high, sorter_name if sorter_name == "rt_sort": probe = str(getattr(config.rt_sort, "probe", "") or "").lower() nominal = _RT_SORT_NOMINAL_HZ.get(probe) if nominal is None: return None tol = nominal * _RT_SORT_TOLERANCE_FRAC return nominal - tol, nominal + tol, f"rt_sort/{probe}" return None def _check_recording_sample_rate( config: Any, recording_files: Sequence[Any], ) -> List[PreflightFinding]: """Warn when a pre-loaded recording's rate sits outside the sorter window. Only inspects pre-loaded recordings (entries with ``get_sampling_frequency``). Path-only inputs are skipped — we do not load the recording for preflight; the existing :func:`_validate_recording_inputs` only confirms the file exists. When ``_expected_sample_rate_window`` returns ``None`` for the configured sorter+probe combination (e.g. an RT-Sort probe value we don't have a nominal rate for), the helper logs an INFO line noting that the rate was not checked — silent-skip would leave operators unsure whether validation ran. Parameters: config (SortingPipelineConfig): Pipeline configuration. recording_files (sequence): Per-recording inputs. Returns: findings (list[PreflightFinding]): One ``warn``-level finding per pre-loaded recording whose rate falls outside the sorter-specific window. Strict mode flips warnings into hard failures. """ window = _expected_sample_rate_window(config) if window is None: sorter_name = getattr(config.sorter, "sorter_name", "").lower() if sorter_name == "rt_sort": probe = getattr(config.rt_sort, "probe", "") _logger.info( "sample-rate check skipped: no nominal rate defined " "for RT-Sort probe %r.", probe, ) return [] low_hz, high_hz, label = window findings: List[PreflightFinding] = [] for rec in recording_files: get_fs = getattr(rec, "get_sampling_frequency", None) if not callable(get_fs): continue try: fs_hz = float(get_fs()) except Exception as exc: # Broad catch is intentional: preflight is a milliseconds- # cheap probe, so a flaky recording loader should not # block the whole preflight. Logged at debug so genuine # loader bugs are still observable to operators who # enable verbose logging. _logger.debug("skipping sample-rate check for %r: %r", rec, exc) continue # NaN comparisons are always False, so without an explicit # check a NaN sampling rate would silently fall into the # warn branch with a "nan kHz" message. if math.isnan(fs_hz): continue if low_hz <= fs_hz <= high_hz: continue # Category is ``environment`` (not ``resource``) because a # sample-rate mismatch is a recording-vs-model misconfiguration, # not a transient resource shortage. Under preflight_strict # the categorical exception drives retry policy at the caller. findings.append( PreflightFinding( level="warn", code="sample_rate_out_of_window", category="environment", message=( f"Recording sampling rate {fs_hz / 1000.0:.2f} kHz " f"is outside the {label} window " f"[{low_hz / 1000.0:.2f}, {high_hz / 1000.0:.2f}] " "kHz. Sorter output may degrade." ), remediation=( "Resample the recording to within the supported " "window, or pick a sorter whose window matches the " "recording's native rate." ), ) ) return findings def _parse_version_tuple(version: str) -> Optional[Tuple[int, ...]]: """Parse a dotted version string to a comparable 3-tuple of ints. The tuple is always padded to length 3 with zeros so single- or two-component versions ("4", "4.0") compare correctly against three-component pins. Without padding, Python's tuple ordering treats ``(4,) < (4, 0, 0)`` as True and a bare "4" would falsely report as below a [4.0.0, 5.0.0) tested range. Per-component, only the LEADING-digit prefix is taken, so ``"1.2.3rc4"`` → ``(1, 2, 3)`` (not ``(1, 2, 34)`` as a naive "strip non-digits" pass would produce — that would order ``"1.2.3rc4"`` *above* ``"1.2.33"``). Returns ``None`` when no component contributes any digit at all (silent skip on truly unparseable input — preserves the behavior expected by :func:`_check_kilosort4_host` and :func:`_check_spikeinterface_version`, which treat ``None`` as "cannot validate, stay silent"). """ try: parts = version.strip().split(".") nums: List[int] = [] had_any_digit = False for p in parts[:3]: m = re.match(r"^\d+", p) if m is None: nums.append(0) else: nums.append(int(m.group(0))) had_any_digit = True except Exception: return None if not had_any_digit: return None nums.extend([0] * (3 - len(nums))) return tuple(nums) def _parse_version(version: str) -> Optional[Tuple[int, ...]]: """Parse a version string to a comparable 3-tuple of ints. Prefers ``packaging.version.Version`` so PEP 440 pre-release, post-release, and dev-tag decorations on otherwise-valid versions (``"4.0.0rc1"``, ``"0.100.0.post1"``) are dropped via ``Version.release`` rather than tripping the manual digit-stripper. Falls back to :func:`_parse_version_tuple` when ``packaging`` is unavailable so the check still works in minimal environments. Parameters: version (str): The version string to parse (e.g. ``"4.1.2"``, ``"4.0.0rc1"``). Returns: parsed (tuple[int, ...] or None): A length-3 tuple of release ints (post/pre/dev tags ignored), or ``None`` when neither parser can extract a version. """ try: from packaging.version import InvalidVersion, Version except ImportError: return _parse_version_tuple(version) try: release = Version(version).release except InvalidVersion: return _parse_version_tuple(version) nums = list(release[:3]) nums.extend([0] * (3 - len(nums))) return tuple(nums) def _check_spikeinterface_version() -> Optional[PreflightFinding]: """Warn when SpikeInterface's version is outside the tested range. SpikeLab is verified against a specific SI version window. Older SI may lack APIs we depend on (e.g. ``run_sorter`` keyword arguments, ``ContainerClient`` fields). Newer SI may have introduced incompatibilities we have not yet caught. Returns ``None`` when SI is absent (no preflight to add — the relevant sort backend will fail later with a clearer message) or when the version is inside the tested range. """ try: import spikeinterface as _si except ImportError: return None version = getattr(_si, "__version__", None) if version is None: return None parsed = _parse_version(version) low = _parse_version(_TESTED_SI_VERSION_RANGE[0]) high = _parse_version(_TESTED_SI_VERSION_RANGE[1]) if parsed is None or low is None or high is None: return None if low <= parsed < high: return None return PreflightFinding( level="warn", code="spikeinterface_version_outside_tested_range", category="environment", message=( f"SpikeInterface {version} is outside the SpikeLab tested " f"range [{_TESTED_SI_VERSION_RANGE[0]}, " f"{_TESTED_SI_VERSION_RANGE[1]}). Some sort paths may " "behave unexpectedly." ), remediation=( f"Pin SpikeInterface to a version inside " f"[{_TESTED_SI_VERSION_RANGE[0]}, " f"{_TESTED_SI_VERSION_RANGE[1]}), or run a smoke test to " "verify your sort path before relying on a new release." ), ) def _check_resource_rlimits(config: Any) -> List[PreflightFinding]: """POSIX-only: warn when ``RLIMIT_NOFILE`` / ``RLIMIT_NPROC`` are tight. RT-Sort opens many file descriptors during chunked I/O; KS4 spawns multiple worker processes when ``num_processes > 1``. Constrained limits (some CI containers, shared hosts) cause opaque ``OSError [Errno 24] Too many open files`` or ``BlockingIOError`` failures deep inside the sort. Linux thresholds: * ``RLIMIT_NOFILE`` < 4096 → warn (RT-Sort's chunked I/O can hold thousands of FDs at once on dense MEAs). * ``RLIMIT_NPROC`` < 256 → warn (KS4 + RT-Sort spawn workers proportional to ``num_processes``). Returns an empty list on Windows where ``resource`` is unavailable. Parameters: config (SortingPipelineConfig): Pipeline configuration — inspected so RT-Sort's ``num_processes`` informs the NPROC threshold. Returns: findings (list[PreflightFinding]): Up to two warn-level findings. Empty when limits are healthy or the OS does not expose them. """ try: import resource as _resource except ImportError: return [] findings: List[PreflightFinding] = [] try: soft_nofile, _hard = _resource.getrlimit(_resource.RLIMIT_NOFILE) except (ValueError, OSError): soft_nofile = None if soft_nofile is not None and 0 < soft_nofile < 4096: findings.append( PreflightFinding( level="warn", code="low_rlimit_nofile", category="environment", message=( f"RLIMIT_NOFILE soft limit is {soft_nofile} " "(< 4096). RT-Sort's chunked I/O and KS4 worker " "pools may exhaust file descriptors during the sort." ), remediation=( "Raise the limit before launching the sort, e.g. " "`ulimit -n 65536` on bash, or via the systemd " "service unit's LimitNOFILE setting." ), ) ) try: soft_nproc, _hard = _resource.getrlimit(_resource.RLIMIT_NPROC) except (AttributeError, ValueError, OSError): soft_nproc = None # NPROC threshold scales with the configured worker count for # RT-Sort. Default to 256 if no explicit setting is given. nproc_needed = 256 rt = getattr(config, "rt_sort", None) if rt is not None: cfg_n = getattr(rt, "num_processes", None) if isinstance(cfg_n, int) and cfg_n > 0: nproc_needed = max(256, 4 * cfg_n) if soft_nproc is not None and 0 < soft_nproc < nproc_needed: findings.append( PreflightFinding( level="warn", code="low_rlimit_nproc", category="environment", message=( f"RLIMIT_NPROC soft limit is {soft_nproc} " f"(< {nproc_needed}). Worker spawning may fail " "with BlockingIOError partway through the sort." ), remediation=( "Raise the limit before launching the sort, e.g. " "`ulimit -u 4096`, or reduce " "``RTSortConfig.num_processes`` if you cannot." ), ) ) return findings def _check_filesystem_writable( folders: Sequence[Any], *, label: str, code_prefix: str, ) -> List[PreflightFinding]: """Verify that *folders* live on writable filesystems. A read-only mount (e.g. an NFS export that flipped to RO after a storage event, or a mistakenly-mounted snapshot) passes the free-disk check but fails on the first write. Catching this in preflight surfaces the misconfiguration in milliseconds rather than seconds-to-minutes into a sort. For folders that do not yet exist, the nearest existing parent is checked instead — the sort will create the folder later. Parameters: folders (sequence of path-like): Folders to validate. label (str): Human-readable folder kind (``"intermediate"`` or ``"results"``) used in the message. code_prefix (str): Stable prefix for the finding code (``"intermediate"`` → ``"intermediate_readonly"``). Returns: findings (list[PreflightFinding]): One ``fail``-level finding per folder whose nearest existing parent is not writable. """ findings: List[PreflightFinding] = [] for folder in folders: p = Path(folder) while not p.exists() and p.parent != p: p = p.parent if not p.exists(): continue if os.access(p, os.W_OK): continue findings.append( PreflightFinding( level="fail", code=f"{code_prefix}_readonly", category="environment", message=( f"{label.capitalize()} folder {folder!s} is on a " f"non-writable filesystem (nearest existing " f"parent {p!s} fails W_OK)." ), remediation=( "Pick a writable path or remount the volume " "read-write. Common causes: NFS export flipped " "to RO after a storage event, mounted snapshot, " "or insufficient permissions on a shared drive." ), ) ) return findings def _hdf5_plugin_finding(config: Any) -> Optional[PreflightFinding]: """Validate ``HDF5_PLUGIN_PATH`` when configured. Surfaces the same root cause the post-mortem classifier (:class:`HDF5PluginMissingError`) detects — but before any data is loaded, so an early operator can fix the path without waiting for the sort to fail. """ configured = getattr(config.recording, "hdf5_plugin_path", None) if configured is None: configured = os.environ.get("HDF5_PLUGIN_PATH") if not configured: return None path = Path(configured) if path.is_dir(): return None return PreflightFinding( level="fail", code="hdf5_plugin_missing", category="environment", message=( f"HDF5_PLUGIN_PATH points to {path!s} but the directory " "does not exist." ), remediation=( "Set HDF5_PLUGIN_PATH (via RecordingConfig.hdf5_plugin_path " "or the environment) to a directory containing the HDF5 " "compression plugin needed for the recording." ), )
[docs] def run_preflight( config: Any, recording_files: Sequence[Any], intermediate_folders: Sequence[Any], results_folders: Sequence[Any], ) -> List[PreflightFinding]: """Run pre-loop resource checks; return all findings. Findings are not raised by this function — the caller decides whether to escalate based on ``ExecutionConfig.preflight_strict``. Parameters: config (SortingPipelineConfig): Pipeline configuration. Reads thresholds from ``config.execution``; sorter selection from ``config.sorter``; recording-side overrides from ``config.recording``; RT-Sort device + probe from ``config.rt_sort``. recording_files (sequence): Recording inputs (used for length sanity in future checks; currently unused but kept in the signature for forward compatibility). intermediate_folders (sequence of path-like): Per-recording intermediate folders. Disk free space is checked at each folder's nearest existing ancestor. results_folders (sequence of path-like): Per-recording results folders. Disk free space is checked similarly. Returns: findings (list[PreflightFinding]): All findings produced by the checks. May be empty when the host has plenty of headroom. Raises: ValueError: If any of ``config.execution.preflight_min_*_gb`` is ``None``. The thresholds must be numeric. Notes: - Empty ``recording_files``, ``intermediate_folders``, or ``results_folders`` produce a fail-level "environment" finding (codes ``no_recordings``, ``no_intermediate_folders``, ``no_results_folders``) but do not short-circuit — the host and dependency checks still run. """ exe = config.execution findings: List[PreflightFinding] = [] threshold_fields = ( "preflight_min_free_inter_gb", "preflight_min_free_results_gb", "preflight_min_available_ram_gb", "preflight_min_free_vram_gb", ) for field_name in threshold_fields: value = getattr(exe, field_name) if value is None: raise ValueError( f"config.execution.{field_name} must be a float, got None. " "Set a numeric threshold or rely on the dataclass default." ) # NaN passes through `is None` but silently disables every downstream # threshold comparison (`x >= NaN` is False), making the preflight # finding unreachable. Reject explicitly so misconfigured thresholds # surface at construction time instead of silently failing open. if isinstance(value, float) and math.isnan(value): raise ValueError( f"config.execution.{field_name} must be a finite float, got NaN. " "NaN thresholds silently disable the preflight check; set a " "numeric value or rely on the dataclass default." ) min_free_inter = float(exe.preflight_min_free_inter_gb) min_free_results = float(exe.preflight_min_free_results_gb) min_avail_ram = float(exe.preflight_min_available_ram_gb) min_free_vram = float(exe.preflight_min_free_vram_gb) if not recording_files: findings.append( PreflightFinding( level="fail", code="no_recordings", message="run_preflight called with an empty recording_files sequence.", remediation=( "Pass at least one recording (path or pre-loaded " "BaseRecording) to the sort pipeline." ), category="environment", ) ) if not intermediate_folders: findings.append( PreflightFinding( level="fail", code="no_intermediate_folders", message=( "run_preflight called with an empty intermediate_folders sequence." ), remediation=( "Pass one intermediate folder per recording to the sort " "pipeline." ), category="environment", ) ) if not results_folders: findings.append( PreflightFinding( level="fail", code="no_results_folders", message=( "run_preflight called with an empty results_folders sequence." ), remediation=( "Pass one results folder per recording to the sort pipeline." ), category="environment", ) ) # ---------- Disk ----------------------------------------------------- for folder in intermediate_folders: free_gb = _disk_free_gb(Path(folder)) if free_gb is None: continue if free_gb < min_free_inter: findings.append( PreflightFinding( level="warn", code="low_disk_inter", message=( f"Intermediate folder {folder!s} parent has only " f"{free_gb:.1f} GB free (< {min_free_inter:.1f} GB)." ), remediation=( "Free disk space or point intermediate_folders at " "a larger volume. RT-Sort and Kilosort write large " "temporary files." ), ) ) for folder in results_folders: free_gb = _disk_free_gb(Path(folder)) if free_gb is None: continue if free_gb < min_free_results: findings.append( PreflightFinding( level="warn", code="low_disk_results", message=( f"Results folder {folder!s} parent has only " f"{free_gb:.1f} GB free (< {min_free_results:.1f} GB)." ), remediation=( "Free disk space or point results_folders at a " "larger volume." ), ) ) # ---------- RAM ------------------------------------------------------ avail_ram = _available_ram_gb() if avail_ram is None: findings.append( PreflightFinding( level="warn", code="ram_unknown", message=( "psutil not installed — cannot check available host " "RAM. The host-memory watchdog will also be disabled." ), remediation="Install psutil to enable RAM-based safety checks.", ) ) elif avail_ram < min_avail_ram: findings.append( PreflightFinding( level="warn", code="low_ram", message=( f"Only {avail_ram:.1f} GB host RAM available " f"(< {min_avail_ram:.1f} GB). Sort may trigger the " "watchdog or thrash on Windows." ), remediation=( "Close other applications or shorten the recording " "before sorting." ), ) ) # ---------- GPU VRAM ------------------------------------------------- if _sorter_uses_gpu(config): free_vram = _free_vram_gb() if free_vram is None: findings.append( PreflightFinding( level="warn", code="vram_unknown", message=( "Sorter requires a GPU but VRAM availability could " "not be detected (no pynvml, no nvidia-smi)." ), remediation=( "Install pynvml or ensure nvidia-smi is on PATH so " "VRAM headroom can be checked before the sort." ), ) ) elif free_vram < min_free_vram: findings.append( PreflightFinding( level="warn", code="low_vram", message=( f"Only {free_vram:.1f} GB GPU memory free " f"(< {min_free_vram:.1f} GB). Risk of " "GPUOutOfMemoryError during sort." ), remediation=( "Close other GPU consumers, reduce batch size, or " "switch to a larger-memory GPU." ), ) ) # ---------- Recording inputs ----------------------------------------- findings.extend(_validate_recording_inputs(recording_files)) # ---------- Recording sample-rate window ----------------------------- findings.extend(_check_recording_sample_rate(config, recording_files)) # ---------- Sorter dependency probes --------------------------------- findings.extend(_check_sorter_dependencies(config)) # ---------- GPU device existence ------------------------------------- if _sorter_uses_gpu(config): gpu_dev = _check_gpu_device_present(config) if gpu_dev is not None: findings.append(gpu_dev) # ---------- POSIX resource limits ------------------------------------ findings.extend(_check_resource_rlimits(config)) # ---------- SpikeInterface version range ----------------------------- si_finding = _check_spikeinterface_version() if si_finding is not None: findings.append(si_finding) # ---------- Filesystem writability ----------------------------------- findings.extend( _check_filesystem_writable( intermediate_folders, label="intermediate", code_prefix="intermediate" ) ) findings.extend( _check_filesystem_writable( results_folders, label="results", code_prefix="results" ) ) # ---------- HDF5 plugin path ----------------------------------------- hdf5 = _hdf5_plugin_finding(config) if hdf5 is not None: findings.append(hdf5) # ---------- .wslconfig (Docker-on-Windows) --------------------------- wsl = _wslconfig_finding(config) if wsl is not None: findings.append(wsl) # ---------- RT-Sort intermediate-disk projection --------------------- rt_disk = _rt_sort_disk_finding(config, recording_files, intermediate_folders) if rt_disk is not None: findings.append(rt_disk) return findings
def report_findings( findings: Sequence[PreflightFinding], *, strict: bool = False ) -> None: """Print findings and raise if any escalate to a hard failure. Parameters: findings (sequence[PreflightFinding]): Output of :func:`run_preflight`. strict (bool): When True, every ``"warn"`` finding is treated as ``"fail"``. Defaults to False. Raises: EnvironmentSortFailure: If a finding has ``level == "fail"`` (or ``"warn"`` under *strict*) and category ``"environment"``. ResourceSortFailure: If a finding has ``level == "fail"`` (or ``"warn"`` under *strict*) and category ``"resource"``. ValueError: If any finding has ``level`` other than ``"warn"`` or ``"fail"``. """ if not findings: _logger.info("all checks passed") return _logger.info("findings:") fatal: List[PreflightFinding] = [] for f in findings: if f.level not in {"warn", "fail"}: raise ValueError( f"Unknown PreflightFinding level: {f.level!r} " f"(code={f.code!r}). Must be 'warn' or 'fail'." ) effective_level = "fail" if (strict and f.level == "warn") else f.level log_method = _logger.error if effective_level == "fail" else _logger.warning log_method(" [%s] %s: %s", effective_level.upper(), f.code, f.message) if f.remediation: log_method(" -> %s", f.remediation) if effective_level == "fail": fatal.append(f) if not fatal: return # Prefer a categorical match for the first fatal finding so callers # can branch on EnvironmentSortFailure vs ResourceSortFailure. first = fatal[0] summary = ( f"Preflight failed: {len(fatal)} fatal finding(s). " f"First: {first.code}{first.message}" ) if first.code == "hdf5_plugin_missing": configured = os.environ.get("HDF5_PLUGIN_PATH") raise HDF5PluginMissingError(summary, configured_path=configured) if first.category == "environment": raise EnvironmentSortFailure(summary) raise ResourceSortFailure(summary)