Skip to content

feat: add KubeflowExecutor for Kubeflow Training Operator (PyTorchJob + TrainJob)#462

Open
ko3n1g wants to merge 4 commits intomainfrom
feat/pytorchjob-executor
Open

feat: add KubeflowExecutor for Kubeflow Training Operator (PyTorchJob + TrainJob)#462
ko3n1g wants to merge 4 commits intomainfrom
feat/pytorchjob-executor

Conversation

@ko3n1g
Copy link
Contributor

@ko3n1g ko3n1g commented Mar 12, 2026

Summary

  • Adds KubeflowExecutor that submits distributed training jobs to any Kubernetes cluster running the Kubeflow Training Operator
  • Supports both PyTorchJob (Training Operator v1) and TrainJob (Training Operator v2) via a job_kind toggle
  • Pairs with a TorchX scheduler so jobs integrate with run.run() and run.Experiment
  • Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)

PyTorchJob vs TrainJob

PyTorchJob TrainJob
API kubeflow.org/v1 trainer.kubeflow.org/v1alpha1
Pod config directly in replica pod spec podTemplateOverrides[].spec
nproc spec.nprocPerNode spec.trainer.numProcPerNode

Notable fields

  • tolerations, affinity — go into pod spec / podTemplateOverrides automatically
  • env_list — full env var dicts supporting valueFrom / secretKeyRef
  • pod_spec_overrides — arbitrary extra pod spec fields (e.g. resourceClaims for IMEX channels)
  • launch(wait=True) — polls until RUNNING / SUCCEEDED / FAILED
  • cancel(wait=True) — polls until CR gone and all pods terminated
  • UNKNOWN/None status → AppState.PENDING (avoids false failures on transient API errors)

Test plan

  • 63 unit tests passing (pytest test/core/execution/test_kubeflow.py test/run/torchx_backend/schedulers/test_kubeflow.py)
  • PyTorchJob e2e verified against AWS EKS (local/example.py): launch → RUNNING → log sentinel → cancel(wait=True)
  • TrainJob e2e pending GKE cluster readiness (local/example_trainjob.py)

🤖 Generated with Claude Code

ko3n1g and others added 3 commits March 12, 2026 16:25
Introduces KubeflowExecutor and a matching TorchX scheduler so users can
deploy distributed training jobs to any Kubernetes cluster running the
Kubeflow Training Operator via run.run() / run.Experiment.

Supported job kinds (toggled via job_kind field):
- PyTorchJob (Training Operator v1, kubeflow.org/v1)
- TrainJob   (Training Operator v2, trainer.kubeflow.org/v1alpha1)

Key features:
- Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)
- PyTorchJob: builds Master + Worker replica specs with nprocPerNode
- TrainJob: builds spec.trainer + merges all pod-level config (volumes,
  tolerations, affinity, imagePullSecrets, resourceClaims, etc.) into a
  single podTemplateOverrides entry targeting "node"
- env_list field supports full env var dicts (valueFrom / secretKeyRef)
- pod_spec_overrides merges arbitrary extra fields into the pod spec
- launch(wait=True) polls until RUNNING / SUCCEEDED / FAILED
- cancel(wait=True) polls until CR is gone and all pods are terminated
- TorchX scheduler persists job state in ~/.nemo_run/.kubeflow_jobs.json
  and maps KubeflowJobState → AppState (UNKNOWN/None → PENDING to avoid
  false failures on transient API errors)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
63 tests covering:
- Executor: defaults, kubeconfig fallback, nnodes, nproc_per_node resolution,
  assign, manifest generation for PyTorchJob and TrainJob (structure, resources,
  volumes, env_vars, env_list, labels, image_pull_secrets, tolerations, affinity,
  pod_spec_overrides, spec_kwargs, container_kwargs), launch (success, wait,
  timeout, conflict), status (all states + API errors), cancel (plain, 404,
  wait=True, wait timeout), fetch_logs (no-follow, follow, TrainJob label selector)
- Scheduler: create, dryrun, schedule, describe (all states + UNKNOWN→PENDING
  regression), cancel, log_iter (list + str), persistence (new file, merge,
  missing file), state map

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Introduces KubeflowExecutor and a matching TorchX scheduler so users can
deploy distributed training jobs to any Kubernetes cluster running the
Kubeflow Training Operator via run.run() / run.Experiment.

Supported job kinds (toggled via job_kind field):
- PyTorchJob (Training Operator v1, kubeflow.org/v1)
- TrainJob   (Training Operator v2, trainer.kubeflow.org/v1alpha1)

Key features:
- Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)
- PyTorchJob: builds Master + Worker replica specs with nprocPerNode
- TrainJob: builds spec.trainer + merges all pod-level config (volumes,
  tolerations, affinity, imagePullSecrets, resourceClaims, etc.) into a
  single podTemplateOverrides entry targeting "node"
- env_list field supports full env var dicts (valueFrom / secretKeyRef)
- pod_spec_overrides merges arbitrary extra fields into the pod spec
- launch(wait=True) polls until RUNNING / SUCCEEDED / FAILED
- cancel(wait=True) polls until CR is gone and all pods are terminated
- TorchX scheduler persists job state in ~/.nemo_run/.kubeflow_jobs.json
  and maps KubeflowJobState → AppState (UNKNOWN/None → PENDING to avoid
  false failures on transient API errors)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
- kubernetes import wrapped in try/except; ImportError raised at
  instantiation time with a helpful install message
- New [kubeflow] optional extra in pyproject.toml: pip install nemo-run[kubeflow]
- uv.lock updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant