From ce042d3bc9d7e97ef8d27c89ebc68757e389f359 Mon Sep 17 00:00:00 2001 From: Jimmi Dyson Date: Tue, 11 Jul 2023 11:54:03 +0100 Subject: [PATCH] feat: Parallel image pulls --- .../create/imagebundle/image_bundle.go | 99 ++++++++++++------- config/images_config.go | 16 +++ go.mod | 2 +- 3 files changed, 80 insertions(+), 37 deletions(-) diff --git a/cmd/mindthegap/create/imagebundle/image_bundle.go b/cmd/mindthegap/create/imagebundle/image_bundle.go index cba6f17d..543c2651 100644 --- a/cmd/mindthegap/create/imagebundle/image_bundle.go +++ b/cmd/mindthegap/create/imagebundle/image_bundle.go @@ -4,6 +4,7 @@ package imagebundle import ( + "context" "errors" "fmt" "net/http" @@ -15,6 +16,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "github.com/mesosphere/dkp-cli-runtime/core/output" @@ -29,10 +31,11 @@ import ( func NewCommand(out output.Output) *cobra.Command { var ( - configFile string - platforms []platform - outputFile string - overwrite bool + configFile string + platforms []platform + outputFile string + overwrite bool + imagePullConcurrency int ) cmd := &cobra.Command{ @@ -119,6 +122,15 @@ func NewCommand(out output.Output) *cobra.Command { // Sort registries for deterministic ordering. regNames := cfg.SortedRegistryNames() + eg, egCtx := errgroup.WithContext(context.Background()) + eg.SetLimit(imagePullConcurrency) + + pullGauge := &output.ProgressGauge{} + pullGauge.SetCapacity(cfg.TotalImages()) + pullGauge.SetStatus("Pulling requested images") + + out.StartOperationWithProgress(pullGauge) + for _, registryName := range regNames { registryConfig := cfg[registryName] @@ -129,6 +141,7 @@ func NewCommand(out output.Output) *cobra.Command { "", ) if err != nil { + out.EndOperationWithStatus(output.Failure()) out.Error(err, "error configuring TLS for source registry") os.Exit(2) } @@ -151,50 +164,62 @@ func NewCommand(out output.Output) *cobra.Command { // Sort images for deterministic ordering. imageNames := registryConfig.SortedImageNames() - for _, imageName := range imageNames { + destRemoteOpts = append(destRemoteOpts, remote.WithContext(egCtx)) + sourceRemoteOpts = append(sourceRemoteOpts, remote.WithContext(egCtx)) + + for i := range imageNames { + imageName := imageNames[i] imageTags := registryConfig.Images[imageName] - for _, imageTag := range imageTags { - srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag) - out.StartOperation( - fmt.Sprintf("Copying %s (platforms: %v)", - srcImageName, platforms, - ), - ) - - imageIndex, err := images.ManifestListForImage( - srcImageName, - platformsStrings, - sourceRemoteOpts...) - if err != nil { - out.EndOperationWithStatus(output.Failure()) - return err - } - - destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag) - ref, err := name.ParseReference(destImageName, name.StrictValidation) - if err != nil { - out.EndOperationWithStatus(output.Failure()) - return err - } - - if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil { - out.EndOperationWithStatus(output.Failure()) - return err - } - - out.EndOperationWithStatus(output.Success()) + + for j := range imageTags { + imageTag := imageTags[j] + + eg.Go(func() error { + srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag) + + imageIndex, err := images.ManifestListForImage( + srcImageName, + platformsStrings, + sourceRemoteOpts..., + ) + if err != nil { + return err + } + + destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag) + ref, err := name.ParseReference(destImageName, name.StrictValidation) + if err != nil { + return err + } + + if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil { + return err + } + + pullGauge.Inc() + + return nil + }) } } + err = eg.Wait() + if tr, ok := sourceTLSRoundTripper.(*http.Transport); ok { tr.CloseIdleConnections() } - if tr, ok := destTLSRoundTripper.(*http.Transport); ok { tr.CloseIdleConnections() } + + if err != nil { + out.EndOperationWithStatus(output.Failure()) + return err + } } + out.EndOperationWithStatus(output.Success()) + if err := config.WriteSanitizedImagesConfig(cfg, filepath.Join(tempDir, "images.yaml")); err != nil { return err } @@ -220,6 +245,8 @@ func NewCommand(out output.Output) *cobra.Command { StringVar(&outputFile, "output-file", "images.tar", "Output file to write image bundle to") cmd.Flags(). BoolVar(&overwrite, "overwrite", false, "Overwrite image bundle file if it already exists") + cmd.Flags(). + IntVar(&imagePullConcurrency, "image-pull-concurrency", 1, "Image pull concurrency") return cmd } diff --git a/config/images_config.go b/config/images_config.go index c3f1dc3f..e41796c4 100644 --- a/config/images_config.go +++ b/config/images_config.go @@ -37,6 +37,14 @@ func (rsc RegistrySyncConfig) SortedImageNames() []string { return imageNames } +func (rsc RegistrySyncConfig) TotalImages() int { + n := 0 + for _, imgTag := range rsc.Images { + n += len(imgTag) + } + return n +} + func (rsc RegistrySyncConfig) Clone() RegistrySyncConfig { images := make(map[string][]string, len(rsc.Images)) for k, v := range rsc.Images { @@ -136,6 +144,14 @@ func (ic ImagesConfig) SortedRegistryNames() []string { return regNames } +func (ic ImagesConfig) TotalImages() int { + n := 0 + for _, rsc := range ic { + n += rsc.TotalImages() + } + return n +} + func ParseImagesConfigFile(configFile string) (ImagesConfig, error) { f, yamlParseErr := os.Open(configFile) if yamlParseErr != nil { diff --git a/go.mod b/go.mod index b9e6d630..5ed39c35 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/thediveo/enumflag/v2 v2.0.4 + golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.12.1 k8s.io/apimachinery v0.27.3 @@ -187,7 +188,6 @@ require ( golang.org/x/mod v0.10.0 // indirect golang.org/x/net v0.11.0 // indirect golang.org/x/oauth2 v0.9.0 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.9.0 // indirect golang.org/x/term v0.9.0 // indirect golang.org/x/text v0.10.0 // indirect