-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Example for using a separate threadpool for CPU bound work (try 3) #16331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pare back example
// systems, including remote catalog access, which is not included in this | ||
// example. | ||
let cpu_runtime = CpuRuntime::try_new()?; | ||
let io_handle = Handle::current(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: this seems like the inverse of what I would have expected where DF would run on the current runtime and IO would run on a specialized runtime. Is there a reason why that would not work here? I would think it would simplify the code a fair bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is any technical reason
The reason I did it this way is that I think most applications / server programs (and the examples from tokio, etc) use the runtime automatically created by tokio for IO and so I wanted to follow the same pattern.
I'll update the documentation to make this clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess for that is that many/(most?) systems don't have a way to push IO to a separate runtime whereas it's easier to do so with cpu much of the time. However, with ObjectStore at least that isn't the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Another consideration is that most uses of tokio are not for CPU bound work, so it makes sense to just use the default pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to say
/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.
Co-authored-by: Bruce Ritchie <[email protected]>
FYI @pepijnve this is perhaps relevant to you as you have been working closely with tokio / tokio runtimes |
@alamb thanks for the pointer. I've been digging through all the work that has already been done, looking particularly at the rayon experiments. Rayon's idea of "yield now" is particularly interesting. Rather than unwinding the stack, it will actually try to run other work immediately underneath the Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time. |
I know we have used it at INfluxData I vaguely remember others reporting they have too (maybe @adriangb ) but I don't know of any writeups (it would be another great doc contribution if you figure it out ) :) |
I've used Tokio console in the past in my app to see if I could find the cause of some app slowdowns. It worked fine but didn't help me personally find the issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very nice! FWIW our internal structure is similar to CpuRuntime
but it also has convenience methods to essentially encapsulate what the JoinSet
is doing and make it less boilerplate to put work on the the cpu runtime.
I left some comments but since this is just an example and already very nice I don't think we should impose much a barrier to moving forward with it.
// IN a production system you also need to ensure that nothing adds new | ||
// tasks to the underlying runtime after this point | ||
if let Some(thread_join_handle) = self.thread_join_handle.take() { | ||
// If the thread is still running, we wait for it to finish | ||
println!("Shutting down CPU runtime thread..."); | ||
if let Err(e) = thread_join_handle.join() { | ||
eprintln!("Error joining CPU runtime thread: {e:?}",); | ||
} else { | ||
println!("CPU runtime thread shutdown successfully."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How complex is that? Would it be worth including in this example? it seems to me that this example is already quite complex so making it 10% more complex to be "complete" seems worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly trying to simplify the shutdown behavior from the DedicatedExecutor in IOx:
I'll review this more carefully tomorrow and see if there is some way to make it more complete (or remove the mysterious text)
let thread_join_handle = std::thread::spawn(move || { | ||
cpu_runtime.block_on(async move { | ||
notify_shutdown_captured.notified().await; | ||
}); | ||
cpu_runtime.shutdown_timeout(Duration::from_secs(1)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what the timeout is doing here? Is 1s reasonable for all systems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just waits for 1 second to shutdown (rather than indefinitely). Maybe the example could wait indefinitely...
I'll add some comments explaining
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// IN a production system you also need to ensure that nothing adds new | ||
// tasks to the underlying runtime after this point | ||
if let Some(thread_join_handle) = self.thread_join_handle.take() { | ||
// If the thread is still running, we wait for it to finish | ||
println!("Shutting down CPU runtime thread..."); | ||
if let Err(e) = thread_join_handle.join() { | ||
eprintln!("Error joining CPU runtime thread: {e:?}",); | ||
} else { | ||
println!("CPU runtime thread shutdown successfully."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly trying to simplify the shutdown behavior from the DedicatedExecutor in IOx:
I'll review this more carefully tomorrow and see if there is some way to make it more complete (or remove the mysterious text)
let thread_join_handle = std::thread::spawn(move || { | ||
cpu_runtime.block_on(async move { | ||
notify_shutdown_captured.notified().await; | ||
}); | ||
cpu_runtime.shutdown_timeout(Duration::from_secs(1)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just waits for 1 second to shutdown (rather than indefinitely). Maybe the example could wait indefinitely...
I'll add some comments explaining
Thanks @adriangb and @Omega359 for the help with this one (and to @tustvold for / @ion-elgreco for the underlying feature) It's taken a while but we have made it |
Note: This PR contains an example and supporting code. It has no changes to the core.
Which issue does this PR close?
Closes Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work) #12393
Note this is new version version of Example for using a separate threadpool for CPU bound work (try 2) #14286
Rationale for this change
I have heard from multiple people multiple times over multiple years that the specifics of using multiple threadpools for separate CPU and IO work in DataFusion is confusing.
They are not wrong, and it is a key detail for building low latency, high performance engines which process data directly from remote storage, which I think is a key capability for DataFusion
My past attempts in #13424 and #14286 to make this example have been bogged down trying to get consensus on details of how to transfer results across streams, the wisdom of wrapping streams, and other details. Thankfully, thanks to @tustvold and @ion-elgreco there is now a much better solution in ObjectStore 0.12.1: apache/arrow-rs-object-store#332
What changes are included in this PR?
thread_pools.rs
exampleAre these changes tested?
Yes the example is run as part of CI and there are tests
Are there any user-facing changes?
Not really