diff --git a/internal/extender/extendertest/extender_test_utils.go b/internal/extender/extendertest/extender_test_utils.go index 17ee14ef7..a9b3eb7c3 100644 --- a/internal/extender/extendertest/extender_test_utils.go +++ b/internal/extender/extendertest/extender_test_utils.go @@ -48,12 +48,14 @@ type Harness struct { UnschedulablePodMarker *extender.UnschedulablePodMarker PodStore cache.Store NodeStore cache.Store - ResourceReservationStore cache.Store + ResourceReservationCache *sscache.ResourceReservationCache + SoftReservationStore *sscache.SoftReservationStore Ctx context.Context } // NewTestExtender returns a new extender test harness, initialized with the provided k8s objects func NewTestExtender(objects ...runtime.Object) (*Harness, error) { + wlog.SetDefaultLoggerProvider(wlog.NewNoopLoggerProvider()) // suppressing Witchcraft warning log about logger provider ctx := newLoggingContext() fakeKubeClient := fake.NewSimpleClientset(objects...) fakeSchedulerClient := ssclientset.NewSimpleClientset() @@ -103,7 +105,6 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) { if err != nil { return nil, err } - softReservationStore := sscache.NewSoftReservationStore(ctx, podInformerInterface) overheadComputer := extender.NewOverheadComputer( @@ -144,11 +145,20 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) { UnschedulablePodMarker: unschedulablePodMarker, PodStore: podInformer.GetStore(), NodeStore: nodeInformer.GetStore(), - ResourceReservationStore: resourceReservationInformer.GetStore(), + ResourceReservationCache: resourceReservationCache, + SoftReservationStore: softReservationStore, Ctx: ctx, }, nil } +// Schedule calls the extender's Predicate method for the given pod and nodes +func (h *Harness) Schedule(pod v1.Pod, nodeNames []string) *schedulerapi.ExtenderFilterResult { + return h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{ + Pod: pod, + NodeNames: &nodeNames, + }) +} + // TerminatePod terminates an existing pod func (h *Harness) TerminatePod(pod v1.Pod) error { termination := v1.ContainerStateTerminated{ @@ -167,10 +177,7 @@ func (h *Harness) TerminatePod(pod v1.Pod) error { // AssertSuccessfulSchedule tries to schedule the provided pods and fails the test if not successful func (h *Harness) AssertSuccessfulSchedule(t *testing.T, pod v1.Pod, nodeNames []string, errorDetails string) { - result := h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{ - Pod: pod, - NodeNames: &nodeNames, - }) + result := h.Schedule(pod, nodeNames) if result.NodeNames == nil { t.Errorf("Scheduling should succeed: %s", errorDetails) } @@ -178,10 +185,7 @@ func (h *Harness) AssertSuccessfulSchedule(t *testing.T, pod v1.Pod, nodeNames [ // AssertFailedSchedule tries to schedule the provided pods and fails the test if successful func (h *Harness) AssertFailedSchedule(t *testing.T, pod v1.Pod, nodeNames []string, errorDetails string) { - result := h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{ - Pod: pod, - NodeNames: &nodeNames, - }) + result := h.Schedule(pod, nodeNames) if result.NodeNames != nil { t.Errorf("Scheduling should not succeed: %s", errorDetails) } @@ -194,8 +198,7 @@ func NewNode(name string) v1.Node { Kind: "node", }, ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "namespace", + Name: name, Labels: map[string]string{ "resource_channel": "batch-medium-priority", "com.palantir.rubix/instance-group": "batch-medium-priority", @@ -215,9 +218,37 @@ func NewNode(name string) v1.Node { } } -// SparkApplicationPods returns a list of pods corresponding to a Spark Application -func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod { - pods := make([]v1.Pod, 1+numExecutors) +// StaticAllocationSparkPods returns a list of pods corresponding to a Spark Application with 1 driver and numExecutors executors +// with the proper static allocation annotations set +func StaticAllocationSparkPods(sparkApplicationID string, numExecutors int) []v1.Pod { + driverAnnotations := map[string]string{ + "spark-driver-cpu": "1", + "spark-driver-mem": "1", + "spark-executor-cpu": "1", + "spark-executor-mem": "1", + "spark-executor-count": fmt.Sprintf("%d", numExecutors), + } + return sparkApplicationPods(sparkApplicationID, driverAnnotations, numExecutors) +} + +// DynamicAllocationSparkPods returns a list of pods corresponding to a Spark Application with 1 driver and maxExecutors executors +// with the proper dynamic allocation annotations set for min and max executor counts +func DynamicAllocationSparkPods(sparkApplicationID string, minExecutors int, maxExecutors int) []v1.Pod { + driverAnnotations := map[string]string{ + "spark-driver-cpu": "1", + "spark-driver-mem": "1", + "spark-executor-cpu": "1", + "spark-executor-mem": "1", + "spark-dynamic-allocation-enabled": "true", + "spark-dynamic-allocation-min-executor-count": fmt.Sprintf("%d", minExecutors), + "spark-dynamic-allocation-max-executor-count": fmt.Sprintf("%d", maxExecutors), + } + return sparkApplicationPods(sparkApplicationID, driverAnnotations, maxExecutors) +} + +// sparkApplicationPods returns a list of pods corresponding to a Spark Application +func sparkApplicationPods(sparkApplicationID string, driverAnnotations map[string]string, maxExecutorCount int) []v1.Pod { + pods := make([]v1.Pod, 1+maxExecutorCount) pods[0] = v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "pod", @@ -229,13 +260,7 @@ func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod "spark-role": "driver", "spark-app-id": sparkApplicationID, }, - Annotations: map[string]string{ - "spark-driver-cpu": "1", - "spark-driver-mem": "1", - "spark-executor-cpu": "1", - "spark-executor-mem": "1", - "spark-executor-count": fmt.Sprintf("%d", numExecutors), - }, + Annotations: driverAnnotations, }, Spec: v1.PodSpec{ NodeSelector: map[string]string{ @@ -248,7 +273,7 @@ func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod }, } - for i := 0; i < numExecutors; i++ { + for i := 0; i < maxExecutorCount; i++ { pods[i+1] = v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "pod", diff --git a/internal/extender/resource_test.go b/internal/extender/resource_test.go index fc4004dbc..51b3e6e91 100644 --- a/internal/extender/resource_test.go +++ b/internal/extender/resource_test.go @@ -15,16 +15,19 @@ package extender_test import ( + "fmt" "testing" "github.com/palantir/k8s-spark-scheduler/internal/extender/extendertest" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" ) func TestScheduler(t *testing.T) { node1 := extendertest.NewNode("node1") node2 := extendertest.NewNode("node2") nodeNames := []string{node1.Name, node2.Name} - podsToSchedule := extendertest.SparkApplicationPods("2-executor-app", 2) + podsToSchedule := extendertest.StaticAllocationSparkPods("2-executor-app", 2) testHarness, err := extendertest.NewTestExtender( &node1, @@ -64,3 +67,130 @@ func TestScheduler(t *testing.T) { nodeNames, "Because an executor is terminated, the new request can replace its reservation") } + +func TestDynamicAllocationScheduling(t *testing.T) { + tests := []struct { + name string + podsToSchedule []v1.Pod + scenario func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) + expectedReservations []string + expectedSoftReservations []string + }{{ + name: "creates a reservation when under min executor count", + podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3), + scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) { + harness.Schedule(podsToSchedule[0], nodeNames) + harness.Schedule(podsToSchedule[1], nodeNames) + }, + expectedReservations: []string{executor(0)}, + expectedSoftReservations: []string{}, + }, { + name: "creates a soft reservation for an executor over min executor count", + podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3), + scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) { + harness.Schedule(podsToSchedule[0], nodeNames) + harness.Schedule(podsToSchedule[1], nodeNames) + harness.Schedule(podsToSchedule[2], nodeNames) + }, + expectedReservations: []string{executor(0)}, + expectedSoftReservations: []string{executor(1)}, + }, { + name: "does not create any reservation for an executor over the max", + podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3), + scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) { + for _, pod := range podsToSchedule { + harness.Schedule(pod, nodeNames) + } + harness.Schedule(podsToSchedule[3], nodeNames) // should not have any reservation + }, + expectedReservations: []string{executor(0)}, + expectedSoftReservations: []string{executor(1), executor(2)}, + }, { + name: "replaces a dead executor's resource reservation before adding a new soft reservation", + podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3), + scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) { + harness.Schedule(podsToSchedule[0], nodeNames) // driver + harness.Schedule(podsToSchedule[1], nodeNames) // executor-0 with a resource reservation + harness.Schedule(podsToSchedule[2], nodeNames) // executor-1 with a soft reservation + // kill executor-0 + if err := harness.TerminatePod(podsToSchedule[1]); err != nil { + t.Fatal("Could not terminate pod in test extender") + } + harness.Schedule(podsToSchedule[3], nodeNames) // executor-2 should have a resource reservation + }, + expectedReservations: []string{executor(2)}, + expectedSoftReservations: []string{executor(1)}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + node1 := extendertest.NewNode("node1") + node2 := extendertest.NewNode("node2") + nodeNames := []string{node1.Name, node2.Name} + harnessArgs := make([]runtime.Object, 0, len(test.podsToSchedule)+2) + harnessArgs = append(harnessArgs, &node1, &node2) + for i := range test.podsToSchedule { + harnessArgs = append(harnessArgs, &test.podsToSchedule[i]) + } + testHarness, err := extendertest.NewTestExtender(harnessArgs...) + if err != nil { + t.Fatal("Could not setup test extender") + } + + test.scenario(testHarness, test.podsToSchedule, nodeNames) + + // Compare expected and actual resource reservations + expectedExecutorReservations := make(map[string]bool) + for _, expectedRes := range test.expectedReservations { + expectedExecutorReservations[expectedRes] = true + } + extraExecutors := make(map[string]bool) + for _, resourceReservation := range testHarness.ResourceReservationCache.List() { + for name, podName := range resourceReservation.Status.Pods { + if name != "driver" { + if _, exists := expectedExecutorReservations[podName]; exists { + delete(expectedExecutorReservations, podName) + } else { + extraExecutors[podName] = true + } + } + } + } + + if len(expectedExecutorReservations) > 0 { + t.Errorf("expected the following executors to have reservations, but did not: %v", expectedExecutorReservations) + } + if len(extraExecutors) > 0 { + t.Errorf("following executors had reservations, but were not supposed to: %v", extraExecutors) + } + + // Compare expected and actual soft reservations + expectedSoftReservations := make(map[string]bool) + for _, expectedRes := range test.expectedSoftReservations { + expectedSoftReservations[expectedRes] = true + } + extraSoftReservations := make(map[string]bool) + for _, softReservation := range testHarness.SoftReservationStore.GetAllSoftReservationsCopy() { + for podName := range softReservation.Reservations { + if _, exists := expectedSoftReservations[podName]; exists { + delete(expectedSoftReservations, podName) + } else { + extraSoftReservations[podName] = true + } + } + } + + if len(expectedSoftReservations) > 0 { + t.Errorf("expected the following executors to have soft reservations, but did not: %v", expectedSoftReservations) + } + if len(extraSoftReservations) > 0 { + t.Errorf("following executors had soft reservations, but were not supposed to: %v", extraSoftReservations) + } + }) + } +} + +func executor(i int) string { + return fmt.Sprintf("spark-exec-%d", i) +} diff --git a/internal/extender/unschedulablepods_test.go b/internal/extender/unschedulablepods_test.go index 7006ae2aa..87a18f322 100644 --- a/internal/extender/unschedulablepods_test.go +++ b/internal/extender/unschedulablepods_test.go @@ -31,7 +31,7 @@ func TestUnschedulablePodMarker(t *testing.T) { t.Fatal("Could not setup test extender") } - twoExecutorsDriver := extendertest.SparkApplicationPods("2-executor-app", 2)[0] + twoExecutorsDriver := extendertest.StaticAllocationSparkPods("2-executor-app", 2)[0] doesExceed, err := testHarness.UnschedulablePodMarker.DoesPodExceedClusterCapacity(testHarness.Ctx, &twoExecutorsDriver) if err != nil { t.Errorf("exceeds capacity check should not cause an error: %s", err) @@ -40,7 +40,7 @@ func TestUnschedulablePodMarker(t *testing.T) { t.Error("The two executor application should fit to the cluster") } - hundredExecutorsDriver := extendertest.SparkApplicationPods("100-executor-app", 100)[0] + hundredExecutorsDriver := extendertest.StaticAllocationSparkPods("100-executor-app", 100)[0] doesExceed, err = testHarness.UnschedulablePodMarker.DoesPodExceedClusterCapacity(testHarness.Ctx, &hundredExecutorsDriver) if err != nil { t.Errorf("exceeds capacity check should not cause an error: %s", err)