Skip to content

Commit

Permalink
Use errorUtils for exec goroutine aggregation
Browse files Browse the repository at this point in the history
This functionality is already used within the project instead of the
custom map function.

Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Oct 16, 2024
1 parent 7abeb52 commit ce41995
Showing 1 changed file with 27 additions and 53 deletions.
80 changes: 27 additions & 53 deletions cmd/crictl/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"errors"
"fmt"
"net/url"
"runtime"
"sync"
"time"

mobyterm "github.com/moby/term"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
errorUtils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
remoteclient "k8s.io/client-go/tools/remotecommand"
internalapi "k8s.io/cri-api/pkg/apis"
Expand Down Expand Up @@ -201,39 +200,37 @@ var runtimeExecCommand = &cli.Command{
transport: c.String(transportFlag),
}

maxParallel := 1
if c.Bool("parallel") {
maxParallel = runtime.NumCPU()
}

results := mapParallel(ids, maxParallel, func(id string) error {
optsCopy := *&opts
optsCopy.id = id
funcs := []func() error{}
for _, id := range ids {
funcs = append(funcs, func() error {
optsCopy := *&opts
optsCopy.id = id

if outputContainerID {
fmt.Println(id + ":")
}
if c.Bool("sync") {
exitCode, err := ExecSync(runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s synchronously: %w", id, err)
}
if exitCode != 0 {
return cli.Exit("non-zero exit code", exitCode)
if outputContainerID {
fmt.Println(id + ":")
}
} else {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
err = Exec(ctx, runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s: %w", id, err)
if c.Bool("sync") {
exitCode, err := ExecSync(runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s synchronously: %w", id, err)
}
if exitCode != 0 {
return cli.Exit("non-zero exit code", exitCode)
}
} else {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
err = Exec(ctx, runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s: %w", id, err)
}
}
}

return nil
})
return nil
})
}

errs := errors.Join(results...)
errs := errorUtils.AggregateGoroutines(funcs...)

if ignoreErrors {
logrus.Debugf("Ignoring errors: %v", errs)
Expand All @@ -244,29 +241,6 @@ var runtimeExecCommand = &cli.Command{
},
}

func mapParallel[T1 any, T2 any](input []T1, maxParallel int, fn func(T1) T2) []T2 {
wg := &sync.WaitGroup{}
wg.Add(len(input))

results := make([]T2, len(input))
maxParallelChan := make(chan struct{}, maxParallel)

for i := range input {
maxParallelChan <- struct{}{}
go func(index int, x T1) {
defer wg.Done()

result := fn(x)
results[index] = result

<-maxParallelChan
}(i, input[i])
}

wg.Wait()
return results
}

// ExecSync sends an ExecSyncRequest to the server, and parses
// the returned ExecSyncResponse. The function returns the corresponding exit
// code beside an general error.
Expand Down

0 comments on commit ce41995

Please sign in to comment.