diff --git a/Gopkg.lock b/Gopkg.lock index 13d6d5c75..02cb622c6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -907,6 +907,7 @@ "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/client-go/informers", + "k8s.io/client-go/informers/core/v1", "k8s.io/client-go/kubernetes", "k8s.io/client-go/kubernetes/fake", "k8s.io/client-go/kubernetes/typed/core/v1", diff --git a/README.md b/README.md index cb1253dae..afeba9416 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,15 @@ As of [f6cc354d83](https://github.com/apache/spark/commit/f6cc354d83), spark sup "spark.kubernetes.executor.podTemplateFile": "/path/to/executor.template" ``` +### Dynamic Allocation +`k8s-spark-scheduler-extender` also supports running Spark applications in dynamic allocation mode. You can find more information about how to configure Spark to make use of dynamic allocation in the [Spark documentation](http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation). +To inform `k8s-spark-scheduler-extender` that you are running an application with dynamic allocation enabled, you should omit setting the `spark-executor-count` annotation on the driver pod, and instead set the following three annotations: +- `spark-dynamic-allocation-enabled`: "true" +- `spark-dynamic-allocation-min-executor-count`: minimum number of executors to always reserve resources for. Should be equal to the `spark.dynamicAllocation.minExecutors` value you set in the Spark configuration +- `spark-dynamic-allocation-max-executor-count`: maximum number of executors to allow your application to request at a given time. Should be equal to the `spark.dynamicAllocation.maxExecutors` value you set in the Spark configuration + +If dynamic allocation is enabled, `k8s-spark-scheduler-extender` will guarantee that your application will only get scheduled if the driver and executors until the minimum executor count fit to the cluster. Executors over the minimum are not reserved for, and are only scheduled if there is capacity to do so when they are requested by the application. + ## Configuration `k8s-spark-scheduler-extender` is a witchcraft service, and supports configuration options detailed in the [github documentation](https://github.com/palantir/witchcraft-go-server#configuration). Additional configuration options are: diff --git a/cmd/server.go b/cmd/server.go index 4b9671b4c..f9c93e18a 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -156,10 +156,13 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { return nil, err } + softReservationStore := cache.NewSoftReservationStore(ctx, podInformerInterface) + overheadComputer := extender.NewOverheadComputer( ctx, podLister, resourceReservationCache, + softReservationStore, nodeLister, instanceGroupLabel, ) @@ -170,6 +173,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { nodeLister, extender.NewSparkPodLister(podLister, instanceGroupLabel), resourceReservationCache, + softReservationStore, kubeClient.CoreV1(), demandCache, apiExtensionsClient, diff --git a/internal/cache/softreservations.go b/internal/cache/softreservations.go new file mode 100644 index 000000000..95b417fb6 --- /dev/null +++ b/internal/cache/softreservations.go @@ -0,0 +1,245 @@ +// Copyright (c) 2019 Palantir Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "context" + "sync" + + "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta1" + "github.com/palantir/k8s-spark-scheduler-lib/pkg/resources" + werror "github.com/palantir/witchcraft-go-error" + "github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log" + v1 "k8s.io/api/core/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + clientcache "k8s.io/client-go/tools/cache" +) + +// TODO(rkaram): Move to common place to avoid duplication without causing circular dependency +const ( + // SparkSchedulerName is the name of the kube-scheduler instance that talks with the extender + SparkSchedulerName = "spark-scheduler" + // SparkRoleLabel represents the label key for the spark-role of a pod + SparkRoleLabel = "spark-role" + // SparkAppIDLabel represents the label key for the spark application ID on a pod + SparkAppIDLabel = "spark-app-id" // TODO(onursatici): change this to a spark specific label when spark has one + // Driver represents the label key for a pod that identifies the pod as a spark driver + Driver = "driver" + // Executor represents the label key for a pod that identifies the pod as a spark executor + Executor = "executor" +) + +// SoftReservationStore is an in-memory store that keeps track of soft reservations granted to extra executors for applications that support dynamic allocation +type SoftReservationStore struct { + store map[string]*SoftReservation // SparkAppID -> SoftReservation + storeLock sync.RWMutex + logger svc1log.Logger +} + +// SoftReservation is an in-memory reservation for a particular spark application that keeps track of extra executors allocated over the +// min reservation count +type SoftReservation struct { + // Executor pod name -> Reservation (only valid ones here) + Reservations map[string]v1beta1.Reservation + + // Executor pod name -> Reservation valid or not + // The reason for this is that we want to keep a history of previously allocated extra executors that we should not create a + // Reservation for if we already have in the past even if the executor is now dead. This prevents the scenario where we have a race between + // the executor death event handling and the executor's scheduling event. + Status map[string]bool +} + +// NewSoftReservationStore builds and returns a SoftReservationStore and instantiates the needed background informer event handlers to keep the store up to date. +func NewSoftReservationStore(ctx context.Context, informer coreinformers.PodInformer) *SoftReservationStore { + s := &SoftReservationStore{ + store: make(map[string]*SoftReservation), + logger: svc1log.FromContext(ctx), + } + + informer.Informer().AddEventHandler( + clientcache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + if pod, ok := obj.(*v1.Pod); ok { + _, labelFound := pod.Labels[SparkRoleLabel] + if labelFound && pod.Spec.SchedulerName == SparkSchedulerName { + return true + } + } + return false + }, + Handler: clientcache.ResourceEventHandlerFuncs{ + DeleteFunc: s.onPodDeletion, + }, + }, + ) + return s +} + +// GetSoftReservation returns a copy of the SoftReservation tied to an application if it exists (otherwise, bool returned will be false). +func (s *SoftReservationStore) GetSoftReservation(appID string) (*SoftReservation, bool) { + s.storeLock.RLock() + defer s.storeLock.RUnlock() + appSoftReservation, ok := s.store[appID] + if !ok { + return &SoftReservation{}, ok + } + return s.deepCopySoftReservation(appSoftReservation), ok +} + +// GetAllSoftReservationsCopy returns a copy of the internal store. As this indicates, this method does a deep copy +// which is slow and should only be used for purposes where this is acceptable such as tests. +func (s *SoftReservationStore) GetAllSoftReservationsCopy() map[string]*SoftReservation { + s.storeLock.RLock() + defer s.storeLock.RUnlock() + storeCopy := make(map[string]*SoftReservation, len(s.store)) + for appID, sr := range s.store { + storeCopy[appID] = s.deepCopySoftReservation(sr) + } + return storeCopy +} + +// CreateSoftReservationIfNotExists creates an internal empty soft reservation for a particular application. +// This is a noop if the reservation already exists. +func (s *SoftReservationStore) CreateSoftReservationIfNotExists(appID string) { + s.storeLock.Lock() + defer s.storeLock.Unlock() + _, ok := s.store[appID] + if !ok { + r := make(map[string]v1beta1.Reservation) + sr := &SoftReservation{ + Reservations: r, + Status: make(map[string]bool), + } + s.store[appID] = sr + } +} + +// AddReservationForPod adds a reservation for an extra executor pod, attaching the associated node and resources to it. +// This is a noop if the reservation already exists. +func (s *SoftReservationStore) AddReservationForPod(ctx context.Context, appID string, podName string, reservation v1beta1.Reservation) error { + s.storeLock.Lock() + defer s.storeLock.Unlock() + appSoftReservation, ok := s.store[appID] + if !ok { + return werror.Error("Could not add soft reservation since appID does not exist in reservation store", + werror.SafeParam("appID", appID)) + } + + if _, alreadyThere := appSoftReservation.Status[podName]; alreadyThere { + return nil + } + + appSoftReservation.Reservations[podName] = reservation + appSoftReservation.Status[podName] = true + return nil +} + +// ExecutorHasSoftReservation returns true when the passed executor pod currently has a SoftReservation, false otherwise. +func (s *SoftReservationStore) ExecutorHasSoftReservation(ctx context.Context, executor *v1.Pod) bool { + s.storeLock.RLock() + defer s.storeLock.RUnlock() + appID, ok := executor.Labels[SparkAppIDLabel] + if !ok { + svc1log.FromContext(ctx).Error("Cannot get SoftReservation for pod which does not have application ID label set", + svc1log.SafeParam("podName", executor.Name), + svc1log.SafeParam("expectedLabel", SparkAppIDLabel)) + return false + } + if sr, ok := s.GetSoftReservation(appID); ok { + _, ok := sr.Reservations[executor.Name] + return ok + } + return false +} + +// UsedSoftReservationResources returns SoftReservation usage by node. +func (s *SoftReservationStore) UsedSoftReservationResources() resources.NodeGroupResources { + s.storeLock.RLock() + defer s.storeLock.RUnlock() + res := resources.NodeGroupResources(map[string]*resources.Resources{}) + + for _, softReservation := range s.store { + for _, reservationObject := range softReservation.Reservations { + node := reservationObject.Node + if res[node] == nil { + res[node] = resources.Zero() + } + res[node].AddFromReservation(&reservationObject) + } + } + return res +} + +func (s *SoftReservationStore) onPodDeletion(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + s.logger.Error("failed to parse object as pod, trying to get from tombstone") + tombstone, ok := obj.(clientcache.DeletedFinalStateUnknown) + if !ok { + s.logger.Error("failed to get object from tombstone") + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + s.logger.Error("failed to get pod from tombstone") + return + } + } + appID := pod.Labels[SparkAppIDLabel] + switch pod.Labels[SparkRoleLabel] { + case Driver: + s.removeDriverReservation(appID) + case Executor: + s.removeExecutorReservation(appID, pod.Name) + } +} + +func (s *SoftReservationStore) removeExecutorReservation(appID string, executorName string) { + s.storeLock.Lock() + defer s.storeLock.Unlock() + sr, found := s.store[appID] + if !found { + return + } + if _, found := sr.Reservations[executorName]; found { + delete(sr.Reservations, executorName) + } + // We always mark this as false to remember that we saw the executor die, and prevent a race between this death event + // and the request to schedule the executor + sr.Status[executorName] = false +} + +func (s *SoftReservationStore) removeDriverReservation(appID string) { + s.storeLock.Lock() + defer s.storeLock.Unlock() + if _, found := s.store[appID]; found { + delete(s.store, appID) + } +} + +func (s *SoftReservationStore) deepCopySoftReservation(reservation *SoftReservation) *SoftReservation { + reservationsCopy := make(map[string]v1beta1.Reservation, len(reservation.Reservations)) + for name, res := range reservation.Reservations { + reservationsCopy[name] = *res.DeepCopy() + } + statusCopy := make(map[string]bool, len(reservation.Status)) + for name, status := range reservation.Status { + statusCopy[name] = status + } + return &SoftReservation{ + Reservations: reservationsCopy, + Status: statusCopy, + } +} diff --git a/internal/extender/demand.go b/internal/extender/demand.go index 35f88eb4e..f8b4ec374 100644 --- a/internal/extender/demand.go +++ b/internal/extender/demand.go @@ -154,7 +154,7 @@ func demandResources(applicationResources *sparkApplicationResources) []demandap Memory: applicationResources.driverResources.Memory, }, { - Count: applicationResources.executorCount, + Count: applicationResources.minExecutorCount, CPU: applicationResources.executorResources.CPU, Memory: applicationResources.executorResources.Memory, }, diff --git a/internal/extender/extendertest/extender_test_utils.go b/internal/extender/extendertest/extender_test_utils.go index f5e9964e6..17ee14ef7 100644 --- a/internal/extender/extendertest/extender_test_utils.go +++ b/internal/extender/extendertest/extender_test_utils.go @@ -104,10 +104,13 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) { return nil, err } + softReservationStore := sscache.NewSoftReservationStore(ctx, podInformerInterface) + overheadComputer := extender.NewOverheadComputer( ctx, podLister, resourceReservationCache, + softReservationStore, nodeLister, instanceGroupLabel, ) @@ -119,6 +122,7 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) { nodeLister, extender.NewSparkPodLister(podLister, instanceGroupLabel), resourceReservationCache, + softReservationStore, fakeKubeClient.CoreV1(), demandCache, fakeAPIExtensionsClient, diff --git a/internal/extender/failover.go b/internal/extender/failover.go index 4ab758ce8..f12af1f86 100644 --- a/internal/extender/failover.go +++ b/internal/extender/failover.go @@ -16,6 +16,7 @@ package extender import ( "context" + "math" "sort" "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta1" @@ -27,7 +28,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" - corelisters "k8s.io/client-go/listers/core/v1" ) // SyncResourceReservationsAndDemands gets all resource reservations and pods, @@ -46,14 +46,16 @@ func (s *SparkSchedulerExtender) syncResourceReservationsAndDemands(ctx context. return err } rrs := s.resourceReservations.List() - availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, s.instanceGroupLabel, rrs, nodes, s.overheadComputer.GetOverhead(ctx, nodes)) - staleSparkPods := unreservedSparkPodsBySparkID(ctx, rrs, pods) + overhead := s.overheadComputer.GetOverhead(ctx, nodes) + softReservationOverhead := s.softReservationStore.UsedSoftReservationResources() + availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, s.instanceGroupLabel, rrs, nodes, overhead, softReservationOverhead) + staleSparkPods := unreservedSparkPodsBySparkID(ctx, rrs, s.softReservationStore, pods) svc1log.FromContext(ctx).Info("starting reconciliation", svc1log.SafeParam("appCount", len(staleSparkPods))) - r := &reconciler{s.podLister, s.resourceReservations, s.demands, availableResources, orderedNodes, s.instanceGroupLabel} + r := &reconciler{s.podLister, s.resourceReservations, s.softReservationStore, s.demands, availableResources, orderedNodes, s.instanceGroupLabel} for _, sp := range staleSparkPods { - r.syncResourceReservation(ctx, sp) - r.syncDemand(ctx, sp) + r.syncResourceReservations(ctx, sp) + r.syncDemands(ctx, sp) } // recompute overhead to account for newly created resource reservations s.overheadComputer.compute(ctx) @@ -72,17 +74,26 @@ type sparkPods struct { type instanceGroup string type reconciler struct { - podLister corelisters.PodLister + podLister *SparkPodLister resourceReservations *cache.ResourceReservationCache + softReservations *cache.SoftReservationStore demands *cache.SafeDemandCache availableResources map[instanceGroup]resources.NodeGroupResources orderedNodes map[instanceGroup][]*v1.Node instanceGroupLabel string } -func (r *reconciler) syncResourceReservation(ctx context.Context, sp *sparkPods) { +func (r *reconciler) syncResourceReservations(ctx context.Context, sp *sparkPods) { // if the driver is nil it already has an associated reservation, get the resource // reservation object and update it so it has reservations for each stale executor + appResources, err := r.getAppResources(ctx, sp) + if err != nil { + svc1log.FromContext(ctx).Error("could not get application resources for application", + svc1log.SafeParam("appID", sp.appID), svc1log.Stacktrace(err)) + return + } + extraExecutors := make([]*v1.Pod, 0, len(sp.inconsistentExecutors)) + if sp.inconsistentDriver == nil && len(sp.inconsistentExecutors) > 0 { exec := sp.inconsistentExecutors[0] rr, ok := r.resourceReservations.Get(exec.Namespace, sp.appID) @@ -95,10 +106,24 @@ func (r *reconciler) syncResourceReservation(ctx context.Context, sp *sparkPods) logRR(ctx, "resource reservation deleted, ignoring", exec.Namespace, sp.appID) return } + + podsWithRR := make(map[string]bool, len(rr.Status.Pods)) + for _, podName := range rr.Status.Pods { + podsWithRR[podName] = true + } + for _, executor := range sp.inconsistentExecutors { + if _, ok := podsWithRR[executor.Name]; !ok { + extraExecutors = append(extraExecutors, executor) + } + } } else if sp.inconsistentDriver != nil { // the driver is stale, a new resource reservation object needs to be created instanceGroup := instanceGroup(sp.inconsistentDriver.Spec.NodeSelector[r.instanceGroupLabel]) - newRR, reservedResources, err := r.constructResourceReservation(ctx, sp.inconsistentDriver, sp.inconsistentExecutors, instanceGroup) + endIdx := int(math.Min(float64(len(sp.inconsistentExecutors)), float64(appResources.minExecutorCount))) + executorsUpToMin := sp.inconsistentExecutors[0:endIdx] + extraExecutors = sp.inconsistentExecutors[endIdx:] + + newRR, reservedResources, err := r.constructResourceReservation(ctx, sp.inconsistentDriver, executorsUpToMin, instanceGroup) if err != nil { svc1log.FromContext(ctx).Error("failed to construct resource reservation", svc1log.Stacktrace(err)) return @@ -114,9 +139,31 @@ func (r *reconciler) syncResourceReservation(ctx context.Context, sp *sparkPods) } r.availableResources[instanceGroup].Sub(reservedResources) } + + // Create soft reservation object for drivers that can have extra executors even if they don't at the moment + if appResources.maxExecutorCount > appResources.minExecutorCount { + r.softReservations.CreateSoftReservationIfNotExists(sp.appID) + } + // Create soft reservations for the extra executors + if len(extraExecutors) > 0 { + for i, extraExecutor := range extraExecutors { + if i >= (appResources.maxExecutorCount - appResources.minExecutorCount) { + break + } + err := r.softReservations.AddReservationForPod(ctx, sp.appID, extraExecutor.Name, v1beta1.Reservation{ + Node: extraExecutor.Spec.NodeName, + CPU: appResources.executorResources.CPU, + Memory: appResources.executorResources.Memory, + }) + if err != nil { + svc1log.FromContext(ctx).Error("failed to add soft reservation for executor on failover. skipping...", svc1log.Stacktrace(err)) + } + } + } + } -func (r *reconciler) syncDemand(ctx context.Context, sp *sparkPods) { +func (r *reconciler) syncDemands(ctx context.Context, sp *sparkPods) { if sp.inconsistentDriver != nil { r.deleteDemandIfExists(sp.inconsistentDriver.Namespace, demandResourceName(sp.inconsistentDriver)) } @@ -135,6 +182,7 @@ func (r *reconciler) deleteDemandIfExists(namespace, name string) { func unreservedSparkPodsBySparkID( ctx context.Context, rrs []*v1beta1.ResourceReservation, + softReservationStore *cache.SoftReservationStore, pods []*v1.Pod, ) map[string]*sparkPods { podsWithRRs := make(map[string]bool, len(rrs)) @@ -146,7 +194,8 @@ func unreservedSparkPodsBySparkID( appIDToPods := make(map[string]*sparkPods) for _, pod := range pods { - if isNotScheduledSparkPod(pod) || podsWithRRs[pod.Name] { + if isNotScheduledSparkPod(pod) || podsWithRRs[pod.Name] || + (pod.Labels[SparkRoleLabel] == Executor && softReservationStore.ExecutorHasSoftReservation(ctx, pod)) { continue } appID := pod.Labels[SparkAppIDLabel] @@ -178,7 +227,8 @@ func availableResourcesPerInstanceGroup( instanceGroupLabel string, rrs []*v1beta1.ResourceReservation, nodes []*v1.Node, - overhead resources.NodeGroupResources) (map[instanceGroup]resources.NodeGroupResources, map[instanceGroup][]*v1.Node) { + overhead resources.NodeGroupResources, + softReservationOverhead resources.NodeGroupResources) (map[instanceGroup]resources.NodeGroupResources, map[instanceGroup][]*v1.Node) { sort.Slice(nodes, func(i, j int) bool { return nodes[j].CreationTimestamp.Before(&nodes[i].CreationTimestamp) }) @@ -193,6 +243,7 @@ func availableResourcesPerInstanceGroup( } usages := resources.UsageForNodes(rrs) usages.Add(overhead) + usages.Add(softReservationOverhead) availableResources := make(map[instanceGroup]resources.NodeGroupResources) for instanceGroup, ns := range schedulableNodes { availableResources[instanceGroup] = resources.AvailableForNodes(ns, usages) @@ -219,6 +270,7 @@ func (r *reconciler) patchResourceReservation(execs []*v1.Pod, rr *v1beta1.Resou } } } + return r.resourceReservations.Update(rr) } @@ -238,13 +290,18 @@ func (r *reconciler) constructResourceReservation( return nil, nil, werror.Error("instance group not found", werror.SafeParam("instanceGroup", instanceGroup)) } - executorCountToAssignNodes := applicationResources.executorCount - len(executors) - reservedNodeNames, reservedResources := findNodes(executorCountToAssignNodes, applicationResources.executorResources, availableResources, nodes) - if len(reservedNodeNames) < executorCountToAssignNodes { - svc1log.FromContext(ctx).Error("could not reserve space for all executors", - svc1log.SafeParams(internal.PodSafeParams(*driver))) + var reservedNodeNames []string + var reservedResources resources.NodeGroupResources + executorCountToAssignNodes := applicationResources.minExecutorCount - len(executors) + if executorCountToAssignNodes > 0 { + reservedNodeNames, reservedResources = findNodes(executorCountToAssignNodes, applicationResources.executorResources, availableResources, nodes) + if len(reservedNodeNames) < executorCountToAssignNodes { + svc1log.FromContext(ctx).Error("could not reserve space for all executors", + svc1log.SafeParams(internal.PodSafeParams(*driver))) + } } - executorNodes := make([]string, 0, applicationResources.executorCount) + + executorNodes := make([]string, 0, applicationResources.minExecutorCount) for _, e := range executors { executorNodes = append(executorNodes, e.Spec.NodeName) } @@ -261,6 +318,23 @@ func (r *reconciler) constructResourceReservation( return rr, reservedResources, nil } +func (r *reconciler) getAppResources(ctx context.Context, sp *sparkPods) (*sparkApplicationResources, error) { + var driver *v1.Pod + if sp.inconsistentDriver != nil { + driver = sp.inconsistentDriver + } else if len(sp.inconsistentExecutors) > 0 { + d, err := r.podLister.getDriverPod(ctx, sp.inconsistentExecutors[0]) + if err != nil { + logRR(ctx, "error getting driver pod for executor", sp.inconsistentExecutors[0].Namespace, sp.appID) + return nil, err + } + driver = d + } else { + return nil, werror.Error("no inconsistent driver or executor") + } + return sparkResources(ctx, driver) +} + // findNodes reserves space for n executors, picks nodes by the iterating // through nodes with the given order. // TODO: replace this with the binpack function once it can return partial results diff --git a/internal/extender/overhead.go b/internal/extender/overhead.go index 7a1b32808..c804f8af4 100644 --- a/internal/extender/overhead.go +++ b/internal/extender/overhead.go @@ -39,6 +39,7 @@ var ( type OverheadComputer struct { podLister corelisters.PodLister resourceReservations *cache.ResourceReservationCache + softReservationStore *cache.SoftReservationStore nodeLister corelisters.NodeLister latestOverhead Overhead overheadLock *sync.RWMutex @@ -59,11 +60,13 @@ func NewOverheadComputer( ctx context.Context, podLister corelisters.PodLister, resourceReservations *cache.ResourceReservationCache, + softReservationStore *cache.SoftReservationStore, nodeLister corelisters.NodeLister, instanceGroupLabel string) *OverheadComputer { computer := &OverheadComputer{ podLister: podLister, resourceReservations: resourceReservations, + softReservationStore: softReservationStore, nodeLister: nodeLister, overheadLock: &sync.RWMutex{}, instanceGroupLabel: instanceGroupLabel, @@ -102,11 +105,17 @@ func (o *OverheadComputer) compute(ctx context.Context) { podsWithRRs[podName] = true } } + // TODO(rkaram): separate between regular overhead and dynamic allocation/spark overhead rawOverhead := map[string]resources.NodeGroupResources{} for _, p := range pods { if podsWithRRs[p.Name] { continue } + if role, ok := p.Labels[SparkRoleLabel]; ok { + if role == Executor && o.softReservationStore.ExecutorHasSoftReservation(ctx, p) { + continue + } + } if p.Spec.NodeName == "" || p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed { // pending pod or pod succeeded or failed continue diff --git a/internal/extender/resource.go b/internal/extender/resource.go index c36bddb6b..8a20e14c4 100644 --- a/internal/extender/resource.go +++ b/internal/extender/resource.go @@ -36,14 +36,16 @@ import ( ) const ( - failureUnbound = "failure-unbound" - failureInternal = "failure-internal" - failureFit = "failure-fit" - failureEarlierDriver = "failure-earlier-driver" - failureNonSparkPod = "failure-non-spark-pod" - success = "success" - successRescheduled = "success-rescheduled" - successAlreadyBound = "success-already-bound" + failureUnbound = "failure-unbound" + failureInternal = "failure-internal" + failureFit = "failure-fit" + failureEarlierDriver = "failure-earlier-driver" + failureNonSparkPod = "failure-non-spark-pod" + success = "success" + successRescheduled = "success-rescheduled" + successAlreadyBound = "success-already-bound" + successScheduledExtraExecutor = "success-scheduled-extra-executor" + failureFitExtraExecutor = "failure-fit-extra-executor" // TODO: make this configurable // leaderElectionInterval is the default LeaseDuration for core clients. // obtained from k8s.io/component-base/config/v1alpha1 @@ -57,6 +59,7 @@ type SparkSchedulerExtender struct { nodeLister corelisters.NodeLister podLister *SparkPodLister resourceReservations *cache.ResourceReservationCache + softReservationStore *cache.SoftReservationStore coreClient corev1.CoreV1Interface demands *cache.SafeDemandCache @@ -74,6 +77,7 @@ func NewExtender( nodeLister corelisters.NodeLister, podLister *SparkPodLister, resourceReservations *cache.ResourceReservationCache, + softReservationStore *cache.SoftReservationStore, coreClient corev1.CoreV1Interface, demands *cache.SafeDemandCache, apiExtensionsClient apiextensionsclientset.Interface, @@ -85,6 +89,7 @@ func NewExtender( nodeLister: nodeLister, podLister: podLister, resourceReservations: resourceReservations, + softReservationStore: softReservationStore, coreClient: coreClient, demands: demands, apiExtensionsClient: apiExtensionsClient, @@ -181,7 +186,7 @@ func (s *SparkSchedulerExtender) fitEarlierDrivers( ctx, applicationResources.driverResources, applicationResources.executorResources, - applicationResources.executorCount, + applicationResources.minExecutorCount, nodeNames, executorNodeNames, availableResources) if !hasCapacity { svc1log.FromContext(ctx).Warn("failed to fit one of the earlier drivers", @@ -242,13 +247,14 @@ func (s *SparkSchedulerExtender) selectDriverNode(ctx context.Context, driver *v ctx, applicationResources.driverResources, applicationResources.executorResources, - applicationResources.executorCount, + applicationResources.minExecutorCount, driverNodeNames, executorNodeNames, availableResources) svc1log.FromContext(ctx).Debug("binpacking result", svc1log.SafeParam("availableResources", availableResources), svc1log.SafeParam("driverResources", applicationResources.driverResources), svc1log.SafeParam("executorResources", applicationResources.executorResources), - svc1log.SafeParam("executorCount", applicationResources.executorCount), + svc1log.SafeParam("minExecutorCount", applicationResources.minExecutorCount), + svc1log.SafeParam("maxExecutorCount", applicationResources.maxExecutorCount), svc1log.SafeParam("hasCapacity", hasCapacity), svc1log.SafeParam("candidateDriverNodes", nodeNames), svc1log.SafeParam("candidateExecutorNodes", executorNodeNames), @@ -261,7 +267,12 @@ func (s *SparkSchedulerExtender) selectDriverNode(ctx context.Context, driver *v } s.removeDemandIfExists(ctx, driver) metrics.ReportCrossZoneMetric(ctx, driverNode, executorNodes, availableNodes) - return s.createResourceReservations(ctx, driver, applicationResources, driverNode, executorNodes) + reservedDriverNode, outcome, err := s.createResourceReservations(ctx, driver, applicationResources, driverNode, executorNodes) + if outcome == success && applicationResources.maxExecutorCount > applicationResources.minExecutorCount { + // only create soft reservations for applications which can request extra executors + s.softReservationStore.CreateSoftReservationIfNotExists(driver.Labels[SparkAppIDLabel]) + } + return reservedDriverNode, outcome, err } func (s *SparkSchedulerExtender) potentialNodes(availableNodes []*v1.Node, driver *v1.Pod, nodeNames []string) (driverNodes, executorNodes []string) { @@ -293,9 +304,41 @@ func (s *SparkSchedulerExtender) selectExecutorNode(ctx context.Context, executo if !ok { return "", failureInternal, werror.Error("failed to get resource reservations") } - unboundReservations, outcome, err := s.findUnboundReservations(ctx, executor, resourceReservation) - if err != nil { - return "", outcome, err + unboundReservations, outcome, unboundResErr := s.findUnboundReservations(ctx, executor, resourceReservation) + if unboundResErr != nil { + extraExecutorCount := 0 + if sr, ok := s.softReservationStore.GetSoftReservation(executor.Labels[SparkAppIDLabel]); ok { + extraExecutorCount = len(sr.Reservations) + } + driver, err := s.podLister.getDriverPod(ctx, executor) + if err != nil { + return "", failureInternal, err + } + sparkResources, err := sparkResources(ctx, driver) + if err != nil { + return "", failureInternal, err + } + if outcome == failureUnbound && (sparkResources.minExecutorCount+extraExecutorCount) < sparkResources.maxExecutorCount { + // dynamic allocation case where driver is requesting more executors than min but less than max + node, outcome, err := s.rescheduleExecutor(ctx, executor, nodeNames, sparkResources, false) + if err != nil { + if outcome == failureFit { + return node, failureFitExtraExecutor, werror.Error("not enough capacity to schedule the extra executor") + } + return node, outcome, err + } + softReservation := v1beta1.Reservation{ + Node: node, + CPU: sparkResources.executorResources.CPU, + Memory: sparkResources.executorResources.Memory, + } + err = s.softReservationStore.AddReservationForPod(ctx, driver.Labels[SparkAppIDLabel], executor.Name, softReservation) + if err != nil { + return "", failureInternal, err + } + return node, successScheduledExtraExecutor, nil + } + return "", outcome, unboundResErr } // the reservation to be selected for the current executor needs to be on a node that the extender has received from kube-scheduler nodeToReservation := make(map[string]string, len(unboundReservations)) @@ -316,7 +359,15 @@ func (s *SparkSchedulerExtender) selectExecutorNode(ctx context.Context, executo // try to reschedule the executor, breaking FIFO, but preventing executor starvation // we are guaranteed len(unboundResourceReservations) > 0 unboundReservation = unboundReservations[0] - node, outcome, err := s.rescheduleExecutor(ctx, executor, nodeNames, unboundReservation, resourceReservation) + driver, err := s.podLister.getDriverPod(ctx, executor) + if err != nil { + return "", failureInternal, err + } + sparkResources, err := sparkResources(ctx, driver) + if err != nil { + return "", failureInternal, err + } + node, outcome, err := s.rescheduleExecutor(ctx, executor, nodeNames, sparkResources, true) if err != nil { return "", outcome, err } @@ -327,7 +378,7 @@ func (s *SparkSchedulerExtender) selectExecutorNode(ctx context.Context, executo } copyResourceReservation.Status.Pods[unboundReservation] = executor.Name - err = s.resourceReservations.Update(copyResourceReservation) + err := s.resourceReservations.Update(copyResourceReservation) if err != nil { return "", failureInternal, werror.Wrap(err, "failed to update resource reservation") } @@ -351,7 +402,9 @@ func (s *SparkSchedulerExtender) getNodes(ctx context.Context, nodeNames []strin func (s *SparkSchedulerExtender) usedResources(nodeNames []string) (resources.NodeGroupResources, error) { resourceReservations := s.resourceReservations.List() - return resources.UsageForNodes(resourceReservations), nil + usage := resources.UsageForNodes(resourceReservations) + usage.Add(s.softReservationStore.UsedSoftReservationResources()) + return usage, nil } func (s *SparkSchedulerExtender) createResourceReservations( @@ -370,9 +423,8 @@ func (s *SparkSchedulerExtender) createResourceReservations( return driverNode, success, nil } -func (s *SparkSchedulerExtender) rescheduleExecutor(ctx context.Context, executor *v1.Pod, nodeNames []string, reservationName string, rr *v1beta1.ResourceReservation) (string, string, error) { - reservation := rr.Spec.Reservations[reservationName] - executorResources := &resources.Resources{CPU: reservation.CPU, Memory: reservation.Memory} +func (s *SparkSchedulerExtender) rescheduleExecutor(ctx context.Context, executor *v1.Pod, nodeNames []string, applicationResources *sparkApplicationResources, createDemandIfNoFit bool) (string, string, error) { + executorResources := &resources.Resources{CPU: applicationResources.executorResources.CPU, Memory: applicationResources.executorResources.Memory} availableNodes := s.getNodes(ctx, nodeNames) usages, err := s.usedResources(nodeNames) if err != nil { @@ -385,7 +437,10 @@ func (s *SparkSchedulerExtender) rescheduleExecutor(ctx context.Context, executo return name, successRescheduled, nil } } - s.createDemandForExecutor(ctx, executor, executorResources) + + if createDemandIfNoFit { + s.createDemandForExecutor(ctx, executor, executorResources) + } return "", failureFit, werror.Error("not enough capacity to reschedule the executor") } @@ -427,7 +482,6 @@ func (s *SparkSchedulerExtender) findUnboundReservations(ctx context.Context, ex relocatableReservations = append(relocatableReservations, name) } } - if len(relocatableReservations) == 0 { return nil, failureUnbound, werror.Error("failed to find unbound resource reservation", werror.SafeParams(logging.RRSafeParam(resourceReservation))) } diff --git a/internal/extender/sparkpods.go b/internal/extender/sparkpods.go index 5f23d515a..e009ea545 100644 --- a/internal/extender/sparkpods.go +++ b/internal/extender/sparkpods.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sort" + "strconv" "github.com/palantir/k8s-spark-scheduler-lib/pkg/resources" v1 "k8s.io/api/core/v1" @@ -48,14 +49,21 @@ const ( ExecutorCPU = "spark-executor-cpu" // ExecutorMemory represents the key of an annotation that describes how much memory a spark executor requires ExecutorMemory = "spark-executor-mem" - // ExecutorCount represents the key of an annotation that describes how many executors a spark job requires + // DynamicAllocationEnabled sets whether dynamic allocation is enabled for this spark application (false by default) + DynamicAllocationEnabled = "spark-dynamic-allocation-enabled" + // ExecutorCount represents the key of an annotation that describes how many executors a spark application requires (required if DynamicAllocationEnabled is false) ExecutorCount = "spark-executor-count" + // DAMinExecutorCount represents the lower bound on the number of executors a spark application requires if dynamic allocation is enabled (required if DynamicAllocationEnabled is true) + DAMinExecutorCount = "spark-dynamic-allocation-min-executor-count" + // DAMaxExecutorCount represents the upper bound on the number of executors a spark application can have if dynamic allocation is enabled (required if DynamicAllocationEnabled is true) + DAMaxExecutorCount = "spark-dynamic-allocation-max-executor-count" ) type sparkApplicationResources struct { driverResources *resources.Resources executorResources *resources.Resources - executorCount int + minExecutorCount int + maxExecutorCount int } // SparkPodLister is a PodLister which can also list drivers per node selector @@ -99,10 +107,26 @@ func filterToEarliestAndSort(driver *v1.Pod, allDrivers []*v1.Pod, instanceGroup func sparkResources(ctx context.Context, pod *v1.Pod) (*sparkApplicationResources, error) { parsedResources := map[string]resource.Quantity{} + dynamicAllocationEnabled := false + if daLabel, ok := pod.Annotations[DynamicAllocationEnabled]; ok { + da, err := strconv.ParseBool(daLabel) + if err != nil { + return nil, fmt.Errorf("annotation DynamicAllocationEnabled could not be parsed as a boolean") + } + dynamicAllocationEnabled = da + } - for _, a := range []string{DriverCPU, DriverMemory, ExecutorCPU, ExecutorMemory, ExecutorCount} { + for _, a := range []string{DriverCPU, DriverMemory, ExecutorCPU, ExecutorMemory, ExecutorCount, DAMinExecutorCount, DAMaxExecutorCount} { value, ok := pod.Annotations[a] if !ok { + switch { + case dynamicAllocationEnabled == false && a == ExecutorCount: + return nil, fmt.Errorf("annotation ExecutorCount is required when DynamicAllocationEnabled is false") + case dynamicAllocationEnabled == true && (a == DAMinExecutorCount || a == DAMaxExecutorCount): + return nil, fmt.Errorf("annotation %v is required when DynamicAllocationEnabled is true", a) + case a == ExecutorCount || a == DAMinExecutorCount || a == DAMaxExecutorCount: + continue + } return nil, fmt.Errorf("annotation %v is missing from driver", a) } quantity, err := resource.ParseQuantity(value) @@ -112,9 +136,19 @@ func sparkResources(ctx context.Context, pod *v1.Pod) (*sparkApplicationResource parsedResources[a] = quantity } - executorCountQuantity := parsedResources[ExecutorCount] - // justification for casting to int from int64: executor count is small (<1000) - executorCount := int(executorCountQuantity.Value()) + var minExecutorCount int + var maxExecutorCount int + if dynamicAllocationEnabled { + // justification for casting to int from int64: executor count is small (<1000) + parsedMinExecutorCount := parsedResources[DAMinExecutorCount] + parsedMaxExecutorCount := parsedResources[DAMaxExecutorCount] + minExecutorCount = int(parsedMinExecutorCount.Value()) + maxExecutorCount = int(parsedMaxExecutorCount.Value()) + } else { + parsedExecutorCount := parsedResources[ExecutorCount] + minExecutorCount = int(parsedExecutorCount.Value()) + maxExecutorCount = int(parsedExecutorCount.Value()) + } driverResources := &resources.Resources{ CPU: parsedResources[DriverCPU], @@ -124,7 +158,7 @@ func sparkResources(ctx context.Context, pod *v1.Pod) (*sparkApplicationResource CPU: parsedResources[ExecutorCPU], Memory: parsedResources[ExecutorMemory], } - return &sparkApplicationResources{driverResources, executorResources, executorCount}, nil + return &sparkApplicationResources{driverResources, executorResources, minExecutorCount, maxExecutorCount}, nil } func sparkResourceUsage(driverResources, executorResources *resources.Resources, driverNode string, executorNodes []string) resources.NodeGroupResources { @@ -135,3 +169,12 @@ func sparkResourceUsage(driverResources, executorResources *resources.Resources, } return res } + +func (s SparkPodLister) getDriverPod(ctx context.Context, executor *v1.Pod) (*v1.Pod, error) { + selector := labels.Set(map[string]string{SparkAppIDLabel: executor.Labels[SparkAppIDLabel], SparkRoleLabel: Driver}).AsSelector() + driver, err := s.Pods(executor.Namespace).List(selector) + if err != nil || len(driver) != 1 { + return nil, err + } + return driver[0], nil +} diff --git a/internal/extender/sparkpods_test.go b/internal/extender/sparkpods_test.go index d0cd07465..19d2b6b90 100644 --- a/internal/extender/sparkpods_test.go +++ b/internal/extender/sparkpods_test.go @@ -40,7 +40,7 @@ func TestSparkResources(t *testing.T) { pod v1.Pod expectedApplicationResources *sparkApplicationResources }{{ - name: "parses pod annotations into resources", + name: "parses static allocation pod annotations into resources", pod: v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -55,7 +55,29 @@ func TestSparkResources(t *testing.T) { expectedApplicationResources: &sparkApplicationResources{ driverResources: createResources(1, 2432*1024*1024), executorResources: createResources(2, 6758*1024*1024), - executorCount: 2, + minExecutorCount: 2, + maxExecutorCount: 2, + }, + }, { + name: "parses dynamic allocation pod annotations into resources", + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + DriverCPU: "1", + DriverMemory: "2432Mi", + ExecutorCPU: "2", + ExecutorMemory: "6758Mi", + DynamicAllocationEnabled: "true", + DAMinExecutorCount: "2", + DAMaxExecutorCount: "5", + }, + }, + }, + expectedApplicationResources: &sparkApplicationResources{ + driverResources: createResources(1, 2432*1024*1024), + executorResources: createResources(2, 6758*1024*1024), + minExecutorCount: 2, + maxExecutorCount: 5, }, }} @@ -73,9 +95,13 @@ func TestSparkResources(t *testing.T) { t.Fatalf("executorResources are not equal, expected: %v, got: %v", test.expectedApplicationResources.executorResources, applicationResources.executorResources) } - if applicationResources.executorCount != test.expectedApplicationResources.executorCount { - t.Fatalf("executorCount are not equal, expected: %v, got: %v", - test.expectedApplicationResources.executorCount, applicationResources.executorCount) + if applicationResources.minExecutorCount != test.expectedApplicationResources.minExecutorCount { + t.Fatalf("minExecutorCount not equal to ExecutorCount in static allocation, expected: %v, got: %v", + test.expectedApplicationResources.minExecutorCount, applicationResources.minExecutorCount) + } + if applicationResources.maxExecutorCount != test.expectedApplicationResources.maxExecutorCount { + t.Fatalf("maxExecutorCount not equal to ExecutorCount in static allocation, expected: %v, got: %v", + test.expectedApplicationResources.maxExecutorCount, applicationResources.maxExecutorCount) } }) } diff --git a/internal/extender/unschedulablepods.go b/internal/extender/unschedulablepods.go index ef1c5d772..03696ef32 100644 --- a/internal/extender/unschedulablepods.go +++ b/internal/extender/unschedulablepods.go @@ -139,7 +139,7 @@ func (u *UnschedulablePodMarker) DoesPodExceedClusterCapacity(ctx context.Contex ctx, applicationResources.driverResources, applicationResources.executorResources, - applicationResources.executorCount, + applicationResources.minExecutorCount, nodeNames, nodeNames, availableResources)