Skip to content

Multithreaded SplitStream creation #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

Johan-Liebert1
Copy link
Collaborator

@Johan-Liebert1 Johan-Liebert1 commented Apr 7, 2025

Related to - #62

With this we spawn multiple threads for external object writing and Zstd encoding + rolling SHA256sum update. Currently this is only supported for Oci::Pull operation and currently it doesn't handle multiple layers in parallel.

Communication between threads are handled by crossbeam mpmc channels. For now we only have one channel for external object writing, but multiple for Encoders.

Performance gain ~3x

@Johan-Liebert1 Johan-Liebert1 marked this pull request as draft April 7, 2025 12:20
@allisonkarlitskaya
Copy link
Collaborator

Suspicious test failures, timing out on pull...

+ ./cfsctl --repo tmp/sysroot/composefs oci pull containers-storage:c22ddccc8dcd7efeee38efdae72773ceff54eb70e47cdb32357c30aca5bfd7c2
Error: The operation was canceled.

src/oci/mod.rs Outdated
self.progress
.println(format!("Fetching config {}", hex::encode(config_sha256)))?;
let raw_config = self.proxy.fetch_config_raw(&self.img).await?;
let config = ImageConfiguration::from_reader(raw_config.as_slice())?;

let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.

Here's some example code that does this, and uses a JoinSet as a way to both keep track of the tasks, but also act as a cap on maximum concurrency.

See
https://github.com/bootc-dev/bootc/blob/2de8e0d23fb89bed76722ff1466614afacec64b3/lib/src/fsck.rs#L195

Among the major benefits of this model is that these are just async tasks, which means cancellation works naturally; dropping the future holding the joinset.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this is a super complex topic; I would say we should turn to rayon if we happened to be doing a lot of computation on in-memory array-like structures and could just use their super high level .par_iter() and .reduce() style APIs. But this is far from that.

(Nothing wrong with crossbeam either, but I am not sure we need it versus just the dedicated tokio mpsc channels either? But this concern goes away too if we just use spawn_blocking as our communication mechanism between synchronous threads and async)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.

This was also my first instinct. I feel like this would make things a lot easier to generalize to the "multiple layers at the same time" level, as well...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree this became a bit more complicated than anticipated. Converting these into a bunch of async operations was my first approach as well, but there were some issues with some values not being sendable.

Thanks for the suggestion, I'll try out the async approach again

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but there were some issues with some values not being sendable.

Right, this is https://tmandry.gitlab.io/blog/posts/2023-03-01-scoped-tasks/ (see also https://without.boats/blog/the-scoped-task-trilemma/ )

The simplest thing to do here is drop the borrowing (making things Send) most usually by operating on owned data. For example in the case of fetching a container image layer, while the proxy exposes get_blob() -> impl AsyncBufRead + Send + Unpin honestly in the end it's just a file descriptor, and we should change the proxy to have a get_blob_fd() or so that gives you that underlying pipe file descriptor. Once we have that, it's a File/OwnedFd that we can just directly move ownership of into a worker thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, trying out an async approach using tokio's JoinSet results in no performance gains. Weird. I can see the layers being worked on in parallel, as I can see multiple progress bars at once.

https://github.com/Johan-Liebert1/composefs-rs/blob/splitstream-tokio-mt/src/oci/mod.rs#L168

The main bottleneck in this case would be writing the external objects, which has a lot of synchronous syscalls. Wrapping them in their own spawn_blocking call also doesn't seem to have much of an affect on the performance.

https://github.com/Johan-Liebert1/composefs-rs/blob/splitstream-tokio-mt/src/repository.rs#L85

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running a quick and dirty test, it does seem like the layers that are smaller are being processed first while other layers are being processed in the background. So there does seem to be some sort of parallelism going on here. What I find really unintuitive here is that it's not better than the vanilla approach

@@ -1,2 +1,3 @@
/Cargo.lock
/target
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change belongs to a different commit...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, moved it to a separate commit

src/oci/mod.rs Outdated
self.progress
.println(format!("Fetching config {}", hex::encode(config_sha256)))?;
let raw_config = self.proxy.fetch_config_raw(&self.img).await?;
let config = ImageConfiguration::from_reader(raw_config.as_slice())?;

let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having played with things related to this in the past, my recommendation is to try instead to use tokio's spawn_blocking instead.

This was also my first instinct. I feel like this would make things a lot easier to generalize to the "multiple layers at the same time" level, as well...

@Johan-Liebert1 Johan-Liebert1 force-pushed the splitstream-multithreaded branch 2 times, most recently from 5c3cfac to 0422b84 Compare April 8, 2025 08:33
Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
Separating out the writer allows us to abstract the parallelism
of writing the splitstream.

Signed-off-by: Pragyan Poudyal <[email protected]>
Add dependencies on Rayon and Crossbeam
Have two modes, single and multi-threaded, for Zstd Encoder

Spwan threads in splitstream writer for writing external objects and
spawn separate threads for Zstd Encoder. We handle communication
between these threads using channels

Any image's layers will be handed off to one of the Encoder threads.
For now we only have one channel for external object writing, but
multiple for Encoders. The reasoning for this is Encoder threads are
usually CPU bound while the object writer threads are more IO bound

Signed-off-by: Pragyan Poudyal <[email protected]>
Expose an `update_sha` method from ZstdWriter so we can update the
rolling SHA256 hash value from SplitStreamWriter

Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
If anything goes wrong at any point, short circuting and stopping the
thread would be the better option

Signed-off-by: Pragyan Poudyal <[email protected]>
If we have more threads than we have unprocessed layers, some of the
cloned senders aren't dropped and the main thread hangs on the result
receiving loop.

We make sure here to not spawn more threads than the number of
unhandled layers

Signed-off-by: Pragyan Poudyal <[email protected]>
Signed-off-by: Pragyan Poudyal <[email protected]>
@Johan-Liebert1 Johan-Liebert1 force-pushed the splitstream-multithreaded branch from 0422b84 to dc3dd52 Compare April 9, 2025 06:13
@allisonkarlitskaya
Copy link
Collaborator

#111 lays some groundwork that would probably make this easier to do, particularly if you decide to go with the tokio-based approach (which makes more and more sense to me)...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants