diff --git a/ray-operator/DEVELOPMENT.md b/ray-operator/DEVELOPMENT.md index 2cf7a71a66c..a46b480633f 100644 --- a/ray-operator/DEVELOPMENT.md +++ b/ray-operator/DEVELOPMENT.md @@ -260,7 +260,7 @@ Run tests on your local environment We use [elastic/crd-ref-docs](https://github.com/elastic/crd-ref-docs) to generate API reference for CRDs of KubeRay. The configuration file of `crd-ref-docs` is located at `hack/config.yaml`. Please refer to the documentation for more details. -Generate API refernece: +Generate API reference: ```bash make api-docs diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 4ac12c79725..f05452709c8 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -25,6 +25,12 @@ type RayClusterSpec struct { // +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'" // +optional ManagedBy *string `json:"managedBy,omitempty"` + // TTLSeconds specifies the time-to-live for the RayCluster in seconds. + // After this time has elapsed since the RayCluster was created, the cluster will be automatically deleted. + // If not specified or set to 0, the cluster will not have a TTL. + // +kubebuilder:default:=0 + // +optional + TTLSeconds *int32 `json:"ttlSeconds,omitempty"` // AutoscalerOptions specifies optional configuration for the Ray autoscaler. // +optional AutoscalerOptions *AutoscalerOptions `json:"autoscalerOptions,omitempty"` @@ -217,6 +223,11 @@ type RayClusterStatus struct { // LastUpdateTime indicates last update timestamp for this cluster status. // +nullable LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` + // TTLExpirationTime indicates when the RayCluster should be deleted based on its TTL. + // This field is set when TTLSeconds is specified in the RayClusterSpec. + // +nullable + // +optional + TTLExpirationTime *metav1.Time `json:"ttlExpirationTime,omitempty"` // StateTransitionTimes indicates the time of the last state transition for each state. // +optional StateTransitionTimes map[ClusterState]*metav1.Time `json:"stateTransitionTimes,omitempty"` diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index b4cb5decf12..963a292c4ed 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -285,6 +285,11 @@ func (in *RayClusterSpec) DeepCopyInto(out *RayClusterSpec) { *out = new(string) **out = **in } + if in.TTLSeconds != nil { + in, out := &in.TTLSeconds, &out.TTLSeconds + *out = new(int32) + **out = **in + } if in.AutoscalerOptions != nil { in, out := &in.AutoscalerOptions, &out.AutoscalerOptions *out = new(AutoscalerOptions) @@ -338,6 +343,10 @@ func (in *RayClusterStatus) DeepCopyInto(out *RayClusterStatus) { in, out := &in.LastUpdateTime, &out.LastUpdateTime *out = (*in).DeepCopy() } + if in.TTLExpirationTime != nil { + in, out := &in.TTLExpirationTime, &out.TTLExpirationTime + *out = (*in).DeepCopy() + } if in.StateTransitionTimes != nil { in, out := &in.StateTransitionTimes, &out.StateTransitionTimes *out = make(map[ClusterState]*metav1.Time, len(*in)) diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index a8e1f1df8f1..abd3abacaaa 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4324,6 +4324,10 @@ spec: type: string suspend: type: boolean + ttlSeconds: + default: 0 + format: int32 + type: integer workerGroupSpecs: items: properties: @@ -8150,6 +8154,10 @@ spec: format: date-time type: string type: object + ttlExpirationTime: + format: date-time + nullable: true + type: string type: object type: object served: true diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 8f8679ca607..5a16d0d91d0 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4374,6 +4374,10 @@ spec: type: string suspend: type: boolean + ttlSeconds: + default: 0 + format: int32 + type: integer workerGroupSpecs: items: properties: @@ -11912,6 +11916,10 @@ spec: format: date-time type: string type: object + ttlExpirationTime: + format: date-time + nullable: true + type: string type: object rayJobInfo: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index a86457fac1a..2dd84206f88 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4304,6 +4304,10 @@ spec: type: string suspend: type: boolean + ttlSeconds: + default: 0 + format: int32 + type: integer workerGroupSpecs: items: properties: @@ -8366,6 +8370,10 @@ spec: format: date-time type: string type: object + ttlExpirationTime: + format: date-time + nullable: true + type: string type: object type: object conditions: @@ -8549,6 +8557,10 @@ spec: format: date-time type: string type: object + ttlExpirationTime: + format: date-time + nullable: true + type: string type: object type: object serviceStatus: diff --git a/ray-operator/config/samples/ray-cluster-ttl.yaml b/ray-operator/config/samples/ray-cluster-ttl.yaml new file mode 100644 index 00000000000..034f020e655 --- /dev/null +++ b/ray-operator/config/samples/ray-cluster-ttl.yaml @@ -0,0 +1,83 @@ +# This sample demonstrates RayCluster with TTL (Time-To-Live) functionality. +# The cluster will be automatically deleted after the specified TTL period. +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: raycluster-ttl-example +spec: + # TTL in seconds - cluster will be deleted after this time has elapsed since creation + # In this example, the cluster will be deleted after 1 hour (3600 seconds) + # Set to 0 or omit this field to disable TTL functionality + ttlSeconds: 3600 + + # Ray version + rayVersion: '2.46.0' + + # Enable autoscaling + enableInTreeAutoscaling: true + + # Head group specification + headGroupSpec: + # Head group template + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.46.0 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "2" + memory: "4Gi" + requests: + cpu: "2" + memory: "4Gi" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + volumes: + - name: ray-logs + emptyDir: {} + # Service type for the head group + serviceType: ClusterIP + + # Worker group specifications + workerGroupSpecs: + # the pod replicas in this group typed worker + - replicas: 2 + minReplicas: 1 + maxReplicas: 5 + # logical group name, for this called small-group, also can be functional + groupName: small-group + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of KubeRay. + rayStartParams: {} + #pod template + template: + spec: + containers: + - name: ray-worker + image: rayproject/ray:2.46.0 + lifecycle: + preStop: + exec: + command: ["/bin/sh","-c","ray stop"] + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "1" + memory: "2Gi" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + volumes: + - name: ray-logs + emptyDir: {} diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 51eb06bcca8..0864333316c 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -299,6 +299,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance r.reconcileHeadService, r.reconcileHeadlessService, r.reconcileServeService, + r.reconcileTTL, r.reconcilePods, } @@ -348,8 +349,22 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance ) requeueAfterSeconds = utils.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS } - logger.Info("Unconditional requeue after", "seconds", requeueAfterSeconds) - return ctrl.Result{RequeueAfter: time.Duration(requeueAfterSeconds) * time.Second}, nil + // Check if TTL expiration requires sooner requeue + requeueDuration := time.Duration(requeueAfterSeconds) * time.Second + if newInstance.Spec.TTLSeconds != nil && *newInstance.Spec.TTLSeconds > 0 && newInstance.Status.TTLExpirationTime != nil { + timeUntilExpiration := time.Until(newInstance.Status.TTLExpirationTime.Time) + // Add a small buffer (2 seconds) to ensure we process the deletion promptly + ttlRequeue := timeUntilExpiration + (2 * time.Second) + + // Only use TTL requeue if it's sooner than default requeue and positive + if ttlRequeue > 0 && ttlRequeue < requeueDuration { + requeueDuration = ttlRequeue + logger.Info("Using TTL-based requeue", "ttlRequeueSeconds", ttlRequeue.Seconds(), "defaultRequeueSeconds", requeueAfterSeconds) + } + } + + logger.Info("Unconditional requeue after", "seconds", requeueDuration.Seconds()) + return ctrl.Result{RequeueAfter: requeueDuration}, nil } func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster) error { @@ -1593,3 +1608,60 @@ func setDefaults(instance *rayv1.RayCluster) { } } } + +// reconcileTTL handles the TTL (Time-To-Live) functionality for RayCluster +func (r *RayClusterReconciler) reconcileTTL(ctx context.Context, instance *rayv1.RayCluster) error { + logger := ctrl.LoggerFrom(ctx) + + // If TTLSeconds is not set or is 0, no TTL functionality is needed + if instance.Spec.TTLSeconds == nil || *instance.Spec.TTLSeconds == 0 { + // If TTL was previously set but now removed, clear the expiration time + if instance.Status.TTLExpirationTime != nil { + instance.Status.TTLExpirationTime = nil + } + return nil + } + + ttlSeconds := *instance.Spec.TTLSeconds + creationTime := instance.CreationTimestamp.Time + expirationTime := creationTime.Add(time.Duration(ttlSeconds) * time.Second) + + // Update the TTL expiration time in status if it's not set or different + if instance.Status.TTLExpirationTime == nil || !instance.Status.TTLExpirationTime.Time.Equal(expirationTime) { + instance.Status.TTLExpirationTime = &metav1.Time{Time: expirationTime} + logger.Info("TTL expiration time set", "expirationTime", expirationTime, "ttlSeconds", ttlSeconds) + } + + nowTime := time.Now() + + // Check if the TTL has expired + if expirationTime.Before(nowTime) || expirationTime.Equal(nowTime) { + logger.Info("RayCluster TTL has expired, deleting cluster", + "creationTime", creationTime, + "expirationTime", expirationTime, + "currentTime", nowTime, + "ttlSeconds", ttlSeconds) + + // Delete the RayCluster + if err := r.Client.Delete(ctx, instance); err != nil { + logger.Error(err, "Failed to delete RayCluster due to TTL expiration") + return err + } + + r.Recorder.Eventf(instance, corev1.EventTypeNormal, "TTLExpired", + "RayCluster %s/%s deleted due to TTL expiration after %d seconds", + instance.Namespace, instance.Name, ttlSeconds) + + return nil + } + + // Calculate time until expiration and log it + timeUntilExpiration := time.Until(expirationTime) + logger.V(1).Info("RayCluster TTL status", + "creationTime", creationTime, + "expirationTime", expirationTime, + "timeUntilExpiration", timeUntilExpiration, + "ttlSeconds", ttlSeconds) + + return nil +} diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 8d5352b5c42..dd535573413 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -44,6 +44,10 @@ func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]s } } + if spec.TTLSeconds != nil && *spec.TTLSeconds < 0 { + return fmt.Errorf("ttlSeconds must be a non-negative integer") + } + if annotations[RayFTEnabledAnnotationKey] != "" && spec.GcsFaultToleranceOptions != nil { return fmt.Errorf("%s annotation and GcsFaultToleranceOptions are both set. "+ "Please use only GcsFaultToleranceOptions to configure GCS fault tolerance", RayFTEnabledAnnotationKey) diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index b55a8f26c39..73e2827932d 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1193,6 +1193,54 @@ func TestValidateRayServiceMetadata(t *testing.T) { require.NoError(t, err) } +func TestValidateRayClusterSpecTTL(t *testing.T) { + tests := []struct { + name string + ttlSeconds *int32 + expectError bool + }{ + { + name: "TTLSeconds is nil (not set)", + ttlSeconds: nil, + expectError: false, + }, + { + name: "TTLSeconds is 0 (no TTL)", + ttlSeconds: ptr.To(int32(0)), + expectError: false, + }, + { + name: "TTLSeconds is positive", + ttlSeconds: ptr.To(int32(3600)), + expectError: false, + }, + { + name: "TTLSeconds is negative", + ttlSeconds: ptr.To(int32(-1)), + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spec := &rayv1.RayClusterSpec{ + TTLSeconds: tt.ttlSeconds, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, nil), + }, + } + + err := ValidateRayClusterSpec(spec, map[string]string{}) + if tt.expectError { + require.Error(t, err) + require.Contains(t, err.Error(), "ttlSeconds must be a non-negative integer") + } else { + require.NoError(t, err) + } + }) + } +} + func createBasicRayClusterSpec() *rayv1.RayClusterSpec { return &rayv1.RayClusterSpec{ HeadGroupSpec: rayv1.HeadGroupSpec{