fix(functions-aggregate): drain CORR state vectors for streaming aggregation #19669
+312
−28
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
This change addresses a failure in the
CORRaggregate function when running in streaming mode. TheCorrelationGroupsAccumulator(introduced in PR #13581) was failing to drain its state vectors duringEmitTo::Firstcalls, causing internal state to persist across emissions. This led to memory leaks, incorrect results for subsequent groups, and "length mismatch" errors because the internal vector sizes diverged from the number of emitted groups.Reproducer
Before:
DataFusion error: Arrow error: Invalid argument error: all columns in a record batch must have the same lengthAfter:
What changes are included in this PR?
This PR is structured into two commits: the first adds a failing test case to demonstrate the issue, and the second implements the fix.
The accumulator now uses
emit_to.take_needed()in bothevaluateandstateto properly consume the emitted portions of the state vectors. Additionally, thesize()implementation has been updated to use vector capacity for more accurate memory accounting.Are these changes tested?
Yes, a new test case in
aggregate.slttriggers streaming aggregation via an ordered subquery. This test previously crashed with an Arrow length mismatch error and now produces correct results.Are there any user-facing changes?
Yes, SQL queries that trigger streaming aggregation using
CORR(typically those with specific ordering requirements) will now succeed instead of failing with a length mismatch error.