Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKS-1573: Refactor available hosts and vm migration code #130

Merged
merged 8 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,6 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
switch {
case service.IsCloneVMTask(task):
releaseTicketForCreateVM(ctx.ElfMachine.Name)
case service.IsVMMigrationTask(task):
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, err
}
releaseTicketForPlacementGroupVMMigration(placementGroupName)
case service.IsMemoryInsufficientError(errorMessage):
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, true)
message := fmt.Sprintf("Insufficient memory detected for ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
Expand All @@ -858,16 +852,9 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
case models.TaskStatusSUCCESSED:
ctx.Logger.Info("VM task succeeded", "vmRef", vmRef, "taskRef", taskRef, "taskDescription", service.GetTowerString(task.Description))

switch {
case service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task):
if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, false)
releaseTicketForCreateVM(ctx.ElfMachine.Name)
case service.IsVMMigrationTask(task):
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, err
}
releaseTicketForPlacementGroupVMMigration(placementGroupName)
}
default:
ctx.Logger.Info("Waiting for VM task done", "vmRef", vmRef, "taskRef", taskRef, "taskStatus", service.GetTowerTaskStatus(task.Status), "taskDescription", service.GetTowerString(task.Description))
Expand Down
310 changes: 167 additions & 143 deletions controllers/elfmachine_controller_placement_group.go

Large diffs are not rendered by default.

171 changes: 89 additions & 82 deletions controllers/elfmachine_controller_test.go

Large diffs are not rendered by default.

28 changes: 0 additions & 28 deletions controllers/vm_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import (
const (
creationTimeout = time.Minute * 6
vmOperationRateLimit = time.Second * 6
vmMigrationTimeout = time.Minute * 20
placementGroupSilenceTime = time.Minute * 30
placementGroupCreationLockKey = "%s:creation"
)

var vmStatusMap = make(map[string]time.Time)
var limiterLock sync.Mutex
var vmOperationMap = make(map[string]time.Time)
var placementGroupVMMigrationMap = make(map[string]time.Time)
var vmOperationLock sync.Mutex

var placementGroupOperationMap = make(map[string]time.Time)
Expand Down Expand Up @@ -91,32 +89,6 @@ func acquireTicketForUpdatingVM(vmName string) bool {
return true
}

// acquireTicketForPlacementGroupVMMigration returns whether virtual machine migration
// of placement group operation can be performed.
func acquireTicketForPlacementGroupVMMigration(groupName string) bool {
vmOperationLock.Lock()
defer vmOperationLock.Unlock()

if status, ok := placementGroupVMMigrationMap[groupName]; ok {
if !time.Now().After(status.Add(vmMigrationTimeout)) {
return false
}
}

placementGroupVMMigrationMap[groupName] = time.Now()

return true
}

// releaseTicketForPlacementGroupVMMigration releases the virtual machine migration
// of placement group being operated.
func releaseTicketForPlacementGroupVMMigration(groupName string) {
vmOperationLock.Lock()
defer vmOperationLock.Unlock()

delete(placementGroupVMMigrationMap, groupName)
}

// acquireTicketForPlacementGroupOperation returns whether placement group operation
// can be performed.
func acquireTicketForPlacementGroupOperation(groupName string) bool {
Expand Down
21 changes: 0 additions & 21 deletions controllers/vm_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,6 @@ var _ = Describe("VM Operation Limiter", func() {
})
})

var _ = Describe("Placement Group VM Migration Limiter", func() {
var groupName string

BeforeEach(func() {
groupName = fake.UUID()
})

It("acquireTicketForPlacementGroupVMMigration", func() {
Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeTrue())
Expect(placementGroupVMMigrationMap).To(HaveKey(groupName))

Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeFalse())
releaseTicketForPlacementGroupVMMigration(groupName)
Expect(placementGroupVMMigrationMap).NotTo(HaveKey(groupName))

placementGroupVMMigrationMap[groupName] = time.Now().Add(-vmMigrationTimeout)
Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeTrue())
Expect(placementGroupVMMigrationMap).To(HaveKey(groupName))
})
})

var _ = Describe("Placement Group Operation Limiter", func() {
var groupName string

Expand Down
161 changes: 161 additions & 0 deletions pkg/service/collections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"fmt"

"github.com/smartxworks/cloudtower-go-sdk/v2/models"
"k8s.io/apimachinery/pkg/util/sets"
)

// Hosts is a set of hosts.
type Hosts map[string]*models.Host

// NewHosts creates a Hosts. from a list of values.
func NewHosts(hosts ...*models.Host) Hosts {
ss := make(Hosts, len(hosts))
ss.Insert(hosts...)
return ss
}

// NewHostsFromList creates a Hosts from the given host slice.
func NewHostsFromList(hosts []*models.Host) Hosts {
ss := make(Hosts, len(hosts))
for i := range hosts {
ss.Insert(hosts[i])
}
return ss
}

func (s Hosts) Insert(hosts ...*models.Host) {
for i := range hosts {
if hosts[i] != nil {
h := hosts[i]
s[*h.ID] = h
}
}
}

func (s Hosts) Contains(hostID string) bool {
_, ok := s[hostID]
return ok
}

// Len returns the size of the set.
func (s Hosts) Len() int {
return len(s)
}

func (s Hosts) IsEmpty() bool {
return len(s) == 0
}

func (s Hosts) String() string {
str := ""

for _, host := range s {
state := ""
if host.HostState != nil {
state = string(*host.HostState.State)
}

Check warning on line 75 in pkg/service/collections.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/collections.go#L74-L75

Added lines #L74 - L75 were not covered by tests

str += fmt.Sprintf("{id: %s,name: %s,memory: %d,status: %s,state: %s},", GetTowerString(host.ID), GetTowerString(host.Name), GetTowerInt64(host.AllocatableMemoryBytes), string(*host.Status), state)
}

return fmt.Sprintf("[%s]", str)
}

// FilterAvailableHostsWithEnoughMemory returns a Hosts containing the available host which has allocatable memory no less than the specified memory.
func (s Hosts) FilterAvailableHostsWithEnoughMemory(memory int64) Hosts {
return s.Filter(func(h *models.Host) bool {
ok, _ := IsAvailableHost(h, memory)
return ok
})
}

// FilterUnavailableHostsOrWithoutEnoughMemory returns a Hosts containing the unavailable hosts or available hosts whose available memory is less than the specified memory.
func (s Hosts) FilterUnavailableHostsOrWithoutEnoughMemory(memory int64) Hosts {
return s.Filter(func(h *models.Host) bool {
ok, _ := IsAvailableHost(h, memory)
return !ok
})
}

// Get returns a Host of the specified host.
func (s Hosts) Get(hostID string) *models.Host {
if host, ok := s[hostID]; ok {
return host
}
return nil
}

// Find returns a Hosts of the specified hosts.
func (s Hosts) Find(targetHosts sets.Set[string]) Hosts {
return s.Filter(func(h *models.Host) bool {
return targetHosts.Has(*h.ID)
})
}

// UnsortedList returns the slice with contents in random order.
func (s Hosts) UnsortedList() []*models.Host {
res := make([]*models.Host, 0, len(s))
for _, value := range s {
res = append(res, value)
}
return res
}

// Difference returns a copy without hosts that are in the given collection.
func (s Hosts) Difference(hosts Hosts) Hosts {
return s.Filter(func(h *models.Host) bool {
_, found := hosts[*h.ID]
return !found
})
}

// newFilteredHostCollection creates a Hosts from a filtered list of values.
func newFilteredHostCollection(filter Func, hosts ...*models.Host) Hosts {
ss := make(Hosts, len(hosts))
for i := range hosts {
h := hosts[i]
if filter(h) {
ss.Insert(h)
}
}
return ss
}

// Filter returns a Hosts containing only the Hosts that match all of the given HostFilters.
func (s Hosts) Filter(filters ...Func) Hosts {
return newFilteredHostCollection(And(filters...), s.UnsortedList()...)
}

// Func is the functon definition for a filter.
type Func func(host *models.Host) bool

// And returns a filter that returns true if all of the given filters returns true.
func And(filters ...Func) Func {
return func(host *models.Host) bool {
for _, f := range filters {
if !f(host) {
return false
}
}
return true
}
}
82 changes: 82 additions & 0 deletions pkg/service/collections_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"fmt"
"testing"

"github.com/onsi/gomega"
"github.com/smartxworks/cloudtower-go-sdk/v2/models"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
)

func TestHostCollection(t *testing.T) {
g := gomega.NewGomegaWithT(t)

t.Run("Find", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1")}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2")}

hosts := NewHosts()
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Len()).To(gomega.Equal(0))

hosts = NewHostsFromList([]*models.Host{host1, host2})
g.Expect(hosts.Get(*host1.ID)).To(gomega.Equal(host1))
g.Expect(hosts.Get(*TowerString("404"))).To(gomega.BeNil())
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Len()).To(gomega.Equal(1))
})

t.Run("Available", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1"), AllocatableMemoryBytes: pointer.Int64(1), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2"), AllocatableMemoryBytes: pointer.Int64(2), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)}

hosts := NewHosts()
g.Expect(hosts.FilterAvailableHostsWithEnoughMemory(0).Len()).To(gomega.Equal(0))

hosts = NewHostsFromList([]*models.Host{host1, host2})
availableHosts := hosts.FilterAvailableHostsWithEnoughMemory(2)
g.Expect(availableHosts.Len()).To(gomega.Equal(1))
g.Expect(availableHosts.Contains(*host2.ID)).To(gomega.BeTrue())

hosts = NewHosts()
unavailableHosts := hosts.FilterUnavailableHostsOrWithoutEnoughMemory(0)
g.Expect(unavailableHosts.IsEmpty()).To(gomega.BeTrue())
g.Expect(unavailableHosts.Len()).To(gomega.Equal(0))
g.Expect(unavailableHosts.String()).To(gomega.Equal("[]"))

hosts = NewHostsFromList([]*models.Host{host1, host2})
unavailableHosts = hosts.FilterUnavailableHostsOrWithoutEnoughMemory(2)
g.Expect(unavailableHosts.Len()).To(gomega.Equal(1))
g.Expect(unavailableHosts.Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(unavailableHosts.String()).To(gomega.Equal(fmt.Sprintf("[{id: %s,name: %s,memory: %d,status: %s,state: %s},]", *host1.ID, *host1.Name, *host1.AllocatableMemoryBytes, string(*host1.Status), "")))
})

t.Run("Difference", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1")}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2")}

g.Expect(NewHosts().Difference(NewHosts()).Len()).To(gomega.Equal(0))
g.Expect(NewHosts().Difference(NewHosts(host1)).Len()).To(gomega.Equal(0))
g.Expect(NewHosts(host1).Difference(NewHosts(host1)).Len()).To(gomega.Equal(0))
g.Expect(NewHosts(host1).Difference(NewHosts()).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(NewHosts(host1).Difference(NewHosts(host2)).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(NewHosts(host1, host2).Difference(NewHosts(host2)).Contains(*host1.ID)).To(gomega.BeTrue())
})
}
13 changes: 7 additions & 6 deletions pkg/service/mock_services/vm_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading