Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Feb 20, 2024
1 parent 9869381 commit 3513be2
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 67 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (

// HostAgentJobNameAnnotation is the annotation identifying the name of HostOperationJob.
HostAgentJobNameAnnotation = "cape.infrastructure.cluster.x-k8s.io/host-agent-job-name"

// MachineHotUpdateStatusAnnotation is the annotation recording the hot update status of machine.
// MachineHotUpdateStatusAnnotation is used in KCP and MD.
MachineHotUpdateStatusAnnotation = "cape.infrastructure.cluster.x-k8s.io/machine-hot-update-status"
)

// Labels.
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta1/elfmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type ElfMachineStatus struct {
// +optional
GPUDevices []GPUStatus `json:"gpuDevices,omitempty"`

// Resources records the resources allocated for the machine.
// +optional
Resources ResourcesStatus `json:"resources,omitempty"`

// FailureReason will be set in the event that there is a terminal problem
// reconciling the Machine and will contain a succinct value suitable
// for machine interpretation.
Expand Down Expand Up @@ -241,6 +245,11 @@ func (m *ElfMachine) IsFailed() bool {
return m.Status.FailureReason != nil || m.Status.FailureMessage != nil
}

// IsResourcesUpToDate returns whether the machine's resources(disk) are as expected.
func (m *ElfMachine) IsResourcesUpToDate() bool {
return m.Spec.DiskGiB == m.Status.Resources.Disk
}

func (m *ElfMachine) SetVMDisconnectionTimestamp(timestamp *metav1.Time) {
if m.Annotations == nil {
m.Annotations = make(map[string]string)
Expand Down
14 changes: 14 additions & 0 deletions api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ type GPUStatus struct {
Name string `json:"name,omitempty"`
}

// ResourcesStatus records the resources allocated to the virtual machine.
type ResourcesStatus struct {
Disk int32 `json:"disk,omitempty"`
}

// MachineHotUpdateStatus defines the observed state of machine hot update.
// MachineHotUpdateStatus is used in KCP and MD.
type MachineHotUpdateStatus struct {
// Total number of machines are not up to date targeted by KCP/MD.
OutdatedReplicas int32 `json:"outdatedReplicas"`
// Total number of machines are updating targeted by KCP/MD.
UpdatingReplicas int32 `json:"updatingReplicas"`
}

//+kubebuilder:object:generate=false

// PatchStringValue is for patching resources.
Expand Down
31 changes: 31 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ spec:
ready:
description: Ready is true when the provider resource is ready.
type: boolean
resources:
description: Resources records the resources allocated for the machine.
properties:
disk:
format: int32
type: integer
type: object
taskRef:
description: TaskRef is a managed object reference to a Task related
to the machine. This value is set automatically at runtime and should
Expand Down
12 changes: 10 additions & 2 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,23 +980,31 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx *context.MachineContext
if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
case service.IsUpdateVMDiskTask(task, ctx.ElfMachine.Name):
reason := conditions.GetReason(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMDiskReason || reason == infrav1.ExpandingVMDiskFailedReason {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, errorMessage)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
}

switch {
case service.IsVMDuplicateError(errorMessage):
setVMDuplicate(ctx.ElfMachine.Name)
case service.IsMemoryInsufficientError(errorMessage):
recordElfClusterMemoryInsufficient(ctx, true)
message := fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
ctx.Logger.Info(message)

return errors.New(message)
case service.IsPlacementGroupError(errorMessage):
if err := recordPlacementGroupPolicyNotSatisfied(ctx, true); err != nil {
return err
}
message := "The placement group policy can not be satisfied"
ctx.Logger.Info(message)

return errors.New(message)
}

Expand Down
145 changes: 82 additions & 63 deletions controllers/elfmachine_controller_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@ import (
"github.com/smartxworks/cluster-api-provider-elf/pkg/hostagent"
"github.com/smartxworks/cluster-api-provider-elf/pkg/service"
annotationsutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/annotations"
machineutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/machine"
)

func (r *ElfMachineReconciler) reconcileVMResources(ctx *context.MachineContext, vm *models.VM) (bool, error) {
if !machineutil.IsUpdatingElfMachineResources(ctx.ElfMachine) {
return true, nil
}

if ok, err := r.reconcieVMVolume(ctx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
return ok, err
}
Expand All @@ -49,57 +44,18 @@ func (r *ElfMachineReconciler) reconcileVMResources(ctx *context.MachineContext,
return false, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", ctx.Client, client.ObjectKey{Namespace: ctx.Cluster.Namespace, Name: ctx.Cluster.Name})
if err != nil {
return false, err
}

var agentJob *agentv1.HostOperationJob
agentJobName := annotationsutil.HostAgentJobName(ctx.ElfMachine)
if agentJobName != "" {
agentJob, err = hostagent.GetHostJob(ctx, kubeClient, ctx.ElfMachine.Namespace, agentJobName)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
}
if agentJob == nil {
agentJob, err = hostagent.AddNewDiskCapacityToRoot(ctx, kubeClient, ctx.ElfMachine)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

annotationsutil.AddAnnotations(ctx.ElfMachine, map[string]string{infrav1.HostAgentJobNameAnnotation: agentJob.Name})

conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")

ctx.Logger.Info("Waiting for disk to be added new disk capacity to root", "hostAgentJob", agentJob.Name)

return false, nil
if ok, err := r.expandVMRootPartition(ctx); err != nil || !ok {
return ok, err
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
conditions.MarkTrue(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
ctx.Logger.Info("Add new disk capacity to root succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
ctx.Logger.Info("Add new disk capacity to root failed, will try again", "hostAgentJob", agentJob.Name)

return false, nil
default:
ctx.Logger.Info("Waiting for adding new disk capacity to root job done", "jobStatus", agentJob.Status.Phase)

return false, nil
}
conditions.MarkTrue(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)

return true, nil
}

// reconcieVMVolume ensures that the vm disk size is as expected.
//
// The conditionType param: VMProvisionedCondition/ResourcesHotUpdatedCondition.
func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm *models.VM, conditionType clusterv1.ConditionType) (bool, error) {
vmDiskIDs := make([]string, len(vm.VMDisks))
for i := 0; i < len(vm.VMDisks); i++ {
Expand All @@ -108,6 +64,8 @@ func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm

vmDisks, err := ctx.VMService.GetVMDisks(vmDiskIDs)
if err != nil {
return false, errors.Wrapf(err, "failed to get disks for vm %s/%s", *vm.ID, *vm.Name)
} else if len(vmDisks) == 0 {
return false, errors.Errorf("no disks found for vm %s/%s", *vm.ID, *vm.Name)
}

Expand All @@ -116,37 +74,98 @@ func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm
return false, err
}

diskSize := service.TowerDisk(ctx.ElfMachine.Spec.DiskGiB)
if *diskSize > *vmVolume.Size {
if service.IsTowerResourcePerformingAnOperation(vmVolume.EntityAsyncStatus) {
ctx.Logger.Info("Waiting for vm volume task done", "volume", fmt.Sprintf("%s/%s", *vmVolume.ID, *vmVolume.Name))
diskSize := service.ByteToGiB(*vmVolume.Size)
ctx.ElfMachine.Status.Resources.Disk = diskSize

return false, nil
}

return false, r.resizeVMVolume(ctx, vmVolume, *diskSize, conditionType)
} else if *diskSize < *vmVolume.Size {
conditions.MarkTrue(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
ctx.Logger.Info(fmt.Sprintf("Current disk capacity is larger than expected, skipping expand vm volume %s/%s", *vmVolume.ID, *vmVolume.Name), "currentSize", *vmVolume.Size, "expectedSize", *diskSize)
if ctx.ElfMachine.Spec.DiskGiB < diskSize {
ctx.Logger.V(3).Info(fmt.Sprintf("Current disk capacity is larger than expected, skipping expand vm volume %s/%s", *vmVolume.ID, *vmVolume.Name), "currentSize", diskSize, "expectedSize", ctx.ElfMachine.Spec.DiskGiB)
} else if ctx.ElfMachine.Spec.DiskGiB > diskSize {
return false, r.resizeVMVolume(ctx, vmVolume, *service.TowerDisk(ctx.ElfMachine.Spec.DiskGiB), conditionType)
}

return true, nil
}

// resizeVMVolume sets the volume to the specified size.
func (r *ElfMachineReconciler) resizeVMVolume(ctx *context.MachineContext, vmVolume *models.VMVolume, diskSize int64, conditionType clusterv1.ConditionType) error {
reason := conditions.GetReason(ctx.ElfMachine, conditionType)
if reason == "" ||
(reason != infrav1.ExpandingVMDiskReason && reason != infrav1.ExpandingVMDiskFailedReason) {
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskReason, clusterv1.ConditionSeverityInfo, "")

// Save the conditionType first, and then expand the disk capacity.
// This prevents the disk expansion from succeeding but failing to save the
// conditionType, causing ElfMachine to not record the conditionType.
return nil
}

if service.IsTowerResourcePerformingAnOperation(vmVolume.EntityAsyncStatus) {
ctx.Logger.Info("Waiting for vm volume task done", "volume", fmt.Sprintf("%s/%s", *vmVolume.ID, *vmVolume.Name))

return nil
}

withTaskVMVolume, err := ctx.VMService.ResizeVMVolume(*vmVolume.ID, diskSize)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskReason, clusterv1.ConditionSeverityWarning, err.Error())
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

return errors.Wrapf(err, "failed to trigger expand size from %d to %d for vm volume %s/%s", *vmVolume.Size, diskSize, *vmVolume.ID, *vmVolume.Name)
}

conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, "")

ctx.ElfMachine.SetTask(*withTaskVMVolume.TaskID)

ctx.Logger.Info(fmt.Sprintf("Waiting for the vm volume %s/%s to be expanded", *vmVolume.ID, *vmVolume.Name), "taskRef", ctx.ElfMachine.Status.TaskRef, "oldSize", *vmVolume.Size, "newSize", diskSize)

return nil
}

func (r *ElfMachineReconciler) expandVMRootPartition(ctx *context.MachineContext) (bool, error) {
reason := conditions.GetReason(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
} else if reason != infrav1.ExpandingVMDiskReason &&
reason != infrav1.ExpandingVMDiskFailedReason &&
reason != infrav1.ExpandingRootPartitionReason &&
reason != infrav1.ExpandingRootPartitionFailedReason {
return true, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", ctx.Client, client.ObjectKey{Namespace: ctx.Cluster.Namespace, Name: ctx.Cluster.Name})
if err != nil {
return false, err
}
var agentJob *agentv1.HostOperationJob
agentJobName := annotationsutil.HostAgentJobName(ctx.ElfMachine)
if agentJobName != "" {
agentJob, err = hostagent.GetHostJob(ctx, kubeClient, ctx.ElfMachine.Namespace, agentJobName)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
}
if agentJob == nil {
agentJob, err = hostagent.AddNewDiskCapacityToRoot(ctx, kubeClient, ctx.ElfMachine)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())
return false, err
}
annotationsutil.AddAnnotations(ctx.ElfMachine, map[string]string{infrav1.HostAgentJobNameAnnotation: agentJob.Name})
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")
ctx.Logger.Info("Waiting for disk to be added new disk capacity to root", "hostAgentJob", agentJob.Name)
return false, nil
}
switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
ctx.Logger.Info("Add new disk capacity to root succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
ctx.Logger.Info("Add new disk capacity to root failed, will try again", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)
return false, nil
default:
ctx.Logger.Info("Waiting for adding new disk capacity to root partition job done", "jobStatus", agentJob.Status.Phase)
return false, nil
}

return true, nil
}
Loading

0 comments on commit 3513be2

Please sign in to comment.