-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add a SpillingPool to manage collections of spill files #18207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Marking as draft for now. Open to input but needs a bit more work. I'm still familiarizing myself with the spilling infrastructure. |
|
This PR is setting size limit to spill files, when the size exceeds threshold, the spiller rotates to new file. I'm wondering why this design? Now the spill writer and reader is able to do streaming read/write, so a large spill file usually won't be the issue, unless it needs more parallelism somewhere. |
The issue with using a single FIFO file is that you accumulate dead data, bloating disk usage considerably. The idea is to cap that at say 100MB and then start a new file so that once all of the original file has been consumed we can garbage collect it. |
6b801ae to
f7c84fe
Compare
f7c84fe to
c5b40ee
Compare
|
@2010YOUY01 let me know if that makes sense, there's an example of this issue in #18011 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a SpillPool abstraction to centralize the management of spill files with FIFO semantics. The pool handles file rotation, batching multiple record batches into single files up to a configurable size limit, and provides streaming read access to spilled data.
Key changes:
- Adds a new
SpillPoolmodule with FIFO queue semantics for managing spill files - Integrates
SpillPoolintoRepartitionExecto replace the previous one-file-per-batch approach - Adds a new configuration option
max_spill_file_size_bytes(default 100MB) to control when spill files rotate
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-plan/src/spill/spill_pool.rs | New module implementing SpillPool and SpillPoolStream with comprehensive tests |
| datafusion/physical-plan/src/spill/mod.rs | Exports the new spill_pool module |
| datafusion/physical-plan/src/repartition/mod.rs | Refactored to use SpillPool instead of one-file-per-batch spilling |
| datafusion/common/src/config.rs | Adds max_spill_file_size_bytes configuration option |
| docs/source/user-guide/configs.md | Documents the new max_spill_file_size_bytes configuration |
| datafusion/sqllogictest/test_files/information_schema.slt | Updates test expectations to include new configuration option |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
This makes a lot of sense, operators should release disk usage sooner if possible. I will to review it soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, looks good in general.
I think it needs several additional test coverage:
- e2e tests, potentially a query that triggers spilling in
RepartitionExec. I think we can also do a quick benchmark on it to see how things work. - #18207 (comment)
I also left some suggestions to simplify the implementation, but they're optional.
datafusion/common/src/config.rs
Outdated
| /// | ||
| /// A larger value reduces file creation overhead but may hold more disk space. | ||
| /// A smaller value creates more files but allows finer-grained space reclamation | ||
| /// (especially in LIFO mode where files are truncated after reading). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we're reclaiming disk space in the 'chunked file' granularity, perhaps this truncating way don't have to be mentioned, since it don't have a real usage yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep was leftover from a previous implementation
| /// Size of current write file in bytes (estimated) | ||
| current_write_size: usize, | ||
| /// Number of batches written to current file | ||
| current_batch_count: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can track them inside InProgressSpillFile, and expose an API. This approach can simplify SpillPool a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| /// SpillManager for creating files and tracking metrics | ||
| spill_manager: Arc<SpillManager>, | ||
| /// Schema for batches (used by SpillPoolStream to implement RecordBatchStream) | ||
| schema: SchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid duplication, the schema inside spill_manager can be used instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 done
| /// Shared reference to the spill pool | ||
| spill_pool: Arc<Mutex<SpillPool>>, | ||
| /// SpillManager for creating streams from spill files | ||
| spill_manager: Arc<SpillManager>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use the spill_manager inside spill_pool, and eliminate this field?
| // Input finished and no more spilled data - we're done | ||
| return Poll::Ready(None); | ||
| } | ||
| // Otherwise check the channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PerPartitionStream is for the order-preserving case of RepartitionExec, it seems a bit tricky to get the order correct, I recommend to find the existing tests for order-preserving repartition, and include spilling to it.
|
|
||
| // Append batch to current file | ||
| if let Some(ref mut file) = self.current_write_file { | ||
| file.append_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A potential follow-up to do: #18261
e7c1e6e to
c57fbd8
Compare
|
|
||
| # End repartition on empty columns test | ||
|
|
||
| # Start spilling tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test passes on main but fails on datafusion-cli v50:
❯ datafusion-cli
DataFusion CLI v50.0.0
> CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER,
d INTEGER
)
STORED AS CSV
WITH ORDER (a ASC, b ASC, c ASC)
LOCATION 'datafusion/core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');
SET datafusion.runtime.memory_limit = '12K';
EXPLAIN ANALYZE
SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
SUM(a) OVER(partition by b, a order by c) as sum2,
SUM(a) OVER(partition by a, d order by b) as sum3,
SUM(a) OVER(partition by d order by a) as sum4
FROM annotated_data_infinite2;
0 row(s) fetched.
Elapsed 0.026 seconds.
0 row(s) fetched.
Elapsed 0.001 seconds.
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
RepartitionExec[Merge 11]#85(can spill: false) consumed 1896.0 B, peak 1896.0 B,
RepartitionExec[Merge 5]#73(can spill: false) consumed 1448.0 B, peak 1448.0 B,
RepartitionExec[Merge 3]#59(can spill: false) consumed 1384.0 B, peak 1384.0 B,
RepartitionExec[Merge 1]#56(can spill: false) consumed 1304.0 B, peak 1304.0 B,
RepartitionExec[8]#48(can spill: false) consumed 1216.0 B, peak 1856.0 B.
Error: Failed to allocate additional 240.0 B for RepartitionExec[Merge 6] with 0.0 B already allocated for this reservation - 8.0 B remain available for the total pool
It's not this PR that enabled it to pass, it was #18014, but worth adding anyway.
Addresses #18014 (comment), potentially paves the path to solve #18011 for other operators as well