Skip to content

Commit

Permalink
feat: Parallel image pulls (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmidyson authored Jul 11, 2023
1 parent 8fc9958 commit 4341db2
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 40 deletions.
99 changes: 63 additions & 36 deletions cmd/mindthegap/create/imagebundle/image_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package imagebundle

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/mesosphere/dkp-cli-runtime/core/output"

Expand All @@ -29,10 +31,11 @@ import (

func NewCommand(out output.Output) *cobra.Command {
var (
configFile string
platforms []platform
outputFile string
overwrite bool
configFile string
platforms []platform
outputFile string
overwrite bool
imagePullConcurrency int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -119,6 +122,15 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort registries for deterministic ordering.
regNames := cfg.SortedRegistryNames()

eg, egCtx := errgroup.WithContext(context.Background())
eg.SetLimit(imagePullConcurrency)

pullGauge := &output.ProgressGauge{}
pullGauge.SetCapacity(cfg.TotalImages())
pullGauge.SetStatus("Pulling requested images")

out.StartOperationWithProgress(pullGauge)

for _, registryName := range regNames {
registryConfig := cfg[registryName]

Expand All @@ -129,6 +141,7 @@ func NewCommand(out output.Output) *cobra.Command {
"",
)
if err != nil {
out.EndOperationWithStatus(output.Failure())
out.Error(err, "error configuring TLS for source registry")
os.Exit(2)
}
Expand All @@ -151,50 +164,62 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort images for deterministic ordering.
imageNames := registryConfig.SortedImageNames()

for _, imageName := range imageNames {
destRemoteOpts = append(destRemoteOpts, remote.WithContext(egCtx))
sourceRemoteOpts = append(sourceRemoteOpts, remote.WithContext(egCtx))

for i := range imageNames {
imageName := imageNames[i]
imageTags := registryConfig.Images[imageName]
for _, imageTag := range imageTags {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)
out.StartOperation(
fmt.Sprintf("Copying %s (platforms: %v)",
srcImageName, platforms,
),
)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

out.EndOperationWithStatus(output.Success())

for j := range imageTags {
imageTag := imageTags[j]

eg.Go(func() error {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...,
)
if err != nil {
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
return err
}

pullGauge.Inc()

return nil
})
}
}

err = eg.Wait()

if tr, ok := sourceTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if tr, ok := destTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}
}

out.EndOperationWithStatus(output.Success())

if err := config.WriteSanitizedImagesConfig(cfg, filepath.Join(tempDir, "images.yaml")); err != nil {
return err
}
Expand All @@ -220,6 +245,8 @@ func NewCommand(out output.Output) *cobra.Command {
StringVar(&outputFile, "output-file", "images.tar", "Output file to write image bundle to")
cmd.Flags().
BoolVar(&overwrite, "overwrite", false, "Overwrite image bundle file if it already exists")
cmd.Flags().
IntVar(&imagePullConcurrency, "image-pull-concurrency", 1, "Image pull concurrency")

return cmd
}
16 changes: 16 additions & 0 deletions config/images_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (rsc RegistrySyncConfig) SortedImageNames() []string {
return imageNames
}

func (rsc RegistrySyncConfig) TotalImages() int {
n := 0
for _, imgTag := range rsc.Images {
n += len(imgTag)
}
return n
}

func (rsc RegistrySyncConfig) Clone() RegistrySyncConfig {
images := make(map[string][]string, len(rsc.Images))
for k, v := range rsc.Images {
Expand Down Expand Up @@ -136,6 +144,14 @@ func (ic ImagesConfig) SortedRegistryNames() []string {
return regNames
}

func (ic ImagesConfig) TotalImages() int {
n := 0
for _, rsc := range ic {
n += rsc.TotalImages()
}
return n
}

func ParseImagesConfigFile(configFile string) (ImagesConfig, error) {
f, yamlParseErr := os.Open(configFile)
if yamlParseErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819
github.com/google/go-containerregistry v0.15.2
github.com/hashicorp/go-getter v1.7.1
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f
github.com/mesosphere/dkp-cli-runtime/core v0.7.2
github.com/mholt/archiver/v3 v3.5.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
Expand All @@ -34,6 +34,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/thediveo/enumflag/v2 v2.0.4
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.12.1
k8s.io/apimachinery v0.27.3
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f h1:2pMHmxCU0Xe9HDLWovHajNJ/eyLLs+oi+zhSApqfSLw=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2 h1:dWzl9mdIS14DV7GQbTgVy8EyjFTr5re4V4UUzdqNqzA=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mholt/archiver/v3 v3.5.1 h1:rDjOBX9JSF5BvoJGvjqK479aL70qh9DIpZCl+k7Clwo=
github.com/mholt/archiver/v3 v3.5.1/go.mod h1:e3dqJ7H78uzsRSEACH1joayhuSyhnonssnDhppzS1L4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down

0 comments on commit 4341db2

Please sign in to comment.