From 42b96b0066062527781fe12b481e37022f790a19 Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Thu, 28 May 2026 11:46:13 -0700 Subject: [PATCH 1/4] Add merge-only reducer API for intercepting intermediate results Adds DataTableReducer#mergeDataTablesOnly and BrokerReduceService#mergeOnDataTable that perform the cross-server merge step WITHOUT finalizing (no extractFinalResult / result formatting). The returned DataTable carries intermediate, non-finalized state, byte-shape identical to a single server's partial response, so a downstream consumer can intercept it and custom handle the intermediate results. Implemented for AggregationDataTableReducer, GroupByDataTableReducer, and DistinctDataTableReducer. SelectionDataTableReducer and ExplainPlanDataTableReducer inherit the default-throwing implementation. Group-by surfaces native GROUPS_TRIMMED / NUM_GROUPS_LIMIT_REACHED flags on the merged DataTable's metadata so downstream consumers can decide whether the result is complete enough to use. Server-side serialization helpers are consolidated to avoid duplication: - AggregationFunctionUtils.setIntermediateResult (now also used by AggregationResultsBlock) - DataTableBuilderUtils.setColumn (now also used by GroupByResultsBlock) No caller is wired in by this change; the read path is unchanged and tests are added to verify that a normal reduce path finalizes a re-injected intermediate result without modifications. WARNING (on the new methods' Javadoc): the merge-only path is heavyweight (full cross-server merge + re-serialize) and must NOT be invoked in the request-serving path. --- .../pinot/common/datatable/DataTable.java | 8 +- .../datatable/DataTableBuilderUtils.java | 128 ++++ .../results/AggregationResultsBlock.java | 34 +- .../blocks/results/GroupByResultsBlock.java | 95 +-- .../function/AggregationFunctionUtils.java | 39 ++ .../reduce/AggregationDataTableReducer.java | 108 +++- .../query/reduce/BrokerReduceService.java | 235 +++++-- .../core/query/reduce/DataTableReducer.java | 31 + .../reduce/DistinctDataTableReducer.java | 34 +- .../reduce/ExecutionStatsAggregator.java | 105 ++++ .../query/reduce/GroupByDataTableReducer.java | 121 ++++ .../query/reduce/MergeDataTablesOnlyTest.java | 576 ++++++++++++++++++ 12 files changed, 1322 insertions(+), 192 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index bf3f2f86117d..95acc069c228 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -156,11 +156,15 @@ enum MetadataKey { WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING), // Needed so that we can track query id in Netty channel response. QUERY_ID(41, "queryId", MetadataValueType.STRING), - EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING); + EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING), + // Set on a merged-only DataTable when one or more input server DataTables were dropped during the + // merge (e.g., due to a schema conflict). Signals to a downstream consumer that the merge is + // partial; how to react (skip, retry, accept with annotation) is the consumer's policy. + PARTIAL_INTERMEDIATE_RESULT(43, "partialIntermediateResult", MetadataValueType.STRING); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = EARLY_TERMINATION_REASON.getId(); + private static final int MAX_ID = PARTIAL_INTERMEDIATE_RESULT.getId(); private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java new file mode 100644 index 000000000000..d111ba214284 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.datatable; + +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.floats.FloatArrayList; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.pinot.common.utils.ArrayListUtils; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.spi.utils.ByteArray; + + +/** + * Helpers for writing values into a {@link DataTableBuilder}. + */ +public class DataTableBuilderUtils { + private DataTableBuilderUtils() { + } + + /** + * Writes a non-null value of the given stored column data type into the {@link DataTableBuilder} at + * the given column. Supports all scalar and array stored types. OBJECT columns are NOT handled here + * (callers serialize them via the owning aggregation function). Used by the group-by result + * serialization on both the server ({@code GroupByResultsBlock}) and the merge-only reduce path. + */ + public static void setColumn(DataTableBuilder dataTableBuilder, ColumnDataType storedColumnDataType, + int columnIndex, Object value) + throws IOException { + switch (storedColumnDataType) { + case INT: + dataTableBuilder.setColumn(columnIndex, (int) value); + break; + case LONG: + dataTableBuilder.setColumn(columnIndex, (long) value); + break; + case FLOAT: + dataTableBuilder.setColumn(columnIndex, (float) value); + break; + case DOUBLE: + dataTableBuilder.setColumn(columnIndex, (double) value); + break; + case BIG_DECIMAL: + dataTableBuilder.setColumn(columnIndex, (BigDecimal) value); + break; + case STRING: + dataTableBuilder.setColumn(columnIndex, value.toString()); + break; + case BYTES: + dataTableBuilder.setColumn(columnIndex, (ByteArray) value); + break; + case INT_ARRAY: + if (value instanceof IntArrayList) { + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toIntArray((IntArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (int[]) value); + } + break; + case LONG_ARRAY: + if (value instanceof LongArrayList) { + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toLongArray((LongArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (long[]) value); + } + break; + case FLOAT_ARRAY: + if (value instanceof FloatArrayList) { + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toFloatArray((FloatArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (float[]) value); + } + break; + case DOUBLE_ARRAY: + if (value instanceof DoubleArrayList) { + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toDoubleArray((DoubleArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (double[]) value); + } + break; + case BIG_DECIMAL_ARRAY: + if (value instanceof ObjectArrayList) { + //noinspection unchecked + dataTableBuilder.setColumn(columnIndex, + ArrayListUtils.toBigDecimalArray((ObjectArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value); + } + break; + case STRING_ARRAY: + if (value instanceof ObjectArrayList) { + //noinspection unchecked + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toStringArray((ObjectArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (String[]) value); + } + break; + case BYTES_ARRAY: + if (value instanceof ObjectArrayList) { + //noinspection unchecked + dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toBytesArray((ObjectArrayList) value)); + } else { + dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value); + } + break; + default: + throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java index 8f493869e3bc..c28e2a8faa2c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java @@ -146,7 +146,7 @@ public DataTable getDataTable() nullBitmaps[i].add(0); } assert result != null; - setIntermediateResult(dataTableBuilder, columnDataTypes, i, result); + AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result); } } } @@ -174,7 +174,7 @@ public DataTable getDataTable() if (columnDataTypes[i] == ColumnDataType.OBJECT) { dataTableBuilder.setColumn(i, _aggregationFunctions[i].serializeIntermediateResult(result)); } else { - setIntermediateResult(dataTableBuilder, columnDataTypes, i, result); + AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result); } } } @@ -184,36 +184,6 @@ public DataTable getDataTable() return dataTableBuilder.build(); } - private void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index, - Object result) throws IOException { - ColumnDataType columnDataType = columnDataTypes[index]; - switch (columnDataType) { - case INT: - dataTableBuilder.setColumn(index, (int) result); - break; - case LONG: - dataTableBuilder.setColumn(index, (long) result); - break; - case FLOAT: - dataTableBuilder.setColumn(index, (float) result); - break; - case DOUBLE: - dataTableBuilder.setColumn(index, (double) result); - break; - case BIG_DECIMAL: - dataTableBuilder.setColumn(index, (BigDecimal) result); - break; - case STRING: - dataTableBuilder.setColumn(index, result.toString()); - break; - case BYTES: - dataTableBuilder.setColumn(index, (ByteArray) result); - break; - default: - throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType); - } - } - private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index, Object result) throws IOException { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index ea1e9bab31f0..5baac68f0994 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -18,13 +18,7 @@ */ package org.apache.pinot.core.operator.blocks.results; -import it.unimi.dsi.fastutil.doubles.DoubleArrayList; -import it.unimi.dsi.fastutil.floats.FloatArrayList; -import it.unimi.dsi.fastutil.ints.IntArrayList; -import it.unimi.dsi.fastutil.longs.LongArrayList; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -33,11 +27,11 @@ import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.utils.ArrayListUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.Table; @@ -45,7 +39,6 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.spi.query.QueryThreadContext; -import org.apache.pinot.spi.utils.ByteArray; import org.roaringbitmap.RoaringBitmap; @@ -243,7 +236,7 @@ public DataTable getDataTable() nullBitmaps[i].add(rowId); } assert value != null; - setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, value); + DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value); } } dataTableBuilder.finishRow(); @@ -265,7 +258,7 @@ public DataTable getDataTable() } else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) { dataTableBuilder.setColumn(i, aggregationFunctions[i - numKeyColumns].serializeIntermediateResult(value)); } else { - setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, value); + DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value); } } dataTableBuilder.finishRow(); @@ -275,88 +268,6 @@ public DataTable getDataTable() return dataTableBuilder.build(); } - private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBuilder dataTableBuilder, - int columnIndex, Object value) - throws IOException { - switch (storedColumnDataType) { - case INT: - dataTableBuilder.setColumn(columnIndex, (int) value); - break; - case LONG: - dataTableBuilder.setColumn(columnIndex, (long) value); - break; - case FLOAT: - dataTableBuilder.setColumn(columnIndex, (float) value); - break; - case DOUBLE: - dataTableBuilder.setColumn(columnIndex, (double) value); - break; - case BIG_DECIMAL: - dataTableBuilder.setColumn(columnIndex, (BigDecimal) value); - break; - case STRING: - dataTableBuilder.setColumn(columnIndex, value.toString()); - break; - case BYTES: - dataTableBuilder.setColumn(columnIndex, (ByteArray) value); - break; - case INT_ARRAY: - if (value instanceof IntArrayList) { - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toIntArray((IntArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (int[]) value); - } - break; - case LONG_ARRAY: - if (value instanceof LongArrayList) { - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toLongArray((LongArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (long[]) value); - } - break; - case FLOAT_ARRAY: - if (value instanceof FloatArrayList) { - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toFloatArray((FloatArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (float[]) value); - } - break; - case DOUBLE_ARRAY: - if (value instanceof DoubleArrayList) { - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toDoubleArray((DoubleArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (double[]) value); - } - break; - case BIG_DECIMAL_ARRAY: - if (value instanceof ObjectArrayList) { - //noinspection unchecked - dataTableBuilder.setColumn(columnIndex, - ArrayListUtils.toBigDecimalArray((ObjectArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value); - } - break; - case STRING_ARRAY: - if (value instanceof ObjectArrayList) { - //noinspection unchecked - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toStringArray((ObjectArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (String[]) value); - } - break; - case BYTES_ARRAY: - if (value instanceof ObjectArrayList) { - //noinspection unchecked - dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toBytesArray((ObjectArrayList) value)); - } else { - dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value); - } - break; - default: - throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType); - } - } @Override public Map getResultsMetadata() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java index 010510d485e8..a9e6f4393204 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java @@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOException; +import java.math.BigDecimal; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +43,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.filter.BaseFilterOperator; @@ -54,6 +57,7 @@ import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.spi.utils.ByteArray; /** @@ -168,6 +172,41 @@ public static Object getIntermediateResult(AggregationFunction aggregationFuncti } } + /** + * Writes a non-OBJECT intermediate result into the {@link DataTableBuilder} at the given column. + * Counterpart of {@link #getIntermediateResult}. OBJECT columns are handled by the caller via + * {@link AggregationFunction#serializeIntermediateResult}, since they need the aggregation function. + */ + public static void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType columnDataType, int colId, + Object result) + throws IOException { + switch (columnDataType) { + case INT: + dataTableBuilder.setColumn(colId, (int) result); + break; + case LONG: + dataTableBuilder.setColumn(colId, (long) result); + break; + case FLOAT: + dataTableBuilder.setColumn(colId, (float) result); + break; + case DOUBLE: + dataTableBuilder.setColumn(colId, (double) result); + break; + case BIG_DECIMAL: + dataTableBuilder.setColumn(colId, (BigDecimal) result); + break; + case STRING: + dataTableBuilder.setColumn(colId, result.toString()); + break; + case BYTES: + dataTableBuilder.setColumn(colId, (ByteArray) result); + break; + default: + throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType); + } + } + /** * Reads the final result from the {@link DataTable}. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java index 8f105d6af714..cc1b2e809c89 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.reduce; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -28,6 +29,8 @@ import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.QueryContext; @@ -83,6 +86,24 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, private void reduceWithIntermediateResult(DataSchema dataSchema, Collection dataTables, BrokerResponseNative brokerResponseNative) { int numAggregationFunctions = _aggregationFunctions.length; + Object[] intermediateResults = mergeIntermediateResults(dataSchema, dataTables); + Object[] finalResults = new Object[numAggregationFunctions]; + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunction aggregationFunction = _aggregationFunctions[i]; + Comparable result = aggregationFunction.extractFinalResult(intermediateResults[i]); + finalResults[i] = result != null ? aggregationFunction.getFinalResultColumnType().convert(result) : null; + } + brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema), finalResults)); + } + + /** + * Merges the per-server intermediate aggregation results into a single {@code Object[]} of merged + * intermediate results (one per aggregation function), WITHOUT finalizing. Shared by the normal + * reduce path ({@link #reduceWithIntermediateResult}) and the merge-only path + * ({@link #mergeDataTablesOnly}). + */ + private Object[] mergeIntermediateResults(DataSchema dataSchema, Collection dataTables) { + int numAggregationFunctions = _aggregationFunctions.length; Object[] intermediateResults = new Object[numAggregationFunctions]; for (DataTable dataTable : dataTables) { QueryThreadContext.checkTerminationAndSampleUsage("AggregationDataTableReducer"); @@ -110,13 +131,88 @@ private void reduceWithIntermediateResult(DataSchema dataSchema, Collection dataTableMap, DataTableReducerContext reducerContext, + BrokerMetrics brokerMetrics) { + // When servers are configured to return final aggregate state, the input DataTables hold final + // (not intermediate) values, so the merge-only contract — "produce an intermediate DataTable that + // can be re-merged via the normal reduce path" — cannot be honored. + if (_queryContext.isServerReturnFinalResult()) { + throw new UnsupportedOperationException( + "Merge-only reduction is not supported when servers return final aggregate results " + + "(server.returnFinalResult / isServerReturnFinalResult); input would be final-typed, " + + "not intermediate."); } - brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema), finalResults)); + dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, dataSchema); + try { + // No data for this bucket: emit an empty (0-row) intermediate DataTable carrying the schema. + if (dataTableMap.isEmpty()) { + return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); + } + Object[] intermediateResults = mergeIntermediateResults(dataSchema, dataTableMap.values()); + return buildIntermediateDataTable(dataSchema, intermediateResults); + } catch (IOException e) { + throw new RuntimeException("Caught IOException while building merged intermediate DataTable for aggregation", e); + } + } + + /** + * Serializes the merged intermediate results into a single-row intermediate {@link DataTable}, + * mirroring the non-final branch of {@code AggregationResultsBlock#getDataTable()} so the output is + * byte-shape identical to a single server's intermediate response. Never finalizes. + */ + private DataTable buildIntermediateDataTable(DataSchema dataSchema, Object[] intermediateResults) + throws IOException { + ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); + int numColumns = columnDataTypes.length; + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + if (_queryContext.isNullHandlingEnabled()) { + RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns]; + for (int i = 0; i < numColumns; i++) { + nullBitmaps[i] = new RoaringBitmap(); + } + dataTableBuilder.startRow(); + for (int i = 0; i < numColumns; i++) { + Object result = intermediateResults[i]; + if (columnDataTypes[i] == ColumnDataType.OBJECT) { + if (result == null) { + dataTableBuilder.setNull(i); + } else { + dataTableBuilder.setColumn(i, _aggregationFunctions[i].serializeIntermediateResult(result)); + } + } else { + if (result == null) { + result = columnDataTypes[i].getNullPlaceholder(); + nullBitmaps[i].add(0); + } + AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result); + } + } + dataTableBuilder.finishRow(); + for (RoaringBitmap nullBitmap : nullBitmaps) { + dataTableBuilder.setNullRowIds(nullBitmap); + } + } else { + dataTableBuilder.startRow(); + for (int i = 0; i < numColumns; i++) { + Object result = intermediateResults[i]; + if (result == null) { + dataTableBuilder.setNull(i); + } else { + if (columnDataTypes[i] == ColumnDataType.OBJECT) { + dataTableBuilder.setColumn(i, _aggregationFunctions[i].serializeIntermediateResult(result)); + } else { + AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result); + } + } + } + dataTableBuilder.finishRow(); + } + return dataTableBuilder.build(); } private void processSingleFinalResult(DataSchema dataSchema, DataTable dataTable, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index 07dd8300ee0e..774754ec6a05 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.BrokerMeter; @@ -75,46 +76,11 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace); BrokerResponseNative brokerResponseNative = new BrokerResponseNative(); - // Cache a data schema from data tables (try to cache one with data rows associated with it). - DataSchema dataSchemaFromEmptyDataTable = null; - DataSchema dataSchemaFromNonEmptyDataTable = null; + // Process server response metadata, drop empty/null-schema/conflicting-schema data tables, and pick + // a data schema (preferring one backed by data rows). List serversWithConflictingDataSchema = new ArrayList<>(); - - // Process server response metadata. - Iterator> iterator = dataTableMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - DataTable dataTable = entry.getValue(); - - // aggregate metrics - aggregator.aggregate(entry.getKey(), dataTable); - - // After processing the metadata, remove data tables without data rows inside. - DataSchema dataSchema = dataTable.getDataSchema(); - if (dataSchema == null) { - iterator.remove(); - } else { - // Try to cache a data table with data rows inside, or cache one with data schema inside. - if (dataTable.getNumberOfRows() == 0) { - if (dataSchemaFromEmptyDataTable == null) { - dataSchemaFromEmptyDataTable = dataSchema; - } - iterator.remove(); - } else { - if (dataSchemaFromNonEmptyDataTable == null) { - dataSchemaFromNonEmptyDataTable = dataSchema; - } else { - // Remove data tables with conflicting data schema. - // NOTE: Only compare the column data types, since the column names (string representation of expression) - // can change across different versions. - if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) { - serversWithConflictingDataSchema.add(entry.getKey()); - iterator.remove(); - } - } - } - } - } + DataSchema cachedDataSchema = + filterDataTablesAndPickSchema(dataTableMap, aggregator, serversWithConflictingDataSchema); String tableName = serverBrokerRequest.getQuerySource().getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableName); @@ -142,34 +108,17 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke // NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the // response with metadata only. - DataSchema cachedDataSchema = - dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable; if (cachedDataSchema == null) { return brokerResponseNative; } QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery()); DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext); - - Integer minGroupTrimSizeQueryOption = null; - Integer groupTrimThresholdQueryOption = null; - Integer minInitialIndexedTableCapacityQueryOption = null; - if (queryOptions != null) { - minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions); - groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions); - minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions); - } - int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize; - int groupTrimThreshold = - groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold; - int minInitialIndexedTableCapacity = - minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption - : _minInitialIndexedTableCapacity; + DataTableReducerContext reducerContext = createReducerContext(queryOptions, reduceTimeOutMs); try { dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative, - new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, - groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics); + reducerContext, brokerMetrics); } catch (RuntimeException e) { // First check terminate exception and use it as the results block if exists. We want to return the termination // reason when query is explicitly terminated. @@ -227,6 +176,176 @@ private static boolean isMaterializedViewRewrite(BrokerRequest serverBrokerReque serverBrokerRequest.getPinotQuery().getQueryOptions().get(QueryOptionKey.MATERIALIZED_VIEW_REWRITE)); } + /** + * Merge-only counterpart of {@link #reduceOnDataTable}: merges the per-server DataTables into a single + * intermediate {@link DataTable} WITHOUT finalizing (no {@code extractFinalResult}). Returns + * {@code null} when there is nothing to merge (empty map, or all servers returned no data / errored). + * Reuses the same schema-filtering preamble and reducer/trim resolution as the regular reduce, but + * does NOT aggregate per-server execution stats (the merged intermediate does not carry them) and + * skips the nested-query / gapfill / alias handling (those query shapes are out of scope for + * merge-only). + * + *

The returned DataTable carries intermediate, non-finalized state (byte-shape identical to a + * single server's partial response), so a downstream consumer can intercept it and custom handle + * the intermediate results. + * + *

If one or more input server DataTables are dropped during merge (e.g., due to a schema + * conflict with the first non-empty table), the returned DataTable's metadata carries the + * {@link DataTable.MetadataKey#PARTIAL_INTERMEDIATE_RESULT} flag set to {@code "true"} and the + * {@link BrokerMeter#RESPONSE_MERGE_EXCEPTIONS} meter is incremented. This is symmetric with how + * the regular reduce path surfaces conflicting-schema servers via a response exception and the + * same meter; downstream consumers can read the flag and decide policy (skip, retry, accept). + * + *

Execution stats from the input DataTables are aggregated via {@link ExecutionStatsAggregator} + * and written back onto the merged DataTable: additive longs (e.g. {@code numDocsScanned}, + * {@code numSegments*}, {@code threadCpuTimeNs}), {@code minConsumingFreshnessTimeMs} (MIN-reduced), + * boolean flags ({@code groupsTrimmed}, {@code numGroupsLimitReached}, etc., OR-reduced), + * per-server exceptions, and trace info (JSON-encoded if {@code trace=true}). Unlike the regular + * reduce path, this method does NOT bump broker meters/timers for the input stats — those will + * fire when the result is eventually re-reduced through {@link #reduceOnDataTable}, so a consumer + * that uses both APIs sees one set of increments per logical query, not two. + * + *

Limitations of the round-trip: + *

    + *
  • CPU and memory stats round-trip as a single combined value per key (the wire format has + * no per-tableType variants). On a re-reduce, the downstream aggregator dumps the whole + * value into one bucket — whichever tableType the caller assigned to the synthetic + * server response — so the offline-vs-realtime split surfaced on {@link + * BrokerResponseNative} is lost across the round-trip, even though the total is preserved. + *
  • Exception attribution to original servers is lost; the wire format is {@code Map} so collisions on the same error code are resolved last-write-wins. + *
  • Per-server trace info is JSON-encoded into a single {@code TRACE_INFO} entry; the + * downstream aggregator reads it back as one trace blob under the synthetic server's name. + *
+ * + *

WARNING: this performs a full cross-server merge and re-serializes the result — heavyweight work + * that must be run asynchronously, decoupled from request serving. Invoking it inline while a query is + * being served can severely degrade its latency. Intended for downstream consumers that want to + * intercept the merged intermediate result instead of the finalized one. + * + *

[org.apache.pinot.spi.query.QueryThreadContext] must already be set up before calling this method. + */ + @Nullable + public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest, + Map dataTableMap, long reduceTimeOutMs, BrokerMetrics brokerMetrics) { + if (dataTableMap.isEmpty()) { + return null; + } + Map queryOptions = serverBrokerRequest.getPinotQuery().getQueryOptions(); + boolean enableTrace = + queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); + // Aggregate stats while filtering so we can write them back onto the merged DataTable's metadata. + // Aggregator.aggregate() runs BEFORE filterDataTablesAndPickSchema removes any entry, matching + // reduceOnDataTable: empty / conflicting-schema servers' stats are still counted. + ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace); + List conflictingServers = new ArrayList<>(); + DataSchema cachedDataSchema = filterDataTablesAndPickSchema(dataTableMap, aggregator, conflictingServers); + if (cachedDataSchema == null) { + // All servers returned no data or encountered exceptions; nothing to merge. + return null; + } + + String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName()); + QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery()); + DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext); + DataTableReducerContext reducerContext = + createReducerContext(serverBrokerRequest.getPinotQuery().getQueryOptions(), reduceTimeOutMs); + DataTable merged = dataTableReducer.mergeDataTablesOnly(rawTableName, cachedDataSchema, dataTableMap, + reducerContext, brokerMetrics); + + if (merged != null) { + // Write accumulated stats (additive longs, booleans, MIN freshness, exceptions, trace) onto the + // merged DataTable so it round-trips through reduceOnDataTable. Unlike setStats() this does NOT + // bump broker meters — those will fire when the result is eventually re-reduced. + aggregator.setStatsOnMergedDataTable(merged); + + // Symmetric with reduceOnDataTable: surface conflicting-schema drops via meter + warn + a metadata + // flag on the merged DataTable so downstream consumers can detect a partial merge. + if (!conflictingServers.isEmpty()) { + LOGGER.warn("Merge-only reduce dropped {} server response(s) for table {} due to data schema " + + "inconsistency: {}", conflictingServers.size(), rawTableName, conflictingServers); + brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1); + merged.getMetadata().put(DataTable.MetadataKey.PARTIAL_INTERMEDIATE_RESULT.getName(), "true"); + } + } + return merged; + } + + /** + * Processes per-server response metadata and filters {@code dataTableMap} in place: drops tables with a + * null schema, drops empty tables (remembering their schema as a fallback), and drops tables whose + * column data types conflict with the first non-empty table (collected into {@code conflictingServers}). + * When an {@code aggregator} is provided, per-table execution stats are aggregated before a table is + * dropped. Returns the remembered data schema (non-empty preferred, else empty-table schema, else + * {@code null}). + */ + private static DataSchema filterDataTablesAndPickSchema(Map dataTableMap, + @Nullable ExecutionStatsAggregator aggregator, List conflictingServers) { + DataSchema dataSchemaFromEmptyDataTable = null; + DataSchema dataSchemaFromNonEmptyDataTable = null; + Iterator> iterator = dataTableMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + DataTable dataTable = entry.getValue(); + + // aggregate metrics + if (aggregator != null) { + aggregator.aggregate(entry.getKey(), dataTable); + } + + // After processing the metadata, remove data tables without data rows inside. + DataSchema dataSchema = dataTable.getDataSchema(); + if (dataSchema == null) { + iterator.remove(); + } else { + // Try to cache a data table with data rows inside, or cache one with data schema inside. + if (dataTable.getNumberOfRows() == 0) { + if (dataSchemaFromEmptyDataTable == null) { + dataSchemaFromEmptyDataTable = dataSchema; + } + iterator.remove(); + } else { + if (dataSchemaFromNonEmptyDataTable == null) { + dataSchemaFromNonEmptyDataTable = dataSchema; + } else { + // Remove data tables with conflicting data schema. + // NOTE: Only compare the column data types, since the column names (string representation of expression) + // can change across different versions. + if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) { + conflictingServers.add(entry.getKey()); + iterator.remove(); + } + } + } + } + } + return dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable; + } + + /** + * Resolves the group-by trim parameters (query option overrides, else broker defaults) and builds the + * {@link DataTableReducerContext}. Shared by the regular reduce and the merge-only path. + */ + private DataTableReducerContext createReducerContext(@Nullable Map queryOptions, + long reduceTimeOutMs) { + Integer minGroupTrimSizeQueryOption = null; + Integer groupTrimThresholdQueryOption = null; + Integer minInitialIndexedTableCapacityQueryOption = null; + if (queryOptions != null) { + minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions); + groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions); + minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions); + } + int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize; + int groupTrimThreshold = + groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold; + int minInitialIndexedTableCapacity = + minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption + : _minInitialIndexedTableCapacity; + return new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, + groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity); + } + public void shutDown() { _reduceExecutorService.shutdownNow(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java index 496df03da850..3ce5e3939a94 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java @@ -42,4 +42,35 @@ public interface DataTableReducer { */ void reduceAndSetResults(String tableName, DataSchema dataSchema, Map dataTableMap, BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics); + + /** + * Merges per-server data tables into a single intermediate {@link DataTable} WITHOUT + * finalizing (no {@code extractFinalResult} / result formatting). The returned DataTable carries + * intermediate state byte-shape identical to a single server's partial response, so a consumer can + * intercept the merged intermediate results and custom handle it. It is expected that the merged + * intermediate result can be reinjected in the normal reduce path. + * + *

WARNING: this performs a full cross-server merge and re-serializes the result — + * heavyweight work that must be run asynchronously, decoupled from request serving. Invoking it + * inline while a query is being served adds that cost to the query and can severely degrade its + * latency. + * + *

Reducers that cannot produce a re-mergeable intermediate (e.g. explain-plan) leave this default + * implementation, which throws {@link UnsupportedOperationException}. Aggregation and group-by + * reducers also throw when the query is configured for server-side final-result return + * ({@code server.returnFinalResult}) because the inputs are then finalized, not intermediate. + * + * @param tableName table name + * @param dataSchema schema from broker reduce service + * @param dataTableMap map of servers to data tables + * @param reducerContext DataTableReducer context + * @param brokerMetrics broker metrics + * @return the merged intermediate DataTable (intermediate, non-finalized state) + */ + default DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, + Map dataTableMap, DataTableReducerContext reducerContext, + BrokerMetrics brokerMetrics) { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support merge-only reduction"); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java index dab95c232275..fe8ac886d269 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.core.query.reduce; +import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.pinot.common.datatable.DataTable; @@ -27,6 +29,7 @@ import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable; import org.apache.pinot.core.query.distinct.table.BytesDistinctTable; import org.apache.pinot.core.query.distinct.table.DistinctTable; @@ -61,8 +64,35 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, brokerResponseNative.setResultTable(new ResultTable(dataSchema, List.of())); return; } + DistinctTable distinctTable = mergeToDistinctTable(dataSchema, dataTableMap.values()); + brokerResponseNative.setResultTable(distinctTable.toResultTable()); + } + + @Override + public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, + Map dataTableMap, DataTableReducerContext reducerContext, + BrokerMetrics brokerMetrics) { + dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext, dataSchema); + try { + // No data for this bucket (or LIMIT 0): emit an empty intermediate DataTable carrying the schema. + if (dataTableMap.isEmpty() || _queryContext.getLimit() == 0) { + return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); + } + DistinctTable distinctTable = mergeToDistinctTable(dataSchema, dataTableMap.values()); + // Intermediate form: limit/sorting not applied; re-mergeable on a later reduce. + return distinctTable.toDataTable(); + } catch (IOException e) { + throw new RuntimeException("Caught IOException while building merged intermediate DataTable for distinct", e); + } + } + + /** + * Merges the per-server DataTables into a single {@link DistinctTable} (early-stopping once the + * distinct set is satisfied). Shared by the normal reduce path and the merge-only path. + */ + private DistinctTable mergeToDistinctTable(DataSchema dataSchema, Collection dataTables) { DistinctTable distinctTable = null; - for (DataTable dataTable : dataTableMap.values()) { + for (DataTable dataTable : dataTables) { QueryThreadContext.checkTerminationAndSampleUsage("DistinctDataTableReducer"); if (distinctTable == null) { distinctTable = createDistinctTable(dataSchema, dataTable); @@ -75,7 +105,7 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, } } } - brokerResponseNative.setResultTable(distinctTable.toResultTable()); + return distinctTable; } private DistinctTable createDistinctTable(DataSchema dataSchema, DataTable dataTable) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 3ca72dd04e6e..e21d56c63d43 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.reduce; +import com.fasterxml.jackson.core.JsonProcessingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.JsonUtils; public class ExecutionStatsAggregator { @@ -346,4 +348,107 @@ private void withNotNullLongMetadata(Map metadata, DataTable.Met consumer.accept(Long.parseLong(strValue)); } } + + /** + * Writes the accumulated execution stats onto the given DataTable's metadata (and exception map), + * so a merged-only DataTable can be re-injected into the regular reduce path with the same + * downstream totals as a direct reduce of the original inputs would have produced. + * + *

Unlike {@link #setStats(String, BrokerResponseNative, BrokerMetrics)}, this method does NOT + * bump broker meters or timers. The merge-only path is expected to run off the request-serving + * path; meter increments fire when the result is eventually re-reduced. + * + *

Limitations of the round-trip via DataTable metadata: + *

    + *
  • CPU and memory stats round-trip as a single combined value per key + * ({@link DataTable.MetadataKey#THREAD_CPU_TIME_NS}, etc.) because the wire format has no + * per-tableType keys. In the standard reduce path the aggregator attributes each server's + * value to offline vs realtime based on {@code routingInstance.getTableType()} and surfaces + * them as separate fields on {@link BrokerResponseNative}; on a re-reduce of the merged + * DataTable the whole combined value lands in one bucket — whichever tableType the caller + * assigned to the synthetic server response. So the per-tableType split visible on + * BrokerResponse is lost across the round-trip, even though the total is preserved. + *
  • Per-server exceptions are written via {@link DataTable#addException(int, String)} which + * backs a {@code Map} keyed by error code; if two inputs reported the + * same error code the merged DataTable carries last-write-wins for the message. + *
  • Per-server trace info is JSON-encoded into a single + * {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream aggregator reads it back + * as one trace blob attributed to the synthetic server. + *
+ */ + public void setStatsOnMergedDataTable(DataTable dataTable) { + Map metadata = dataTable.getMetadata(); + + // Additive long stats: mirror setStats()'s pattern of unconditional writes. Accumulators are + // initialized to 0, so a 0 here is indistinguishable to downstream from "absent" — the + // downstream aggregator's Long.parseLong("0") + null-check both produce 0. + putLong(metadata, DataTable.MetadataKey.NUM_DOCS_SCANNED, _numDocsScanned); + putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER, _numEntriesScannedInFilter); + putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER, _numEntriesScannedPostFilter); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_QUERIED, _numSegmentsQueried); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED, _numSegmentsProcessed); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_MATCHED, _numSegmentsMatched); + putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED, _numConsumingSegmentsQueried); + putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED, _numConsumingSegmentsProcessed); + putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED, _numConsumingSegmentsMatched); + putLong(metadata, DataTable.MetadataKey.TOTAL_DOCS, _numTotalDocs); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER, _numSegmentsPrunedByServer); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, _numSegmentsPrunedInvalid); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, _numSegmentsPrunedByLimit); + putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, _numSegmentsPrunedByValue); + putLong(metadata, DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS, + _explainPlanNumEmptyFilterSegments); + putLong(metadata, DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS, + _explainPlanNumMatchAllFilterSegments); + // Collapse offline+realtime decomposition back to the combined wire-format keys. + putLong(metadata, DataTable.MetadataKey.THREAD_CPU_TIME_NS, + _offlineThreadCpuTimeNs + _realtimeThreadCpuTimeNs); + putLong(metadata, DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS, + _offlineSystemActivitiesCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs); + putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS, + _offlineResponseSerializationCpuTimeNs + _realtimeResponseSerializationCpuTimeNs); + putLong(metadata, DataTable.MetadataKey.THREAD_MEM_ALLOCATED_BYTES, + _offlineThreadMemAllocatedBytes + _realtimeThreadMemAllocatedBytes); + putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES, + _offlineResponseSerMemAllocatedBytes + _realtimeResponseSerMemAllocatedBytes); + + // MIN_CONSUMING_FRESHNESS_TIME_MS: sentinel-guarded. Long.MAX_VALUE means "no input had a real + // freshness reading"; writing the sentinel would mislead downstream observability. + if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) { + metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), + Long.toString(_minConsumingFreshnessTimeMs)); + } + + // Boolean flags: OR-reduced; only write the key when true (a "false" entry is noise and the + // existing reduce path treats absent as false). + if (_groupsTrimmed) { + metadata.put(DataTable.MetadataKey.GROUPS_TRIMMED.getName(), "true"); + } + if (_numGroupsLimitReached) { + metadata.put(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true"); + } + if (_numGroupsWarningLimitReached) { + metadata.put(DataTable.MetadataKey.NUM_GROUPS_WARNING_LIMIT_REACHED.getName(), "true"); + } + + // Exceptions: copy each accumulated exception onto the DataTable. Last-write-wins on error-code + // collision (wire format is Map). + for (QueryProcessingException qpe : _processingExceptions) { + dataTable.addException(qpe.getErrorCode(), qpe.getMessage()); + } + + // Trace: JSON-encode the per-server map into a single TRACE_INFO metadata entry. On downstream + // readback the aggregator reads it as one string under the synthetic server's name. + if (_enableTrace && !_traceInfo.isEmpty()) { + try { + metadata.put(DataTable.MetadataKey.TRACE_INFO.getName(), JsonUtils.objectToString(_traceInfo)); + } catch (JsonProcessingException e) { + throw new IllegalStateException("Failed to serialize trace info for merged DataTable", e); + } + } + } + + private static void putLong(Map metadata, DataTable.MetadataKey key, long value) { + metadata.put(key.getName(), Long.toString(value)); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 942544dab507..9cd040f9ce90 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; @@ -37,6 +38,7 @@ import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.Utils; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -46,6 +48,9 @@ import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -120,6 +125,52 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, } } + @Override + public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, + Map dataTableMap, DataTableReducerContext reducerContext, + BrokerMetrics brokerMetrics) { + // When servers are configured to return final aggregate state, the input DataTables hold final + // (not intermediate) values, so the merge-only contract — "produce an intermediate DataTable that + // can be re-merged via the normal reduce path" — cannot be honored. + if (_queryContext.isServerReturnFinalResult()) { + throw new UnsupportedOperationException( + "Merge-only reduction is not supported when servers return final aggregate results " + + "(server.returnFinalResult / isServerReturnFinalResult); input would be final-typed, " + + "not intermediate."); + } + dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext, dataSchema); + try { + // No data for this bucket: emit an empty (0-row) intermediate DataTable carrying the schema. + if (dataTableMap.isEmpty()) { + return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); + } + Collection dataTables = dataTableMap.values(); + // Reuse the regular reduce's merge: builds the IndexedTable of group keys + intermediate agg state. + IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext); + DataTable mergedDataTable = buildIntermediateDataTable(dataSchema, indexedTable); + // Surface completeness flags so a downstream consumer can decide whether the merged intermediate + // is complete enough to use. No skip policy is enforced here. + if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) { + mergedDataTable.getMetadata().put(MetadataKey.GROUPS_TRIMMED.getName(), "true"); + } + if (anyNumGroupsLimitReached(dataTables)) { + mergedDataTable.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true"); + } + return mergedDataTable; + } catch (IOException e) { + throw new RuntimeException("Caught IOException while building merged intermediate DataTable for group-by", e); + } + } + + private static boolean anyNumGroupsLimitReached(Collection dataTables) { + for (DataTable dataTable : dataTables) { + if (Boolean.parseBoolean(dataTable.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()))) { + return true; + } + } + return false; + } + /// Reduces group-by results into a [ResultTable] and set it into the [BrokerResponseNative]. private void reduceResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema, Collection dataTables, DataTableReducerContext reducerContext, String rawTableName, @@ -482,4 +533,74 @@ private Object getConvertedKey(DataTable dataTable, ColumnDataType columnDataTyp throw new IllegalStateException("Illegal column data type in group key: " + columnDataType); } } + + /** + * Serializes the merged {@link IndexedTable} back into an intermediate {@link DataTable}, mirroring + * {@code GroupByResultsBlock#getDataTable()} so the output is byte-shape identical to a single + * server's intermediate group-by response. Group-key columns are written by stored type; OBJECT + * aggregate columns via {@link AggregationFunction#serializeIntermediateResult}. No limit / HAVING / + * post-aggregation / formatting is applied. + */ + private DataTable buildIntermediateDataTable(DataSchema dataSchema, IndexedTable indexedTable) + throws IOException { + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); + Iterator iterator = indexedTable.iterator(); + if (_queryContext.isNullHandlingEnabled()) { + RoaringBitmap[] nullBitmaps = new RoaringBitmap[_numColumns]; + Object[] nullPlaceholders = new Object[_numColumns]; + for (int colId = 0; colId < _numColumns; colId++) { + nullBitmaps[colId] = new RoaringBitmap(); + nullPlaceholders[colId] = storedColumnDataTypes[colId].getNullPlaceholder(); + } + int rowId = 0; + while (iterator.hasNext()) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId, "GroupByDataTableReducer#merge"); + dataTableBuilder.startRow(); + Object[] values = iterator.next().getValues(); + for (int i = 0; i < _numColumns; i++) { + Object value = values[i]; + if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) { + if (value == null) { + dataTableBuilder.setNull(i); + } else { + dataTableBuilder.setColumn(i, + _aggregationFunctions[i - _numGroupByExpressions].serializeIntermediateResult(value)); + } + } else { + if (value == null) { + value = nullPlaceholders[i]; + nullBitmaps[i].add(rowId); + } + DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value); + } + } + dataTableBuilder.finishRow(); + rowId++; + } + for (RoaringBitmap nullBitmap : nullBitmaps) { + dataTableBuilder.setNullRowIds(nullBitmap); + } + } else { + int rowId = 0; + while (iterator.hasNext()) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId++, "GroupByDataTableReducer#merge"); + dataTableBuilder.startRow(); + Object[] values = iterator.next().getValues(); + for (int i = 0; i < _numColumns; i++) { + Object value = values[i]; + if (value == null) { + dataTableBuilder.setNull(i); + } else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) { + dataTableBuilder.setColumn(i, + _aggregationFunctions[i - _numGroupByExpressions].serializeIntermediateResult(value)); + } else { + DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value); + } + } + dataTableBuilder.finishRow(); + } + } + return dataTableBuilder.build(); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java new file mode 100644 index 000000000000..fa36cc27224c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.CommonConstants.Broker; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.roaringbitmap.RoaringBitmap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + + +/** + * Verifies the core merge-only contract: reducing the raw per-server DataTable map produces the same + * final result as reducing a single re-injected merged intermediate DataTable, i.e. + *
reduceOnDataTable(rawMap) == reduceOnDataTable({ syntheticKey -> mergeOnDataTable(rawMap) })
+ * for the supported reducers (Aggregation, Group-by, Distinct), including the OBJECT-column + * (DISTINCTCOUNT) round-trip. Also checks that the merged DataTable carries the intermediate schema and + * that unsupported reducers (Selection) throw. + */ +public class MergeDataTablesOnlyTest { + private static final long TIMEOUT_MS = 60_000L; + private BrokerReduceService _reduceService; + + @BeforeClass + public void setUp() { + _reduceService = + new BrokerReduceService(new PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + } + + @AfterClass + public void tearDown() { + _reduceService.shutDown(); + } + + @Test + public void testAggregationScalarRoundTrip() { + String query = "SELECT COUNT(*), SUM(m), MIN(m), MAX(m) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)", "min(m)", "max(m)"}, + new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); + List serverTables = List.of( + buildRow(schema, 3L, 10.0, 2.0, 8.0), + buildRow(schema, 2L, 5.0, 1.0, 9.0), + buildRow(schema, 4L, 20.0, 3.0, 7.0)); + assertRoundTrip(query, serverTables); + } + + @Test + public void testDistinctCountObjectRoundTrip() + throws IOException { + String query = "SELECT DISTINCTCOUNT(col1) FROM testTable"; + AggregationFunction aggFunction = aggFunctions(query)[0]; + DataSchema schema = + new DataSchema(new String[]{"distinctcount(col1)"}, new ColumnDataType[]{ColumnDataType.OBJECT}); + List serverTables = List.of( + buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{1, 2, 3})), + buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{3, 4, 5}))); + assertRoundTrip(query, serverTables); + } + + @Test + public void testGroupByRoundTrip() { + String query = "SELECT col1, COUNT(*), SUM(m) FROM testTable GROUP BY col1"; + DataSchema schema = new DataSchema(new String[]{"col1", "count(*)", "sum(m)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE}); + List serverTables = List.of( + buildGroupBy(schema, new Object[][]{{1, 2L, 10.0}, {2, 1L, 5.0}}), + buildGroupBy(schema, new Object[][]{{1, 3L, 20.0}, {3, 4L, 8.0}})); + assertRoundTrip(query, serverTables); + } + + @Test + public void testDistinctRoundTrip() { + String query = "SELECT DISTINCT col1 FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"col1"}, new ColumnDataType[]{ColumnDataType.INT}); + List serverTables = List.of( + buildGroupBy(schema, new Object[][]{{1}, {2}}), + buildGroupBy(schema, new Object[][]{{2}, {3}})); + assertRoundTrip(query, serverTables); + } + + @Test + public void testAggregationServerReturnFinalResultRejected() { + // Under server.returnFinalResult, the per-server DataTables hold finalized (not intermediate) + // aggregate state; the merge-only contract cannot be honored, so the reducer must throw rather + // than silently merge final-typed values as if they were intermediates. + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable"); + brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true"); + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + Map map = singletonMap(buildRow(schema, 5L)); + assertThrows(UnsupportedOperationException.class, () -> merge(brokerRequest, map)); + } + + @Test + public void testGroupByServerReturnFinalResultRejected() { + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest( + "SELECT col1, COUNT(*) FROM testTable GROUP BY col1"); + brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true"); + DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + Map map = singletonMap(buildGroupBy(schema, new Object[][]{{1, 2L}})); + assertThrows(UnsupportedOperationException.class, () -> merge(brokerRequest, map)); + } + + @Test + public void testGroupByLimitZeroRoundTrip() { + // LIMIT 0 on group-by: mergeDataTablesOnly does NOT apply LIMIT (intermediate is unlimited), + // so a non-empty intermediate DataTable is produced. The re-injected reduce path applies the + // LIMIT and produces an empty final result, matching the direct reduce. + String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1 LIMIT 0"; + DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + List serverTables = List.of( + buildGroupBy(schema, new Object[][]{{1, 2L}}), + buildGroupBy(schema, new Object[][]{{2, 3L}})); + assertRoundTrip(query, serverTables); + } + + @Test + public void testMergedDataTableCarriesIntermediateSchema() { + String query = "SELECT COUNT(*), SUM(m), DISTINCTCOUNT(col1) FROM testTable"; + AggregationFunction[] aggFunctions = aggFunctions(query); + // Build intermediate rows: COUNT->LONG, SUM->DOUBLE, DISTINCTCOUNT->OBJECT + DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)", "distinctcount(col1)"}, + new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, ColumnDataType.OBJECT}); + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(schema); + try { + builder.startRow(); + builder.setColumn(0, 5L); + builder.setColumn(1, 12.0); + builder.setColumn(2, aggFunctions[2].serializeIntermediateResult(new IntOpenHashSet(new int[]{1, 2}))); + builder.finishRow(); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map map = singletonMap(builder.build()); + + DataTable merged = merge(query, map); + assertNotNull(merged); + ColumnDataType[] mergedTypes = merged.getDataSchema().getColumnDataTypes(); + for (int i = 0; i < aggFunctions.length; i++) { + assertEquals(mergedTypes[i], aggFunctions[i].getIntermediateResultColumnType(), + "merged column " + i + " must carry the intermediate type, not the finalized type"); + } + } + + @Test + public void testEmptyMapReturnsNull() { + assertNull(merge("SELECT COUNT(*) FROM testTable", new HashMap<>())); + } + + @Test + public void testAllEmptyDataTablesReturnsNull() { + // A metadata-only (0-row, null schema) data table is dropped, leaving nothing to merge. + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable emptyMetadataOnly = DataTableBuilderFactory.getDataTableBuilder(schema).build().toMetadataOnlyDataTable(); + assertNull(merge("SELECT COUNT(*) FROM testTable", singletonMap(emptyMetadataOnly))); + } + + @Test + public void testSelectionIsUnsupported() { + String query = "SELECT col1 FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"col1"}, new ColumnDataType[]{ColumnDataType.INT}); + Map map = singletonMap(buildGroupBy(schema, new Object[][]{{1}, {2}})); + // SelectionDataTableReducer inherits the default-throwing mergeDataTablesOnly (selection is out of + // scope for merge-only reduction). + assertThrows(UnsupportedOperationException.class, () -> merge(query, map)); + } + + @Test + public void testNullHandlingAggregationRoundTrip() + throws IOException { + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT MIN(m) FROM testTable"); + brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, "true"); + DataSchema schema = new DataSchema(new String[]{"min(m)"}, new ColumnDataType[]{ColumnDataType.DOUBLE}); + // One server has a value, the other contributes a null (e.g. all rows filtered out). + List serverTables = List.of(buildNullableDouble(schema, 5.0), buildNullableDouble(schema, null)); + assertRoundTrip(brokerRequest, serverTables); + } + + @Test + public void testConflictingSchemaSurfacedAsPartialMergeResult() { + // When one server's column types conflict with the first non-empty table's, filterDataTablesAndPickSchema + // drops that server. The merge proceeds on the remaining servers and the returned DataTable carries + // the PARTIAL_INTERMEDIATE_RESULT flag so a downstream consumer can detect the partial result. + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schemaLong = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataSchema schemaInt = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.INT}); + DataTable t1 = buildRow(schemaLong, 5L); + DataTable t2 = buildRow(schemaLong, 7L); + // Conflicting column data type (INT vs LONG) — filterDataTablesAndPickSchema will drop t3. + DataTable t3 = buildRow(schemaInt, 99); + + DataTable merged = merge(query, toMap(List.of(t1, t2, t3))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.PARTIAL_INTERMEDIATE_RESULT.getName()), "true", + "PARTIAL_INTERMEDIATE_RESULT must be surfaced when a server is dropped due to schema conflict"); + } + + @Test + public void testAllSameSchemaDoesNotSetPartialMergeResult() { + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L), buildRow(schema, 7L)))); + assertNotNull(merged); + assertNull(merged.getMetadata().get(MetadataKey.PARTIAL_INTERMEDIATE_RESULT.getName()), + "PARTIAL_INTERMEDIATE_RESULT must not be set on a clean merge"); + } + + @Test + public void testAdditiveStatsAggregatedOntoMergedMetadata() { + // Two inputs carry numDocsScanned + threadCpuTimeNs in their metadata; the merged DataTable + // should sum them so a downstream re-reduce sees the same totals. + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + DataTable t2 = buildRow(schema, 7L); + t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100"); + t1.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "1000"); + t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250"); + t2.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "3000"); + + DataTable merged = merge(query, toMap(List.of(t1, t2))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), "350"); + assertEquals(merged.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()), "4000"); + } + + @Test + public void testAdditiveStatsAbsentFromAllInputsWriteZero() { + // Mirrors setStats()'s unconditional-write pattern: when none of the inputs carry an additive + // stats key, the merged DataTable carries "0" for it. Downstream's Long.parseLong("0") and + // null-check both yield 0, so the user-facing BrokerResponse is identical either way. + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L), buildRow(schema, 7L)))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), "0"); + } + + @Test + public void testMinConsumingFreshnessTimeMsTakesMin() { + // MIN_CONSUMING_FRESHNESS_TIME_MS semantically captures the WORST freshness across servers + // (used by FRESHNESS_LAG_MS); the merge must MIN-reduce, not SUM. + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + DataTable t2 = buildRow(schema, 7L); + t1.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), "1000"); + t2.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), "500"); + + DataTable merged = merge(query, toMap(List.of(t1, t2))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName()), "500"); + } + + @Test + public void testStatsFromDroppedZeroRowServersStillCounted() { + // A 0-row metadata-only DataTable is dropped from the merge inputs by filterDataTablesAndPickSchema, + // but its stats must still be aggregated (matches reduceOnDataTable's behavior — the aggregator + // runs BEFORE the iterator.remove()). + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable withData = buildRow(schema, 5L); + withData.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100"); + // A 0-row server (e.g. all rows filtered out) still reports stats. + DataTable emptyRows = DataTableBuilderFactory.getDataTableBuilder(schema).build(); + emptyRows.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "50"); + + DataTable merged = merge(query, toMap(List.of(withData, emptyRows))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), "150"); + } + + @Test + public void testAdditiveStatsRoundTripThroughReduce() { + // End-to-end: aggregating stats on the merged DataTable means a downstream reduce on the + // re-injected merged DataTable sees the same numDocsScanned the direct reduce would. + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable"); + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + DataTable t2 = buildRow(schema, 7L); + t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100"); + t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250"); + + BrokerResponseNative direct = reduce(brokerRequest, toMap(List.of(t1, t2))); + DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2))); + assertNotNull(merged); + BrokerResponseNative viaMerge = reduce(brokerRequest, singletonMap(merged)); + + assertEquals(viaMerge.getNumDocsScanned(), direct.getNumDocsScanned(), + "merged DataTable must carry numDocsScanned so a downstream reduce produces the same total"); + assertEquals(viaMerge.getNumDocsScanned(), 350L); + } + + @Test + public void testExceptionsCopiedToMergedDataTable() { + // Per-server exceptions are accumulated by the aggregator and written back onto the merged + // DataTable's exception map, so a downstream reduce surfaces them on the BrokerResponse. + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable"); + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + DataTable t2 = buildRow(schema, 7L); + t1.addException(QueryErrorCode.QUERY_EXECUTION, "boom on server 1"); + t2.addException(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED, "limit on server 2"); + + DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2))); + assertNotNull(merged); + assertEquals(merged.getExceptions().get(QueryErrorCode.QUERY_EXECUTION.getId()), "boom on server 1"); + assertEquals(merged.getExceptions().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId()), + "limit on server 2"); + + BrokerResponseNative viaMerge = reduce(brokerRequest, singletonMap(merged)); + // Downstream reduce surfaces both as response exceptions (one per distinct error code). + assertEquals(viaMerge.getExceptions().size(), 2); + } + + @Test + public void testTraceInfoCopiedToMergedDataTableWhenEnabled() { + // When trace is enabled, per-server trace info is JSON-encoded into the merged DataTable's + // TRACE_INFO metadata entry. The aggregator keys traces by server short-name (hostname-derived), + // so we use distinct hostnames to preserve attribution. + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable"); + brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.TRACE, "true"); + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + DataTable t2 = buildRow(schema, 7L); + t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(), "trace-from-server-1"); + t2.getMetadata().put(MetadataKey.TRACE_INFO.getName(), "trace-from-server-2"); + Map map = new HashMap<>(); + map.put(new ServerRoutingInstance("hostA", 1000, TableType.OFFLINE), t1); + map.put(new ServerRoutingInstance("hostB", 1001, TableType.OFFLINE), t2); + + DataTable merged; + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + merged = _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS, mock(BrokerMetrics.class)); + } + assertNotNull(merged); + String mergedTrace = merged.getMetadata().get(MetadataKey.TRACE_INFO.getName()); + assertNotNull(mergedTrace, "TRACE_INFO must be present on merged DataTable when trace is enabled"); + // JSON-encoded map; both inputs' trace strings must be present. + assertTrue(mergedTrace.contains("trace-from-server-1")); + assertTrue(mergedTrace.contains("trace-from-server-2")); + } + + @Test + public void testTraceInfoSkippedWhenTraceDisabled() { + // Without trace=true, the aggregator skips trace collection, so the merged DataTable has no + // TRACE_INFO metadata even if inputs carried it. + String query = "SELECT COUNT(*) FROM testTable"; + DataSchema schema = new DataSchema(new String[]{"count(*)"}, new ColumnDataType[]{ColumnDataType.LONG}); + DataTable t1 = buildRow(schema, 5L); + t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(), "trace-from-server-1"); + + DataTable merged = merge(query, toMap(List.of(t1))); + assertNotNull(merged); + assertNull(merged.getMetadata().get(MetadataKey.TRACE_INFO.getName())); + } + + @Test + public void testNumGroupsLimitReachedFlagSurfaced() { + String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1"; + DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + DataTable t1 = buildGroupBy(schema, new Object[][]{{1, 2L}}); + DataTable t2 = buildGroupBy(schema, new Object[][]{{2, 3L}}); + // Simulate a server that hit numGroupsLimit. + t2.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true"); + + DataTable merged = merge(query, toMap(List.of(t1, t2))); + assertNotNull(merged); + assertEquals(merged.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()), "true", + "NUM_GROUPS_LIMIT_REACHED from an input server must be surfaced on the merged DataTable"); + } + + // ---- helpers ---- + + /** + * Asserts the round-trip equivalence for the given query and per-server intermediate DataTables. + */ + private void assertRoundTrip(String query, List serverTables) { + assertRoundTrip(CalciteSqlCompiler.compileToBrokerRequest(query), serverTables); + } + + private void assertRoundTrip(BrokerRequest brokerRequest, List serverTables) { + BrokerResponseNative baseline = reduce(brokerRequest, toMap(serverTables)); + + DataTable merged = merge(brokerRequest, toMap(serverTables)); + assertNotNull(merged, "merge produced null"); + + BrokerResponseNative viaMerge = reduce(brokerRequest, singletonMap(merged)); + + assertResultTablesEquivalent(baseline.getResultTable(), viaMerge.getResultTable()); + } + + private BrokerResponseNative reduce(BrokerRequest brokerRequest, Map map) { + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + return _reduceService.reduceOnDataTable(brokerRequest, brokerRequest, map, TIMEOUT_MS, mock(BrokerMetrics.class)); + } + } + + private DataTable merge(String query, Map map) { + return merge(CalciteSqlCompiler.compileToBrokerRequest(query), map); + } + + private DataTable merge(BrokerRequest brokerRequest, Map map) { + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + return _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS, mock(BrokerMetrics.class)); + } + } + + private static AggregationFunction[] aggFunctions(String query) { + QueryContext queryContext = + QueryContextConverterUtils.getQueryContext(CalciteSqlCompiler.compileToBrokerRequest(query).getPinotQuery()); + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + assertNotNull(aggregationFunctions); + return aggregationFunctions; + } + + private static DataTable buildRow(DataSchema schema, Object... values) { + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(schema); + try { + appendRow(builder, schema, values); + return builder.build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static DataTable buildGroupBy(DataSchema schema, Object[][] rows) { + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(schema); + try { + for (Object[] row : rows) { + appendRow(builder, schema, row); + } + return builder.build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static DataTable buildObjectRow(DataSchema schema, AggregationFunction aggFunction, Object intermediate) + throws IOException { + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(schema); + builder.startRow(); + builder.setColumn(0, aggFunction.serializeIntermediateResult(intermediate)); + builder.finishRow(); + return builder.build(); + } + + /** Builds a single-row, single-DOUBLE-column intermediate DataTable with null-handling encoding. */ + private static DataTable buildNullableDouble(DataSchema schema, Double value) + throws IOException { + DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(schema); + RoaringBitmap nullBitmap = new RoaringBitmap(); + builder.startRow(); + if (value == null) { + builder.setColumn(0, ((Number) ColumnDataType.DOUBLE.getNullPlaceholder()).doubleValue()); + nullBitmap.add(0); + } else { + builder.setColumn(0, (double) value); + } + builder.finishRow(); + builder.setNullRowIds(nullBitmap); + return builder.build(); + } + + private static void appendRow(DataTableBuilder builder, DataSchema schema, Object[] values) + throws IOException { + builder.startRow(); + ColumnDataType[] columnDataTypes = schema.getColumnDataTypes(); + for (int i = 0; i < values.length; i++) { + switch (columnDataTypes[i]) { + case INT: + builder.setColumn(i, (int) values[i]); + break; + case LONG: + builder.setColumn(i, (long) values[i]); + break; + case DOUBLE: + builder.setColumn(i, (double) values[i]); + break; + case STRING: + builder.setColumn(i, (String) values[i]); + break; + default: + throw new IllegalArgumentException("Unsupported test column type: " + columnDataTypes[i]); + } + } + builder.finishRow(); + } + + private static Map toMap(List serverTables) { + Map map = new HashMap<>(); + for (int i = 0; i < serverTables.size(); i++) { + map.put(new ServerRoutingInstance("localhost", 1000 + i, TableType.OFFLINE), serverTables.get(i)); + } + return map; + } + + private static Map singletonMap(DataTable dataTable) { + Map map = new HashMap<>(); + map.put(new ServerRoutingInstance("Server_merge", 0, TableType.OFFLINE), dataTable); + return map; + } + + /** + * Asserts the two result tables have the same schema and the same set of rows (order-independent, so + * the comparison is robust for unordered group-by / distinct results). + */ + private static void assertResultTablesEquivalent(ResultTable expected, ResultTable actual) { + assertNotNull(expected); + assertNotNull(actual); + assertEquals(actual.getDataSchema(), expected.getDataSchema()); + assertEquals(actual.getRows().size(), expected.getRows().size()); + assertTrue(sortedRows(actual).equals(sortedRows(expected)), + "rows differ:\n expected=" + sortedRows(expected) + "\n actual =" + sortedRows(actual)); + } + + private static List sortedRows(ResultTable resultTable) { + List rows = new ArrayList<>(); + for (Object[] row : resultTable.getRows()) { + rows.add(Arrays.deepToString(row)); + } + return rows.stream().sorted().collect(Collectors.toList()); + } +} From 1b8fb99a7a6ec5ea16f1a123639f872319a70062 Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Fri, 29 May 2026 13:28:46 -0700 Subject: [PATCH 2/4] removing superfluous comments --- .../reduce/AggregationDataTableReducer.java | 1 - .../query/reduce/BrokerReduceService.java | 39 +++++-------------- .../reduce/DistinctDataTableReducer.java | 3 +- .../query/reduce/GroupByDataTableReducer.java | 3 -- 4 files changed, 11 insertions(+), 35 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java index cc1b2e809c89..2617cd043e65 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java @@ -149,7 +149,6 @@ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, } dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, dataSchema); try { - // No data for this bucket: emit an empty (0-row) intermediate DataTable carrying the schema. if (dataTableMap.isEmpty()) { return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index 774754ec6a05..f4d714d7996b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -180,47 +180,33 @@ private static boolean isMaterializedViewRewrite(BrokerRequest serverBrokerReque * Merge-only counterpart of {@link #reduceOnDataTable}: merges the per-server DataTables into a single * intermediate {@link DataTable} WITHOUT finalizing (no {@code extractFinalResult}). Returns * {@code null} when there is nothing to merge (empty map, or all servers returned no data / errored). - * Reuses the same schema-filtering preamble and reducer/trim resolution as the regular reduce, but - * does NOT aggregate per-server execution stats (the merged intermediate does not carry them) and - * skips the nested-query / gapfill / alias handling (those query shapes are out of scope for - * merge-only). + * Reuses the same schema-filtering preamble and reducer/trim resolution as the regular reduce. * *

The returned DataTable carries intermediate, non-finalized state (byte-shape identical to a - * single server's partial response), so a downstream consumer can intercept it and custom handle - * the intermediate results. + * single server's partial response). * *

If one or more input server DataTables are dropped during merge (e.g., due to a schema * conflict with the first non-empty table), the returned DataTable's metadata carries the * {@link DataTable.MetadataKey#PARTIAL_INTERMEDIATE_RESULT} flag set to {@code "true"} and the - * {@link BrokerMeter#RESPONSE_MERGE_EXCEPTIONS} meter is incremented. This is symmetric with how - * the regular reduce path surfaces conflicting-schema servers via a response exception and the - * same meter; downstream consumers can read the flag and decide policy (skip, retry, accept). + * {@link BrokerMeter#RESPONSE_MERGE_EXCEPTIONS} meter is incremented. The callers can + * read the flag and decide policy (skip, retry, accept). * *

Execution stats from the input DataTables are aggregated via {@link ExecutionStatsAggregator} * and written back onto the merged DataTable: additive longs (e.g. {@code numDocsScanned}, * {@code numSegments*}, {@code threadCpuTimeNs}), {@code minConsumingFreshnessTimeMs} (MIN-reduced), * boolean flags ({@code groupsTrimmed}, {@code numGroupsLimitReached}, etc., OR-reduced), - * per-server exceptions, and trace info (JSON-encoded if {@code trace=true}). Unlike the regular - * reduce path, this method does NOT bump broker meters/timers for the input stats — those will - * fire when the result is eventually re-reduced through {@link #reduceOnDataTable}, so a consumer - * that uses both APIs sees one set of increments per logical query, not two. + * per-server exceptions, and trace info (JSON-encoded if {@code trace=true}). This method does NOT bump + * broker meters/timers for the input stats. * - *

Limitations of the round-trip: + *

Limitations on stats: *

    - *
  • CPU and memory stats round-trip as a single combined value per key (the wire format has - * no per-tableType variants). On a re-reduce, the downstream aggregator dumps the whole - * value into one bucket — whichever tableType the caller assigned to the synthetic - * server response — so the offline-vs-realtime split surfaced on {@link - * BrokerResponseNative} is lost across the round-trip, even though the total is preserved. *
  • Exception attribution to original servers is lost; the wire format is {@code Map} so collisions on the same error code are resolved last-write-wins. - *
  • Per-server trace info is JSON-encoded into a single {@code TRACE_INFO} entry; the - * downstream aggregator reads it back as one trace blob under the synthetic server's name. + *
  • Per-server trace info is JSON-encoded into a single {@code TRACE_INFO} entry *
* *

WARNING: this performs a full cross-server merge and re-serializes the result — heavyweight work - * that must be run asynchronously, decoupled from request serving. Invoking it inline while a query is - * being served can severely degrade its latency. Intended for downstream consumers that want to + * that must be run asynchronously, decoupled from request serving. Intended for callers that want to * intercept the merged intermediate result instead of the finalized one. * *

[org.apache.pinot.spi.query.QueryThreadContext] must already be set up before calling this method. @@ -235,8 +221,6 @@ public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest, boolean enableTrace = queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); // Aggregate stats while filtering so we can write them back onto the merged DataTable's metadata. - // Aggregator.aggregate() runs BEFORE filterDataTablesAndPickSchema removes any entry, matching - // reduceOnDataTable: empty / conflicting-schema servers' stats are still counted. ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace); List conflictingServers = new ArrayList<>(); DataSchema cachedDataSchema = filterDataTablesAndPickSchema(dataTableMap, aggregator, conflictingServers); @@ -255,12 +239,9 @@ public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest, if (merged != null) { // Write accumulated stats (additive longs, booleans, MIN freshness, exceptions, trace) onto the - // merged DataTable so it round-trips through reduceOnDataTable. Unlike setStats() this does NOT - // bump broker meters — those will fire when the result is eventually re-reduced. + // merged DataTable aggregator.setStatsOnMergedDataTable(merged); - // Symmetric with reduceOnDataTable: surface conflicting-schema drops via meter + warn + a metadata - // flag on the merged DataTable so downstream consumers can detect a partial merge. if (!conflictingServers.isEmpty()) { LOGGER.warn("Merge-only reduce dropped {} server response(s) for table {} due to data schema " + "inconsistency: {}", conflictingServers.size(), rawTableName, conflictingServers); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java index fe8ac886d269..c3a1c7d788cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java @@ -74,12 +74,11 @@ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, BrokerMetrics brokerMetrics) { dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext, dataSchema); try { - // No data for this bucket (or LIMIT 0): emit an empty intermediate DataTable carrying the schema. if (dataTableMap.isEmpty() || _queryContext.getLimit() == 0) { return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); } DistinctTable distinctTable = mergeToDistinctTable(dataSchema, dataTableMap.values()); - // Intermediate form: limit/sorting not applied; re-mergeable on a later reduce. + // Intermediate form: limit/sorting not applied return distinctTable.toDataTable(); } catch (IOException e) { throw new RuntimeException("Caught IOException while building merged intermediate DataTable for distinct", e); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 9cd040f9ce90..839948f88bec 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -140,7 +140,6 @@ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, } dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext, dataSchema); try { - // No data for this bucket: emit an empty (0-row) intermediate DataTable carrying the schema. if (dataTableMap.isEmpty()) { return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); } @@ -148,8 +147,6 @@ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema, // Reuse the regular reduce's merge: builds the IndexedTable of group keys + intermediate agg state. IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext); DataTable mergedDataTable = buildIntermediateDataTable(dataSchema, indexedTable); - // Surface completeness flags so a downstream consumer can decide whether the merged intermediate - // is complete enough to use. No skip policy is enforced here. if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) { mergedDataTable.getMetadata().put(MetadataKey.GROUPS_TRIMMED.getName(), "true"); } From b12a2741d74b698596abee877ebb0ab19d9a065e Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Fri, 29 May 2026 13:37:13 -0700 Subject: [PATCH 3/4] fix exception messages and remove more superfluous comments --- .../query/reduce/AggregationDataTableReducer.java | 12 +++--------- .../core/query/reduce/BrokerReduceService.java | 14 +++++++++----- .../pinot/core/query/reduce/DataTableReducer.java | 4 +--- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java index 2617cd043e65..b8e6f249ce67 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java @@ -98,9 +98,7 @@ private void reduceWithIntermediateResult(DataSchema dataSchema, Collection dataTables) { int numAggregationFunctions = _aggregationFunctions.length; @@ -138,14 +136,10 @@ private Object[] mergeIntermediateResults(DataSchema dataSchema, Collection dataTableMap, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { - // When servers are configured to return final aggregate state, the input DataTables hold final - // (not intermediate) values, so the merge-only contract — "produce an intermediate DataTable that - // can be re-merged via the normal reduce path" — cannot be honored. + // cannot support finalized value types returned by the servers as an intermediate result type if (_queryContext.isServerReturnFinalResult()) { throw new UnsupportedOperationException( - "Merge-only reduction is not supported when servers return final aggregate results " - + "(server.returnFinalResult / isServerReturnFinalResult); input would be final-typed, " - + "not intermediate."); + "Datatable merge to intermediate results cannot be supported when servers return final result"); } dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, dataSchema); try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index f4d714d7996b..1bc3900f2117 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -253,11 +253,15 @@ public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest, } /** - * Processes per-server response metadata and filters {@code dataTableMap} in place: drops tables with a - * null schema, drops empty tables (remembering their schema as a fallback), and drops tables whose - * column data types conflict with the first non-empty table (collected into {@code conflictingServers}). + * Processes per-server response metadata and filters {@code dataTableMap} in place: + * - drops tables with a null schema + * - drops empty tables (remembering their schema as a fallback) + * - drops tables whose column data types conflict with the first non-empty table + * (collected into {@code conflictingServers}). + * * When an {@code aggregator} is provided, per-table execution stats are aggregated before a table is - * dropped. Returns the remembered data schema (non-empty preferred, else empty-table schema, else + * dropped. + * Returns the remembered data schema (non-empty preferred, else empty-table schema, else * {@code null}). */ private static DataSchema filterDataTablesAndPickSchema(Map dataTableMap, @@ -305,7 +309,7 @@ private static DataSchema filterDataTablesAndPickSchema(Map queryOptions, long reduceTimeOutMs) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java index 3ce5e3939a94..597b048958f1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java @@ -56,9 +56,7 @@ void reduceAndSetResults(String tableName, DataSchema dataSchema, MapReducers that cannot produce a re-mergeable intermediate (e.g. explain-plan) leave this default - * implementation, which throws {@link UnsupportedOperationException}. Aggregation and group-by - * reducers also throw when the query is configured for server-side final-result return - * ({@code server.returnFinalResult}) because the inputs are then finalized, not intermediate. + * implementation, which throws {@link UnsupportedOperationException}. * * @param tableName table name * @param dataSchema schema from broker reduce service From 2ec9e0bfd213f4632ae2ab22f021eab480307446 Mon Sep 17 00:00:00 2001 From: Navina Ramesh Date: Sun, 31 May 2026 14:45:18 -0700 Subject: [PATCH 4/4] Round-trip DISTINCT EARLY_TERMINATION_REASON through setStatsOnMergedDataTable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit aggregate() reads EARLY_TERMINATION_REASON from a per-server DataTable and sets three independent booleans (_maxRowsInDistinctReached, _maxRowsWithoutChangeInDistinctReached, _maxExecutionTimeInDistinctReached); setStatsOnMergedDataTable previously did not write that key back. After re-injection, reduceOnDataTable would treat the merged DISTINCT response as complete and clear the partial-result flags on BrokerResponseNative — silently converting a partial DISTINCT result into one that looks complete. Encode the equivalent reason back onto the merged DataTable's metadata. The wire format is one string per DataTable, so when multiple per-server reasons were OR-reduced into independent booleans the round-trip can encode only one back; pick the first set flag in declaration order so the round-trip is deterministic. Granularity loss when multiple flags are true is inherent to the single-string wire format — the user-visible "DISTINCT is partial" signal is preserved because each of the three flags independently sets partial-ness on BrokerResponseNative. Add ExecutionStatsAggregatorTest pinning: - Each of the three reasons round-trips its own boolean. - Multiple booleans set → first-in-declaration-order encoded (DISTINCT_MAX_ROWS > DISTINCT_MAX_ROWS_WITHOUT_CHANGE > DISTINCT_MAX_EXECUTION_TIME). - No reason set → EARLY_TERMINATION_REASON key absent on merged DataTable (writing a sentinel would break aggregate()'s EarlyTerminationReason.valueOf). - Same reason from multiple inputs round-trips to the same single boolean. Documents the asymmetry on setStatsOnMergedDataTable's javadoc as a fourth "limitation of the round-trip" bullet, mirroring the existing three. --- .../reduce/ExecutionStatsAggregator.java | 26 +++ .../reduce/ExecutionStatsAggregatorTest.java | 179 ++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index e21d56c63d43..d76596987aee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -374,6 +374,13 @@ private void withNotNullLongMetadata(Map metadata, DataTable.Met *

  • Per-server trace info is JSON-encoded into a single * {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream aggregator reads it back * as one trace blob attributed to the synthetic server. + *
  • DISTINCT early-termination reasons round-trip as a single enum name + * ({@link DataTable.MetadataKey#EARLY_TERMINATION_REASON}) because the wire format is one + * string per DataTable. The aggregator OR-reduces multiple per-server reasons into three + * independent booleans; on re-injection we encode only the first set flag in declaration + * order. The user-visible "DISTINCT is partial" signal is preserved (each of the three + * flags independently sets partial-ness on {@link BrokerResponseNative}); the exact reason + * granularity is best-effort when multiple flags are true. * */ public void setStatsOnMergedDataTable(DataTable dataTable) { @@ -431,6 +438,25 @@ public void setStatsOnMergedDataTable(DataTable dataTable) { metadata.put(DataTable.MetadataKey.NUM_GROUPS_WARNING_LIMIT_REACHED.getName(), "true"); } + // EARLY_TERMINATION_REASON: 1-string wire format ↔ 3-boolean accumulator. aggregate() OR-reduces + // multiple per-server reasons into the three DISTINCT booleans (each per-server DataTable carries + // at most one enum name); on re-injection we can encode only one reason back. Pick the first set + // flag in declaration order so the round-trip is deterministic. Granularity loss when multiple + // flags are true is inherent to the single-string wire format — the user-visible "DISTINCT is + // partial" signal is preserved because any one of the three flags independently sets + // partial-ness on BrokerResponseNative. + BaseResultsBlock.EarlyTerminationReason distinctReason = null; + if (_maxRowsInDistinctReached) { + distinctReason = BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS; + } else if (_maxRowsWithoutChangeInDistinctReached) { + distinctReason = BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE; + } else if (_maxExecutionTimeInDistinctReached) { + distinctReason = BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME; + } + if (distinctReason != null) { + metadata.put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(), distinctReason.name()); + } + // Exceptions: copy each accumulated exception onto the DataTable. Last-write-wins on error-code // collision (wire format is Map). for (QueryProcessingException qpe : _processingExceptions) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java new file mode 100644 index 000000000000..608287b05bec --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.config.table.TableType; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * Focused tests for {@link ExecutionStatsAggregator#setStatsOnMergedDataTable(DataTable)} — + * specifically the {@link MetadataKey#EARLY_TERMINATION_REASON} round-trip for DISTINCT queries. + * + *

    The aggregator collapses a single-string wire-format reason into three independent booleans + * (one per {@link EarlyTerminationReason}); the round-trip can encode only one reason back. These + * tests pin: + *

      + *
    • Each of the three reasons round-trips its own boolean when it is the only one set. + *
    • When multiple booleans are set (different per-server reasons OR-reduced), the round-trip + * encodes the first set flag in declaration order. The user-visible "DISTINCT is partial" + * signal is preserved because any one flag independently sets partial-ness on + * {@link BrokerResponseNative}. + *
    • When no DISTINCT early-termination booleans are set, the key is absent from the merged + * metadata (so the downstream decoder does not see a stray non-DISTINCT reason and the + * merged DataTable is indistinguishable from one whose inputs all completed normally). + *
    + */ +public class ExecutionStatsAggregatorTest { + + private static final ServerRoutingInstance SERVER = + new ServerRoutingInstance("localhost", 8080, TableType.OFFLINE); + + /** Build an empty DataTable carrying a single {@code EARLY_TERMINATION_REASON} metadata entry. */ + private static DataTable serverTableWithReason(EarlyTerminationReason reason) { + DataTable dt = DataTableBuilderFactory.getEmptyDataTable(); + dt.getMetadata().put(MetadataKey.EARLY_TERMINATION_REASON.getName(), reason.name()); + return dt; + } + + private static DataTable emptyServerTable() { + return DataTableBuilderFactory.getEmptyDataTable(); + } + + /** + * Drives one aggregate-then-readback round-trip and returns the three DISTINCT booleans the + * downstream aggregator would flip from the merged DataTable's metadata. Mirrors the production + * sequence: aggregate per-server inputs → serialize onto merged DataTable → reduce path + * re-aggregates the merged DataTable on a fresh aggregator. + */ + private static boolean[] roundTrip(DataTable... inputs) { + ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false); + for (DataTable dt : inputs) { + producer.aggregate(SERVER, dt); + } + DataTable merged = DataTableBuilderFactory.getEmptyDataTable(); + producer.setStatsOnMergedDataTable(merged); + + ExecutionStatsAggregator consumer = new ExecutionStatsAggregator(false); + consumer.aggregate(SERVER, merged); + BrokerResponseNative response = new BrokerResponseNative(); + consumer.setStats("testTable", response, mock(BrokerMetrics.class)); + return new boolean[]{ + response.isMaxRowsInDistinctReached(), + response.isMaxRowsWithoutChangeInDistinctReached(), + response.isMaxExecutionTimeInDistinctReached() + }; + } + + @Test + public void testDistinctMaxRowsReachedRoundTrip() { + boolean[] flags = roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS)); + assertTrue(flags[0], "_maxRowsInDistinctReached must round-trip"); + assertFalse(flags[1], "other DISTINCT flags must stay false"); + assertFalse(flags[2], "other DISTINCT flags must stay false"); + } + + @Test + public void testDistinctMaxRowsWithoutChangeReachedRoundTrip() { + boolean[] flags = roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE)); + assertFalse(flags[0]); + assertTrue(flags[1], "_maxRowsWithoutChangeInDistinctReached must round-trip"); + assertFalse(flags[2]); + } + + @Test + public void testDistinctMaxExecutionTimeReachedRoundTrip() { + boolean[] flags = roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME)); + assertFalse(flags[0]); + assertFalse(flags[1]); + assertTrue(flags[2], "_maxExecutionTimeInDistinctReached must round-trip"); + } + + /** + * Two inputs hit DIFFERENT DISTINCT early-termination reasons. The aggregator OR-reduces both + * into independent booleans, but the wire format is one string per DataTable. Round-trip can + * encode only one reason back; per the implementation contract, it picks the first set flag in + * declaration order — {@code DISTINCT_MAX_ROWS} when both rows-reached and execution-time are + * set. + * + *

    The user-visible "DISTINCT is partial" signal is still preserved: at least one of the + * three flags is true post-round-trip. The exact reason granularity is best-effort. + */ + @Test + public void testMultipleReasonsEncodeFirstInDeclarationOrder() { + boolean[] flags = roundTrip( + serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME), + serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS)); + assertTrue(flags[0], "DISTINCT_MAX_ROWS wins over DISTINCT_MAX_EXECUTION_TIME in priority order"); + assertFalse(flags[1]); + assertFalse(flags[2], "execution-time flag is dropped — single-string wire format can only carry one reason"); + } + + /** + * No DISTINCT early-termination on any input. The merged DataTable must NOT carry an + * {@code EARLY_TERMINATION_REASON} key — writing one would mislead the downstream decoder, and + * an empty/false value is not part of the wire-format vocabulary (the consumer's + * {@code EarlyTerminationReason.valueOf} would throw on "" or "false" and the + * {@code IllegalArgumentException} would silently mask any other reason we tried to encode). + */ + @Test + public void testNoReasonProducesAbsentMetadataKey() { + ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false); + producer.aggregate(SERVER, emptyServerTable()); + DataTable merged = DataTableBuilderFactory.getEmptyDataTable(); + producer.setStatsOnMergedDataTable(merged); + + assertNull(merged.getMetadata().get(MetadataKey.EARLY_TERMINATION_REASON.getName()), + "no DISTINCT early-termination → key must be absent on merged DataTable"); + + // Sanity: re-aggregation of the merged DataTable leaves all three flags false. + boolean[] flags = roundTrip(emptyServerTable()); + assertFalse(flags[0]); + assertFalse(flags[1]); + assertFalse(flags[2]); + } + + /** + * Same DISTINCT reason reported by multiple inputs round-trips to the same single boolean. The + * aggregator's OR-reduce makes this a trivial case, but pinning it catches a future regression + * where setStatsOnMergedDataTable starts skipping the reason on repeats. + */ + @Test + public void testSameReasonFromMultipleInputsRoundTripsToOneBoolean() { + boolean[] flags = roundTrip( + serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS), + serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS), + serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS)); + assertTrue(flags[0]); + assertFalse(flags[1]); + assertFalse(flags[2]); + } +}