Add merge-only reducer API for intercepting intermediate results#18621
Add merge-only reducer API for intercepting intermediate results#18621navina wants to merge 3 commits into
Conversation
Adds DataTableReducer#mergeDataTablesOnly and BrokerReduceService#mergeOnDataTable that perform the cross-server merge step WITHOUT finalizing (no extractFinalResult / result formatting). The returned DataTable carries intermediate, non-finalized state, byte-shape identical to a single server's partial response, so a downstream consumer can intercept it and custom handle the intermediate results. Implemented for AggregationDataTableReducer, GroupByDataTableReducer, and DistinctDataTableReducer. SelectionDataTableReducer and ExplainPlanDataTableReducer inherit the default-throwing implementation. Group-by surfaces native GROUPS_TRIMMED / NUM_GROUPS_LIMIT_REACHED flags on the merged DataTable's metadata so downstream consumers can decide whether the result is complete enough to use. Server-side serialization helpers are consolidated to avoid duplication: - AggregationFunctionUtils.setIntermediateResult (now also used by AggregationResultsBlock) - DataTableBuilderUtils.setColumn (now also used by GroupByResultsBlock) No caller is wired in by this change; the read path is unchanged and tests are added to verify that a normal reduce path finalizes a re-injected intermediate result without modifications. WARNING (on the new methods' Javadoc): the merge-only path is heavyweight (full cross-server merge + re-serialize) and must NOT be invoked in the request-serving path.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18621 +/- ##
============================================
+ Coverage 56.79% 64.46% +7.67%
- Complexity 7 1282 +1275
============================================
Files 2567 3354 +787
Lines 149143 207364 +58221
Branches 24111 32387 +8276
============================================
+ Hits 84707 133684 +48977
- Misses 57241 62936 +5695
- Partials 7195 10744 +3549
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // Set on a merged-only DataTable when one or more input server DataTables were dropped during the | ||
| // merge (e.g., due to a schema conflict). Signals to a downstream consumer that the merge is | ||
| // partial; how to react (skip, retry, accept with annotation) is the consumer's policy. | ||
| PARTIAL_INTERMEDIATE_RESULT(43, "partialIntermediateResult", MetadataValueType.STRING); |
There was a problem hiding this comment.
This looks a bit odd, aren't all the intermediate results partial? Should it be something like INCOMPLETE_MERGE instead?
| * as one trace blob attributed to the synthetic server. | ||
| * </ul> | ||
| */ | ||
| public void setStatsOnMergedDataTable(DataTable dataTable) { |
There was a problem hiding this comment.
Is this dropping EARLY_TERMINATION_REASON for distinct queries that was added recently?
There was a problem hiding this comment.
This is also going to be prone to becoming stale in the future - I'd prefer avoiding this pattern altogether. Maybe we can just carry the execution stats aggregate alongside the data table? Another option is to collapse the two paths (this and setStats) into one by refactoring it into a pattern like forEachStat((MetadataKey key, long value) -> …) that both consumers drive? One writes to the response, the other to metadata.
Summary
Adds a merge-only reduction surface that performs the cross-server merge step
without finalizing — producing a single intermediate
DataTablethatcarries non-finalized aggregation state, byte-shape identical to a single
server's partial response. A downstream consumer can intercept this merged
intermediate, custom-handle it, and later re-inject it through the standard
reduce path for finalization. The read path is unchanged; tests verify a
re-injected intermediate finalizes to the same result as a direct reduce.
No caller is wired in by this change — this is the API surface only.
API
Supported reducers
AggregationDataTableReducer,GroupByDataTableReducer,DistinctDataTableReducer— implementmergeDataTablesOnly.SelectionDataTableReducer,ExplainPlanDataTableReducer— inherit thedefault-throwing implementation (out of scope).
UnsupportedOperationExceptionwhen
_queryContext.isServerReturnFinalResult()is true — inputs are thenfinalized rather than intermediate, so the contract can't be honored.
Surfaced on the merged DataTable's metadata
GROUPS_TRIMMED,NUM_GROUPS_LIMIT_REACHED.PARTIAL_INTERMEDIATE_RESULTflag — set when one or more inputDataTables are dropped during merge (e.g. schema conflict). The
BrokerMeter.RESPONSE_MERGE_EXCEPTIONSmeter is also bumped, symmetricwith how the regular reduce path surfaces conflicting-schema servers.
ExecutionStatsAggregator#setStatsOnMergedDataTable: additive longs(
numDocsScanned,numSegments*,threadCpuTimeNs, ...), MIN-reducedminConsumingFreshnessTimeMs, boolean flags, per-server exceptions, andJSON-encoded trace info (when
trace=true). Method does not bumpbroker meters/timers (the merge path is expected to run off the
request-serving path; metric increments fire on eventual re-reduce).
Limitations of the round-trip
BrokerResponseNative— wire format has only combined keys, so are-reduce dumps the whole value into one bucket based on the synthetic
server's tableType.
same error code are last-write-wins (wire format is
Map<Integer, String>).TRACE_INFOentry; thedownstream aggregator reads it back as one trace blob under the synthetic
server's name.
Server-side consolidations
Two helpers extracted to avoid duplication between the existing server-side
serialization and the new broker-side merge-only path:
AggregationFunctionUtils.setIntermediateResult(now also used byAggregationResultsBlock).DataTableBuilderUtils.setColumn(now also used byGroupByResultsBlock).These are behavior-preserving — private methods promoted to shared statics
with identical bodies.
Wire format
Adds one new
DataTable.MetadataKeyentry (PARTIAL_INTERMEDIATE_RESULT,id=43). Older readers ignore unknown metadata keys, so this is
forward-compatible.
Test plan
MergeDataTablesOnlyTest(23 tests): round-trip equivalence forAggregation/GroupBy/Distinct (including OBJECT-column DISTINCTCOUNT),
LIMIT 0group-by round-trip, intermediate-schema preservation,conflicting-schema partial-merge flag,
serverReturnFinalResultrejection, additive stats end-to-end (
numDocsScannedsurvivesround-trip), MIN freshness reduction, exception/trace propagation,
null-handling round-trip, selection-throws, empty/no-input null returns.
./mvnw -pl pinot-core test-compile, spotless, checkstyle, license all clean.