Skip to content

Commit

Permalink
feat: Parallel image pulls
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmidyson committed Jul 11, 2023
1 parent 8fc9958 commit ce042d3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 37 deletions.
99 changes: 63 additions & 36 deletions cmd/mindthegap/create/imagebundle/image_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package imagebundle

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/mesosphere/dkp-cli-runtime/core/output"

Expand All @@ -29,10 +31,11 @@ import (

func NewCommand(out output.Output) *cobra.Command {
var (
configFile string
platforms []platform
outputFile string
overwrite bool
configFile string
platforms []platform
outputFile string
overwrite bool
imagePullConcurrency int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -119,6 +122,15 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort registries for deterministic ordering.
regNames := cfg.SortedRegistryNames()

eg, egCtx := errgroup.WithContext(context.Background())
eg.SetLimit(imagePullConcurrency)

pullGauge := &output.ProgressGauge{}
pullGauge.SetCapacity(cfg.TotalImages())
pullGauge.SetStatus("Pulling requested images")

out.StartOperationWithProgress(pullGauge)

for _, registryName := range regNames {
registryConfig := cfg[registryName]

Expand All @@ -129,6 +141,7 @@ func NewCommand(out output.Output) *cobra.Command {
"",
)
if err != nil {
out.EndOperationWithStatus(output.Failure())
out.Error(err, "error configuring TLS for source registry")
os.Exit(2)
}
Expand All @@ -151,50 +164,62 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort images for deterministic ordering.
imageNames := registryConfig.SortedImageNames()

for _, imageName := range imageNames {
destRemoteOpts = append(destRemoteOpts, remote.WithContext(egCtx))
sourceRemoteOpts = append(sourceRemoteOpts, remote.WithContext(egCtx))

for i := range imageNames {
imageName := imageNames[i]
imageTags := registryConfig.Images[imageName]
for _, imageTag := range imageTags {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)
out.StartOperation(
fmt.Sprintf("Copying %s (platforms: %v)",
srcImageName, platforms,
),
)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

out.EndOperationWithStatus(output.Success())

for j := range imageTags {
imageTag := imageTags[j]

eg.Go(func() error {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...,
)
if err != nil {
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
return err
}

pullGauge.Inc()

return nil
})
}
}

err = eg.Wait()

if tr, ok := sourceTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if tr, ok := destTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}
}

out.EndOperationWithStatus(output.Success())

if err := config.WriteSanitizedImagesConfig(cfg, filepath.Join(tempDir, "images.yaml")); err != nil {
return err
}
Expand All @@ -220,6 +245,8 @@ func NewCommand(out output.Output) *cobra.Command {
StringVar(&outputFile, "output-file", "images.tar", "Output file to write image bundle to")
cmd.Flags().
BoolVar(&overwrite, "overwrite", false, "Overwrite image bundle file if it already exists")
cmd.Flags().
IntVar(&imagePullConcurrency, "image-pull-concurrency", 1, "Image pull concurrency")

return cmd
}
16 changes: 16 additions & 0 deletions config/images_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (rsc RegistrySyncConfig) SortedImageNames() []string {
return imageNames
}

func (rsc RegistrySyncConfig) TotalImages() int {
n := 0
for _, imgTag := range rsc.Images {
n += len(imgTag)
}
return n
}

func (rsc RegistrySyncConfig) Clone() RegistrySyncConfig {
images := make(map[string][]string, len(rsc.Images))
for k, v := range rsc.Images {
Expand Down Expand Up @@ -136,6 +144,14 @@ func (ic ImagesConfig) SortedRegistryNames() []string {
return regNames
}

func (ic ImagesConfig) TotalImages() int {
n := 0
for _, rsc := range ic {
n += rsc.TotalImages()
}
return n
}

func ParseImagesConfigFile(configFile string) (ImagesConfig, error) {
f, yamlParseErr := os.Open(configFile)
if yamlParseErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/thediveo/enumflag/v2 v2.0.4
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.12.1
k8s.io/apimachinery v0.27.3
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
Expand Down

0 comments on commit ce042d3

Please sign in to comment.