Skip to content

Commit

Permalink
mode execution of success command to repairFinish
Browse files Browse the repository at this point in the history
  • Loading branch information
YZ775 committed Nov 11, 2024
1 parent 5d66e2c commit e278761
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 38 deletions.
16 changes: 10 additions & 6 deletions op/repair_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
type repairExecuteOp struct {
finished bool

entry *cke.RepairQueueEntry
step *cke.RepairStep
entry *cke.RepairQueueEntry
step *cke.RepairStep
cluster *cke.Cluster
}

func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep) cke.Operator {
func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep, cluster *cke.Cluster) cke.Operator {
return &repairExecuteOp{
entry: entry,
step: step,
entry: entry,
step: step,
cluster: cluster,
}
}

Expand All @@ -40,6 +42,7 @@ func (o *repairExecuteOp) NextCommand() cke.Commander {
timeoutSeconds: o.step.CommandTimeoutSeconds,
retries: o.step.CommandRetries,
interval: o.step.CommandInterval,
cluster: o.cluster,
}
}

Expand All @@ -53,6 +56,7 @@ type repairExecuteCommand struct {
timeoutSeconds *int
retries *int
interval *int
cluster *cke.Cluster
}

func (c repairExecuteCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
Expand Down Expand Up @@ -110,7 +114,7 @@ RETRY:
"address": c.entry.Address,
"command": strings.Join(c.command, " "),
})
return repairFinish(ctx, inf, c.entry, false)
return repairFinish(ctx, inf, c.entry, false, c.cluster)
}

func (c repairExecuteCommand) Command() cke.Command {
Expand Down
51 changes: 48 additions & 3 deletions op/repair_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
)

type repairFinishOp struct {
finished bool

entry *cke.RepairQueueEntry
succeeded bool
cluster *cke.Cluster
}

func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool) cke.Operator {
func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) cke.Operator {
return &repairFinishOp{
entry: entry,
succeeded: succeeded,
cluster: cluster,
}
}

Expand All @@ -34,6 +38,7 @@ func (o *repairFinishOp) NextCommand() cke.Commander {
return repairFinishCommand{
entry: o.entry,
succeeded: o.succeeded,
cluster: o.cluster,
}
}

Expand All @@ -44,10 +49,14 @@ func (o *repairFinishOp) Targets() []string {
type repairFinishCommand struct {
entry *cke.RepairQueueEntry
succeeded bool
cluster *cke.Cluster
}

func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
return repairFinish(ctx, inf, c.entry, c.succeeded)
if c.succeeded {

}
return repairFinish(ctx, inf, c.entry, c.succeeded, c.cluster)
}

func (c repairFinishCommand) Command() cke.Command {
Expand All @@ -57,8 +66,44 @@ func (c repairFinishCommand) Command() cke.Command {
}
}

func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool) error {
func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) error {
if succeeded {
//execute Success command
err := func() error {
op, err := entry.GetMatchingRepairOperation(cluster)
if err != nil {
return err
}
if op.SuccessCommand != nil {
err := func() error {
ctx := ctx
timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds
if op.SuccessCommandTimeout != nil {
timeout = *op.SuccessCommandTimeout
}
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout))
defer cancel()
}
args := append(op.SuccessCommand[1:], entry.Address)
command := well.CommandContext(ctx, op.SuccessCommand[0], args...)
return command.Run()
}()
if err != nil {
return err
}
}
return nil
}()
if err != nil {
entry.Status = cke.RepairStatusFailed
log.Warn("SuccessCommand failed", map[string]interface{}{
log.FnError: err,
"index": entry.Index,
"address": entry.Address,
})
}
entry.Status = cke.RepairStatusSucceeded
} else {
entry.Status = cke.RepairStatusFailed
Expand Down
25 changes: 0 additions & 25 deletions op/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,31 +613,6 @@ func GetRepairQueueStatus(ctx context.Context, inf cke.Infrastructure, n *cke.No
}
if healthy {
rqs.RepairCompleted[entry.Address] = true
//execute Success command
op, err := entry.GetMatchingRepairOperation(cluster)
if err != nil {
return cke.RepairQueueStatus{}, err
}
if op.SuccessCommand != nil {
err := func() error {
ctx := ctx
timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds
if op.SuccessCommandTimeout != nil {
timeout = *op.SuccessCommandTimeout
}
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout))
defer cancel()
}
args := append(op.SuccessCommand[1:], entry.Address)
command := well.CommandContext(ctx, op.SuccessCommand[0], args...)
return command.Run()
}()
if err != nil {
return cke.RepairQueueStatus{}, err
}
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if rqs.RepairCompleted[entry.Address] {
ops = append(ops, op.RepairFinishOp(entry, true))
ops = append(ops, op.RepairFinishOp(entry, true, c))
continue
}
switch entry.Status {
Expand Down Expand Up @@ -826,7 +826,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
// Though ErrRepairStepOutOfRange may be caused by real misconfiguration,
// e.g., by decreasing "repair_steps" in cluster.yaml, we treat the error
// as the end of the steps for simplicity.
ops = append(ops, op.RepairFinishOp(entry, false))
ops = append(ops, op.RepairFinishOp(entry, false, c))
continue
}

Expand All @@ -838,7 +838,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if !(step.NeedDrain && entry.IsInCluster()) {
ops = append(ops, op.RepairExecuteOp(entry, step))
ops = append(ops, op.RepairExecuteOp(entry, step, c))
continue
}
// DrainBackOffExpire has been confirmed, so start drain now.
Expand All @@ -849,7 +849,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if rqs.DrainCompleted[entry.Address] {
ops = append(ops, op.RepairExecuteOp(entry, step))
ops = append(ops, op.RepairExecuteOp(entry, step, c))
continue
}
if entry.LastTransitionTime.Before(evictionStartLimit) {
Expand Down

0 comments on commit e278761

Please sign in to comment.