Skip to content

Commit

Permalink
Fix: Make instance group label configurable (palantir#66)
Browse files Browse the repository at this point in the history
* instance group label configurable
  • Loading branch information
laflechejonathan authored and rkaram committed Oct 1, 2019
1 parent 4361666 commit e70fc44
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 94 deletions.
12 changes: 10 additions & 2 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
}
kubeconfig.QPS = install.QPS
kubeconfig.Burst = install.Burst
instanceGroupLabel := install.InstanceGroupLabel
if instanceGroupLabel == "" {
// for back-compat, as instanceGroupLabel was once hard-coded to this value
instanceGroupLabel = "resource_channel"
}

kubeClient, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
Expand Down Expand Up @@ -156,25 +161,28 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
podLister,
resourceReservationCache,
nodeLister,
instanceGroupLabel,
)

binpacker := extender.SelectBinpacker(install.BinpackAlgo)

sparkSchedulerExtender := extender.NewExtender(
nodeLister,
extender.NewSparkPodLister(podLister),
extender.NewSparkPodLister(podLister, instanceGroupLabel),
resourceReservationCache,
kubeClient.CoreV1(),
demandCache,
apiExtensionsClient,
install.FIFO,
binpacker,
overheadComputer,
instanceGroupLabel,
)

resourceReporter := metrics.NewResourceReporter(
nodeLister,
resourceReservationCache,
instanceGroupLabel,
)

cacheReporter := metrics.NewCacheMetrics(
Expand All @@ -183,7 +191,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
demandCache,
)

queueReporter := metrics.NewQueueReporter(podLister)
queueReporter := metrics.NewQueueReporter(podLister, instanceGroupLabel)

unschedulablePodMarker := extender.NewUnschedulablePodMarker(
nodeLister,
Expand Down
15 changes: 8 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

// Install contains the install time configuration of the server and kubernetes dependency
type Install struct {
config.Install `yaml:",inline"`
config.Runtime `yaml:",inline"`
Kubeconfig string `yaml:"kube-config,omitempty"`
FIFO bool `yaml:"fifo,omitempty"`
QPS float32 `yaml:"qps,omitempty"`
Burst int `yaml:"burst,omitempty"`
BinpackAlgo string `yaml:"binpack,omitempty"`
config.Install `yaml:",inline"`
config.Runtime `yaml:",inline"`
Kubeconfig string `yaml:"kube-config,omitempty"`
FIFO bool `yaml:"fifo,omitempty"`
QPS float32 `yaml:"qps,omitempty"`
Burst int `yaml:"burst,omitempty"`
BinpackAlgo string `yaml:"binpack,omitempty"`
InstanceGroupLabel string `yaml:"instance-group-label,omitempty"`

ResourceReservationCRDAnnotations map[string]string `yaml:"resource-reservation-crd-annotations,omitempty"`
}
23 changes: 10 additions & 13 deletions internal/extender/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

const (
instanceGroupNodeSelector = "resource_channel"
instanceGroupLabel = "instance-group"
)

const (
podDemandCreated v1.PodConditionType = "PodDemandCreated"
)
Expand Down Expand Up @@ -78,7 +73,14 @@ func (s *SparkSchedulerExtender) createDemandForApplication(ctx context.Context,
}

func (s *SparkSchedulerExtender) createDemand(ctx context.Context, pod *v1.Pod, demandUnits []demandapi.DemandUnit) {
newDemand, err := newDemand(pod, demandUnits)
instanceGroup, ok := pod.Spec.NodeSelector[s.instanceGroupLabel]
if !ok {
svc1log.FromContext(ctx).Error("No instanceGroup label exists. Cannot map to InstanceGroup. Skipping demand object",
svc1log.SafeParam("expectedLabel", s.instanceGroupLabel))
return
}

newDemand, err := newDemand(pod, instanceGroup, demandUnits)
if err != nil {
svc1log.FromContext(ctx).Error("failed to construct demand object", svc1log.Stacktrace(err))
return
Expand Down Expand Up @@ -120,11 +122,7 @@ func (s *SparkSchedulerExtender) removeDemandIfExists(ctx context.Context, pod *
}
}

func newDemand(pod *v1.Pod, units []demandapi.DemandUnit) (*demandapi.Demand, error) {
instanceGroup, ok := pod.Spec.NodeSelector[instanceGroupNodeSelector]
if !ok {
return nil, werror.Error("No resource_channel label exists. Cannot map to InstanceGroup. Skipping demand object")
}
func newDemand(pod *v1.Pod, instanceGroup string, units []demandapi.DemandUnit) (*demandapi.Demand, error) {
appID, ok := pod.Labels[SparkAppIDLabel]
if !ok {
return nil, werror.Error("pod did not contain expected label for AppID", werror.SafeParam("expectedLabel", SparkAppIDLabel))
Expand All @@ -135,8 +133,7 @@ func newDemand(pod *v1.Pod, units []demandapi.DemandUnit) (*demandapi.Demand, er
Name: demandName,
Namespace: pod.Namespace,
Labels: map[string]string{
SparkAppIDLabel: appID,
instanceGroupLabel: instanceGroup,
SparkAppIDLabel: appID,
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(pod, podGroupVersionKind),
Expand Down
6 changes: 5 additions & 1 deletion internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
resourceReservationInformerInterface := sparkSchedulerInformerFactory.Sparkscheduler().V1beta1().ResourceReservations()
resourceReservationInformer := resourceReservationInformerInterface.Informer()

instanceGroupLabel := "resource_channel"

go func() {
kubeInformerFactory.Start(ctx.Done())
}()
Expand Down Expand Up @@ -107,21 +109,23 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
podLister,
resourceReservationCache,
nodeLister,
instanceGroupLabel,
)

isFIFO := true
binpacker := extender.SelectBinpacker("tightly-pack")

sparkSchedulerExtender := extender.NewExtender(
nodeLister,
extender.NewSparkPodLister(podLister),
extender.NewSparkPodLister(podLister, instanceGroupLabel),
resourceReservationCache,
fakeKubeClient.CoreV1(),
demandCache,
fakeAPIExtensionsClient,
isFIFO,
binpacker,
overheadComputer,
instanceGroupLabel,
)

unschedulablePodMarker := extender.NewUnschedulablePodMarker(
Expand Down
10 changes: 6 additions & 4 deletions internal/extender/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (s *SparkSchedulerExtender) syncResourceReservationsAndDemands(ctx context.
return err
}
rrs := s.resourceReservations.List()
availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, rrs, nodes, s.overheadComputer.GetOverhead(ctx, nodes))
availableResources, orderedNodes := availableResourcesPerInstanceGroup(ctx, s.instanceGroupLabel, rrs, nodes, s.overheadComputer.GetOverhead(ctx, nodes))
staleSparkPods := unreservedSparkPodsBySparkID(ctx, rrs, pods)
svc1log.FromContext(ctx).Info("starting reconciliation", svc1log.SafeParam("appCount", len(staleSparkPods)))

r := &reconciler{s.podLister, s.resourceReservations, s.demands, availableResources, orderedNodes}
r := &reconciler{s.podLister, s.resourceReservations, s.demands, availableResources, orderedNodes, s.instanceGroupLabel}
for _, sp := range staleSparkPods {
r.syncResourceReservation(ctx, sp)
r.syncDemand(ctx, sp)
Expand All @@ -77,6 +77,7 @@ type reconciler struct {
demands *cache.SafeDemandCache
availableResources map[instanceGroup]resources.NodeGroupResources
orderedNodes map[instanceGroup][]*v1.Node
instanceGroupLabel string
}

func (r *reconciler) syncResourceReservation(ctx context.Context, sp *sparkPods) {
Expand All @@ -96,7 +97,7 @@ func (r *reconciler) syncResourceReservation(ctx context.Context, sp *sparkPods)
}
} else if sp.inconsistentDriver != nil {
// the driver is stale, a new resource reservation object needs to be created
instanceGroup := instanceGroup(sp.inconsistentDriver.Spec.NodeSelector[instanceGroupNodeSelector])
instanceGroup := instanceGroup(sp.inconsistentDriver.Spec.NodeSelector[r.instanceGroupLabel])
newRR, reservedResources, err := r.constructResourceReservation(ctx, sp.inconsistentDriver, sp.inconsistentExecutors, instanceGroup)
if err != nil {
svc1log.FromContext(ctx).Error("failed to construct resource reservation", svc1log.Stacktrace(err))
Expand Down Expand Up @@ -174,6 +175,7 @@ func isNotScheduledSparkPod(pod *v1.Pod) bool {

func availableResourcesPerInstanceGroup(
ctx context.Context,
instanceGroupLabel string,
rrs []*v1beta1.ResourceReservation,
nodes []*v1.Node,
overhead resources.NodeGroupResources) (map[instanceGroup]resources.NodeGroupResources, map[instanceGroup][]*v1.Node) {
Expand All @@ -186,7 +188,7 @@ func availableResourcesPerInstanceGroup(
if n.Spec.Unschedulable {
continue
}
instanceGroup := instanceGroup(n.Labels[instanceGroupNodeSelector])
instanceGroup := instanceGroup(n.Labels[instanceGroupLabel])
schedulableNodes[instanceGroup] = append(schedulableNodes[instanceGroup], n)
}
usages := resources.UsageForNodes(rrs)
Expand Down
9 changes: 6 additions & 3 deletions internal/extender/overhead.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type OverheadComputer struct {
nodeLister corelisters.NodeLister
latestOverhead Overhead
overheadLock *sync.RWMutex
instanceGroupLabel string
}

// Overhead represents the overall overhead in the cluster, indexed by instance groups
Expand All @@ -58,12 +59,14 @@ func NewOverheadComputer(
ctx context.Context,
podLister corelisters.PodLister,
resourceReservations *cache.ResourceReservationCache,
nodeLister corelisters.NodeLister) *OverheadComputer {
nodeLister corelisters.NodeLister,
instanceGroupLabel string) *OverheadComputer {
computer := &OverheadComputer{
podLister: podLister,
resourceReservations: resourceReservations,
nodeLister: nodeLister,
overheadLock: &sync.RWMutex{},
instanceGroupLabel: instanceGroupLabel,
}
computer.compute(ctx)
return computer
Expand Down Expand Up @@ -114,7 +117,7 @@ func (o *OverheadComputer) compute(ctx context.Context) {
continue
}
// found pod with not associated resource reservation, add to overhead
instanceGroup := node.Labels[instanceGroupNodeSelector]
instanceGroup := node.Labels[o.instanceGroupLabel]
if _, ok := rawOverhead[instanceGroup]; !ok {
rawOverhead[instanceGroup] = resources.NodeGroupResources{}
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func (o OverheadComputer) GetOverhead(ctx context.Context, nodes []*v1.Node) res
return res
}
for _, n := range nodes {
instanceGroup := n.Labels[instanceGroupNodeSelector]
instanceGroup := n.Labels[o.instanceGroupLabel]
instanceGroupOverhead := o.latestOverhead[instanceGroup]
if instanceGroupOverhead == nil {
svc1log.FromContext(ctx).Warn("overhead for instance group does not exist", svc1log.SafeParam("instanceGroup", instanceGroup))
Expand Down
18 changes: 11 additions & 7 deletions internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ type SparkSchedulerExtender struct {
demands *cache.SafeDemandCache
apiExtensionsClient apiextensionsclientset.Interface

isFIFO bool
binpacker *Binpacker
overheadComputer *OverheadComputer
lastRequest time.Time
isFIFO bool
binpacker *Binpacker
overheadComputer *OverheadComputer
lastRequest time.Time
instanceGroupLabel string
}

// NewExtender is responsible for creating and initializing a SparkSchedulerExtender
Expand All @@ -78,7 +79,8 @@ func NewExtender(
apiExtensionsClient apiextensionsclientset.Interface,
isFIFO bool,
binpacker *Binpacker,
overheadComputer *OverheadComputer) *SparkSchedulerExtender {
overheadComputer *OverheadComputer,
instanceGroupLabel string) *SparkSchedulerExtender {
return &SparkSchedulerExtender{
nodeLister: nodeLister,
podLister: podLister,
Expand All @@ -89,6 +91,7 @@ func NewExtender(
isFIFO: isFIFO,
binpacker: binpacker,
overheadComputer: overheadComputer,
instanceGroupLabel: instanceGroupLabel,
}
}

Expand All @@ -97,12 +100,13 @@ func NewExtender(
func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
params := internal.PodSafeParams(args.Pod)
role := args.Pod.Labels[SparkRoleLabel]
instanceGroup := args.Pod.Spec.NodeSelector[s.instanceGroupLabel]
params["podSparkRole"] = role
params["instanceGroup"] = args.Pod.Spec.NodeSelector[instanceGroupNodeSelector]
params["instanceGroup"] = instanceGroup
ctx = svc1log.WithLoggerParams(ctx, svc1log.SafeParams(params))
logger := svc1log.FromContext(ctx)

timer := metrics.NewScheduleTimer(ctx, &args.Pod)
timer := metrics.NewScheduleTimer(ctx, instanceGroup, &args.Pod)
logger.Info("starting scheduling pod")

err := s.reconcileIfNeeded(ctx, timer)
Expand Down
11 changes: 6 additions & 5 deletions internal/extender/sparkpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ type sparkApplicationResources struct {
// SparkPodLister is a PodLister which can also list drivers per node selector
type SparkPodLister struct {
corelisters.PodLister
instanceGroupLabel string
}

// NewSparkPodLister creates and initializes a SparkPodLister
func NewSparkPodLister(delegate corelisters.PodLister) *SparkPodLister {
return &SparkPodLister{delegate}
func NewSparkPodLister(delegate corelisters.PodLister, instanceGroupLabel string) *SparkPodLister {
return &SparkPodLister{delegate, instanceGroupLabel}
}

// ListEarlierDrivers lists earlier driver than the given driver that has the same node selectors
Expand All @@ -75,16 +76,16 @@ func (s SparkPodLister) ListEarlierDrivers(driver *v1.Pod) ([]*v1.Pod, error) {
if err != nil {
return nil, err
}
return filterToEarliestAndSort(driver, drivers), nil
return filterToEarliestAndSort(driver, drivers, s.instanceGroupLabel), nil
}

func filterToEarliestAndSort(driver *v1.Pod, allDrivers []*v1.Pod) []*v1.Pod {
func filterToEarliestAndSort(driver *v1.Pod, allDrivers []*v1.Pod, instanceGroupLabel string) []*v1.Pod {
earlierDrivers := make([]*v1.Pod, 0, 10)
for _, p := range allDrivers {
// add only unscheduled drivers with the same instance group and targeted to the same scheduler
if len(p.Spec.NodeName) == 0 &&
p.Spec.SchedulerName == driver.Spec.SchedulerName &&
p.Spec.NodeSelector[instanceGroupNodeSelector] == driver.Spec.NodeSelector[instanceGroupNodeSelector] &&
p.Spec.NodeSelector[instanceGroupLabel] == driver.Spec.NodeSelector[instanceGroupLabel] &&
p.CreationTimestamp.Before(&driver.CreationTimestamp) &&
p.DeletionTimestamp == nil {
earlierDrivers = append(earlierDrivers, p)
Expand Down
Loading

0 comments on commit e70fc44

Please sign in to comment.