From 4d3ea7750a75cc7ae55d0ce5563e4042f37a23b8 Mon Sep 17 00:00:00 2001 From: Ashray Jain Date: Tue, 3 Dec 2019 23:48:54 +0000 Subject: [PATCH] Only schedule executors on ready nodes (#100) * Only include ready nodes when scheduling executors * Fix tests * Fix version * Add node ready handling on failover --- Gopkg.lock | 6 +++--- Gopkg.toml | 2 +- .../extender/extendertest/extender_test_utils.go | 6 ++++++ internal/extender/failover.go | 14 ++++++++++++-- internal/extender/nodesorting_test.go | 3 +++ internal/extender/resource.go | 2 +- .../pkg/resources/resources.go | 9 +++++++++ 7 files changed, 35 insertions(+), 7 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 45e9ee8a6..7c13e1720 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -171,7 +171,7 @@ revision = "a1d3024b37d8cdd6c19d748051f7228155f0a0d9" [[projects]] - digest = "1:d0fdcf03fc5df3e8a056ccee0e2ee57cc81c7d4a3b2438fc1261b3051f69f6e5" + digest = "1:eca6161234acc67c6a88635f7da9997cfa121e5aedf8cef03a7277dbc8b2ce63" name = "github.com/palantir/k8s-spark-scheduler-lib" packages = [ "pkg/apis/scaler/v1alpha1", @@ -196,8 +196,8 @@ "pkg/resources", ] pruneopts = "UT" - revision = "5b5188d04f766a14cc503306cc1ed6fab8c79977" - version = "0.2.0" + revision = "3bd80935dd3983c4978df92889e0a87dd128f92e" + version = "0.2.1" [[projects]] digest = "1:adda8befb1b3e6b1f939034d4396a12570348c97412847bd819c4d8ec29e5d9c" diff --git a/Gopkg.toml b/Gopkg.toml index 1feed7d36..cd9fad692 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -18,7 +18,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"] [[constraint]] name = "github.com/palantir/k8s-spark-scheduler-lib" - version = "0.2.0" + version = "0.2.1" [[constraint]] version = "kubernetes-1.14.9" diff --git a/internal/extender/extendertest/extender_test_utils.go b/internal/extender/extendertest/extender_test_utils.go index 19cc4cfbc..7c10092be 100644 --- a/internal/extender/extendertest/extender_test_utils.go +++ b/internal/extender/extendertest/extender_test_utils.go @@ -213,6 +213,12 @@ func NewNode(name string) v1.Node { v1.ResourceCPU: *resource.NewQuantity(8, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(8*1024*1024*1024, resource.BinarySI), }, + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, }, } } diff --git a/internal/extender/failover.go b/internal/extender/failover.go index a1fa86056..27a4fad8b 100644 --- a/internal/extender/failover.go +++ b/internal/extender/failover.go @@ -48,7 +48,7 @@ func (s *SparkSchedulerExtender) syncResourceReservationsAndDemands(ctx context. rrs := s.resourceReservations.List() overhead := s.overheadComputer.GetOverhead(ctx, nodes) softReservationOverhead := s.softReservationStore.UsedSoftReservationResources() - availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, s.instanceGroupLabel, rrs, nodes, overhead, softReservationOverhead) + availableResources, orderedNodes := availableResourcesPerInstanceGroup(s.instanceGroupLabel, rrs, nodes, overhead, softReservationOverhead) staleSparkPods := unreservedSparkPodsBySparkID(ctx, rrs, s.softReservationStore, pods) svc1log.FromContext(ctx).Info("starting reconciliation", svc1log.SafeParam("appCount", len(staleSparkPods))) @@ -278,7 +278,6 @@ func isNotScheduledSparkPod(pod *v1.Pod) bool { } func availableResourcesPerInstanceGroup( - ctx context.Context, instanceGroupLabel string, rrs []*v1beta1.ResourceReservation, nodes []*v1.Node, @@ -293,6 +292,17 @@ func availableResourcesPerInstanceGroup( if n.Spec.Unschedulable { continue } + + nodeReady := false + for _, n := range n.Status.Conditions { + if n.Type == v1.NodeReady && n.Status == v1.ConditionTrue { + nodeReady = true + } + } + if !nodeReady { + continue + } + instanceGroup := instanceGroup(n.Labels[instanceGroupLabel]) schedulableNodes[instanceGroup] = append(schedulableNodes[instanceGroup], n) } diff --git a/internal/extender/nodesorting_test.go b/internal/extender/nodesorting_test.go index 83b73682e..b16d1a0c3 100644 --- a/internal/extender/nodesorting_test.go +++ b/internal/extender/nodesorting_test.go @@ -138,18 +138,21 @@ func TestAZAwareNodeSortingWorksIfZoneLabelIsMissing(t *testing.T) { CPU: two, Memory: one, }, + Ready: true, } node2SchedulingMetadata := &resources.NodeSchedulingMetadata{ AvailableResources: &resources.Resources{ CPU: two, Memory: one, }, + Ready: true, } node3SchedulingMetadata := &resources.NodeSchedulingMetadata{ AvailableResources: &resources.Resources{ CPU: one, Memory: one, }, + Ready: true, } nodesSchedulingMetadata := resources.NodeGroupSchedulingMetadata{ diff --git a/internal/extender/resource.go b/internal/extender/resource.go index 476e0b471..69cb876b4 100644 --- a/internal/extender/resource.go +++ b/internal/extender/resource.go @@ -309,7 +309,7 @@ func (s *SparkSchedulerExtender) potentialNodes(availableNodesSchedulingMetadata if _, ok := nodeNamesSet[nodeName]; ok { driverNodeNames = append(driverNodeNames, nodeName) } - if !availableNodesSchedulingMetadata[nodeName].Unschedulable { + if !availableNodesSchedulingMetadata[nodeName].Unschedulable && availableNodesSchedulingMetadata[nodeName].Ready { executorNodeNames = append(executorNodeNames, nodeName) } } diff --git a/vendor/github.com/palantir/k8s-spark-scheduler-lib/pkg/resources/resources.go b/vendor/github.com/palantir/k8s-spark-scheduler-lib/pkg/resources/resources.go index ffb36d9db..4aab51f61 100644 --- a/vendor/github.com/palantir/k8s-spark-scheduler-lib/pkg/resources/resources.go +++ b/vendor/github.com/palantir/k8s-spark-scheduler-lib/pkg/resources/resources.go @@ -66,11 +66,19 @@ func NodeSchedulingMetadataForNodes(nodes []*v1.Node, currentUsage NodeGroupReso if !ok { zoneLabel = zoneLabelPlaceholder } + + nodeReady := false + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { + nodeReady = true + } + } nodeGroupSchedulingMetadata[node.Name] = &NodeSchedulingMetadata{ AvailableResources: subtractFromResourceList(node.Status.Allocatable, currentUsageForNode), CreationTimestamp: node.CreationTimestamp.Time, ZoneLabel: zoneLabel, Unschedulable: node.Spec.Unschedulable, + Ready: nodeReady, } } return nodeGroupSchedulingMetadata @@ -133,6 +141,7 @@ type NodeSchedulingMetadata struct { CreationTimestamp time.Time ZoneLabel string Unschedulable bool + Ready bool } // Zero returns a Resources object with quantities of zero