Skip to content

Commit

Permalink
dont kill busy workers
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-hontarau committed Aug 12, 2024
1 parent 5959eeb commit e1a0fce
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 7 deletions.
57 changes: 51 additions & 6 deletions internal/auto_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
return fmt.Errorf("could not get worker pool: %w", err)
}

// remove drained workers from the previous autoscaler run that were busy
err = s.extractAndKillDrainedWorkers(ctx, logger, workerPool)
if err != nil {
return fmt.Errorf("could not extract and kill already drained workers: %w", err)
}

asg, err := s.controller.GetAutoscalingGroup(ctx)
if err != nil {
return fmt.Errorf("could not get autoscaling group: %w", err)
Expand Down Expand Up @@ -111,6 +117,24 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {

idleWorkers := state.IdleWorkers()

err = s.drainWorkers(ctx, decision, idleWorkers, logger)
if err != nil {
return fmt.Errorf("could not drain workers: %w", err)
}

// fetch again workers and kill only drained non-busy workers
workerPool, err = s.controller.GetWorkerPool(ctx)
if err != nil {
return fmt.Errorf("could not get worker pool: %w", err)
}
err = s.extractAndKillDrainedWorkers(ctx, logger, workerPool)
if err != nil {
return fmt.Errorf("could not extract and kill drained workers: %w", err)
}
return nil
}

func (s AutoScaler) drainWorkers(ctx context.Context, decision Decision, idleWorkers []Worker, logger *slog.Logger) error {
for i := 0; i < decision.ScalingSize; i++ {
worker := idleWorkers[i]

Expand All @@ -122,20 +146,41 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
)
logger.Info("scaling down ASG and killing worker")

drained, err := s.controller.DrainWorker(ctx, worker.ID)
_, err := s.controller.DrainWorker(ctx, worker.ID)
if err != nil {
return fmt.Errorf("could not drain worker: %w", err)
}
}
return nil
}

if !drained {
logger.Warn("worker was busy, stopping the scaling down process")
return nil
func (s AutoScaler) extractAndKillDrainedWorkers(ctx context.Context, logger *slog.Logger, workerPool *WorkerPool) error {
drainedWorkers := workerPool.ExtractDrainedWorkers()
err := s.killWorkers(ctx, logger, drainedWorkers)
if err != nil {
return err
}
return nil
}

func (s AutoScaler) killWorkers(ctx context.Context, logger *slog.Logger, workers []Worker) error {
for _, worker := range workers {
if worker.Busy {
continue
}
_, instanceID, err := worker.InstanceIdentity()
if err != nil {
logger.Error("could not determine instance ID", "instance_id", instanceID)
continue
}
logger.With(
"worker_id", worker.ID,
"instance_id", instanceID,
).Info("killing drained worker")

if err := s.controller.KillInstance(ctx, string(instanceID)); err != nil {
return fmt.Errorf("could not kill instance: %w", err)
return fmt.Errorf("could not kill drained instance: %w", err)
}
}

return nil
}
56 changes: 55 additions & 1 deletion internal/auto_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ func TestAutoScalerScalingNone(t *testing.T) {
require.NoError(t, err)
}

func TestAutoScalerScalingNoneWithDrainedWorkers(t *testing.T) {
var buf bytes.Buffer
h := slog.NewTextHandler(&buf, nil)

cfg := internal.RuntimeConfig{}

ctrl := new(MockController)
defer ctrl.AssertExpectations(t)

scaler := internal.NewAutoScaler(ctrl, slog.New(h))

ctrl.On("GetWorkerPool", mock.Anything).Return(&internal.WorkerPool{
Workers: []internal.Worker{
{
ID: "1",
Metadata: `{"asg_id": "group", "instance_id": "instance"}`,
},
{
ID: "2",
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
Drained: true,
},
{
ID: "3",
Metadata: `{"asg_id": "group", "instance_id": "instance3"}`,
Drained: true,
Busy: true,
},
},
}, nil)
ctrl.On("KillInstance", mock.Anything, "instance2").Return(nil)
ctrl.On("GetAutoscalingGroup", mock.Anything).Return(&types.AutoScalingGroup{
AutoScalingGroupName: ptr("group"),
MinSize: ptr(int32(1)),
MaxSize: ptr(int32(3)),
DesiredCapacity: ptr(int32(2)),
}, nil)
err := scaler.Scale(context.Background(), cfg)
require.NoError(t, err)
}

func TestAutoScalerScalingUp(t *testing.T) {
var buf bytes.Buffer
h := slog.NewTextHandler(&buf, nil)
Expand Down Expand Up @@ -100,7 +141,20 @@ func TestAutoScalerScalingDown(t *testing.T) {
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
},
},
}, nil)
}, nil).Once()
ctrl.On("GetWorkerPool", mock.Anything).Return(&internal.WorkerPool{
Workers: []internal.Worker{
{
ID: "1",
Metadata: `{"asg_id": "group", "instance_id": "instance"}`,
Drained: true,
},
{
ID: "2",
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
},
},
}, nil).Once()
ctrl.On("GetAutoscalingGroup", mock.Anything).Return(&types.AutoScalingGroup{
AutoScalingGroupName: ptr("group"),
MinSize: ptr(int32(1)),
Expand Down
14 changes: 14 additions & 0 deletions internal/worker_pool_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ type WorkerPool struct {
Workers []Worker `graphql:"workers" json:"workers"`
}

func (wp *WorkerPool) ExtractDrainedWorkers() []Worker {
drainedWorkers := make([]Worker, 0)
nonDrainedWorkers := make([]Worker, 0)
for _, worker := range wp.Workers {
if worker.Drained {
drainedWorkers = append(drainedWorkers, worker)
continue
}
nonDrainedWorkers = append(nonDrainedWorkers, worker)
}
wp.Workers = nonDrainedWorkers
return drainedWorkers
}

type WorkerPoolDetails struct {
Pool *WorkerPool `graphql:"workerPool(id: $workerPool)"`
}

0 comments on commit e1a0fce

Please sign in to comment.