Skip to content

Add merge-only reducer API for intercepting intermediate results#18621

Open
navina wants to merge 3 commits into
apache:masterfrom
navina:merge-only-reducer
Open

Add merge-only reducer API for intercepting intermediate results#18621
navina wants to merge 3 commits into
apache:masterfrom
navina:merge-only-reducer

Conversation

@navina
Copy link
Copy Markdown
Contributor

@navina navina commented May 28, 2026

Summary

Adds a merge-only reduction surface that performs the cross-server merge step
without finalizing — producing a single intermediate DataTable that
carries non-finalized aggregation state, byte-shape identical to a single
server's partial response. A downstream consumer can intercept this merged
intermediate, custom-handle it, and later re-inject it through the standard
reduce path for finalization. The read path is unchanged; tests verify a
re-injected intermediate finalizes to the same result as a direct reduce.

No caller is wired in by this change — this is the API surface only.

API

// DataTableReducer (new default method, throws by default)
default DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
    Map<ServerRoutingInstance, DataTable> dataTableMap,
    DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics);

// BrokerReduceService (merge-only counterpart of reduceOnDataTable)
@Nullable
public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest,
    Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs,
    BrokerMetrics brokerMetrics);

Supported reducers

  • AggregationDataTableReducer, GroupByDataTableReducer,
    DistinctDataTableReducer — implement mergeDataTablesOnly.
  • SelectionDataTableReducer, ExplainPlanDataTableReducer — inherit the
    default-throwing implementation (out of scope).
  • Aggregation and group-by additionally throw UnsupportedOperationException
    when _queryContext.isServerReturnFinalResult() is true — inputs are then
    finalized rather than intermediate, so the contract can't be honored.

Surfaced on the merged DataTable's metadata

  • Group-by completeness: GROUPS_TRIMMED, NUM_GROUPS_LIMIT_REACHED.
  • New PARTIAL_INTERMEDIATE_RESULT flag — set when one or more input
    DataTables are dropped during merge (e.g. schema conflict). The
    BrokerMeter.RESPONSE_MERGE_EXCEPTIONS meter is also bumped, symmetric
    with how the regular reduce path surfaces conflicting-schema servers.
  • Execution stats round-trip via the new
    ExecutionStatsAggregator#setStatsOnMergedDataTable: additive longs
    (numDocsScanned, numSegments*, threadCpuTimeNs, ...), MIN-reduced
    minConsumingFreshnessTimeMs, boolean flags, per-server exceptions, and
    JSON-encoded trace info (when trace=true). Method does not bump
    broker meters/timers (the merge path is expected to run off the
    request-serving path; metric increments fire on eventual re-reduce).

Limitations of the round-trip

  • CPU/mem stats lose the offline-vs-realtime split visible on
    BrokerResponseNative — wire format has only combined keys, so a
    re-reduce dumps the whole value into one bucket based on the synthetic
    server's tableType.
  • Exception attribution to original servers is lost; collisions on the
    same error code are last-write-wins (wire format is Map<Integer, String>).
  • Per-server trace info is JSON-encoded into one TRACE_INFO entry; the
    downstream aggregator reads it back as one trace blob under the synthetic
    server's name.

Server-side consolidations

Two helpers extracted to avoid duplication between the existing server-side
serialization and the new broker-side merge-only path:

  • AggregationFunctionUtils.setIntermediateResult (now also used by
    AggregationResultsBlock).
  • DataTableBuilderUtils.setColumn (now also used by GroupByResultsBlock).

These are behavior-preserving — private methods promoted to shared statics
with identical bodies.

Wire format

Adds one new DataTable.MetadataKey entry (PARTIAL_INTERMEDIATE_RESULT,
id=43). Older readers ignore unknown metadata keys, so this is
forward-compatible.

Test plan

  • MergeDataTablesOnlyTest (23 tests): round-trip equivalence for
    Aggregation/GroupBy/Distinct (including OBJECT-column DISTINCTCOUNT),
    LIMIT 0 group-by round-trip, intermediate-schema preservation,
    conflicting-schema partial-merge flag, serverReturnFinalResult
    rejection, additive stats end-to-end (numDocsScanned survives
    round-trip), MIN freshness reduction, exception/trace propagation,
    null-handling round-trip, selection-throws, empty/no-input null returns.
  • ./mvnw -pl pinot-core test-compile, spotless, checkstyle, license all clean.

Adds DataTableReducer#mergeDataTablesOnly and BrokerReduceService#mergeOnDataTable
that perform the cross-server merge step WITHOUT finalizing (no extractFinalResult
/ result formatting). The returned DataTable carries intermediate, non-finalized
state, byte-shape identical to a single server's partial response, so a downstream
consumer can intercept it and custom handle the intermediate results.

Implemented for AggregationDataTableReducer, GroupByDataTableReducer, and
DistinctDataTableReducer. SelectionDataTableReducer and ExplainPlanDataTableReducer
inherit the default-throwing implementation. Group-by surfaces native GROUPS_TRIMMED
/ NUM_GROUPS_LIMIT_REACHED flags on the merged DataTable's metadata so downstream
consumers can decide whether the result is complete enough to use.

Server-side serialization helpers are consolidated to avoid duplication:
- AggregationFunctionUtils.setIntermediateResult (now also used by
  AggregationResultsBlock)
- DataTableBuilderUtils.setColumn (now also used by GroupByResultsBlock)

No caller is wired in by this change; the read path is unchanged and tests are
added to verify that a normal reduce path finalizes a re-injected intermediate
result without modifications.

WARNING (on the new methods' Javadoc): the merge-only path is heavyweight
(full cross-server merge + re-serialize) and must NOT be invoked in the
request-serving path.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 28, 2026

Codecov Report

❌ Patch coverage is 63.60656% with 111 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.46%. Comparing base (4ecb1d4) to head (fe3bce6).
⚠️ Report is 11 commits behind head on master.

Files with missing lines Patch % Lines
...not/core/query/reduce/GroupByDataTableReducer.java 41.79% 34 Missing and 5 partials ⚠️
...t/core/common/datatable/DataTableBuilderUtils.java 28.26% 32 Missing and 1 partial ⚠️
...core/query/reduce/AggregationDataTableReducer.java 74.50% 9 Missing and 4 partials ⚠️
...aggregation/function/AggregationFunctionUtils.java 52.94% 7 Missing and 1 partial ⚠️
...e/pinot/core/query/reduce/BrokerReduceService.java 88.52% 0 Missing and 7 partials ⚠️
...ot/core/query/reduce/ExecutionStatsAggregator.java 83.33% 4 Missing and 3 partials ⚠️
...ot/core/query/reduce/DistinctDataTableReducer.java 66.66% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18621      +/-   ##
============================================
+ Coverage     56.79%   64.46%   +7.67%     
- Complexity        7     1282    +1275     
============================================
  Files          2567     3354     +787     
  Lines        149143   207364   +58221     
  Branches      24111    32387    +8276     
============================================
+ Hits          84707   133684   +48977     
- Misses        57241    62936    +5695     
- Partials       7195    10744    +3549     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.46% <63.60%> (+7.67%) ⬆️
temurin 64.46% <63.60%> (+7.67%) ⬆️
unittests 64.46% <63.60%> (+7.67%) ⬆️
unittests1 56.84% <63.60%> (+0.05%) ⬆️
unittests2 37.12% <11.47%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment on lines +160 to +163
// Set on a merged-only DataTable when one or more input server DataTables were dropped during the
// merge (e.g., due to a schema conflict). Signals to a downstream consumer that the merge is
// partial; how to react (skip, retry, accept with annotation) is the consumer's policy.
PARTIAL_INTERMEDIATE_RESULT(43, "partialIntermediateResult", MetadataValueType.STRING);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit odd, aren't all the intermediate results partial? Should it be something like INCOMPLETE_MERGE instead?

* as one trace blob attributed to the synthetic server.
* </ul>
*/
public void setStatsOnMergedDataTable(DataTable dataTable) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this dropping EARLY_TERMINATION_REASON for distinct queries that was added recently?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also going to be prone to becoming stale in the future - I'd prefer avoiding this pattern altogether. Maybe we can just carry the execution stats aggregate alongside the data table? Another option is to collapse the two paths (this and setStats) into one by refactoring it into a pattern like forEachStat((MetadataKey key, long value) -> …) that both consumers drive? One writes to the response, the other to metadata.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants