Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Oct 23, 2025

Which issue does this PR close?

This follows on from the work in #2635 where we copy and/or unpack dictionary-encoded arrays in the scan, so we no longer need to insert CopyExec into the plan.

Rationale for this change

  • Reduce complexity
  • Remove redundant code

What changes are included in this PR?

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Oct 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 59.19%. Comparing base (f09f8af) to head (5ef1014).
⚠️ Report is 645 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2639      +/-   ##
============================================
+ Coverage     56.12%   59.19%   +3.06%     
- Complexity      976     1449     +473     
============================================
  Files           119      147      +28     
  Lines         11743    13746    +2003     
  Branches       2251     2362     +111     
============================================
+ Hits           6591     8137    +1546     
- Misses         4012     4387     +375     
- Partials       1140     1222      +82     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@EmilyMatt
Copy link
Contributor

This could be very problematic for performance.
Am AFK but will explain further in a bit

@EmilyMatt
Copy link
Contributor

EmilyMatt commented Oct 23, 2025

Ok so the previous time I wanted to make this PR I encountered something interesting:
Imagine something like the following code on DataFusion's SortExec

futures::stream::once(async move {
                        while let Some(batch) = input.next().await {
                            let batch = batch?;
                            sorter.insert_batch(batch).await?;
                        }
                        sorter.sort().await
                    })
                    .try_flatten(),

This will essentially consume the entire input iterator, all the way to the operator producing the ColumnarBatch on the Scala side.
The underlying arrow arrays can be produced off-heap, it doesn't matter, because the wrapped ArrowArray and ArrowSchema objects are created using heap memory, so any ArrayRef created from the ArrayData of that passed arrow array, will use a small amount of heap memory.
The issue is that a code like the one above, that will consume the entire input iterator, will cause such high GC pressure, that performance can decrease by up to 10x compared to Spark.
It will not always show up on local performance runs(I saw the horrible performance when running on an EC2 Cluster with a huge amount of data)
The only solution I've found was to do a deep copy of the ArrayData itself.
I know this seems paradoxical but it's a real-life issue.
The best way I know to handle this is to just do a full copy of the ArrayData before make_array in the scan for all arrays.
The unpacking can happen before that I guess, but hopefully DataFusion will have enough support for dictionaries this could be ignored completely.

When we use off-heap memory, naturally we want to reduce the executor memory, which exacerbates the issue.

@EmilyMatt
Copy link
Contributor

Comparison of with and without copying the underlying ArrayData

With:
image (1)

Without:
image

@andygrove
Copy link
Member Author

Ok so the previous time I wanted to make this PR I encountered something interesting: Imagine something like the following code on DataFusion's SortExec

futures::stream::once(async move {
                        while let Some(batch) = input.next().await {
                            let batch = batch?;
                            sorter.insert_batch(batch).await?;
                        }
                        sorter.sort().await
                    })
                    .try_flatten(),

This will essentially consume the entire input iterator, all the way to the operator producing the ColumnarBatch on the Scala side. The underlying arrow arrays can be produced off-heap, it doesn't matter, because the wrapped ArrowArray and ArrowSchema objects are created using heap memory, so any ArrayRef created from the ArrayData of that passed arrow array, will use a small amount of heap memory. The issue is that a code like the one above, that will consume the entire input iterator, will cause such high GC pressure, that performance can decrease by up to 10x compared to Spark. It will not always show up on local performance runs(I saw the horrible performance when running on an EC2 Cluster with a huge amount of data) The only solution I've found was to do a deep copy of the ArrayData itself. I know this seems paradoxical but it's a real-life issue. The best way I know to handle this is to just do a full copy of the ArrayData before make_array in the scan for all arrays. The unpacking can happen before that I guess, but hopefully DataFusion will have enough support for dictionaries this could be ignored completely.

When we use off-heap memory, naturally we want to reduce the executor memory, which exacerbates the issue.

Thanks, this is really helpful. I will experiment with this.

@andygrove andygrove changed the title chore: Remove CopyExec chore: Remove CopyExec [WIP] Oct 24, 2025
@andygrove
Copy link
Member Author

Some native_datafusion tests are failing:

2025-10-23T20:23:31.2697381Z - join (native_datafusion, native shuffle) *** FAILED *** (463 milliseconds)
2025-10-23T20:23:31.2702227Z   org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4382.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4382.0 (TID 10847) (localhost executor driver): org.apache.comet.CometNativeException: Invalid HashJoinExec, partition count mismatch 1!=5,consider using RepartitionExec.

@andygrove
Copy link
Member Author

Ok so the previous time I wanted to make this PR I encountered something interesting: Imagine something like the following code on DataFusion's SortExec

futures::stream::once(async move {
                        while let Some(batch) = input.next().await {
                            let batch = batch?;
                            sorter.insert_batch(batch).await?;
                        }
                        sorter.sort().await
                    })
                    .try_flatten(),

This will essentially consume the entire input iterator, all the way to the operator producing the ColumnarBatch on the Scala side. The underlying arrow arrays can be produced off-heap, it doesn't matter, because the wrapped ArrowArray and ArrowSchema objects are created using heap memory, so any ArrayRef created from the ArrayData of that passed arrow array, will use a small amount of heap memory. The issue is that a code like the one above, that will consume the entire input iterator, will cause such high GC pressure, that performance can decrease by up to 10x compared to Spark. It will not always show up on local performance runs(I saw the horrible performance when running on an EC2 Cluster with a huge amount of data) The only solution I've found was to do a deep copy of the ArrayData itself. I know this seems paradoxical but it's a real-life issue. The best way I know to handle this is to just do a full copy of the ArrayData before make_array in the scan for all arrays. The unpacking can happen before that I guess, but hopefully DataFusion will have enough support for dictionaries this could be ignored completely.
When we use off-heap memory, naturally we want to reduce the executor memory, which exacerbates the issue.

Thanks, this is really helpful. I will experiment with this.

Claude's analysis of the PR and this comment:

The Core Problem

Memory Model Mismatch

When Arrow arrays are passed from Spark (JVM) to DataFusion (Rust):

  • Data buffers: Can be allocated off-heap (native memory)
  • Wrapper objects (ArrowArray, ArrowSchema): Always allocated on Java heap
  • Even though the actual data is off-heap, each ArrayRef needs these small heap-allocated wrappers

GC Pressure from Buffering Operators

The issue arises with operators like SortExec that consume their entire input:

  futures::stream::once(async move {
      while let Some(batch) = input.next().await {
          sorter.insert_batch(batch).await?;  // Accumulates all batches
      }
      sorter.sort().await  // Only then produces output
  })

This pattern:

  1. Consumes the entire input iterator before producing output
  2. Creates many wrapper objects (one set per batch)
  3. Keeps all wrappers alive until sorting completes
  4. Causes severe GC pressure as thousands of small objects accumulate on the heap

Why It's Worse with Off-Heap Memory

When using off-heap memory for performance, users typically:

  • Reduce executor heap size (since data is off-heap)
  • This makes the heap smaller → GC pressure from wrapper objects becomes catastrophic
  • Can cause 10x performance degradation on clusters with large data

Why Deep Copy Solves It (The Paradox)

The comment suggests doing a deep copy of ArrayData before make_array in the scan. This seems counterintuitive but works because:

  1. Immediate Materialization: Deep copy fully materializes data into new arrays
  2. Immediate GC: Original wrapper objects can be garbage collected right away
  3. Clean Boundaries: Each batch owns its data completely
  4. Less GC Thrashing: Even though copying costs CPU, it's cheaper than continuous GC pauses

Without copy: Many small wrapper objects → constant GC pressureWith copy: Upfront copy cost → clean memory lifecycle → smooth execution

Current PR Context

Looking at your PR, you've:

  • Removed the separate CopyExec operator
  • Moved copy_array and copy_or_unpack_array functions into scan.rs
  • The copy happens at scan time (lines 270-278 in scan.rs):
  let array = if arrow_ffi_safe {
      copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
  } else {
      copy_array(&array)  // Deep copy
  };

The commenter is saying: Make sure this deep copy happens for all arrays to prevent the GC pressure issue, especially for operators that buffer entire inputs.

@andygrove
Copy link
Member Author

Some native_datafusion tests are failing:

2025-10-23T20:23:31.2697381Z - join (native_datafusion, native shuffle) *** FAILED *** (463 milliseconds)
2025-10-23T20:23:31.2702227Z   org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4382.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4382.0 (TID 10847) (localhost executor driver): org.apache.comet.CometNativeException: Invalid HashJoinExec, partition count mismatch 1!=5,consider using RepartitionExec.

CopyExec was masking an issue when using native_datafusion scans: #2660

I am not sure what to do about this yet, but I have skipped some tests in this PR for now.

@mbutrovich fyi

@andygrove
Copy link
Member Author

I have created two new PRs to replace this one:

@andygrove andygrove closed this Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants