Skip to content

Commit

Permalink
[Dynamic Allocation] Dynamic Allocation tests (palantir#69)
Browse files Browse the repository at this point in the history
* support dynamic allocation of executors

* Adds a SoftReservationStore which keeps track of in-memory reservations for extra executors and handles failover between schedulers

* Removes leftover method

* Fixes tests

* Checkstyle fixes

* Modifies CircleCI config to publish a snapshot release

* Circle branch updat

* Updates Circle published image tag

* Revert "Updates Circle published image tag"

This reverts commit 3800676c14ebb3a3243484134ea57626080263fe.

* Addresses some PR comments

* Fixes storeLock removal

* checkstyle fixes

* Modifies Circle config to publish dirty release

* Fixes a bug in failover for stale drivers which was specifying bounds out of range when executor count is less than min

* Reverts Circle config to develop

* Renames dynamic allocation annotation keys to be more explicit

* Removes the logic for handling Fifo priority from the extra executor code path to reduce complexity for now

It is still a correct behavior we probably want to reintroduce in the long term, but for now will make it easier to reason about dynamic allocation

* Always marks the executor status as false in the SoftReservation object to eliminate a race between the event handling and the executor scheduling request

* Removes redundant debug log line

* Adds tests for different dynamic allocation scenarios the extender might get

* Resets unintentional changes after merge

* Fixes merge conflicts

* stylecheck
  • Loading branch information
rkaram authored and onursatici committed Oct 11, 2019
1 parent 01b101e commit 613aaed
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 27 deletions.
73 changes: 49 additions & 24 deletions internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ type Harness struct {
UnschedulablePodMarker *extender.UnschedulablePodMarker
PodStore cache.Store
NodeStore cache.Store
ResourceReservationStore cache.Store
ResourceReservationCache *sscache.ResourceReservationCache
SoftReservationStore *sscache.SoftReservationStore
Ctx context.Context
}

// NewTestExtender returns a new extender test harness, initialized with the provided k8s objects
func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
wlog.SetDefaultLoggerProvider(wlog.NewNoopLoggerProvider()) // suppressing Witchcraft warning log about logger provider
ctx := newLoggingContext()
fakeKubeClient := fake.NewSimpleClientset(objects...)
fakeSchedulerClient := ssclientset.NewSimpleClientset()
Expand Down Expand Up @@ -103,7 +105,6 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
if err != nil {
return nil, err
}

softReservationStore := sscache.NewSoftReservationStore(ctx, podInformerInterface)

overheadComputer := extender.NewOverheadComputer(
Expand Down Expand Up @@ -144,11 +145,20 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
UnschedulablePodMarker: unschedulablePodMarker,
PodStore: podInformer.GetStore(),
NodeStore: nodeInformer.GetStore(),
ResourceReservationStore: resourceReservationInformer.GetStore(),
ResourceReservationCache: resourceReservationCache,
SoftReservationStore: softReservationStore,
Ctx: ctx,
}, nil
}

// Schedule calls the extender's Predicate method for the given pod and nodes
func (h *Harness) Schedule(pod v1.Pod, nodeNames []string) *schedulerapi.ExtenderFilterResult {
return h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{
Pod: pod,
NodeNames: &nodeNames,
})
}

// TerminatePod terminates an existing pod
func (h *Harness) TerminatePod(pod v1.Pod) error {
termination := v1.ContainerStateTerminated{
Expand All @@ -167,21 +177,15 @@ func (h *Harness) TerminatePod(pod v1.Pod) error {

// AssertSuccessfulSchedule tries to schedule the provided pods and fails the test if not successful
func (h *Harness) AssertSuccessfulSchedule(t *testing.T, pod v1.Pod, nodeNames []string, errorDetails string) {
result := h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{
Pod: pod,
NodeNames: &nodeNames,
})
result := h.Schedule(pod, nodeNames)
if result.NodeNames == nil {
t.Errorf("Scheduling should succeed: %s", errorDetails)
}
}

// AssertFailedSchedule tries to schedule the provided pods and fails the test if successful
func (h *Harness) AssertFailedSchedule(t *testing.T, pod v1.Pod, nodeNames []string, errorDetails string) {
result := h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{
Pod: pod,
NodeNames: &nodeNames,
})
result := h.Schedule(pod, nodeNames)
if result.NodeNames != nil {
t.Errorf("Scheduling should not succeed: %s", errorDetails)
}
Expand All @@ -194,8 +198,7 @@ func NewNode(name string) v1.Node {
Kind: "node",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "namespace",
Name: name,
Labels: map[string]string{
"resource_channel": "batch-medium-priority",
"com.palantir.rubix/instance-group": "batch-medium-priority",
Expand All @@ -215,9 +218,37 @@ func NewNode(name string) v1.Node {
}
}

// SparkApplicationPods returns a list of pods corresponding to a Spark Application
func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod {
pods := make([]v1.Pod, 1+numExecutors)
// StaticAllocationSparkPods returns a list of pods corresponding to a Spark Application with 1 driver and numExecutors executors
// with the proper static allocation annotations set
func StaticAllocationSparkPods(sparkApplicationID string, numExecutors int) []v1.Pod {
driverAnnotations := map[string]string{
"spark-driver-cpu": "1",
"spark-driver-mem": "1",
"spark-executor-cpu": "1",
"spark-executor-mem": "1",
"spark-executor-count": fmt.Sprintf("%d", numExecutors),
}
return sparkApplicationPods(sparkApplicationID, driverAnnotations, numExecutors)
}

// DynamicAllocationSparkPods returns a list of pods corresponding to a Spark Application with 1 driver and maxExecutors executors
// with the proper dynamic allocation annotations set for min and max executor counts
func DynamicAllocationSparkPods(sparkApplicationID string, minExecutors int, maxExecutors int) []v1.Pod {
driverAnnotations := map[string]string{
"spark-driver-cpu": "1",
"spark-driver-mem": "1",
"spark-executor-cpu": "1",
"spark-executor-mem": "1",
"spark-dynamic-allocation-enabled": "true",
"spark-dynamic-allocation-min-executor-count": fmt.Sprintf("%d", minExecutors),
"spark-dynamic-allocation-max-executor-count": fmt.Sprintf("%d", maxExecutors),
}
return sparkApplicationPods(sparkApplicationID, driverAnnotations, maxExecutors)
}

// sparkApplicationPods returns a list of pods corresponding to a Spark Application
func sparkApplicationPods(sparkApplicationID string, driverAnnotations map[string]string, maxExecutorCount int) []v1.Pod {
pods := make([]v1.Pod, 1+maxExecutorCount)
pods[0] = v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "pod",
Expand All @@ -229,13 +260,7 @@ func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod
"spark-role": "driver",
"spark-app-id": sparkApplicationID,
},
Annotations: map[string]string{
"spark-driver-cpu": "1",
"spark-driver-mem": "1",
"spark-executor-cpu": "1",
"spark-executor-mem": "1",
"spark-executor-count": fmt.Sprintf("%d", numExecutors),
},
Annotations: driverAnnotations,
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{
Expand All @@ -248,7 +273,7 @@ func SparkApplicationPods(sparkApplicationID string, numExecutors int) []v1.Pod
},
}

for i := 0; i < numExecutors; i++ {
for i := 0; i < maxExecutorCount; i++ {
pods[i+1] = v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "pod",
Expand Down
132 changes: 131 additions & 1 deletion internal/extender/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package extender_test

import (
"fmt"
"testing"

"github.com/palantir/k8s-spark-scheduler/internal/extender/extendertest"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)

func TestScheduler(t *testing.T) {
node1 := extendertest.NewNode("node1")
node2 := extendertest.NewNode("node2")
nodeNames := []string{node1.Name, node2.Name}
podsToSchedule := extendertest.SparkApplicationPods("2-executor-app", 2)
podsToSchedule := extendertest.StaticAllocationSparkPods("2-executor-app", 2)

testHarness, err := extendertest.NewTestExtender(
&node1,
Expand Down Expand Up @@ -64,3 +67,130 @@ func TestScheduler(t *testing.T) {
nodeNames,
"Because an executor is terminated, the new request can replace its reservation")
}

func TestDynamicAllocationScheduling(t *testing.T) {
tests := []struct {
name string
podsToSchedule []v1.Pod
scenario func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string)
expectedReservations []string
expectedSoftReservations []string
}{{
name: "creates a reservation when under min executor count",
podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3),
scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) {
harness.Schedule(podsToSchedule[0], nodeNames)
harness.Schedule(podsToSchedule[1], nodeNames)
},
expectedReservations: []string{executor(0)},
expectedSoftReservations: []string{},
}, {
name: "creates a soft reservation for an executor over min executor count",
podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3),
scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) {
harness.Schedule(podsToSchedule[0], nodeNames)
harness.Schedule(podsToSchedule[1], nodeNames)
harness.Schedule(podsToSchedule[2], nodeNames)
},
expectedReservations: []string{executor(0)},
expectedSoftReservations: []string{executor(1)},
}, {
name: "does not create any reservation for an executor over the max",
podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3),
scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) {
for _, pod := range podsToSchedule {
harness.Schedule(pod, nodeNames)
}
harness.Schedule(podsToSchedule[3], nodeNames) // should not have any reservation
},
expectedReservations: []string{executor(0)},
expectedSoftReservations: []string{executor(1), executor(2)},
}, {
name: "replaces a dead executor's resource reservation before adding a new soft reservation",
podsToSchedule: extendertest.DynamicAllocationSparkPods("dynamic-allocation-app", 1, 3),
scenario: func(harness *extendertest.Harness, podsToSchedule []v1.Pod, nodeNames []string) {
harness.Schedule(podsToSchedule[0], nodeNames) // driver
harness.Schedule(podsToSchedule[1], nodeNames) // executor-0 with a resource reservation
harness.Schedule(podsToSchedule[2], nodeNames) // executor-1 with a soft reservation
// kill executor-0
if err := harness.TerminatePod(podsToSchedule[1]); err != nil {
t.Fatal("Could not terminate pod in test extender")
}
harness.Schedule(podsToSchedule[3], nodeNames) // executor-2 should have a resource reservation
},
expectedReservations: []string{executor(2)},
expectedSoftReservations: []string{executor(1)},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node1 := extendertest.NewNode("node1")
node2 := extendertest.NewNode("node2")
nodeNames := []string{node1.Name, node2.Name}
harnessArgs := make([]runtime.Object, 0, len(test.podsToSchedule)+2)
harnessArgs = append(harnessArgs, &node1, &node2)
for i := range test.podsToSchedule {
harnessArgs = append(harnessArgs, &test.podsToSchedule[i])
}
testHarness, err := extendertest.NewTestExtender(harnessArgs...)
if err != nil {
t.Fatal("Could not setup test extender")
}

test.scenario(testHarness, test.podsToSchedule, nodeNames)

// Compare expected and actual resource reservations
expectedExecutorReservations := make(map[string]bool)
for _, expectedRes := range test.expectedReservations {
expectedExecutorReservations[expectedRes] = true
}
extraExecutors := make(map[string]bool)
for _, resourceReservation := range testHarness.ResourceReservationCache.List() {
for name, podName := range resourceReservation.Status.Pods {
if name != "driver" {
if _, exists := expectedExecutorReservations[podName]; exists {
delete(expectedExecutorReservations, podName)
} else {
extraExecutors[podName] = true
}
}
}
}

if len(expectedExecutorReservations) > 0 {
t.Errorf("expected the following executors to have reservations, but did not: %v", expectedExecutorReservations)
}
if len(extraExecutors) > 0 {
t.Errorf("following executors had reservations, but were not supposed to: %v", extraExecutors)
}

// Compare expected and actual soft reservations
expectedSoftReservations := make(map[string]bool)
for _, expectedRes := range test.expectedSoftReservations {
expectedSoftReservations[expectedRes] = true
}
extraSoftReservations := make(map[string]bool)
for _, softReservation := range testHarness.SoftReservationStore.GetAllSoftReservationsCopy() {
for podName := range softReservation.Reservations {
if _, exists := expectedSoftReservations[podName]; exists {
delete(expectedSoftReservations, podName)
} else {
extraSoftReservations[podName] = true
}
}
}

if len(expectedSoftReservations) > 0 {
t.Errorf("expected the following executors to have soft reservations, but did not: %v", expectedSoftReservations)
}
if len(extraSoftReservations) > 0 {
t.Errorf("following executors had soft reservations, but were not supposed to: %v", extraSoftReservations)
}
})
}
}

func executor(i int) string {
return fmt.Sprintf("spark-exec-%d", i)
}
4 changes: 2 additions & 2 deletions internal/extender/unschedulablepods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestUnschedulablePodMarker(t *testing.T) {
t.Fatal("Could not setup test extender")
}

twoExecutorsDriver := extendertest.SparkApplicationPods("2-executor-app", 2)[0]
twoExecutorsDriver := extendertest.StaticAllocationSparkPods("2-executor-app", 2)[0]
doesExceed, err := testHarness.UnschedulablePodMarker.DoesPodExceedClusterCapacity(testHarness.Ctx, &twoExecutorsDriver)
if err != nil {
t.Errorf("exceeds capacity check should not cause an error: %s", err)
Expand All @@ -40,7 +40,7 @@ func TestUnschedulablePodMarker(t *testing.T) {
t.Error("The two executor application should fit to the cluster")
}

hundredExecutorsDriver := extendertest.SparkApplicationPods("100-executor-app", 100)[0]
hundredExecutorsDriver := extendertest.StaticAllocationSparkPods("100-executor-app", 100)[0]
doesExceed, err = testHarness.UnschedulablePodMarker.DoesPodExceedClusterCapacity(testHarness.Ctx, &hundredExecutorsDriver)
if err != nil {
t.Errorf("exceeds capacity check should not cause an error: %s", err)
Expand Down

0 comments on commit 613aaed

Please sign in to comment.