Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,21 @@
*/
package org.apache.pinot.plugin.inputformat.arrow;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryEncoder;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
import org.apache.pinot.spi.utils.TimestampUtils;


/// Extracts a single Arrow row into a [GenericRow]. Reader-scoped state ([VectorSchemaRoot] +
Expand Down Expand Up @@ -208,247 +197,10 @@ public GenericRow extract(Record from, GenericRow to) {
ValueVector activeVector = activeVectors[i];
Object rawValue = activeVector.getObject(from._rowId);
to.putValue(fieldVectors[i].getField().getName(),
rawValue != null ? convert(activeVector.getField(), rawValue) : null);
rawValue != null
? ArrowToPinotTypeConverter.toPinotValue(activeVector.getField(), rawValue, _extractRawTimeValues)
: null);
}
return to;
}

/// Schema-driven dispatch — one branch per [ArrowType.ArrowTypeID]; complex types recurse with
/// their child [Field]s, scalars normalize per the contract.
@Nullable
private Object convert(Field field, Object value) {
ArrowType type = field.getType();
switch (type.getTypeID()) {
// Pass-through — Arrow boxes these directly into the contract output type.
case Bool: // Boolean
case FloatingPoint: // Float / Double
case Decimal: // BigDecimal
case Binary: // byte[]
case LargeBinary: // byte[]
case FixedSizeBinary: // byte[]
return value;
// toString — `Utf8` / `LargeUtf8` produce `String`; `Interval` / `Duration` produce ISO-8601
// (`java.time.Period` / `java.time.Duration` / `PeriodDuration` all have meaningful toString).
case Utf8:
case LargeUtf8:
case Interval:
case Duration:
return value.toString();
// Integer — `Byte` widens to `Integer` per contract (sign-extended for signed `TinyIntVector`,
// zero-extended via `& 0xFF` for unsigned `UInt1Vector`); `Short` (signed `SmallIntVector`)
// sign-extends; `Character` (unsigned 16, from `UInt2Vector`) widens to its `int` code point;
// `Integer` / `Long` pass through.
case Int:
if (value instanceof Byte) {
int v = (Byte) value;
return ((ArrowType.Int) type).getIsSigned() ? v : v & 0xFF;
}
if (value instanceof Short) {
return ((Short) value).intValue();
}
if (value instanceof Character) {
return (int) (Character) value;
}
return value;
// Null — NullVector.getObject always returns null; extractValue short-circuits on null, so
// this branch is unreachable in practice. Defensive return.
case Null:
return null;
// Logical temporal — schema's `TimeUnit` drives the conversion.
case Timestamp:
return convertTimestamp((ArrowType.Timestamp) type, value);
case Date:
return convertDate((ArrowType.Date) type, value);
case Time:
return convertTime((ArrowType.Time) type, value);
// Multi-value — `List` (and primitive-array lists) → `Object[]`.
case List:
case LargeList:
case FixedSizeList:
return convertList(field.getChildren().get(0), (List<?>) value);
// Map / nested complex types.
case Map:
// The Map field's children are [entriesStruct]; the entries struct's children are
// [keyField, valueField] (named per MapVector.KEY_NAME / VALUE_NAME).
Field entriesField = field.getChildren().get(0);
return convertMap(entriesField.getChildren().get(0), entriesField.getChildren().get(1), (List<?>) value);
case Struct:
return convertStruct(field.getChildren(), (Map<?, ?>) value);
case Union:
// The chosen branch isn't visible from the resolved value alone — dispatch by the value's
// runtime Java type. Nested complex sub-branches fall back to `value.toString()`.
return convertByRuntimeType(value);
default:
// `NONE` is a placeholder; any other ID is a future Arrow addition.
throw new IllegalStateException("Unsupported Arrow type: " + type + " for field: " + field.getName());
}
}

/// Constructs a [Timestamp] from an Arrow `Timestamp` value. No-TZ vectors surface as
/// `LocalDateTime` (interpreted as UTC); with-TZ vectors surface as `Long` epoch counted in the
/// schema's `TimeUnit`. Sub-millisecond precision is preserved via [TimestampUtils].
/// With [#_extractRawTimeValues] the raw `long` epoch in the schema's `TimeUnit` is returned.
private Object convertTimestamp(ArrowType.Timestamp type, Object value) {
if (_extractRawTimeValues) {
if (value instanceof LocalDateTime) {
// No-TZ vector — convert the LocalDateTime back to an epoch `long` in the declared unit.
Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC);
return toEpochInUnit(instant, type.getUnit());
}
// With-TZ vector — already raw `long` in the declared unit.
return value;
}
if (value instanceof LocalDateTime) {
return Timestamp.from(((LocalDateTime) value).toInstant(ZoneOffset.UTC));
}
long raw = ((Number) value).longValue();
switch (type.getUnit()) {
case SECOND:
return new Timestamp(raw * 1000L);
case MILLISECOND:
return new Timestamp(raw);
case MICROSECOND:
return TimestampUtils.fromMicrosSinceEpoch(raw);
case NANOSECOND:
return TimestampUtils.fromNanosSinceEpoch(raw);
default:
throw new IllegalStateException("Unsupported Timestamp unit: " + type.getUnit());
}
}

private static long toEpochInUnit(Instant instant, org.apache.arrow.vector.types.TimeUnit unit) {
switch (unit) {
case SECOND:
return instant.getEpochSecond();
case MILLISECOND:
return instant.toEpochMilli();
case MICROSECOND:
return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1_000_000L), instant.getNano() / 1_000L);
case NANOSECOND:
return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1_000_000_000L), instant.getNano());
default:
throw new IllegalStateException("Unsupported Timestamp unit: " + unit);
}
}

/// Reduces an Arrow `Date` value to its contract Java type ([LocalDate]), or to `int`
/// days-since-epoch when [#_extractRawTimeValues] is set. `DateDayVector` surfaces as `Integer`
/// raw days; `DateMilliVector` surfaces as `LocalDateTime` at UTC midnight.
private Object convertDate(ArrowType.Date type, Object value) {
int days;
switch (type.getUnit()) {
case DAY:
days = (Integer) value;
break;
case MILLISECOND:
days = (int) ((LocalDateTime) value).toLocalDate().toEpochDay();
break;
default:
throw new IllegalStateException("Unsupported Date unit: " + type.getUnit());
}
return _extractRawTimeValues ? days : LocalDate.ofEpochDay(days);
}

/// Constructs a [LocalTime] from an Arrow `Time` value, dispatched by the schema's `TimeUnit`:
/// `TimeMilliVector` surfaces as `LocalDateTime`; `TimeSecVector` as `Integer`;
/// `TimeMicroVector` / `TimeNanoVector` as `Long`. All collapse onto nanoseconds-since-midnight.
/// With [#_extractRawTimeValues] the raw count in the schema's `TimeUnit` is returned instead.
private Object convertTime(ArrowType.Time type, Object value) {
if (_extractRawTimeValues) {
if (value instanceof LocalDateTime) {
// `TimeMilliVector` surfaces as `LocalDateTime`; raw is `int` ms since midnight.
return (int) (((LocalDateTime) value).toLocalTime().toNanoOfDay() / 1_000_000L);
}
// `TimeSecVector` (Integer) / `TimeMicroVector` / `TimeNanoVector` (Long) — already raw.
return value;
}
if (value instanceof LocalDateTime) {
return ((LocalDateTime) value).toLocalTime();
}
long raw = ((Number) value).longValue();
switch (type.getUnit()) {
case SECOND:
return LocalTime.ofSecondOfDay(raw);
case MILLISECOND:
return LocalTime.ofNanoOfDay(raw * 1_000_000L);
case MICROSECOND:
return LocalTime.ofNanoOfDay(raw * 1_000L);
case NANOSECOND:
return LocalTime.ofNanoOfDay(raw);
default:
throw new IllegalStateException("Unsupported Time unit: " + type.getUnit());
}
}

private Object[] convertList(Field elementField, List<?> list) {
int size = list.size();
Object[] result = new Object[size];
int i = 0;
for (Object element : list) {
result[i++] = element != null ? convert(elementField, element) : null;
}
return result;
}

/// Flattens an Arrow `Map` column's entry list (`List<Map<KEY_NAME, VALUE_NAME>>`) into a
/// `Map<String, Object>`, recursing into each value via [#convert] and stringifying each key via
/// [BaseRecordExtractor#stringifyMapKey] per the contract. Entries with a `null` key (input or
/// post-conversion) are dropped.
private Map<String, Object> convertMap(Field keyField, Field valueField, List<?> entries) {
Map<String, Object> result = Maps.newLinkedHashMapWithExpectedSize(entries.size());
for (Object entry : entries) {
if (entry == null) {
continue;
}
Map<?, ?> entryMap = (Map<?, ?>) entry;
Object rawKey = entryMap.get(MapVector.KEY_NAME);
if (rawKey == null) {
continue;
}
Object convertedKey = convert(keyField, rawKey);
if (convertedKey == null) {
continue;
}
Object rawValue = entryMap.get(MapVector.VALUE_NAME);
result.put(stringifyMapKey(convertedKey), rawValue != null ? convert(valueField, rawValue) : null);
}
return result;
}

private Map<String, Object> convertStruct(List<Field> childFields, Map<?, ?> value) {
Map<String, Object> result = Maps.newHashMapWithExpectedSize(childFields.size());
for (Field childField : childFields) {
String name = childField.getName();
Object rawValue = value.get(name);
result.put(name, rawValue != null ? convert(childField, rawValue) : null);
}
return result;
}

/// Runtime-type dispatch used by the `Union` case (where the chosen branch isn't accessible
/// from the resolved value). Mirrors the scalar handling of [#convert] for the common Arrow
/// boxed types; nested complex types fall back to `value.toString()` because their child
/// [Field]s aren't reachable from here.
private static Object convertByRuntimeType(Object value) {
if (value instanceof Number) {
if (value instanceof Byte || value instanceof Short) {
return ((Number) value).intValue();
}
return value;
}
if (value instanceof Boolean || value instanceof byte[]) {
return value;
}
if (value instanceof Character) {
// `UInt2Vector` surfaces as `Character`; widen to `int` per the Int(16) contract.
return (int) (Character) value;
}
if (value instanceof LocalDateTime) {
// Ambiguous between Timestamp / Date / Time — best-effort: treat as Timestamp UTC.
return Timestamp.from(((LocalDateTime) value).toInstant(ZoneOffset.UTC));
}
// `Text` (Utf8 / LargeUtf8), `Period` / `Duration` / `PeriodDuration` (Interval / Duration), and
// anything unrecognized fall through to `toString()`.
return value.toString();
}
}
Loading
Loading