Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,15 @@ enum MetadataKey {
WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
// Needed so that we can track query id in Netty channel response.
QUERY_ID(41, "queryId", MetadataValueType.STRING),
EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING);
EARLY_TERMINATION_REASON(42, "earlyTerminationReason", MetadataValueType.STRING),
// Set on a merged-only DataTable when one or more input server DataTables were dropped during the
// merge (e.g., due to a schema conflict). Signals to a downstream consumer that the merge is
// partial; how to react (skip, retry, accept with annotation) is the consumer's policy.
PARTIAL_INTERMEDIATE_RESULT(43, "partialIntermediateResult", MetadataValueType.STRING);
Comment on lines +160 to +163
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit odd, aren't all the intermediate results partial? Should it be something like INCOMPLETE_MERGE instead?


// We keep this constant to track the max id added so far for backward compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
private static final int MAX_ID = PARTIAL_INTERMEDIATE_RESULT.getId();

private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.common.datatable;

import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.pinot.common.utils.ArrayListUtils;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.utils.ByteArray;


/**
* Helpers for writing values into a {@link DataTableBuilder}.
*/
public class DataTableBuilderUtils {
private DataTableBuilderUtils() {
}

/**
* Writes a non-null value of the given stored column data type into the {@link DataTableBuilder} at
* the given column. Supports all scalar and array stored types. OBJECT columns are NOT handled here
* (callers serialize them via the owning aggregation function). Used by the group-by result
* serialization on both the server ({@code GroupByResultsBlock}) and the merge-only reduce path.
*/
public static void setColumn(DataTableBuilder dataTableBuilder, ColumnDataType storedColumnDataType,
int columnIndex, Object value)
throws IOException {
switch (storedColumnDataType) {
case INT:
dataTableBuilder.setColumn(columnIndex, (int) value);
break;
case LONG:
dataTableBuilder.setColumn(columnIndex, (long) value);
break;
case FLOAT:
dataTableBuilder.setColumn(columnIndex, (float) value);
break;
case DOUBLE:
dataTableBuilder.setColumn(columnIndex, (double) value);
break;
case BIG_DECIMAL:
dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
break;
case STRING:
dataTableBuilder.setColumn(columnIndex, value.toString());
break;
case BYTES:
dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
break;
case INT_ARRAY:
if (value instanceof IntArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toIntArray((IntArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (int[]) value);
}
break;
case LONG_ARRAY:
if (value instanceof LongArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toLongArray((LongArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (long[]) value);
}
break;
case FLOAT_ARRAY:
if (value instanceof FloatArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toFloatArray((FloatArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (float[]) value);
}
break;
case DOUBLE_ARRAY:
if (value instanceof DoubleArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toDoubleArray((DoubleArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (double[]) value);
}
break;
case BIG_DECIMAL_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
}
break;
case STRING_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (String[]) value);
}
break;
case BYTES_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
}
break;
default:
throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public DataTable getDataTable()
nullBitmaps[i].add(0);
}
assert result != null;
setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result);
}
}
}
Expand Down Expand Up @@ -174,7 +174,7 @@ public DataTable getDataTable()
if (columnDataTypes[i] == ColumnDataType.OBJECT) {
dataTableBuilder.setColumn(i, _aggregationFunctions[i].serializeIntermediateResult(result));
} else {
setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, columnDataTypes[i], i, result);
}
}
}
Expand All @@ -184,36 +184,6 @@ public DataTable getDataTable()
return dataTableBuilder.build();
}

private void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
Object result) throws IOException {
ColumnDataType columnDataType = columnDataTypes[index];
switch (columnDataType) {
case INT:
dataTableBuilder.setColumn(index, (int) result);
break;
case LONG:
dataTableBuilder.setColumn(index, (long) result);
break;
case FLOAT:
dataTableBuilder.setColumn(index, (float) result);
break;
case DOUBLE:
dataTableBuilder.setColumn(index, (double) result);
break;
case BIG_DECIMAL:
dataTableBuilder.setColumn(index, (BigDecimal) result);
break;
case STRING:
dataTableBuilder.setColumn(index, result.toString());
break;
case BYTES:
dataTableBuilder.setColumn(index, (ByteArray) result);
break;
default:
throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
}
}

private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
Object result)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks.results;

import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -33,19 +27,18 @@
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.ArrayListUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;


Expand Down Expand Up @@ -243,7 +236,7 @@ public DataTable getDataTable()
nullBitmaps[i].add(rowId);
}
assert value != null;
setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, value);
DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value);
}
}
dataTableBuilder.finishRow();
Expand All @@ -265,7 +258,7 @@ public DataTable getDataTable()
} else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
dataTableBuilder.setColumn(i, aggregationFunctions[i - numKeyColumns].serializeIntermediateResult(value));
} else {
setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, value);
DataTableBuilderUtils.setColumn(dataTableBuilder, storedColumnDataTypes[i], i, value);
}
}
dataTableBuilder.finishRow();
Expand All @@ -275,88 +268,6 @@ public DataTable getDataTable()
return dataTableBuilder.build();
}

private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBuilder dataTableBuilder,
int columnIndex, Object value)
throws IOException {
switch (storedColumnDataType) {
case INT:
dataTableBuilder.setColumn(columnIndex, (int) value);
break;
case LONG:
dataTableBuilder.setColumn(columnIndex, (long) value);
break;
case FLOAT:
dataTableBuilder.setColumn(columnIndex, (float) value);
break;
case DOUBLE:
dataTableBuilder.setColumn(columnIndex, (double) value);
break;
case BIG_DECIMAL:
dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
break;
case STRING:
dataTableBuilder.setColumn(columnIndex, value.toString());
break;
case BYTES:
dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
break;
case INT_ARRAY:
if (value instanceof IntArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toIntArray((IntArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (int[]) value);
}
break;
case LONG_ARRAY:
if (value instanceof LongArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toLongArray((LongArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (long[]) value);
}
break;
case FLOAT_ARRAY:
if (value instanceof FloatArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toFloatArray((FloatArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (float[]) value);
}
break;
case DOUBLE_ARRAY:
if (value instanceof DoubleArrayList) {
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toDoubleArray((DoubleArrayList) value));
} else {
dataTableBuilder.setColumn(columnIndex, (double[]) value);
}
break;
case BIG_DECIMAL_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
}
break;
case STRING_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (String[]) value);
}
break;
case BYTES_ARRAY:
if (value instanceof ObjectArrayList) {
//noinspection unchecked
dataTableBuilder.setColumn(columnIndex, ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
} else {
dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
}
break;
default:
throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType);
}
}

@Override
public Map<String, String> getResultsMetadata() {
Expand Down
Loading
Loading