-
Notifications
You must be signed in to change notification settings - Fork 11
Multithreaded SplitStream creation #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Multithreaded SplitStream creation #95
Conversation
Suspicious test failures, timing out on pull...
|
src/oci/mod.rs
Outdated
self.progress | ||
.println(format!("Fetching config {}", hex::encode(config_sha256)))?; | ||
let raw_config = self.proxy.fetch_config_raw(&self.img).await?; | ||
let config = ImageConfiguration::from_reader(raw_config.as_slice())?; | ||
|
||
let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.
Here's some example code that does this, and uses a JoinSet as a way to both keep track of the tasks, but also act as a cap on maximum concurrency.
Among the major benefits of this model is that these are just async tasks, which means cancellation works naturally; dropping the future holding the joinset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this is a super complex topic; I would say we should turn to rayon if we happened to be doing a lot of computation on in-memory array-like structures and could just use their super high level .par_iter()
and .reduce()
style APIs. But this is far from that.
(Nothing wrong with crossbeam either, but I am not sure we need it versus just the dedicated tokio mpsc channels either? But this concern goes away too if we just use spawn_blocking
as our communication mechanism between synchronous threads and async)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.
This was also my first instinct. I feel like this would make things a lot easier to generalize to the "multiple layers at the same time" level, as well...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree this became a bit more complicated than anticipated. Converting these into a bunch of async operations was my first approach as well, but there were some issues with some values not being sendable.
Thanks for the suggestion, I'll try out the async approach again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but there were some issues with some values not being sendable.
Right, this is https://tmandry.gitlab.io/blog/posts/2023-03-01-scoped-tasks/ (see also https://without.boats/blog/the-scoped-task-trilemma/ )
The simplest thing to do here is drop the borrowing (making things Send
) most usually by operating on owned data. For example in the case of fetching a container image layer, while the proxy exposes get_blob() -> impl AsyncBufRead + Send + Unpin
honestly in the end it's just a file descriptor, and we should change the proxy to have a get_blob_fd()
or so that gives you that underlying pipe file descriptor. Once we have that, it's a File/OwnedFd
that we can just directly move ownership of into a worker thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, trying out an async approach using tokio's JoinSet results in no performance gains. Weird. I can see the layers being worked on in parallel, as I can see multiple progress bars at once.
https://github.com/Johan-Liebert1/composefs-rs/blob/splitstream-tokio-mt/src/oci/mod.rs#L168
The main bottleneck in this case would be writing the external objects, which has a lot of synchronous syscalls. Wrapping them in their own spawn_blocking
call also doesn't seem to have much of an affect on the performance.
https://github.com/Johan-Liebert1/composefs-rs/blob/splitstream-tokio-mt/src/repository.rs#L85
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running a quick and dirty test, it does seem like the layers that are smaller are being processed first while other layers are being processed in the background. So there does seem to be some sort of parallelism going on here. What I find really unintuitive here is that it's not better than the vanilla approach
@@ -1,2 +1,3 @@ | |||
/Cargo.lock | |||
/target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this change belongs to a different commit...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, moved it to a separate commit
src/oci/mod.rs
Outdated
self.progress | ||
.println(format!("Fetching config {}", hex::encode(config_sha256)))?; | ||
let raw_config = self.proxy.fetch_config_raw(&self.img).await?; | ||
let config = ImageConfiguration::from_reader(raw_config.as_slice())?; | ||
|
||
let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.
This was also my first instinct. I feel like this would make things a lot easier to generalize to the "multiple layers at the same time" level, as well...
5c3cfac
to
0422b84
Compare
Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
Separating out the writer allows us to abstract the parallelism of writing the splitstream. Signed-off-by: Pragyan Poudyal <[email protected]>
Add dependencies on Rayon and Crossbeam Have two modes, single and multi-threaded, for Zstd Encoder Spwan threads in splitstream writer for writing external objects and spawn separate threads for Zstd Encoder. We handle communication between these threads using channels Any image's layers will be handed off to one of the Encoder threads. For now we only have one channel for external object writing, but multiple for Encoders. The reasoning for this is Encoder threads are usually CPU bound while the object writer threads are more IO bound Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
Expose an `update_sha` method from ZstdWriter so we can update the rolling SHA256 hash value from SplitStreamWriter Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
If anything goes wrong at any point, short circuting and stopping the thread would be the better option Signed-off-by: Pragyan Poudyal <[email protected]>
If we have more threads than we have unprocessed layers, some of the cloned senders aren't dropped and the main thread hangs on the result receiving loop. We make sure here to not spawn more threads than the number of unhandled layers Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
0422b84
to
dc3dd52
Compare
#111 lays some groundwork that would probably make this easier to do, particularly if you decide to go with the tokio-based approach (which makes more and more sense to me)... |
Related to - #62
With this we spawn multiple threads for external object writing and Zstd encoding + rolling SHA256sum update. Currently this is only supported for
Oci::Pull
operation and currently it doesn't handle multiple layers in parallel.Communication between threads are handled by crossbeam mpmc channels. For now we only have one channel for external object writing, but multiple for Encoders.
Performance gain ~3x