-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
batch-serial is back-compat with serial, and batch-canary with canary, but added as new commands for now to ensure not to disrupt workflows relying on these commands to be bulletproof
- Loading branch information
1 parent
f397879
commit 465254a
Showing
8 changed files
with
880 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,273 @@ | ||
// Copyright 2017 Palantir Technologies, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package batchcanary | ||
|
||
import ( | ||
"context" | ||
"strings" | ||
|
||
at "github.com/aws/aws-sdk-go-v2/service/autoscaling/types" | ||
"github.com/palantir/bouncer/bouncer" | ||
"github.com/pkg/errors" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// Runner holds data for a particular batch-canary run | ||
// Note that in the batch-canary case, asgs will always be of length 1 | ||
type Runner struct { | ||
bouncer.BaseRunner | ||
batchSize int32 // This field is set in ValidatePrereqs | ||
} | ||
|
||
// NewRunner instantiates a new batch-canary runner | ||
func NewRunner(ctx context.Context, opts *bouncer.RunnerOpts) (*Runner, error) { | ||
br, err := bouncer.NewBaseRunner(ctx, opts) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "error getting base runner") | ||
} | ||
|
||
batchSize := *opts.BatchSize | ||
|
||
if batchSize == 0 { | ||
if len(strings.Split(opts.AsgString, ",")) > 1 { | ||
return nil, errors.New("Batch canary mode supports only 1 ASG at a time") | ||
} | ||
|
||
da, err := bouncer.ExtractDesiredASG(opts.AsgString, nil, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
batchSize = da.DesiredCapacity | ||
} | ||
|
||
r := Runner{ | ||
BaseRunner: *br, | ||
batchSize: batchSize, | ||
} | ||
return &r, nil | ||
} | ||
|
||
// ValidatePrereqs checks that the batch runner is safe to proceed | ||
func (r *Runner) ValidatePrereqs(ctx context.Context) error { | ||
asgSet, err := r.NewASGSet(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "error building actualASG") | ||
} | ||
|
||
if len(asgSet.ASGs) > 1 { | ||
log.WithFields(log.Fields{ | ||
"count given": len(asgSet.ASGs), | ||
}).Error("Batch Canary mode supports only 1 ASG at a time") | ||
return errors.New("error validating ASG input") | ||
} | ||
|
||
for _, actualAsg := range asgSet.ASGs { | ||
if actualAsg.DesiredASG.DesiredCapacity != *actualAsg.ASG.DesiredCapacity { | ||
log.WithFields(log.Fields{ | ||
"desired capacity given": actualAsg.DesiredASG.DesiredCapacity, | ||
"desired capacity actual": *actualAsg.ASG.DesiredCapacity, | ||
}).Error("Desired capacity given must be equal to starting desired_capacity of ASG") | ||
return errors.New("error validating ASG state") | ||
} | ||
|
||
if actualAsg.DesiredASG.DesiredCapacity < *actualAsg.ASG.MinSize { | ||
log.WithFields(log.Fields{ | ||
"min size": *actualAsg.ASG.MinSize, | ||
"max size": *actualAsg.ASG.MaxSize, | ||
"desired capacity": actualAsg.DesiredASG.DesiredCapacity, | ||
}).Error("Desired capacity given must be greater than or equal to min ASG size") | ||
return errors.New("error validating ASG state") | ||
} | ||
|
||
if *actualAsg.ASG.MaxSize < (actualAsg.DesiredASG.DesiredCapacity + r.batchSize) { | ||
log.WithFields(log.Fields{ | ||
"min size": *actualAsg.ASG.MinSize, | ||
"max size": *actualAsg.ASG.MaxSize, | ||
"desired capacity": actualAsg.DesiredASG.DesiredCapacity, | ||
"batch size": r.batchSize, | ||
}).Error("Max capacity of ASG must be >= desired capacity + batch size") | ||
return errors.New("error validating ASG state") | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func min(a, b int32) int32 { | ||
if a < b { | ||
return a | ||
} | ||
return b | ||
} | ||
|
||
// Run has the meat of the batch job | ||
func (r *Runner) Run() error { | ||
var newDesiredCapacity int32 | ||
decrement := true | ||
|
||
ctx, cancel := r.NewContext() | ||
defer cancel() | ||
|
||
for { | ||
// Rebuild the state of the world every iteration of the loop because instance and ASG statuses are changing | ||
log.Debug("Beginning new batch canary run check") | ||
asgSet, err := r.NewASGSet(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "error building ASGSet") | ||
} | ||
|
||
// Since we only support one ASG in batch-canary mode | ||
asg := asgSet.ASGs[0] | ||
curDesiredCapacity := *asg.ASG.DesiredCapacity | ||
finDesiredCapacity := asg.DesiredASG.DesiredCapacity | ||
|
||
oldUnhealthy := asgSet.GetUnHealthyOldInstances() | ||
newHealthy := asgSet.GetHealthyNewInstances() | ||
oldHealthy := asgSet.GetHealthyOldInstances() | ||
|
||
newCount := int32(len(asgSet.GetNewInstances())) | ||
oldCount := int32(len(asgSet.GetOldInstances())) | ||
healthyCount := int32(len(newHealthy) + len(oldHealthy)) | ||
|
||
totalCount := newCount + oldCount | ||
|
||
// Never terminate nodes so that we go below finDesiredCapacity number of healthy (InService) machines | ||
extraNodes := healthyCount - finDesiredCapacity | ||
|
||
maxDesiredCapacity := finDesiredCapacity + r.batchSize | ||
newDesiredCapacity = min(maxDesiredCapacity, finDesiredCapacity+oldCount) | ||
|
||
// Clean-out old unhealthy instances in P:W now, as they're just wasting time | ||
oldKilled := false | ||
for _, oi := range oldUnhealthy { | ||
if oi.ASGInstance.LifecycleState == at.LifecycleStatePendingWait { | ||
err := r.KillInstance(ctx, oi, &decrement) | ||
if err != nil { | ||
return errors.Wrap(err, "error killing instance") | ||
} | ||
oldKilled = true | ||
} | ||
} | ||
|
||
if oldKilled { | ||
ctx, cancel = r.NewContext() | ||
defer cancel() | ||
r.Sleep(ctx) | ||
|
||
continue | ||
} | ||
|
||
// This check already prints statuses of individual nodes | ||
if asgSet.IsTransient() { | ||
log.Info("Waiting for nodes to settle") | ||
r.Sleep(ctx) | ||
continue | ||
} | ||
|
||
// Our exit case - we have exactly the number of nodes we want, they're all new, and they're all InService | ||
if oldCount == 0 && totalCount == finDesiredCapacity { | ||
if curDesiredCapacity == finDesiredCapacity { | ||
log.Info("Didn't find any old instances or ASGs - we're done here!") | ||
return nil | ||
} | ||
|
||
// Not sure how this would happen off-hand? | ||
log.WithFields(log.Fields{ | ||
"Current desired capacity": curDesiredCapacity, | ||
"Final desired capacity": finDesiredCapacity, | ||
}).Error("Capacity mismatch") | ||
return errors.New("old instance mismatch") | ||
} | ||
|
||
// If we haven't canaried a new instance yet, let's do that | ||
if newCount == 0 { | ||
log.Info("Adding canary node") | ||
newDesiredCapacity = curDesiredCapacity + 1 | ||
|
||
err = r.SetDesiredCapacity(ctx, asg, &newDesiredCapacity) | ||
if err != nil { | ||
return errors.Wrap(err, "error setting desired capacity") | ||
} | ||
|
||
ctx, cancel = r.NewContext() | ||
defer cancel() | ||
r.Sleep(ctx) | ||
|
||
continue | ||
} | ||
|
||
// Scale-out a batch | ||
if newDesiredCapacity > curDesiredCapacity { | ||
log.WithFields(log.Fields{ | ||
"Batch size given": r.batchSize, | ||
"Old machines remaining": oldCount, | ||
"Max descap": maxDesiredCapacity, | ||
"Current batch size": newDesiredCapacity - curDesiredCapacity, | ||
}).Info("Adding a batch of new nodes") | ||
|
||
err = r.SetDesiredCapacity(ctx, asg, &newDesiredCapacity) | ||
if err != nil { | ||
return errors.Wrap(err, "error setting desired capacity") | ||
} | ||
|
||
ctx, cancel = r.NewContext() | ||
defer cancel() | ||
r.Sleep(ctx) | ||
|
||
continue | ||
} | ||
|
||
// Scale-in a batch | ||
if extraNodes > 0 { | ||
killed := int32(0) | ||
|
||
log.WithFields(log.Fields{ | ||
"Old nodes": oldCount, | ||
"Healthy nodes": healthyCount, | ||
"Extra nodes": extraNodes, | ||
}).Info("Killing a batch of nodes") | ||
|
||
for _, oi := range oldHealthy { | ||
err := r.KillInstance(ctx, oi, &decrement) | ||
if err != nil { | ||
return errors.Wrap(err, "error killing instance") | ||
} | ||
killed++ | ||
if killed == extraNodes { | ||
log.WithFields(log.Fields{ | ||
"Killed Nodes": killed, | ||
}).Info("Already killed number of extra nodes to get back to desired capacity, pausing here") | ||
break | ||
} | ||
} | ||
ctx, cancel = r.NewContext() | ||
defer cancel() | ||
r.Sleep(ctx) | ||
|
||
continue | ||
} | ||
|
||
// Not sure how this would happen off-hand? | ||
log.WithFields(log.Fields{ | ||
"Current desired capacity": curDesiredCapacity, | ||
"Final desired capacity": finDesiredCapacity, | ||
"Old nodes": oldCount, | ||
"Healthy nodes": healthyCount, | ||
"Extra nodes": extraNodes, | ||
}).Error("Unknown condition hit") | ||
return errors.New("undefined error") | ||
} | ||
} |
Oops, something went wrong.