Skip to content

Example for using a separate threadpool for CPU bound work (try 3) #16331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 23, 2025

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 8, 2025

Note: This PR contains an example and supporting code. It has no changes to the core.

Which issue does this PR close?

Rationale for this change

I have heard from multiple people multiple times over multiple years that the specifics of using multiple threadpools for separate CPU and IO work in DataFusion is confusing.

They are not wrong, and it is a key detail for building low latency, high performance engines which process data directly from remote storage, which I think is a key capability for DataFusion

My past attempts in #13424 and #14286 to make this example have been bogged down trying to get consensus on details of how to transfer results across streams, the wisdom of wrapping streams, and other details. Thankfully, thanks to @tustvold and @ion-elgreco there is now a much better solution in ObjectStore 0.12.1: apache/arrow-rs-object-store#332

What changes are included in this PR?

  1. thread_pools.rs example
  2. Update documentation

Are these changes tested?

Yes the example is run as part of CI and there are tests

Are there any user-facing changes?

Not really

@github-actions github-actions bot added the core Core DataFusion crate label Jun 8, 2025
@alamb alamb added the documentation Improvements or additions to documentation label Jun 8, 2025
// systems, including remote catalog access, which is not included in this
// example.
let cpu_runtime = CpuRuntime::try_new()?;
let io_handle = Handle::current();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: this seems like the inverse of what I would have expected where DF would run on the current runtime and IO would run on a specialized runtime. Is there a reason why that would not work here? I would think it would simplify the code a fair bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any technical reason

The reason I did it this way is that I think most applications / server programs (and the examples from tokio, etc) use the runtime automatically created by tokio for IO and so I wanted to follow the same pattern.

I'll update the documentation to make this clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess for that is that many/(most?) systems don't have a way to push IO to a separate runtime whereas it's easier to do so with cpu much of the time. However, with ObjectStore at least that isn't the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Another consideration is that most uses of tokio are not for CPU bound work, so it makes sense to just use the default pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to say

/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.

@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Jun 9, 2025
@alamb
Copy link
Contributor Author

alamb commented Jun 17, 2025

FYI @pepijnve this is perhaps relevant to you as you have been working closely with tokio / tokio runtimes

@pepijnve
Copy link
Contributor

pepijnve commented Jun 17, 2025

@alamb thanks for the pointer.

I've been digging through all the work that has already been done, looking particularly at the rayon experiments. Rayon's idea of "yield now" is particularly interesting. Rather than unwinding the stack, it will actually try to run other work immediately underneath the yield_now call. I wonder if this approach could lead to stack overflows when you have lots of yielding tasks.

Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time.

@alamb
Copy link
Contributor Author

alamb commented Jun 17, 2025

@alamb thanks for the pointer.

Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time.

I know we have used it at INfluxData

I vaguely remember others reporting they have too (maybe @adriangb ) but I don't know of any writeups (it would be another great doc contribution if you figure it out ) :)

@Omega359
Copy link
Contributor

Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time.

I've used Tokio console in the past in my app to see if I could find the cause of some app slowdowns. It worked fine but didn't help me personally find the issue.

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very nice! FWIW our internal structure is similar to CpuRuntime but it also has convenience methods to essentially encapsulate what the JoinSet is doing and make it less boilerplate to put work on the the cpu runtime.

I left some comments but since this is just an example and already very nice I don't think we should impose much a barrier to moving forward with it.

Comment on lines 297 to 307
// IN a production system you also need to ensure that nothing adds new
// tasks to the underlying runtime after this point
if let Some(thread_join_handle) = self.thread_join_handle.take() {
// If the thread is still running, we wait for it to finish
println!("Shutting down CPU runtime thread...");
if let Err(e) = thread_join_handle.join() {
eprintln!("Error joining CPU runtime thread: {e:?}",);
} else {
println!("CPU runtime thread shutdown successfully.");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How complex is that? Would it be worth including in this example? it seems to me that this example is already quite complex so making it 10% more complex to be "complete" seems worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly trying to simplify the shutdown behavior from the DedicatedExecutor in IOx:

https://github.com/influxdata/influxdb3_core/blob/ad3a2b250d92f21c39ad38d60919b127ae5af1fc/executor/src/lib.rs#L342-L351

I'll review this more carefully tomorrow and see if there is some way to make it more complete (or remove the mysterious text)

Comment on lines 322 to 327
let thread_join_handle = std::thread::spawn(move || {
cpu_runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
cpu_runtime.shutdown_timeout(Duration::from_secs(1));
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what the timeout is doing here? Is 1s reasonable for all systems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just waits for 1 second to shutdown (rather than indefinitely). Maybe the example could wait indefinitely...

I'll add some comments explaining

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb and @Omega359 for the review. I will work on improving the comments tomorrow

Comment on lines 297 to 307
// IN a production system you also need to ensure that nothing adds new
// tasks to the underlying runtime after this point
if let Some(thread_join_handle) = self.thread_join_handle.take() {
// If the thread is still running, we wait for it to finish
println!("Shutting down CPU runtime thread...");
if let Err(e) = thread_join_handle.join() {
eprintln!("Error joining CPU runtime thread: {e:?}",);
} else {
println!("CPU runtime thread shutdown successfully.");
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly trying to simplify the shutdown behavior from the DedicatedExecutor in IOx:

https://github.com/influxdata/influxdb3_core/blob/ad3a2b250d92f21c39ad38d60919b127ae5af1fc/executor/src/lib.rs#L342-L351

I'll review this more carefully tomorrow and see if there is some way to make it more complete (or remove the mysterious text)

Comment on lines 322 to 327
let thread_join_handle = std::thread::spawn(move || {
cpu_runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
cpu_runtime.shutdown_timeout(Duration::from_secs(1));
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just waits for 1 second to shutdown (rather than indefinitely). Maybe the example could wait indefinitely...

I'll add some comments explaining

@alamb
Copy link
Contributor Author

alamb commented Jun 23, 2025

Thanks @adriangb and @Omega359 for the help with this one (and to @tustvold for / @ion-elgreco for the underlying feature)

It's taken a while but we have made it

@alamb alamb merged commit fb01049 into apache:main Jun 23, 2025
27 checks passed
@alamb alamb deleted the alamb/threadpool_example4 branch June 23, 2025 10:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work)
4 participants