Source code for spikelab.batch_jobs.models

"""Typed models used by the batch job launcher."""

from __future__ import annotations

from typing import Dict, List, Literal, Optional

from pydantic import BaseModel, Field, field_validator, model_validator


[docs] class ContainerSpec(BaseModel): """Container runtime details for a single-job pod.""" image: str = Field(min_length=1) image_pull_policy: Literal["Always", "IfNotPresent", "Never"] = "IfNotPresent" command: List[str] = Field(default_factory=list) args: List[str] = Field(default_factory=list) env: Dict[str, str] = Field(default_factory=dict)
[docs] class ResourceSpec(BaseModel): """Resource requests/limits for a job container.""" requests_cpu: str = "1" requests_memory: str = "2Gi" limits_cpu: str = "1" limits_memory: str = "2Gi" requests_gpu: int = Field(default=0, ge=0) limits_gpu: int = Field(default=0, ge=0) node_selector: Dict[str, str] = Field(default_factory=dict) @model_validator(mode="after") def _validate_gpu_pairing(self) -> "ResourceSpec": if self.requests_gpu != self.limits_gpu: raise ValueError("GPU requests and limits must match") return self
[docs] class VolumeMountSpec(BaseModel): """Pod volume + mount target information.""" name: str = Field(min_length=1) mount_path: str = Field(min_length=1) sub_path: Optional[str] = None secret_name: Optional[str] = None pvc_name: Optional[str] = None read_only: bool = True @model_validator(mode="after") def _validate_source(self) -> "VolumeMountSpec": if not self.secret_name and not self.pvc_name: raise ValueError("Volume must reference either secret_name or pvc_name") return self
[docs] class NamespaceHookSpec(BaseModel): """Per-namespace overrides applied when a job targets a specific namespace.""" image_pull_policy: Optional[Literal["Always", "IfNotPresent", "Never"]] = None default_command: List[str] = Field(default_factory=list) required_volumes: List[VolumeMountSpec] = Field(default_factory=list) env_defaults: Dict[str, str] = Field(default_factory=dict)
[docs] class StoragePathTemplates(BaseModel): """Python format-string templates for S3 artifact paths. Available placeholders: ``{prefix}``, ``{run_id}``, ``{filename}``. """ inputs: str = "{prefix}inputs/{run_id}/{filename}" outputs: str = "{prefix}outputs/{run_id}/" logs: str = "{prefix}logs/{run_id}/"
[docs] class PolicyConfig(BaseModel): """Configurable thresholds for the cluster policy engine.""" max_interactive_gpus: int = Field(default=2, ge=0) max_runtime_seconds: int = Field(default=1_209_600, ge=1) # 14 days block_sleep_infinity: bool = True warn_request_limit_mismatch: bool = True
[docs] class JobSpec(BaseModel): """High-level description of a Kubernetes batch job.""" name_prefix: str = "analysis-job" namespace: str = "default" labels: Dict[str, str] = Field(default_factory=dict) container: ContainerSpec resources: ResourceSpec volumes: List[VolumeMountSpec] = Field(default_factory=list) ttl_seconds_after_finished: int = Field(default=3600, ge=0) backoff_limit: int = Field(default=0, ge=0) active_deadline_seconds: Optional[int] = Field(default=None, ge=1) @field_validator("name_prefix") @classmethod def _validate_name_prefix(cls, value: str) -> str: value = value.strip().lower() if not value: raise ValueError("name_prefix cannot be empty") safe = "".join(ch if ch.isalnum() or ch == "-" else "-" for ch in value) return safe.strip("-")[:40] or "analysis-job"
[docs] class ClusterProfile(BaseModel): """Cluster defaults that can be merged with a JobSpec. All organisation-specific configuration (images, secrets, S3 buckets, namespace hooks) belongs in profile YAML files, not in Python source. """ name: str namespace: str = "default" labels: Dict[str, str] = Field(default_factory=dict) default_s3_prefix: Optional[str] = None affinity: Dict[str, object] = Field(default_factory=dict) tolerations: List[Dict[str, object]] = Field(default_factory=list) default_secrets_mapping: Dict[str, str] = Field(default_factory=dict) default_images: Dict[str, str] = Field(default_factory=dict) default_volumes: List[VolumeMountSpec] = Field(default_factory=list) namespace_hooks: Dict[str, NamespaceHookSpec] = Field(default_factory=dict) storage: StoragePathTemplates = Field(default_factory=StoragePathTemplates) policy: PolicyConfig = Field(default_factory=PolicyConfig) resources: ResourceSpec = Field(default_factory=ResourceSpec) endpoint_url: Optional[str] = None region_name: Optional[str] = None
[docs] class SubmitResult(BaseModel): """Result returned by job submission methods.""" job_name: str manifest_yaml: str run_id: str uploaded_input_uri: str output_prefix: str logs_prefix: str job_type: Literal["workspace", "sorting", "prepared"]
[docs] class RunConfig(BaseModel): """User-facing run config consumed by CLI/session.""" profile_name: str = "defaults" input_path: str output_prefix: Optional[str] = None workspace_id: Optional[str] = None namespace: Optional[str] = None allow_policy_risk: bool = False max_wait_seconds: int = Field(default=3600, ge=1) wait_for_completion: bool = False follow_logs: bool = False