Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[s3 cache] Faster download with parallelism #5604

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bpaquet
Copy link
Contributor

@bpaquet bpaquet commented Dec 17, 2024

No description provided.

@bpaquet bpaquet force-pushed the parallel_download branch 2 times, most recently from e47360e to 8cee779 Compare December 17, 2024 21:06
@bpaquet bpaquet changed the title Implement a parallel download on S3 [s3 cache] Faster download with parallelism Dec 17, 2024
@bpaquet
Copy link
Contributor Author

bpaquet commented Dec 17, 2024

I will deploy this PR internally to check how it behaves at scale, but a review is always interesting

@bpaquet bpaquet marked this pull request as ready for review December 17, 2024 21:11
Copy link
Member

@tonistiigi tonistiigi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how much memory does this use?

I wonder if instead of trying to get this functionality behind ReadAt call (where it really does not belong because it is only for a specific range), this return value could instead implement https://pkg.go.dev/io#WriterTo that could be detected on the caller side where it is writing the blob to disk (maybe even via WriteAt()).

input *S3DownloaderInput
}

type S3DownloaderInput struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't need to be public if only used by private functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


func newDownloader(input *S3DownloaderInput) *S3Downloader {
if input.Parallelism == 0 {
input.Parallelism = 8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can test this, but for registry layers, our default is 4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These default are not really used, they are overriden in s3.go, where the default value is 4. I changed them anyway.

@bpaquet
Copy link
Contributor Author

bpaquet commented Dec 18, 2024

So how much memory does this use?

I wonder if instead of trying to get this functionality behind ReadAt call (where it really does not belong because it is only for a specific range), this return value could instead implement https://pkg.go.dev/io#WriterTo that could be detected on the caller side where it is writing the blob to disk (maybe even via WriteAt()).

If we do the hypothesis the solver will read faster than we can download from s3, we will use parallelism * part size, so 20MB by default.

We can also build a wrapper to "convert" the WriterAt from the SDK to a ReaderAt, but it seems more complicated to me than this solution, especially from a memory allocation point of view. We can anyway consider is if you think it's better.

if downloadPartSizeInt <= 0 {
return Config{}, errors.Errorf("download_part_size must be a positive integer")
}
downloadParallism = downloadPartSizeInt

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
downloadParallism = downloadPartSizeInt
downloadPartSize = downloadPartSizeInt

Right?

Comment on lines +133 to +136
for i := 0; i < r.totalChunk; i++ {
r.inChan <- i
}
for k := 0; k < r.input.Parallelism; k++ {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: since go1.22, you can now use the quite shorter range-over-integer variant:

Suggested change
for i := 0; i < r.totalChunk; i++ {
r.inChan <- i
}
for k := 0; k < r.input.Parallelism; k++ {
for i := range r.totalChunk {
r.inChan <- i
}
for range r.input.Parallelism {

See this playground link

Comment on lines +178 to +187
for {
if r.chunks[r.currentChunk].done {
break
}
err := <-r.outChan
if err != nil {
r.Close()
return 0, err
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would most probably replace done with an unbuffered chan struct{} and go with (untested):

Suggested change
for {
if r.chunks[r.currentChunk].done {
break
}
err := <-r.outChan
if err != nil {
r.Close()
return 0, err
}
}
readyOrConsume:
for {
select {
case <-r.chunks[r.currentChunk].done:
// this chunk is okay, can process
break readyOrConsome
case err := <-r.outChan:
// process error in case there is any
}
}

And close the done channel from the downloadChunk function before returning.

Which IMO looks more idiomatic and removes the race on done (even if I assume this particular race wouldn't have any side effect).

if err != nil {
return err
}
n, err := io.Copy(&r.chunks[chunk], resp.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for the sake of performance / brevity, I would suggest going with a one-time copy of the result of io.ReadAll(resp.Body) into r.chunks[chunk]. This would avoid the need to reference r.chunks[chunk] to get a valid io.Writer implementation, along with the need for chunkStatus to even implement io.Writer. Also that would remove the need for a writeOffset.

No strong opinion however if you think this is clearer like that.

size int64
start int64
buffer []byte
io.Writer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick note: this actually declares an anonymous struct field with type io.Writer, which I believe is not what you're looking for, since this field's Write method is shadowed by that of chunkStatus below anyways.

If what you're trying to do is to make explicit and enforce the fact that chunkStatus implements io.Writer, the more idiomatic way to go is to add this line somewhere in the file (most generally this is below the struct declaration itself):

var _ io.Writer = &chunkStatus{}

But Go's interface implementation being implicit, this is not even necessary.

}

type s3Reader struct {
content.ReaderAt

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for io.Writer above.

@@ -141,20 +145,48 @@ func getConfig(attrs map[string]string) (Config, error) {
uploadParallelism = uploadParallelismInt
}

downloadParallism := 4

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also ultra nitpick:

Suggested change
downloadParallism := 4
downloadParallelism := 4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants