Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallel image pulls #456

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 63 additions & 36 deletions cmd/mindthegap/create/imagebundle/image_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package imagebundle

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/mesosphere/dkp-cli-runtime/core/output"

Expand All @@ -29,10 +31,11 @@ import (

func NewCommand(out output.Output) *cobra.Command {
var (
configFile string
platforms []platform
outputFile string
overwrite bool
configFile string
platforms []platform
outputFile string
overwrite bool
imagePullConcurrency int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -119,6 +122,15 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort registries for deterministic ordering.
regNames := cfg.SortedRegistryNames()

eg, egCtx := errgroup.WithContext(context.Background())
eg.SetLimit(imagePullConcurrency)

pullGauge := &output.ProgressGauge{}
pullGauge.SetCapacity(cfg.TotalImages())
pullGauge.SetStatus("Pulling requested images")

out.StartOperationWithProgress(pullGauge)

for _, registryName := range regNames {
registryConfig := cfg[registryName]

Expand All @@ -129,6 +141,7 @@ func NewCommand(out output.Output) *cobra.Command {
"",
)
if err != nil {
out.EndOperationWithStatus(output.Failure())
out.Error(err, "error configuring TLS for source registry")
os.Exit(2)
}
Expand All @@ -151,50 +164,62 @@ func NewCommand(out output.Output) *cobra.Command {
// Sort images for deterministic ordering.
imageNames := registryConfig.SortedImageNames()

for _, imageName := range imageNames {
destRemoteOpts = append(destRemoteOpts, remote.WithContext(egCtx))
sourceRemoteOpts = append(sourceRemoteOpts, remote.WithContext(egCtx))

for i := range imageNames {
imageName := imageNames[i]
imageTags := registryConfig.Images[imageName]
for _, imageTag := range imageTags {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)
out.StartOperation(
fmt.Sprintf("Copying %s (platforms: %v)",
srcImageName, platforms,
),
)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}

out.EndOperationWithStatus(output.Success())

for j := range imageTags {
imageTag := imageTags[j]

eg.Go(func() error {
srcImageName := fmt.Sprintf("%s/%s:%s", registryName, imageName, imageTag)

imageIndex, err := images.ManifestListForImage(
srcImageName,
platformsStrings,
sourceRemoteOpts...,
)
if err != nil {
return err
}

destImageName := fmt.Sprintf("%s/%s:%s", reg.Address(), imageName, imageTag)
ref, err := name.ParseReference(destImageName, name.StrictValidation)
if err != nil {
return err
}

if err := remote.WriteIndex(ref, imageIndex, destRemoteOpts...); err != nil {
return err
}

pullGauge.Inc()

return nil
})
}
}

err = eg.Wait()

if tr, ok := sourceTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if tr, ok := destTLSRoundTripper.(*http.Transport); ok {
tr.CloseIdleConnections()
}

if err != nil {
out.EndOperationWithStatus(output.Failure())
return err
}
}

out.EndOperationWithStatus(output.Success())

if err := config.WriteSanitizedImagesConfig(cfg, filepath.Join(tempDir, "images.yaml")); err != nil {
return err
}
Expand All @@ -220,6 +245,8 @@ func NewCommand(out output.Output) *cobra.Command {
StringVar(&outputFile, "output-file", "images.tar", "Output file to write image bundle to")
cmd.Flags().
BoolVar(&overwrite, "overwrite", false, "Overwrite image bundle file if it already exists")
cmd.Flags().
IntVar(&imagePullConcurrency, "image-pull-concurrency", 1, "Image pull concurrency")

return cmd
}
16 changes: 16 additions & 0 deletions config/images_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (rsc RegistrySyncConfig) SortedImageNames() []string {
return imageNames
}

func (rsc RegistrySyncConfig) TotalImages() int {
n := 0
for _, imgTag := range rsc.Images {
n += len(imgTag)
}
return n
}

func (rsc RegistrySyncConfig) Clone() RegistrySyncConfig {
images := make(map[string][]string, len(rsc.Images))
for k, v := range rsc.Images {
Expand Down Expand Up @@ -136,6 +144,14 @@ func (ic ImagesConfig) SortedRegistryNames() []string {
return regNames
}

func (ic ImagesConfig) TotalImages() int {
n := 0
for _, rsc := range ic {
n += rsc.TotalImages()
}
return n
}

func ParseImagesConfigFile(configFile string) (ImagesConfig, error) {
f, yamlParseErr := os.Open(configFile)
if yamlParseErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819
github.com/google/go-containerregistry v0.15.2
github.com/hashicorp/go-getter v1.7.1
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f
github.com/mesosphere/dkp-cli-runtime/core v0.7.2
github.com/mholt/archiver/v3 v3.5.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
Expand All @@ -34,6 +34,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/thediveo/enumflag/v2 v2.0.4
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.12.1
k8s.io/apimachinery v0.27.3
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f h1:2pMHmxCU0Xe9HDLWovHajNJ/eyLLs+oi+zhSApqfSLw=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2-0.20230621124959-168f07e7d74f/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2 h1:dWzl9mdIS14DV7GQbTgVy8EyjFTr5re4V4UUzdqNqzA=
github.com/mesosphere/dkp-cli-runtime/core v0.7.2/go.mod h1:hIC+ZZFofDtkRs1v+TnnGxAhFT5IIXuqVvXMe00zOvw=
github.com/mholt/archiver/v3 v3.5.1 h1:rDjOBX9JSF5BvoJGvjqK479aL70qh9DIpZCl+k7Clwo=
github.com/mholt/archiver/v3 v3.5.1/go.mod h1:e3dqJ7H78uzsRSEACH1joayhuSyhnonssnDhppzS1L4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down