diff --git a/pkg/controller/modeladapter/modeladapter_controller.go b/pkg/controller/modeladapter/modeladapter_controller.go index cac7d3b5..b6a073b1 100644 --- a/pkg/controller/modeladapter/modeladapter_controller.go +++ b/pkg/controller/modeladapter/modeladapter_controller.go @@ -200,7 +200,7 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request // the object is being deleted if controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) { // the finalizer is present, so let's unload lora from those inference engines - if err := r.unloadModelAdapter(modelAdapter); err != nil { + if _, err := r.unloadModelAdapter(modelAdapter, int(*modelAdapter.Spec.Replicas)); err != nil { // if fail to delete unload lora here, return the error so it can be retried. return ctrl.Result{}, err } @@ -217,32 +217,26 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - return r.DoReconcile(ctx, req, modelAdapter) -} - -func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Request, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) { // Let's set the initial status when no status is available - if instance.Status.Conditions == nil || len(instance.Status.Conditions) == 0 { - instance.Status.Phase = modelv1alpha1.ModelAdapterPending - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeInitialized), - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: "Starting reconciliation", - LastTransitionTime: metav1.Now()}) - - if err := r.Status().Update(ctx, instance); err != nil { - klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance)) + if modelAdapter.Status.Conditions == nil || len(modelAdapter.Status.Conditions) == 0 { + modelAdapter.Status.Phase = modelv1alpha1.ModelAdapterPending + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeInitialized), metav1.ConditionUnknown, + "Reconciling", "Starting reconciliation") + if err := r.updateStatus(ctx, modelAdapter, condition); err != nil { + klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(modelAdapter)) return reconcile.Result{}, err } // re-fetch the custom resource after updating the status to avoid 409 error here. - if err := r.Get(ctx, req.NamespacedName, instance); err != nil { + if err := r.Get(ctx, req.NamespacedName, modelAdapter); err != nil { klog.Error(err, "Failed to re-fetch modelAdapter") return ctrl.Result{}, err } } + return r.DoReconcile(ctx, req, modelAdapter) +} +func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Request, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) { oldInstance := instance.DeepCopy() // Step 0: Validate ModelAdapter configurations @@ -250,14 +244,9 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque klog.Error(err, "Failed to validate the ModelAdapter") instance.Status.Phase = modelv1alpha1.ModelAdapterFailed - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "ValidationFailed", - Message: "ModelAdapter resource is not valid", - LastTransitionTime: metav1.Now()}) - - if updateErr := r.Status().Update(ctx, instance); updateErr != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + "ValidationFailed", "ModelAdapter resource is not valid") + if updateErr := r.updateStatus(ctx, instance, condition); updateErr != nil { klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance)) return reconcile.Result{}, updateErr } @@ -265,70 +254,108 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, err } - // Step 1: Schedule Pod for ModelAdapter - selectedPod := &corev1.Pod{} - existPods := false - var err error + // verify if there's pods matches adapter's selector + backendPods, err := r.findPodsForModelAdapter(ctx, instance) + if err != nil { + klog.Info("failed to list pods", "error", err) + return ctrl.Result{}, err + } + + // verify the pods counts, we have a constraint + // scenario 1: if there's no pod available, it could be wrong label selector. + // scenario 2: if len < replica, due to our constraints, it's not allowed. + // TODO: make constraints like a framework and provides the validate() interface etc. + if len(backendPods) == 0 || len(backendPods) < int(*instance.Spec.Replicas) { + instance.Status.Phase = modelv1alpha1.ModelAdapterFailed + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + "ValidationFailed", "ModelAdapter pod selector can not find any pods or pod count is less than required replicas") + if updateErr := r.updateStatus(ctx, instance, condition); updateErr != nil { + klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance)) + return reconcile.Result{}, updateErr + } + } + + // Step 1: Schedule ModelAdapter across Pods + podsWithModelAdapter := make([]*corev1.Pod, 0) if instance.Status.Instances != nil && len(instance.Status.Instances) != 0 { // model adapter has already been scheduled to some pods // check the scheduled pod first, verify the mapping is still valid. // TODO: this needs to be changed once we support multiple lora adapters - selectedPodName := instance.Status.Instances[0] - if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: selectedPodName}, selectedPod); err != nil && apierrors.IsNotFound(err) { - klog.ErrorS(err, "Failed to get selected pod but pod is still in ModelAdapter instance list", "modelAdapter", klog.KObj(instance)) - - // instance.Status.Instances has been outdated, and we need to clear the pod list - // after the pod list is cleaned up, let's reconcile the instance object again in the next loop - return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance) - } else if err != nil { - // failed to fetch the pod, let's requeue - return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err - } else { - // compare instance and model adapter labels. - selector, err := metav1.LabelSelectorAsSelector(instance.Spec.PodSelector) - if err != nil { - // TODO: this should barely happen, let's move this logic to earlier validation logics. - return ctrl.Result{}, fmt.Errorf("failed to convert pod selector: %v", err) - } + for _, selectedPodName := range instance.Status.Instances { + pod := &corev1.Pod{} + if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: selectedPodName}, pod); err != nil && apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to get selected pod but pod is still in ModelAdapter instance list", "modelAdapter", klog.KObj(instance)) + + // instance.Status.Instances has been outdated, and we need to clear the pod list + // after the pod list is cleaned up, let's reconcile the instance object again in the next loop + return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance, selectedPodName) + } else if err != nil { + // failed to fetch the pod, let's requeue + return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err + } else { + // compare instance and model adapter labels. + selector, err := metav1.LabelSelectorAsSelector(instance.Spec.PodSelector) + if err != nil { + // TODO: this should barely happen, let's move this logic to earlier validation logics. + return ctrl.Result{}, fmt.Errorf("failed to convert pod selector: %v", err) + } - if !selector.Matches(labels.Set(selectedPod.Labels)) { - klog.Warning("current assigned pod selector doesn't match model adapter selector") - return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance) - } + if !selector.Matches(labels.Set(pod.Labels)) { + klog.Warning("current assigned pod selector doesn't match model adapter selector") + return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance, selectedPodName) + } - existPods = true + podsWithModelAdapter = append(podsWithModelAdapter, pod) + } } } - if !existPods { + // candidatePods = (all available pods) - (pods already have model adapter scheduled) + candidatePods := getCandidatePods(backendPods, podsWithModelAdapter) + selectedPods := make([]*corev1.Pod, 0) + if int(*instance.Spec.Replicas) > len(podsWithModelAdapter) { // TODO: as we plan to support lora replicas, it needs some corresponding changes. // it should return a list of pods in future, otherwise, it should be invoked by N times. - selectedPod, err = r.schedulePod(ctx, instance) + diff := len(backendPods) - int(*instance.Spec.Replicas) + selectedPods, err := r.schedulePods(ctx, instance, candidatePods, diff) if err != nil { klog.ErrorS(err, "Failed to schedule Pod for ModelAdapter", "modelAdapter", instance.Name) return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err } - instance.Status.Phase = modelv1alpha1.ModelAdapterScheduling - instance.Status.Instances = []string{selectedPod.Name} - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), selectedPod.Name), - LastTransitionTime: metav1.Now(), - }) + podNames := ExtractPodNames(selectedPods) - if err := r.Status().Update(ctx, instance); err != nil { + instance.Status.Phase = modelv1alpha1.ModelAdapterScheduling + instance.Status.Instances = podNames + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), metav1.ConditionTrue, + "Reconciling", fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames)) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.InfoS("Got error when updating status", "cluster name", req.Name, "error", err, "ModelAdapter", instance) return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err } + return ctrl.Result{Requeue: true}, nil + } else if len(podsWithModelAdapter) > int(*instance.Spec.Replicas) { + diff := len(podsWithModelAdapter) - int(*instance.Spec.Replicas) + podUnloadedAdapters, err := r.unloadModelAdapter(instance, diff) + if err != nil { + return ctrl.Result{}, err + } + podNames := ExtractPodNames(podsWithModelAdapter) + + instance.Status.Phase = modelv1alpha1.ModelAdapterScaling + instance.Status.Instances = Difference(podNames, podUnloadedAdapters) + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), metav1.ConditionTrue, + "Reconciling", fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames)) + if err = r.updateStatus(ctx, instance, condition); err != nil { + return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err + } + return ctrl.Result{Requeue: true}, nil } // Step 2: Reconcile Loading - if err := r.reconcileLoading(ctx, instance, selectedPod); err != nil { + if err := r.reconcileLoading(ctx, instance, selectedPods); err != nil { // retry any of the failure. instance.Status.Phase = modelv1alpha1.ModelAdapterFailed if err := r.Status().Update(ctx, instance); err != nil { @@ -348,7 +375,7 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque } // Step 4: Reconcile EndpointSlice - if ctrlResult, err := r.reconcileEndpointSlice(ctx, instance, selectedPod); err != nil { + if ctrlResult, err := r.reconcileEndpointSlice(ctx, instance, selectedPods); err != nil { if updateErr := r.updateModelAdapterState(ctx, instance, modelv1alpha1.ModelAdapterFailed); updateErr != nil { klog.ErrorS(updateErr, "ModelAdapter update state error", "cluster name", req.Name) } @@ -359,7 +386,9 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque if r.inconsistentModelAdapterStatus(oldInstance.Status, instance.Status) { klog.InfoS("model adapter reconcile", "Update CR status", req.Name, "status", instance.Status) instance.Status.Phase = modelv1alpha1.ModelAdapterRunning - if err = r.updateStatus(ctx, instance); err != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionReady), metav1.ConditionTrue, + "Reconciling", fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance))) + if err = r.updateStatus(ctx, instance, condition); err != nil { return reconcile.Result{}, fmt.Errorf("update modelAdapter status error: %v", err) } } @@ -367,30 +396,24 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } -func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionReady), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance)), - LastTransitionTime: metav1.Now(), - }) - +func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter, condition metav1.Condition) error { + meta.SetStatusCondition(&instance.Status.Conditions, condition) return r.Status().Update(ctx, instance) } -func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { - stalePodName := instance.Status.Instances[0] - instance.Status.Instances = []string{} - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionCleanup), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) can not be fetched for model adapter (%s), clean up the list", stalePodName, instance.Name), - LastTransitionTime: metav1.Now(), - }) - - if err := r.Status().Update(ctx, instance); err != nil { +func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Context, instance *modelv1alpha1.ModelAdapter, stalePodName string) error { + // remove stalePodName from the original list + newList := make([]string, 0) + for _, podName := range instance.Status.Instances { + if podName != stalePodName { + newList = append(newList, podName) + } + } + + instance.Status.Instances = newList + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionCleanup), metav1.ConditionTrue, + "Reconciling", fmt.Sprintf("Pod (%s) can not be fetched for model adapter (%s), clean up the list", stalePodName, instance.Name)) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.Error(err, "Failed to update modelAdapter status") return err } @@ -398,61 +421,90 @@ func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Conte return nil } -func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter) (*corev1.Pod, error) { - // Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector - // For the sake of example, we will just list the Pods matching the selector and pick the first one - podList := &corev1.PodList{} - listOpts := []client.ListOption{ - client.InNamespace(instance.Namespace), - client.MatchingLabels(instance.Spec.PodSelector.MatchLabels), - } - if err := r.List(ctx, podList, listOpts...); err != nil { - return nil, err +// schedulePods will schedule 'diff' number of Pods by calling schedulePod multiple times, +// keeping track of selected Pods and removing them from the candidates list. +func (r *ModelAdapterReconciler) schedulePods(ctx context.Context, instance *modelv1alpha1.ModelAdapter, candidates []*corev1.Pod, diff int) ([]*corev1.Pod, error) { + var selectedPods []*corev1.Pod + + for i := 0; i < diff; i++ { + if len(candidates) == 0 { + // If we run out of candidate Pods, return an error + // this will barely happen since we did the len(pods) > replica check earlier + return nil, fmt.Errorf("no more candidate pods available to schedule") + } + + // Schedule a pod and remove it from the candidate list + pod, err := r.schedulePod(ctx, instance, candidates) + if err != nil { + return nil, err + } + + // Record the scheduled pod + selectedPods = append(selectedPods, pod) + + // Remove the scheduled pod from candidates + for idx, candidate := range candidates { + if candidate.Name == pod.Name { + // Remove the pod by slicing out the element + candidates = append(candidates[:idx], candidates[idx+1:]...) + break + } + } } - if len(podList.Items) == 0 { - return nil, fmt.Errorf("no pods found matching selector") + // Return all selected pods + return selectedPods, nil +} + +func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter, candidates []*corev1.Pod) (*corev1.Pod, error) { + // Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector + // For the sake of example, we will just list the Pods matching the selector and pick the first one + if len(candidates) == 0 { + return nil, fmt.Errorf("no candidate pods available") } - return r.scheduler.SelectPod(ctx, podList.Items) + // You can implement any specific logic to choose a pod, here just selecting the first candidate + return r.scheduler.SelectPod(ctx, candidates) } -func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) error { +func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter, podToBind []*corev1.Pod) error { // Define the key you want to check key := "DEBUG_MODE" value, exists := getEnvKey(key) - host := fmt.Sprintf("http://%s:8000", pod.Status.PodIP) - if exists && value == "on" { + hostIp := "localhost" + if !exists || value != "on" { // 30080 is the nodePort of the base model service. - host = fmt.Sprintf("http://%s:30080", "localhost") + hostIp = fmt.Sprintf("http://%s:30080", "localhost") } - // Check if the model is already loaded - exists, err := r.modelAdapterExists(host, instance.Name) - if err != nil { - return err - } - if exists { - klog.V(4).Info("LoRA model has been registered previously, skipping registration") - return nil - } + for _, pod := range podToBind { + if !exists || value != "on" { + hostIp = pod.Status.PodIP + } + // TODO: this should be configured by user. right now, only supports vLLM + host := fmt.Sprintf("http://%s:8000", hostIp) + // Check if the model is already loaded + exists, err := r.modelAdapterExists(host, instance.Name) + if err != nil { + return err + } + if exists { + klog.V(4).Info("LoRA model has been registered previously, skipping registration") + return nil + } - // Load the Model adapter - err = r.loadModelAdapter(host, instance) - if err != nil { - return err + // Load the Model adapter + err = r.loadModelAdapter(host, instance) + if err != nil { + return err + } } - // Update the instance status + // Update the instance status once all binding has been done. instance.Status.Phase = modelv1alpha1.ModelAdapterBinding - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeScheduled), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance)), - LastTransitionTime: metav1.Now(), - }) - if err := r.Status().Update(ctx, instance); err != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeScheduled), metav1.ConditionTrue, + "Reconciling", fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance))) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.InfoS("Got error when updating status", "error", err, "ModelAdapter", instance) return err } @@ -547,67 +599,79 @@ func (r *ModelAdapterReconciler) loadModelAdapter(host string, instance *modelv1 } // unloadModelAdapter unloads the loras from inference engines -func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error { +func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter, diff int) ([]string, error) { if len(instance.Status.Instances) == 0 { klog.Warning("model adapter has not been deployed to any pods yet, skip unloading") - return nil + return nil, nil } // TODO:(jiaxin.shan) Support multiple instances - - podName := instance.Status.Instances[0] - targetPod := &corev1.Pod{} - if err := r.Get(context.TODO(), types.NamespacedName{ - Namespace: instance.Namespace, - Name: podName, - }, targetPod); err != nil { - if apierrors.IsNotFound(err) { - // since the pod doesn't exist, unload is unnecessary - return nil - } - klog.Warning("Error getting Pod from lora instance list", err) - return err - } - - payload := map[string]string{ - "lora_name": instance.Name, - } - payloadBytes, err := json.Marshal(payload) - if err != nil { - return err + var podsToUnload []string + if len(instance.Status.Instances) <= diff { + // If the number of instances is less than or equal to diff, unload all of them + podsToUnload = instance.Status.Instances + } else { + // If there are more instances than diff, unload the first 'diff' instances + podsToUnload = instance.Status.Instances[:diff] } - url := fmt.Sprintf("http://%s:%d/v1/unload_lora_adapter", targetPod.Status.PodIP, 8000) + // this is for debug purpose key := "DEBUG_MODE" value, exists := getEnvKey(key) - if exists && value == "on" { - // 30080 is the nodePort of the base model service. - url = "http://localhost:30080/v1/unload_lora_adapter" - } + endpoint := "localhost:30080" + + for i := 0; i < len(podsToUnload); i++ { + podName := podsToUnload[0] + targetPod := &corev1.Pod{} + if err := r.Get(context.TODO(), types.NamespacedName{ + Namespace: instance.Namespace, + Name: podName, + }, targetPod); err != nil { + if apierrors.IsNotFound(err) { + // since the pod doesn't exist, unload is unnecessary + continue + } + klog.Warning("Error getting Pod from lora instance list", "pod", podName) + return nil, err + } - req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") + payload := map[string]string{ + "lora_name": instance.Name, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return err - } - defer func() { - if err := resp.Body.Close(); err != nil { - klog.InfoS("Error closing response body:", err) + if !exists || value != "on" { + endpoint = fmt.Sprintf("%s:%d", targetPod.Status.PodIP, 8000) } - }() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to unload LoRA adapter: %s", body) + url := fmt.Sprintf("http://%s/v1/unload_lora_adapter", endpoint) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer func() { + if err := resp.Body.Close(); err != nil { + klog.InfoS("Error closing response body:", err) + } + }() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to unload LoRA adapter: %s", body) + } } - return nil + return podsToUnload, nil } func (r *ModelAdapterReconciler) updateModelAdapterState(ctx context.Context, instance *modelv1alpha1.ModelAdapter, phase modelv1alpha1.ModelAdapterPhase) error { @@ -629,15 +693,9 @@ func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance svc, err := buildModelAdapterService(instance) if err != nil { klog.ErrorS(err, "Failed to define new Service resource for ModelAdapter") - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: fmt.Sprintf("Failed to create Service for the custom resource (%s): (%s)", instance.Name, err), - LastTransitionTime: metav1.Now(), - }) - - if err := r.Status().Update(ctx, instance); err != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + "Reconciling", fmt.Sprintf("Failed to create Service for the custom resource (%s): (%s)", instance.Name, err)) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.Error(err, "Failed to update modelAdapter status") return ctrl.Result{}, err } @@ -667,25 +725,19 @@ func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance return ctrl.Result{}, nil } -func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) (ctrl.Result, error) { +func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, instance *modelv1alpha1.ModelAdapter, pods []*corev1.Pod) (ctrl.Result, error) { // check if the endpoint slice already exists, if not create a new one. found := &discoveryv1.EndpointSlice{} err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found) if err != nil && apierrors.IsNotFound(err) { // EndpointSlice does not exist, create it - eps, err := buildModelAdapterEndpointSlice(instance, pod) + eps, err := buildModelAdapterEndpointSlice(instance, pods) if err != nil { klog.ErrorS(err, "Failed to define new EndpointSlice resource for ModelAdapter") instance.Status.Phase = modelv1alpha1.ModelAdapterFailed - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err), - LastTransitionTime: metav1.Now(), - }) - - if err := r.Status().Update(ctx, instance); err != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + "Reconciling", fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err)) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.Error(err, "Failed to update modelAdapter status") return ctrl.Result{}, err } @@ -708,34 +760,57 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins klog.ErrorS(err, "Failed to get EndpointSlice") return ctrl.Result{}, err } else { - // Existing EndpointSlice Found. Check if the Pod IP is already in the EndpointSlice - podIP := pod.Status.PodIP - alreadyExists := false + // Step 1: Create a set of Pod IPs from the input Pods list + podIPs := make(map[string]bool) + for _, pod := range pods { + podIPs[pod.Status.PodIP] = true + } + + // Step 2: Create a set of existing IPs from the EndpointSlice + existingIPs := make(map[string]bool) for _, endpoint := range found.Endpoints { for _, address := range endpoint.Addresses { - if address == podIP { - alreadyExists = true - break - } + existingIPs[address] = true } - if alreadyExists { + } + + // Step 3: Compare the Pod IPs with the existing EndpointSlice IPs + needsUpdate := false + + // Check for any missing pod IPs in the EndpointSlice (need to add these) + for podIP := range podIPs { + if !existingIPs[podIP] { + needsUpdate = true break } } - // Append the Pod IP to the EndpointSlice if it doesn't already exist - if !alreadyExists { - found.Endpoints = append(found.Endpoints, discoveryv1.Endpoint{ - Addresses: []string{podIP}, - }) + // Check for any extra IPs in the EndpointSlice that are not in the pods list (need to remove these) + for existingIP := range existingIPs { + if !podIPs[existingIP] { + needsUpdate = true + break + } + } + // Step 4: If there are any changes, update the EndpointSlice + if needsUpdate { + // Clear existing Endpoints and rebuild from the current Pods list + found.Endpoints = []discoveryv1.Endpoint{} + for podIP := range podIPs { + found.Endpoints = append(found.Endpoints, discoveryv1.Endpoint{ + Addresses: []string{podIP}, + }) + } + + // Update the EndpointSlice in Kubernetes if err := r.Update(ctx, found); err != nil { klog.ErrorS(err, "Failed to update EndpointSlice", "EndpointSlice", found.Name) - return ctrl.Result{}, err + return ctrl.Result{}, nil } klog.InfoS("Successfully updated EndpointSlice", "EndpointSlice", found.Name) } else { - klog.InfoS("Pod IP already exists in EndpointSlice", "PodIP", podIP) + klog.InfoS("No changes detected in EndpointSlice", "EndpointSlice", found.Name) } } @@ -750,3 +825,16 @@ func (r *ModelAdapterReconciler) inconsistentModelAdapterStatus(oldStatus, newSt return false } + +func (r *ModelAdapterReconciler) findPodsForModelAdapter(ctx context.Context, instance *modelv1alpha1.ModelAdapter) ([]corev1.Pod, error) { + podList := &corev1.PodList{} + listOpts := []client.ListOption{ + client.InNamespace(instance.Namespace), + client.MatchingLabels(instance.Spec.PodSelector.MatchLabels), + } + if err := r.List(ctx, podList, listOpts...); err != nil { + return nil, err + } + + return podList.Items, nil +} diff --git a/pkg/controller/modeladapter/resources.go b/pkg/controller/modeladapter/resources.go index ec238558..f1571724 100644 --- a/pkg/controller/modeladapter/resources.go +++ b/pkg/controller/modeladapter/resources.go @@ -22,24 +22,25 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" ) -func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) (*discoveryv1.EndpointSlice, error) { +func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pods []*corev1.Pod) (*discoveryv1.EndpointSlice, error) { serviceLabels := map[string]string{ "kubernetes.io/service-name": instance.Name, } addresses := []discoveryv1.Endpoint{ { - Addresses: []string{pod.Status.PodIP}, + Addresses: ExtractPodIPs(pods), }, } ports := []discoveryv1.EndpointPort{ { - Name: stringPtr("http"), - Protocol: protocolPtr(corev1.ProtocolTCP), - Port: int32Ptr(8000), + Name: ptr.To("http"), + Protocol: ptr.To(corev1.ProtocolTCP), + Port: ptr.To(int32(8000)), }, } diff --git a/pkg/controller/modeladapter/resources_test.go b/pkg/controller/modeladapter/resources_test.go index 81cc47da..5a4a847d 100644 --- a/pkg/controller/modeladapter/resources_test.go +++ b/pkg/controller/modeladapter/resources_test.go @@ -36,14 +36,16 @@ func TestBuildModelAdapterEndpointSlice(t *testing.T) { } // Mock input for Pod - pod := &corev1.Pod{ - Status: corev1.PodStatus{ - PodIP: "192.168.1.1", + pods := []*corev1.Pod{ + { + Status: corev1.PodStatus{ + PodIP: "192.168.1.1", + }, }, } // Call the function to test - endpointSlice, err := buildModelAdapterEndpointSlice(instance, pod) + endpointSlice, err := buildModelAdapterEndpointSlice(instance, pods) // Assert no errors assert.NoError(t, err) diff --git a/pkg/controller/modeladapter/scheduling/leastadapters.go b/pkg/controller/modeladapter/scheduling/leastadapters.go index 6b289cd1..8a39d012 100644 --- a/pkg/controller/modeladapter/scheduling/leastadapters.go +++ b/pkg/controller/modeladapter/scheduling/leastadapters.go @@ -35,8 +35,8 @@ func NewLeastAdapters(c *cache.Cache) Scheduler { } } -func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) { - selectedPod := v1.Pod{} +func (r leastAdapters) SelectPod(ctx context.Context, pods []*v1.Pod) (*v1.Pod, error) { + selectedPod := &v1.Pod{} modelAdapterCountMin := math.MaxInt for _, pod := range pods { @@ -51,5 +51,5 @@ func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, e } klog.Infof("pod selected with least model adapters: %s", selectedPod.Name) - return &selectedPod, nil + return selectedPod, nil } diff --git a/pkg/controller/modeladapter/scheduling/scheduler.go b/pkg/controller/modeladapter/scheduling/scheduler.go index e0246d23..1af1b9b6 100644 --- a/pkg/controller/modeladapter/scheduling/scheduler.go +++ b/pkg/controller/modeladapter/scheduling/scheduler.go @@ -27,7 +27,7 @@ import ( type Scheduler interface { // SelectPod returns the pod to schedule model adapter - SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) + SelectPod(ctx context.Context, pods []*v1.Pod) (*v1.Pod, error) } // NewScheduler leverages the factory method to choose the right scheduler diff --git a/pkg/controller/modeladapter/utils.go b/pkg/controller/modeladapter/utils.go index 04b3b168..81b7a931 100644 --- a/pkg/controller/modeladapter/utils.go +++ b/pkg/controller/modeladapter/utils.go @@ -19,31 +19,16 @@ package modeladapter import ( "errors" "fmt" + "net/url" "os" "strings" - corev1 "k8s.io/api/core/v1" - modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func stringPtr(s string) *string { - return &s -} - -func protocolPtr(p corev1.Protocol) *corev1.Protocol { - return &p -} - -func int32Ptr(i int32) *int32 { - return &i -} - -func mapPtr(m map[string]string) *map[string]string { - return &m -} - func validateModelAdapter(instance *modelv1alpha1.ModelAdapter) error { if instance.Spec.ArtifactURL == "" { return fmt.Errorf("artifactURL is required") @@ -127,3 +112,72 @@ func extractHuggingFacePath(artifactURL string) (string, error) { return path, nil } + +// ExtractPodNames takes a list of Pods and returns a list of their names. +func ExtractPodNames(pods []*corev1.Pod) []string { + podNames := make([]string, len(pods)) + for i, pod := range pods { + podNames[i] = pod.Name + } + return podNames +} + +// ExtractPodIPs takes a list of Pods and returns a list of their ips. +func ExtractPodIPs(pods []*corev1.Pod) []string { + podIPs := make([]string, len(pods)) + for i, pod := range pods { + podIPs[i] = pod.Status.PodIP + } + return podIPs +} + +func getCandidatePods(backendPods []corev1.Pod, podsWithModelAdapter []*corev1.Pod) []*corev1.Pod { + // Step 1: Create a set of pod names from podsWithModelAdapter + modelAdapterPodNames := make(map[string]bool) + for _, pod := range podsWithModelAdapter { + modelAdapterPodNames[pod.Name] = true + } + + // Step 2: Iterate through backendPods and find pods that are not in podsWithModelAdapter + var candidatePods []*corev1.Pod + for _, pod := range backendPods { + // Create a copy of the loop variable to avoid exporting the same pointer + podCopy := pod.DeepCopy() + if !modelAdapterPodNames[pod.Name] { + // Pod is not in podsWithModelAdapter, add to candidatePods + candidatePods = append(candidatePods, podCopy) + } + } + + return candidatePods +} + +// Difference returns the elements in `a` that are not in `b` +func Difference(a, b []string) []string { + // Create a set (map) to store elements of b for quick lookup + bSet := make(map[string]struct{}, len(b)) + for _, item := range b { + bSet[item] = struct{}{} + } + + // Iterate over a and keep only elements not in bSet + var diff []string + for _, item := range a { + if _, found := bSet[item]; !found { + diff = append(diff, item) + } + } + + return diff +} + +// NewCondition creates a new replicaset condition. +func NewCondition(condType string, status metav1.ConditionStatus, reason, msg string) metav1.Condition { + return metav1.Condition{ + Type: condType, + Status: status, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: msg, + } +} diff --git a/pkg/controller/modeladapter/utils_test.go b/pkg/controller/modeladapter/utils_test.go index 80033ef2..136f31c4 100644 --- a/pkg/controller/modeladapter/utils_test.go +++ b/pkg/controller/modeladapter/utils_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1" ) @@ -36,7 +37,7 @@ func TestValidateModelAdapter(t *testing.T) { PodSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{"app": "test"}, }, - Replicas: int32Ptr(1), + Replicas: ptr.To(int32(1)), }, }