Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #40 from everpeace/fix-mutex-in-reserve-cache
Browse files Browse the repository at this point in the history
Fix panic on concurrent access to reserved resource amount cache
  • Loading branch information
everpeace authored Oct 25, 2021
2 parents 211b034 + 2a2d874 commit 44b501a
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ dev-run-debug: dev-scheduler-conf
#
# E2E test
#
export E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=1m
export E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=90s
export E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION=2s
E2E_PAUSE_IMAGE=k8s.gcr.io/pause:3.2
E2E_KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Shingo Omura under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Shingo Omura licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package controllers

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controllers Suite")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers

import (
"strings"
"sync"

schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -29,9 +30,14 @@ import (
)

type reservedResourceAmounts struct {
// [Cluster]Throttle Name -> podResourceAmountMap
cache map[types.NamespacedName]podResourceAmountMap
// This protects concurrent accesses to cache itself
sync.RWMutex

// This protects concurrent accesses on the same (Cluster)Throttle's key
keyMutex keymutex.KeyMutex

// [Cluster]Throttle Name -> podResourceAmountMap
cache map[types.NamespacedName]podResourceAmountMap
}

func newReservedResourceAmounts(n int) *reservedResourceAmounts {
Expand All @@ -41,18 +47,25 @@ func newReservedResourceAmounts(n int) *reservedResourceAmounts {
}
}

func (c *reservedResourceAmounts) getPodResourceAmountMap(nn types.NamespacedName) podResourceAmountMap {
c.Lock()
defer c.Unlock()
if _, ok := c.cache[nn]; !ok {
c.cache[nn] = podResourceAmountMap{}
}
return c.cache[nn]
}

func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Pod) bool {
c.keyMutex.LockKey(nn.String())
defer func() {
_ = c.keyMutex.UnlockKey(nn.String())
}()

if _, ok := c.cache[nn]; !ok {
c.cache[nn] = podResourceAmountMap{}
}
m := c.getPodResourceAmountMap(nn)
added := m.add(pod)

added := c.cache[nn].add(pod)
klog.V(5).InfoS("reservedResourceAmounts.addPod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Cache", c.cache)
klog.V(5).InfoS("reservedResourceAmounts.addPod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Cache", m)
return added
}

Expand All @@ -62,12 +75,10 @@ func (c *reservedResourceAmounts) removePod(nn types.NamespacedName, pod *corev1
_ = c.keyMutex.UnlockKey(nn.String())
}()

if _, ok := c.cache[nn]; !ok {
c.cache[nn] = podResourceAmountMap{}
}
m := c.getPodResourceAmountMap(nn)
removed := m.remove(pod)

removed := c.cache[nn].remove(pod)
klog.V(5).InfoS("reservedResourceAmounts.removePod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Removed", removed, "Cache", c.cache)
klog.V(5).InfoS("reservedResourceAmounts.removePod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Removed", removed, "Cache", m)
return removed
}

Expand Down
97 changes: 97 additions & 0 deletions pkg/controllers/reserved_resource_amounts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to Shingo Omura under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Shingo Omura licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package controllers

import (
"fmt"
"sync"

. "github.com/onsi/ginkgo"
// . "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("ReservedResourceAmounts", func() {
n := 2000
pod := func(i int) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{},
},
}
}
It("should be threadsafe across different throttle's namespacenames", func() {
r := newReservedResourceAmounts(1024)
add := &sync.WaitGroup{}
add.Add(n)
for i := 0; i < n; i++ {
go func(j int) {
r.addPod(
types.NamespacedName{Name: fmt.Sprintf("%d", j)},
pod(j),
)
add.Done()
}(i)
}
remove := &sync.WaitGroup{}
remove.Add(n)
for i := 0; i < n; i++ {
go func(j int) {
r.removePod(
types.NamespacedName{Name: fmt.Sprintf("%d", j)},
pod(j),
)
remove.Done()
}(i)
}
add.Wait()
remove.Wait()
})
It("should be threadsafe on specific throttle's namespacedname", func() {
r := newReservedResourceAmounts(1024)
add := &sync.WaitGroup{}
add.Add(n)
for i := 0; i < n; i++ {
go func(j int) {
r.addPod(
types.NamespacedName{Name: "test"},
pod(j),
)
add.Done()
}(i)
}
remove := &sync.WaitGroup{}
remove.Add(n)
for i := 0; i < n; i++ {
go func(j int) {
r.removePod(
types.NamespacedName{Name: "test"},
pod(j),
)
remove.Done()
}(i)
}
add.Wait()
remove.Wait()
})
})

0 comments on commit 44b501a

Please sign in to comment.