Batch Jobs

The spikelab.batch_jobs sub-package provides Kubernetes batch-job launching helpers for submitting SpikeLab analysis workloads to a cluster. It requires the batch-jobs optional dependency group:

pip install spikelab[batch-jobs]

See the Batch Jobs guide for usage examples.

Models

Pydantic models defining job specifications, cluster profiles, and related configuration.

Typed models used by the batch job launcher.

class spikelab.batch_jobs.models.ContainerSpec(*args, **kwargs)[source]

Bases: BaseModel

Container runtime details for a single-job pod.

image_pull_policy: Literal['Always', 'IfNotPresent', 'Never'] = 'IfNotPresent'
class spikelab.batch_jobs.models.ResourceSpec(*args, **kwargs)[source]

Bases: BaseModel

Resource requests/limits for a job container.

requests_cpu: str = '1'
requests_memory: str = '2Gi'
limits_cpu: str = '1'
limits_memory: str = '2Gi'
class spikelab.batch_jobs.models.VolumeMountSpec(*args, **kwargs)[source]

Bases: BaseModel

Pod volume + mount target information.

sub_path: Optional[str] = None
secret_name: Optional[str] = None
pvc_name: Optional[str] = None
read_only: bool = True
class spikelab.batch_jobs.models.NamespaceHookSpec(*args, **kwargs)[source]

Bases: BaseModel

Per-namespace overrides applied when a job targets a specific namespace.

image_pull_policy: Optional[Literal['Always', 'IfNotPresent', 'Never']] = None
class spikelab.batch_jobs.models.StoragePathTemplates(*args, **kwargs)[source]

Bases: BaseModel

Python format-string templates for S3 artifact paths.

Available placeholders: {prefix}, {run_id}, {filename}.

inputs: str = '{prefix}inputs/{run_id}/{filename}'
outputs: str = '{prefix}outputs/{run_id}/'
logs: str = '{prefix}logs/{run_id}/'
class spikelab.batch_jobs.models.PolicyConfig(*args, **kwargs)[source]

Bases: BaseModel

Configurable thresholds for the cluster policy engine.

block_sleep_infinity: bool = True
warn_request_limit_mismatch: bool = True
class spikelab.batch_jobs.models.JobSpec(*args, **kwargs)[source]

Bases: BaseModel

High-level description of a Kubernetes batch job.

name_prefix: str = 'analysis-job'
namespace: str = 'default'
container: ContainerSpec
resources: ResourceSpec
class spikelab.batch_jobs.models.ClusterProfile(*args, **kwargs)[source]

Bases: BaseModel

Cluster defaults that can be merged with a JobSpec.

All organisation-specific configuration (images, secrets, S3 buckets, namespace hooks) belongs in profile YAML files, not in Python source.

name: str
namespace: str = 'default'
default_s3_prefix: Optional[str] = None
endpoint_url: Optional[str] = None
region_name: Optional[str] = None
class spikelab.batch_jobs.models.SubmitResult(*args, **kwargs)[source]

Bases: BaseModel

Result returned by job submission methods.

job_name: str
manifest_yaml: str
run_id: str
uploaded_input_uri: str
output_prefix: str
logs_prefix: str
job_type: Literal['workspace', 'sorting', 'prepared']
class spikelab.batch_jobs.models.RunConfig(*args, **kwargs)[source]

Bases: BaseModel

User-facing run config consumed by CLI/session.

profile_name: str = 'defaults'
input_path: str
output_prefix: Optional[str] = None
workspace_id: Optional[str] = None
namespace: Optional[str] = None
allow_policy_risk: bool = False
wait_for_completion: bool = False
follow_logs: bool = False

Policy Engine

Cluster policy preflight checks using profile-driven thresholds.

Cluster policy preflight checks for job specs.

Thresholds are read from the active ClusterProfile so that different clusters can enforce different rules.

class spikelab.batch_jobs.policy.PolicyFinding(code, level, message)[source]

Bases: object

code: str
level: Literal['PASS', 'WARN', 'BLOCK']
message: str
__init__(code, level, message)
spikelab.batch_jobs.policy.evaluate_policy(job_spec, profile)[source]

Evaluate policy checks using profile-driven thresholds.

Return type:

List[PolicyFinding]

spikelab.batch_jobs.policy.summarize_preflight(findings)[source]

Return aggregate level and text summary.

Return type:

tuple[Literal['PASS', 'WARN', 'BLOCK'], str]

Profile Loading

Load cluster profile presets for job execution.

spikelab.batch_jobs.profiles.load_cluster_profile(path)[source]

Load a profile from an explicit YAML path.

Return type:

ClusterProfile

spikelab.batch_jobs.profiles.load_profile_from_name(name)[source]

Load one of the built-in profile files by name.

Return type:

ClusterProfile

Run Session

High-level orchestration for packaging, uploading, and job submission.

High-level run orchestration for packaging, uploading, and job submission.

class spikelab.batch_jobs.session.RunSession(*, profile, backend, storage_client, credentials=None)[source]

Bases: object

Coordinates artifact packaging, job submission, and result retrieval.

__init__(*, profile, backend, storage_client, credentials=None)[source]
render_manifest(*, job_name, job_spec, run_id)[source]

Render a Kubernetes Job manifest from a spec and profile.

Return type:

str

submit_workspace_job(*, workspace, script, job_spec, allow_policy_risk=False, bundle_input_paths=None, metadata=None)[source]

Save a workspace, bundle it with a script, and submit a job.

Parameters:
  • workspace (Any) – An AnalysisWorkspace instance or a str path to an existing workspace base path (without extension).

  • script (str) – Path to the analysis script to run inside the container.

  • job_spec (JobSpec) – Kubernetes job specification.

  • allow_policy_risk (bool) – Override policy BLOCK findings.

  • bundle_input_paths (iterable[str] | None) – Extra files to include in the bundle.

  • metadata (dict | None) – Arbitrary metadata written into the bundle manifest.

Returns:

Submission details including the

output prefix where the updated workspace will appear.

Return type:

result (SubmitResult)

submit_sorting_job(*, recording_paths, config=None, config_overrides=None, job_spec, allow_policy_risk=False, metadata=None)[source]

Bundle recording files with a sorting config and submit a job.

Parameters:
  • recording_paths (list[str]) – Paths to recording files.

  • config (Optional[Any]) – A SortingPipelineConfig instance, a preset name string (e.g. "kilosort4"), or None for defaults.

  • config_overrides (dict | None) – Flat keyword overrides applied to the config via config.override().

  • job_spec (JobSpec) – Kubernetes job specification.

  • allow_policy_risk (bool) – Override policy BLOCK findings.

  • metadata (dict | None) – Arbitrary metadata written into the bundle manifest.

Returns:

Submission details including the

output prefix where sorted results will appear.

Return type:

result (SubmitResult)

submit_prepared_job(*, job_spec, run_id=None, allow_policy_risk=False)[source]

Submit a job without generating bundle artifacts.

Return type:

SubmitResult

retrieve_result(submit_result, local_dir)[source]

Download job outputs and return an AnalysisWorkspace.

Parameters:
  • submit_result (SubmitResult) – The result from a prior submit_workspace_job or submit_sorting_job call.

  • local_dir (str) – Local directory to download outputs into.

Returns:

The workspace produced by the

job. For workspace jobs this is the updated workspace; for sorting jobs it contains per-recording namespaces with SpikeData at key "spikedata".

Return type:

workspace (AnalysisWorkspace)

Notes

  • Call wait_for_completion before calling this method to ensure the job has finished.

wait_for_completion(*, job_name, max_wait_seconds=3600, poll_interval_seconds=10)[source]

Poll until completion/failure or timeout and return final state.

Return type:

str

Kubernetes Backend

Kubernetes backend for batch job submission and monitoring.

class spikelab.batch_jobs.backend_k8s.KubernetesBatchJobBackend(namespace='default', kubeconfig=None, use_kubectl_fallback=True)[source]

Bases: object

Backend wrapper around Kubernetes client with kubectl fallback.

__init__(namespace='default', kubeconfig=None, use_kubectl_fallback=True)[source]
apply_manifest(manifest_path_or_str)[source]

Apply a job manifest by YAML file path or raw YAML string.

Return type:

str

delete_job(name)[source]

Delete a job and its pods.

Return type:

None

job_status(name)[source]

Return one of Pending/Running/Complete/Failed/Unknown.

Return type:

str

pods_for_job(job_name)[source]

Return pod names associated with a job.

Return type:

List[str]

stream_logs(pod_name, follow=True)[source]

Yield log lines from a pod.

Return type:

Iterator[str]

S3 Storage

S3-compatible storage helpers for batch job artifacts.

class spikelab.batch_jobs.storage_s3.S3StorageClient(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]

Bases: object

Small wrapper around boto3 for upload/download URI handling.

Path layout is controlled by path_templates (a StoragePathTemplates instance loaded from the active profile).

__init__(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]
build_uri(*, run_id, filename, category='inputs')[source]

Build an S3 URI for a file using the active path templates.

Return type:

str

upload_file(*, local_path, s3_uri)[source]

Upload a local file to S3 and return the URI.

Return type:

str

upload_bundle(*, local_zip, run_id)[source]

Upload a zip bundle to S3 under the inputs path template.

Return type:

str

output_prefix_for_run(run_id)[source]

Return the S3 prefix for a run’s output files.

Return type:

str

logs_prefix_for_run(run_id)[source]

Return the S3 prefix for a run’s log files.

Return type:

str

download_file(*, s3_uri, local_path)[source]

Download a single file from S3.

Parameters:
  • s3_uri (str) – Full s3://bucket/key URI.

  • local_path (str) – Destination path on disk.

Returns:

The same local_path for convenience.

Return type:

local_path (str)

download_output(*, run_id, filename, local_dir)[source]

Download a file from the output prefix of a run.

Parameters:
  • run_id (str) – Run identifier.

  • filename (str) – Name of the file within the output prefix.

  • local_dir (str) – Local directory to save the file into.

Returns:

Absolute path of the downloaded file.

Return type:

local_path (str)

list_output_files(run_id)[source]

List object keys under the output prefix of a run.

Parameters:

run_id (str) – Run identifier.

Returns:

S3 object keys found under the output prefix.

Return type:

keys (list[str])

Artifact Packager

Create uploadable analysis bundles for batch job execution.

spikelab.batch_jobs.artifact_packager.package_analysis_bundle(*, input_paths, run_id, output_dir, output_format, metadata=None)[source]

Create a run zip bundle and return its absolute path.

Return type:

str

Credentials

Credential resolution and redaction utilities.

class spikelab.batch_jobs.credentials.ResolvedCredentials(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)[source]

Bases: object

kubeconfig: Optional[str]
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
aws_session_token: Optional[str]
__init__(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)
spikelab.batch_jobs.credentials.resolve_credentials(*, kubeconfig=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]

Resolve credentials with explicit args first, then environment.

Return type:

ResolvedCredentials

spikelab.batch_jobs.credentials.redact_sensitive_map(values)[source]

Redact common secret values before logging.

Return type:

Dict[str, str]

Validation

Validation helpers for CLI/API job inputs.

spikelab.batch_jobs.validation.validate_job_spec(payload)[source]

Parse and validate a raw job spec payload.

Return type:

JobSpec

spikelab.batch_jobs.validation.validate_run_config(payload)[source]

Parse and validate a run config payload.

Return type:

RunConfig

spikelab.batch_jobs.validation.summarize_validation_error(exc)[source]

Return a concise human-readable validation summary.

Return type:

str