-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[s3 cache] Faster download with parallelism #5604
base: master
Are you sure you want to change the base?
Conversation
e47360e
to
8cee779
Compare
I will deploy this PR internally to check how it behaves at scale, but a review is always interesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how much memory does this use?
I wonder if instead of trying to get this functionality behind ReadAt
call (where it really does not belong because it is only for a specific range), this return value could instead implement https://pkg.go.dev/io#WriterTo that could be detected on the caller side where it is writing the blob to disk (maybe even via WriteAt()).
input *S3DownloaderInput | ||
} | ||
|
||
type S3DownloaderInput struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't need to be public if only used by private functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
func newDownloader(input *S3DownloaderInput) *S3Downloader { | ||
if input.Parallelism == 0 { | ||
input.Parallelism = 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can test this, but for registry layers, our default is 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These default are not really used, they are overriden in s3.go, where the default value is 4. I changed them anyway.
Signed-off-by: Bertrand Paquet <[email protected]>
8cee779
to
007f08e
Compare
If we do the hypothesis the solver will read faster than we can download from s3, we will use parallelism * part size, so 20MB by default. We can also build a wrapper to "convert" the |
if downloadPartSizeInt <= 0 { | ||
return Config{}, errors.Errorf("download_part_size must be a positive integer") | ||
} | ||
downloadParallism = downloadPartSizeInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downloadParallism = downloadPartSizeInt | |
downloadPartSize = downloadPartSizeInt |
Right?
for i := 0; i < r.totalChunk; i++ { | ||
r.inChan <- i | ||
} | ||
for k := 0; k < r.input.Parallelism; k++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: since go1.22, you can now use the quite shorter range-over-integer variant:
for i := 0; i < r.totalChunk; i++ { | |
r.inChan <- i | |
} | |
for k := 0; k < r.input.Parallelism; k++ { | |
for i := range r.totalChunk { | |
r.inChan <- i | |
} | |
for range r.input.Parallelism { |
for { | ||
if r.chunks[r.currentChunk].done { | ||
break | ||
} | ||
err := <-r.outChan | ||
if err != nil { | ||
r.Close() | ||
return 0, err | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would most probably replace done
with an unbuffered chan struct{}
and go with (untested):
for { | |
if r.chunks[r.currentChunk].done { | |
break | |
} | |
err := <-r.outChan | |
if err != nil { | |
r.Close() | |
return 0, err | |
} | |
} | |
readyOrConsume: | |
for { | |
select { | |
case <-r.chunks[r.currentChunk].done: | |
// this chunk is okay, can process | |
break readyOrConsome | |
case err := <-r.outChan: | |
// process error in case there is any | |
} | |
} |
And close the done
channel from the downloadChunk
function before returning.
Which IMO looks more idiomatic and removes the race on done
(even if I assume this particular race wouldn't have any side effect).
if err != nil { | ||
return err | ||
} | ||
n, err := io.Copy(&r.chunks[chunk], resp.Body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: for the sake of performance / brevity, I would suggest going with a one-time copy of the result of io.ReadAll(resp.Body)
into r.chunks[chunk]
. This would avoid the need to reference r.chunks[chunk]
to get a valid io.Writer
implementation, along with the need for chunkStatus
to even implement io.Writer
. Also that would remove the need for a writeOffset
.
No strong opinion however if you think this is clearer like that.
size int64 | ||
start int64 | ||
buffer []byte | ||
io.Writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick note: this actually declares an anonymous struct field with type io.Writer
, which I believe is not what you're looking for, since this field's Write
method is shadowed by that of chunkStatus
below anyways.
If what you're trying to do is to make explicit and enforce the fact that chunkStatus
implements io.Writer
, the more idiomatic way to go is to add this line somewhere in the file (most generally this is below the struct declaration itself):
var _ io.Writer = &chunkStatus{}
But Go's interface implementation being implicit, this is not even necessary.
} | ||
|
||
type s3Reader struct { | ||
content.ReaderAt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as for io.Writer
above.
@@ -141,20 +145,48 @@ func getConfig(attrs map[string]string) (Config, error) { | |||
uploadParallelism = uploadParallelismInt | |||
} | |||
|
|||
downloadParallism := 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ultra nitpick:
downloadParallism := 4 | |
downloadParallelism := 4 |
No description provided.