Skip to content

Add partial result to AggregateCursor continuation #3254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 54 commits into
base: main
Choose a base branch
from

Conversation

pengpeng-lu
Copy link
Contributor

@pengpeng-lu pengpeng-lu commented Mar 17, 2025

  1. Add AggregateCursor.AggregateCursorContinuation, which contains partialAggregationResult.
  2. AggregateCursor returns AggregateCursor.AggregateCursorContinuation
  3. Add tests in record-layer and relational-layer to verify

@pengpeng-lu pengpeng-lu added the enhancement New feature or request label Mar 17, 2025
(prefixLength < highBytes.length) &&
(lowBytes[prefixLength] == highBytes[prefixLength])) {
(prefixLength < highBytes.length) &&
(lowBytes[prefixLength] == highBytes[prefixLength])) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird auto formating, will remove it in the next commit.

@pengpeng-lu pengpeng-lu requested a review from alecgrieser May 5, 2025 22:45
}

@Nonnull
@Override
public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
if (previousResult != null && !previousResult.hasNext()) {
// we are done
return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(new AggregateCursorContinuation(previousResult.getContinuation(), serializationMode),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also needs to include the partialAggregationResult, though. I'm thinking of a case like:

try (RecordCursor<QueryResult> cursor = getAggregateCursor()) {
   RecordCursor<QueryResult> curr = cursor.getNext();
   while (curr.hasNext()) {
     // do something with curr
     curr = cursor.getNext();
   }

   RecordCursor<QueryResult> last = cursor.getNext();
   assert !last.hasNext(); // should succeed
   assert last.getContinution().equals(curr.getContinuation()); // I think will fail as last is missing any partial aggregation result 
}

The last value there is the second result from the cursor where .hasNext() is false.


@Override
public boolean isEnd() {
return innerContinuation.isEnd();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something about this that is still a bit suspicious. In general, we should only return continuations where isEnd() returns true if we are also returning RecordCursorResult.exhausted(). In particular, we should never have a RecordCursorResult with a value and a continuation where isEnd is true. If I'm understanding the code correctly, I actually think that this invariant is currently true (and may actually be necessary because of the check on line 113 where we create an AggregateCursorContinuation with the previous continuation value, though we could adjust that to return RecordCursorResult.exhausted() if previousContinuationInGroup.isEnd()).

The issue I have is that I don't think this works well with a slightly different variant we might want to make in the future, where we make the innerContinuation the previous read continuation (with a partial aggregation result) rather than the previous returned continuation, because in that case, we'd expect the final value to wrap an inner end continuation, but for the result to nevertheless have a value. We have to be a little careful in that case to not accidentally return an empty continuation (potentially we need a bool inner_is_end flag in the continuation to guarantee it won't be empty if both the innerContinuation is empty and there's no partialAggregationResult).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed isEnd = innerContinuation.isEnd() && partialAggregationResult == null, for the current implementation, inner_is_end is unnecessary so I didn't add it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see what is gained here by also checking the partialAggregationResult, if I'm honest.

Yeah, I wouldn't expect inner_is_end to be needed unless we modified this to handle the case where the continuation is always the last key read. It is probably fine not to include it, if we decide that we're unlikely to do that

@pengpeng-lu pengpeng-lu requested a review from alecgrieser May 27, 2025 19:52
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value))
.withGroupCriterion("str_value_indexed")
.withQueryPredicate("num_value_2", Comparisons.Type.LESS_THAN, 0)
.build(false, RecordQueryStreamingAggregationPlan.SerializationMode.TO_NEW);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this one or the other ones with a filter use TO_OLD? I'd expect them to have trouble, and we're not really going to be able to fix them, I don't think, but it might be good to understand what they did

}

@Nonnull
@Override
public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
if (previousResult != null && !previousResult.hasNext()) {
// we are done
return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(new AggregateCursorContinuation(previousResult.getContinuation(), serializationMode),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the partialAggregationResult at the moment. And did you add a test that called .getNext() a second time on a partially completed cursor?


@Override
public boolean isEnd() {
return innerContinuation.isEnd();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see what is gained here by also checking the partialAggregationResult, if I'm honest.

Yeah, I wouldn't expect inner_is_end to be needed unless we modified this to handle the case where the continuation is always the last key read. It is probably fine not to include it, if we decide that we're unlikely to do that

@pengpeng-lu pengpeng-lu requested a review from alecgrieser June 2, 2025 05:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants