-
Notifications
You must be signed in to change notification settings - Fork 247
chore: Remove CopyExec [WIP]
#2639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2639 +/- ##
============================================
+ Coverage 56.12% 59.19% +3.06%
- Complexity 976 1449 +473
============================================
Files 119 147 +28
Lines 11743 13746 +2003
Branches 2251 2362 +111
============================================
+ Hits 6591 8137 +1546
- Misses 4012 4387 +375
- Partials 1140 1222 +82 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
This could be very problematic for performance. |
|
Ok so the previous time I wanted to make this PR I encountered something interesting: This will essentially consume the entire input iterator, all the way to the operator producing the ColumnarBatch on the Scala side. When we use off-heap memory, naturally we want to reduce the executor memory, which exacerbates the issue. |
Thanks, this is really helpful. I will experiment with this. |
|
Some |
Claude's analysis of the PR and this comment: The Core Problem Memory Model Mismatch When Arrow arrays are passed from Spark (JVM) to DataFusion (Rust):
GC Pressure from Buffering Operators The issue arises with operators like SortExec that consume their entire input: futures::stream::once(async move {
while let Some(batch) = input.next().await {
sorter.insert_batch(batch).await?; // Accumulates all batches
}
sorter.sort().await // Only then produces output
})This pattern:
Why It's Worse with Off-Heap Memory When using off-heap memory for performance, users typically:
Why Deep Copy Solves It (The Paradox) The comment suggests doing a deep copy of ArrayData before make_array in the scan. This seems counterintuitive but works because:
Without copy: Many small wrapper objects → constant GC pressureWith copy: Upfront copy cost → clean memory lifecycle → smooth execution Current PR Context Looking at your PR, you've:
let array = if arrow_ffi_safe {
copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
} else {
copy_array(&array) // Deep copy
};The commenter is saying: Make sure this deep copy happens for all arrays to prevent the GC pressure issue, especially for operators that buffer entire inputs. |
I am not sure what to do about this yet, but I have skipped some tests in this PR for now. @mbutrovich fyi |
|
I have created two new PRs to replace this one: |


Which issue does this PR close?
This follows on from the work in #2635 where we copy and/or unpack dictionary-encoded arrays in the scan, so we no longer need to insert
CopyExecinto the plan.Rationale for this change
What changes are included in this PR?
How are these changes tested?