Skip to content

Commit

Permalink
Use node affinity instead of node selector labels from Pod Spec for s…
Browse files Browse the repository at this point in the history
…cheduling (palantir#102)

* partial work to use node affinity instead of labels

* get dependencies right

* Remove occurences of pod.Spec.NodeSelector

* Expose FindInstanceGroup

* move find instance group in internal package

* updated tests

* fixed vendor dependencies

* upgrade k8s.spark-scheduler-lib

* fix unit tests

* running verify

* publish docker snapshot from any branch temporarily

* some refactor

* use local filter instead of all for publishing

* Instead of label match use node affinity predicate

* test the release from my branch

* revert circle config changes after testing

* Fix flaky test

* Address some of Onur's comments

* remove named return type

Co-authored-by: Onur Satici <[email protected]>
  • Loading branch information
codekarma and onursatici authored Feb 3, 2020
1 parent 4d3ea77 commit 0dad8c4
Show file tree
Hide file tree
Showing 1,278 changed files with 305,835 additions and 122,453 deletions.
379 changes: 350 additions & 29 deletions Gopkg.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ required = ["k8s.io/code-generator/cmd/client-gen"]

[[constraint]]
name = "github.com/palantir/k8s-spark-scheduler-lib"
version = "0.2.1"
version = "0.2.4"

[[constraint]]
version = "kubernetes-1.14.9"
version = "kubernetes-1.17.2"
name = "k8s.io/client-go"

[[constraint]]
version = "kubernetes-1.14.9"
version = "kubernetes-1.17.2"
name = "k8s.io/apimachinery"

[[constraint]]
version = "kubernetes-1.17.2"
name = "k8s.io/apiextensions-apiserver"

[[constraint]]
version = "v1.1.0"
name = "github.com/palantir/witchcraft-go-logging"
Expand Down
2 changes: 1 addition & 1 deletion cmd/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
werror "github.com/palantir/witchcraft-go-error"
"github.com/palantir/witchcraft-go-server/rest"
"github.com/palantir/witchcraft-go-server/wrouter"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
)

func registerExtenderEndpoints(r wrouter.Router, sparkSchedulerExtender *extender.SparkSchedulerExtender) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/extender/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *SparkSchedulerExtender) createDemandForApplication(ctx context.Context,
}

func (s *SparkSchedulerExtender) createDemand(ctx context.Context, pod *v1.Pod, demandUnits []demandapi.DemandUnit) {
instanceGroup, ok := pod.Spec.NodeSelector[s.instanceGroupLabel]
instanceGroup, ok := internal.FindInstanceGroupFromPodSpec(pod.Spec, s.instanceGroupLabel)
if !ok {
svc1log.FromContext(ctx).Error("No instanceGroup label exists. Cannot map to InstanceGroup. Skipping demand object",
svc1log.SafeParam("expectedLabel", s.instanceGroupLabel))
Expand Down
46 changes: 39 additions & 7 deletions internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
)

const (
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
// Schedule calls the extender's Predicate method for the given pod and nodes
func (h *Harness) Schedule(pod v1.Pod, nodeNames []string) *schedulerapi.ExtenderFilterResult {
return h.Extender.Predicate(h.Ctx, schedulerapi.ExtenderArgs{
Pod: pod,
Pod: &pod,
NodeNames: &nodeNames,
})
}
Expand Down Expand Up @@ -268,9 +268,27 @@ func sparkApplicationPods(sparkApplicationID string, driverAnnotations map[strin
Annotations: driverAnnotations,
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{
"resource_channel": "batch-medium-priority",
"com.palantir.rubix/instance-group": "batch-medium-priority",
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "resource_channel",
Operator: v1.NodeSelectorOpIn,
Values: []string{"batch-medium-priority"},
},
{
Key: "com.palantir.rubix/instance-group",
Operator: v1.NodeSelectorOpIn,
Values: []string{"batch-medium-priority"},
},
},
},
},
},
},
},
},
Status: v1.PodStatus{
Expand All @@ -292,8 +310,22 @@ func sparkApplicationPods(sparkApplicationID string, driverAnnotations map[strin
},
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{
"resource_channel": resourceChannel,
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "resource_channel",
Operator: v1.NodeSelectorOpIn,
Values: []string{resourceChannel},
},
},
},
},
},
},
},
},
Status: v1.PodStatus{
Expand Down
3 changes: 2 additions & 1 deletion internal/extender/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (r *reconciler) syncResourceReservations(ctx context.Context, sp *sparkPods
svc1log.SafeParam("appID", sp.appID), svc1log.Stacktrace(err))
return nil
}
instanceGroup := instanceGroup(sp.inconsistentDriver.Spec.NodeSelector[r.instanceGroupLabel])
ig, _ := internal.FindInstanceGroupFromPodSpec(sp.inconsistentDriver.Spec, r.instanceGroupLabel)
instanceGroup := instanceGroup(ig)
endIdx := int(math.Min(float64(len(sp.inconsistentExecutors)), float64(appResources.minExecutorCount)))
executorsUpToMin := sp.inconsistentExecutors[0:endIdx]
extraExecutors = sp.inconsistentExecutors[endIdx:]
Expand Down
14 changes: 9 additions & 5 deletions internal/extender/nodesorting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package extender

import (
"testing"
"time"

"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -138,21 +139,24 @@ func TestAZAwareNodeSortingWorksIfZoneLabelIsMissing(t *testing.T) {
CPU: two,
Memory: one,
},
Ready: true,
CreationTimestamp: time.Now(),
Ready: true,
}
node2SchedulingMetadata := &resources.NodeSchedulingMetadata{
AvailableResources: &resources.Resources{
CPU: two,
Memory: one,
Memory: two,
},
Ready: true,
CreationTimestamp: time.Now(),
Ready: true,
}
node3SchedulingMetadata := &resources.NodeSchedulingMetadata{
AvailableResources: &resources.Resources{
CPU: one,
Memory: one,
},
Ready: true,
CreationTimestamp: time.Now(),
Ready: true,
}

nodesSchedulingMetadata := resources.NodeGroupSchedulingMetadata{
Expand All @@ -173,7 +177,7 @@ func compareActualToExpected(actualNodes []string, expectedResult []string, t *t
}
for i, expectedNode := range expectedResult {
if expectedNode != actualNodes[i] {
t.Error("Each element in the sorted result should match the expected result. Element unmatched: ", i)
t.Error("Each element in the sorted result should match the expected result. ", expectedResult, actualNodes)
}
}
}
26 changes: 16 additions & 10 deletions internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
)

const (
Expand Down Expand Up @@ -106,15 +107,18 @@ func NewExtender(
// Predicate is responsible for returning a filtered list of nodes that qualify to schedule the pod provided in the
// ExtenderArgs
func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
params := internal.PodSafeParams(args.Pod)
params := internal.PodSafeParams(*args.Pod)
role := args.Pod.Labels[SparkRoleLabel]
instanceGroup := args.Pod.Spec.NodeSelector[s.instanceGroupLabel]
params["podSparkRole"] = role
params["instanceGroup"] = instanceGroup
ctx = svc1log.WithLoggerParams(ctx, svc1log.SafeParams(params))
logger := svc1log.FromContext(ctx)
instanceGroup, success := internal.FindInstanceGroupFromPodSpec(args.Pod.Spec, s.instanceGroupLabel)
if !success {
instanceGroup = ""
}
params["podSparkRole"] = role
params["instanceGroup"] = instanceGroup

timer := metrics.NewScheduleTimer(ctx, instanceGroup, &args.Pod)
timer := metrics.NewScheduleTimer(ctx, instanceGroup, args.Pod)
logger.Info("starting scheduling pod")

err := s.reconcileIfNeeded(ctx, timer)
Expand All @@ -124,7 +128,7 @@ func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerap
return failWithMessage(ctx, args, msg)
}

nodeName, outcome, err := s.selectNode(ctx, args.Pod.Labels[SparkRoleLabel], &args.Pod, *args.NodeNames)
nodeName, outcome, err := s.selectNode(ctx, args.Pod.Labels[SparkRoleLabel], args.Pod, *args.NodeNames)
timer.Mark(ctx, role, outcome)
if err != nil {
if outcome == failureInternal {
Expand All @@ -136,7 +140,7 @@ func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerap
}

if role == Driver {
appResources, err := sparkResources(ctx, &args.Pod)
appResources, err := sparkResources(ctx, args.Pod)
if err != nil {
logger.Error("internal error scheduling pod", svc1log.Stacktrace(err))
return failWithMessage(ctx, args, err.Error())
Expand All @@ -145,7 +149,7 @@ func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerap
ctx,
instanceGroup,
args.Pod.Labels[SparkAppIDLabel],
args.Pod,
*args.Pod,
appResources.driverResources,
appResources.executorResources,
appResources.minExecutorCount,
Expand Down Expand Up @@ -237,7 +241,9 @@ func (s *SparkSchedulerExtender) selectDriverNode(ctx context.Context, driver *v
svc1log.SafeParam("nodeNames", nodeNames))
return driverReservedNode, success, nil
}
availableNodes, err := s.nodeLister.List(labels.Set(driver.Spec.NodeSelector).AsSelector())
availableNodes, err := s.nodeLister.ListWithPredicate(func(node *v1.Node) bool {
return predicates.PodMatchesNodeSelectorAndAffinityTerms(driver, node)
})
if err != nil {
return "", failureInternal, err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/extender/sparkpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"

"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
"github.com/palantir/k8s-spark-scheduler/internal"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -90,10 +91,11 @@ func (s SparkPodLister) ListEarlierDrivers(driver *v1.Pod) ([]*v1.Pod, error) {
func filterToEarliestAndSort(driver *v1.Pod, allDrivers []*v1.Pod, instanceGroupLabel string) []*v1.Pod {
earlierDrivers := make([]*v1.Pod, 0, 10)
for _, p := range allDrivers {

// add only unscheduled drivers with the same instance group and targeted to the same scheduler
if len(p.Spec.NodeName) == 0 &&
p.Spec.SchedulerName == driver.Spec.SchedulerName &&
p.Spec.NodeSelector[instanceGroupLabel] == driver.Spec.NodeSelector[instanceGroupLabel] &&
internal.MatchPodInstanceGroup(p, driver, instanceGroupLabel) &&
p.CreationTimestamp.Before(&driver.CreationTimestamp) &&
p.DeletionTimestamp == nil {
earlierDrivers = append(earlierDrivers, p)
Expand Down
18 changes: 17 additions & 1 deletion internal/extender/sparkpods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,23 @@ func createPod(seconds int64, uid, instanceGroupLabel, instanceGroup string) *v1
CreationTimestamp: metav1.NewTime(time.Unix(seconds, 0)),
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{instanceGroupLabel: instanceGroup},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: instanceGroupLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{instanceGroup},
},
},
},
},
},
},
},
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/extender/unschedulablepods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
)

const (
Expand Down Expand Up @@ -117,7 +118,9 @@ func (u *UnschedulablePodMarker) scanForUnschedulablePods(ctx context.Context) {

// DoesPodExceedClusterCapacity checks if the provided driver pod could ever fit to the cluster
func (u *UnschedulablePodMarker) DoesPodExceedClusterCapacity(ctx context.Context, driver *v1.Pod) (bool, error) {
nodes, err := u.nodeLister.List(labels.Set(driver.Spec.NodeSelector).AsSelector())
nodes, err := u.nodeLister.ListWithPredicate(func(node *v1.Node) bool {
return predicates.PodMatchesNodeSelectorAndAffinityTerms(driver, node)
})
if err != nil {
return false, err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/palantir/k8s-spark-scheduler/internal"
"github.com/palantir/pkg/metrics"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
"github.com/palantir/witchcraft-go-logging/wlog/wapp"
Expand Down Expand Up @@ -128,7 +129,11 @@ func (p PodHistograms) Inc(key PodTags) {

// MarkTimes inspects pod conditions and marks lifecycle transition times
func (p PodHistograms) MarkTimes(ctx context.Context, pod *v1.Pod, instanceGroupTagLabel string, now time.Time) {
instanceGroupTag := InstanceGroupTag(ctx, pod.Spec.NodeSelector[instanceGroupTagLabel])
ig, success := internal.FindInstanceGroupFromPodSpec(pod.Spec, instanceGroupTagLabel)
if !success {
ig = ""
}
instanceGroupTag := InstanceGroupTag(ctx, ig)
sparkRoleTag := SparkRoleTag(ctx, pod.Labels[sparkRoleLabel])
podConditions := NewSparkPodConditions(pod.Status.Conditions)

Expand Down
18 changes: 17 additions & 1 deletion internal/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,23 @@ func createPod(instanceGroupLabel, instanceGroup, sparkRole string, creationTime
CreationTimestamp: metav1.NewTime(creationTimeStamp),
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{instanceGroupLabel: instanceGroup},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: instanceGroupLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{instanceGroup},
},
},
},
},
},
},
},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{},
Expand Down
41 changes: 41 additions & 0 deletions internal/podspec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2019 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
v1 "k8s.io/api/core/v1"
)

// MatchPodInstanceGroup compares instance group label on given Pod specs.
func MatchPodInstanceGroup(pod1 *v1.Pod, pod2 *v1.Pod, instanceGroupLabel string) bool {
instanceGroup1, success1 := FindInstanceGroupFromPodSpec(pod1.Spec, instanceGroupLabel)
instanceGroup2, success2 := FindInstanceGroupFromPodSpec(pod2.Spec, instanceGroupLabel)
return success1 && success2 && instanceGroup1 == instanceGroup2
}

// FindInstanceGroupFromPodSpec extracts the instance group from a Pod spec.
func FindInstanceGroupFromPodSpec(podSpec v1.PodSpec, instanceGroupLabel string) (string, bool) {
for _, nodeSelectorTerm := range podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
for _, matchExpression := range nodeSelectorTerm.MatchExpressions {
if matchExpression.Key == instanceGroupLabel {
if len(matchExpression.Values) == 1 {
return matchExpression.Values[0], true
}
}
}
}
instanceGroup, ok := podSpec.NodeSelector[instanceGroupLabel]
return instanceGroup, ok
}
Loading

0 comments on commit 0dad8c4

Please sign in to comment.