Skip to content

Commit

Permalink
Only schedule executors on ready nodes (palantir#100)
Browse files Browse the repository at this point in the history
* Only include ready nodes when scheduling executors

* Fix tests

* Fix version

* Add node ready handling on failover
  • Loading branch information
ashrayjain authored and onursatici committed Dec 3, 2019
1 parent c45ab2f commit 4d3ea77
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 7 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"]

[[constraint]]
name = "github.com/palantir/k8s-spark-scheduler-lib"
version = "0.2.0"
version = "0.2.1"

[[constraint]]
version = "kubernetes-1.14.9"
Expand Down
6 changes: 6 additions & 0 deletions internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ func NewNode(name string) v1.Node {
v1.ResourceCPU: *resource.NewQuantity(8, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(8*1024*1024*1024, resource.BinarySI),
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}
}
Expand Down
14 changes: 12 additions & 2 deletions internal/extender/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *SparkSchedulerExtender) syncResourceReservationsAndDemands(ctx context.
rrs := s.resourceReservations.List()
overhead := s.overheadComputer.GetOverhead(ctx, nodes)
softReservationOverhead := s.softReservationStore.UsedSoftReservationResources()
availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, s.instanceGroupLabel, rrs, nodes, overhead, softReservationOverhead)
availableResources, orderedNodes := availableResourcesPerInstanceGroup(s.instanceGroupLabel, rrs, nodes, overhead, softReservationOverhead)
staleSparkPods := unreservedSparkPodsBySparkID(ctx, rrs, s.softReservationStore, pods)
svc1log.FromContext(ctx).Info("starting reconciliation", svc1log.SafeParam("appCount", len(staleSparkPods)))

Expand Down Expand Up @@ -278,7 +278,6 @@ func isNotScheduledSparkPod(pod *v1.Pod) bool {
}

func availableResourcesPerInstanceGroup(
ctx context.Context,
instanceGroupLabel string,
rrs []*v1beta1.ResourceReservation,
nodes []*v1.Node,
Expand All @@ -293,6 +292,17 @@ func availableResourcesPerInstanceGroup(
if n.Spec.Unschedulable {
continue
}

nodeReady := false
for _, n := range n.Status.Conditions {
if n.Type == v1.NodeReady && n.Status == v1.ConditionTrue {
nodeReady = true
}
}
if !nodeReady {
continue
}

instanceGroup := instanceGroup(n.Labels[instanceGroupLabel])
schedulableNodes[instanceGroup] = append(schedulableNodes[instanceGroup], n)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/extender/nodesorting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,21 @@ func TestAZAwareNodeSortingWorksIfZoneLabelIsMissing(t *testing.T) {
CPU: two,
Memory: one,
},
Ready: true,
}
node2SchedulingMetadata := &resources.NodeSchedulingMetadata{
AvailableResources: &resources.Resources{
CPU: two,
Memory: one,
},
Ready: true,
}
node3SchedulingMetadata := &resources.NodeSchedulingMetadata{
AvailableResources: &resources.Resources{
CPU: one,
Memory: one,
},
Ready: true,
}

nodesSchedulingMetadata := resources.NodeGroupSchedulingMetadata{
Expand Down
2 changes: 1 addition & 1 deletion internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (s *SparkSchedulerExtender) potentialNodes(availableNodesSchedulingMetadata
if _, ok := nodeNamesSet[nodeName]; ok {
driverNodeNames = append(driverNodeNames, nodeName)
}
if !availableNodesSchedulingMetadata[nodeName].Unschedulable {
if !availableNodesSchedulingMetadata[nodeName].Unschedulable && availableNodesSchedulingMetadata[nodeName].Ready {
executorNodeNames = append(executorNodeNames, nodeName)
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4d3ea77

Please sign in to comment.