diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java index 4c6f7eab9f..ef53b464e5 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java @@ -65,9 +65,7 @@ protected int recordCount(List fetchedRecords) { protected ArrowScanRecords toResult( Map> fetchedRecords, Map consumedUpToOffsets) { - // Arrow scan paths don't need consumedUpToOffsets (issue #2371 is specific to - // row-based tiering), so it's discarded here rather than carried in ArrowScanRecords. - return new ArrowScanRecords(fetchedRecords); + return new ArrowScanRecords(fetchedRecords, consumedUpToOffsets); } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java index d5cb63a8e3..048eba4e1a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java @@ -24,6 +24,7 @@ import org.apache.fluss.utils.IOUtils; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Iterator; @@ -43,8 +44,18 @@ public class ArrowScanRecords implements Iterable, AutoCloseable private final Map> records; + /** The exclusive upper bound of consumed offsets per polled bucket in this round. */ + private final Map consumedUpToOffsets; + public ArrowScanRecords(Map> records) { + this(records, Collections.emptyMap()); + } + + public ArrowScanRecords( + Map> records, + Map consumedUpToOffsets) { this.records = records; + this.consumedUpToOffsets = consumedUpToOffsets; } /** Get just the Arrow batches for the given bucket. */ @@ -56,11 +67,23 @@ public List records(TableBucket scanBucket) { return Collections.unmodifiableList(recs); } - /** Returns the buckets that contain Arrow batches. */ + /** Returns the buckets that were polled in this round. */ public Set buckets() { return Collections.unmodifiableSet(records.keySet()); } + /** + * Get the exclusive upper bound of offsets consumed for the given bucket in this poll round. + * + * @param bucket the bucket to query + * @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in + * this round + */ + @Nullable + public Long consumedUpToOffset(TableBucket bucket) { + return consumedUpToOffsets.get(bucket); + } + /** Returns the total number of rows in all batches. */ public int count() { int count = 0; diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/batch/ArrowRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/lake/batch/ArrowRecordBatch.java index 78548d69d5..769e6fe9c2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/batch/ArrowRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/batch/ArrowRecordBatch.java @@ -18,11 +18,32 @@ package org.apache.fluss.lake.batch; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.record.ArrowBatchData; /** - * The Arrow implementation of the RecordBatch interface. + * The Arrow implementation of the {@link RecordBatch} interface. + * + *

Wraps an {@link ArrowBatchData} for use by lake writers that support batch writing via {@link + * org.apache.fluss.lake.writer.SupportsRecordBatchWrite}. * * @since 0.7 */ @PublicEvolving -public class ArrowRecordBatch implements RecordBatch {} +public class ArrowRecordBatch implements RecordBatch, AutoCloseable { + + private final ArrowBatchData arrowBatchData; + + public ArrowRecordBatch(ArrowBatchData arrowBatchData) { + this.arrowBatchData = arrowBatchData; + } + + /** Returns the underlying {@link ArrowBatchData}. */ + public ArrowBatchData getArrowBatchData() { + return arrowBatchData; + } + + @Override + public void close() { + arrowBatchData.close(); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java b/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java index 515f73d5aa..c3e55fadf7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java @@ -19,6 +19,8 @@ import org.apache.fluss.annotation.Internal; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import static org.apache.fluss.utils.Preconditions.checkArgument; @@ -38,6 +40,7 @@ public class ArrowBatchData implements AutoCloseable { private final long baseLogOffset; private final long timestamp; private final int schemaId; + private boolean closed; public ArrowBatchData( VectorSchemaRoot vectorSchemaRoot, long baseLogOffset, long timestamp, int schemaId) { @@ -72,6 +75,17 @@ public int getRecordCount() { return vectorSchemaRoot.getRowCount(); } + /** Returns the total size in bytes of the underlying Arrow buffers. */ + public long getSizeInBytes() { + long size = 0; + for (FieldVector vector : vectorSchemaRoot.getFieldVectors()) { + for (ArrowBuf buf : vector.getBuffers(false)) { + size += buf.readableBytes(); + } + } + return size; + } + /** * Creates a new {@link ArrowBatchData} containing a contiguous slice of this batch's rows and * releases the original vector data. @@ -92,12 +106,37 @@ skipRows < getRecordCount(), int remainingRows = getRecordCount() - skipRows; VectorSchemaRoot slicedRoot = vectorSchemaRoot.slice(skipRows, remainingRows); // release original vector buffers; sliced vectors hold independent copies - vectorSchemaRoot.close(); + close(); return new ArrowBatchData(slicedRoot, baseLogOffset + skipRows, timestamp, schemaId); } + /** + * Creates a new {@link ArrowBatchData} containing only the first {@code rowCount} rows and + * releases the original vector data. + * + *

After this method returns, the original {@link ArrowBatchData} instance MUST NOT be used + * or closed. The caller is responsible for closing the returned instance. + * + * @param rowCount the number of leading rows to keep + * @return a new {@link ArrowBatchData} containing the first {@code rowCount} rows + */ + public ArrowBatchData truncateAndTransferOwnership(int rowCount) { + checkArgument(rowCount > 0, "rowCount must be > 0, but is %s", rowCount); + checkArgument( + rowCount <= getRecordCount(), + "rowCount(%s) must be <= recordCount(%s)", + rowCount, + getRecordCount()); + VectorSchemaRoot slicedRoot = vectorSchemaRoot.slice(0, rowCount); + close(); + return new ArrowBatchData(slicedRoot, baseLogOffset, timestamp, schemaId); + } + @Override public void close() { - vectorSchemaRoot.close(); + if (!closed) { + closed = true; + vectorSchemaRoot.close(); + } } } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java index dff13f0405..5ea16bc17d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java @@ -54,7 +54,9 @@ private UnshadedArrowReadUtils() {} public static Schema toArrowSchema(RowType rowType) { org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema shadedSchema = ArrowUtils.toArrowSchema(rowType); - return Schema.deserializeMessage(ByteBuffer.wrap(shadedSchema.serializeAsMessage())); + // Use toByteArray()/deserialize() instead of serializeAsMessage()/deserializeMessage() + // for compatibility with Arrow 12.x (used by Spark 3.4/3.5) + return Schema.deserialize(ByteBuffer.wrap(shadedSchema.toByteArray())); } public static void loadArrowBatch( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index 6f0fc43b95..3e2fa13fd5 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -90,11 +90,16 @@ private static TieringSourceFetcherManager createFetc LakeTieringFactory lakeTieringFactory, Duration pollTimeout) { TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); + ClassLoader userClassLoader = context.getUserCodeClassLoader().asClassLoader(); return new TieringSourceFetcherManager<>( elementsQueue, () -> new TieringSplitReader<>( - connection, lakeTieringFactory, pollTimeout, tieringMetrics), + connection, + lakeTieringFactory, + userClassLoader, + pollTimeout, + tieringMetrics), context.getConfiguration(), (ignore) -> {}); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 335f5114c8..324ce8a9e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -21,7 +21,9 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.ArrowScanRecords; import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.LogScannerImpl; import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; @@ -29,12 +31,18 @@ import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.lake.batch.ArrowRecordBatch; import org.apache.fluss.lake.writer.LakeTieringFactory; import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.lake.writer.SupportsRecordBatchWrite; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ArrowBatchData; import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.function.SupplierWithException; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -56,6 +64,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.function.Function; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -98,6 +107,8 @@ public class TieringSplitReader @Nullable private BoundedSplitReader currentSnapshotSplitReader; @Nullable private TieringSnapshotSplit currentSnapshotSplit; @Nullable private Integer currentTableNumberOfSplits; + // whether the current table uses the Arrow record batch path for tiering + @Nullable private Boolean currentTableUseRecordBatchPath; // map from table bucket to split id private final Map currentTableSplitsByBucket; @@ -108,18 +119,21 @@ public class TieringSplitReader private final Set currentEmptySplits; private final TieringMetrics tieringMetrics; + private final boolean unshadedArrowAvailable; public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, + ClassLoader userClassLoader, TieringMetrics tieringMetrics) { - this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT, tieringMetrics); + this(connection, lakeTieringFactory, userClassLoader, DEFAULT_POLL_TIMEOUT, tieringMetrics); } @VisibleForTesting protected TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, + ClassLoader userClassLoader, Duration pollTimeout, TieringMetrics tieringMetrics) { this.lakeTieringFactory = lakeTieringFactory; @@ -136,6 +150,7 @@ protected TieringSplitReader( this.reachTieringMaxDurationTables = new HashSet<>(); this.pollTimeout = pollTimeout; this.tieringMetrics = tieringMetrics; + this.unshadedArrowAvailable = checkUnshadedArrowAvailable(userClassLoader); } @Override @@ -171,8 +186,23 @@ public RecordsWithSplitIds> fetch() throws I if (reachTieringMaxDurationTables.contains(currentTableId)) { return forceCompleteTieringLogRecords(); } - ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); - return forLogRecords(scanRecords); + if (useRecordBatchPath()) { + try (ArrowScanRecords arrowScanRecords = + ((LogScannerImpl) currentLogScanner).pollRecordBatch(pollTimeout)) { + return processLogRecords( + arrowScanRecords.buckets(), + arrowScanRecords::records, + this::handleArrowBatchRecords, + arrowScanRecords::consumedUpToOffset); + } + } else { + ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); + return processLogRecords( + scanRecords.buckets(), + scanRecords::records, + this::handleLogRecords, + scanRecords::consumedUpToOffset); + } } else { return emptyTableBucketWriteResultWithSplitIds(); } @@ -350,59 +380,88 @@ private void mayCreateLogScanner() { return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } - private RecordsWithSplitIds> forLogRecords( - ScanRecords scanRecords) throws IOException { + /** + * Determines whether the current table should use the Arrow record batch path for tiering. The + * batch path is used when the table is an ARROW format append-only (log) table and the lake + * writer supports batch writing. + */ + private boolean useRecordBatchPath() { + if (currentTableUseRecordBatchPath != null) { + return currentTableUseRecordBatchPath; + } + TableInfo tableInfo = checkNotNull(currentTable).getTableInfo(); + + currentTableUseRecordBatchPath = + unshadedArrowAvailable + && !tableInfo.hasPrimaryKey() + && tableInfo.getTableConfig().getLogFormat() == LogFormat.ARROW + && tableInfo.getTableConfig().getDataLakeFormat().orElse(null) + == DataLakeFormat.PAIMON; + return currentTableUseRecordBatchPath; + } + + /** + * Generic template method for processing tiering log records. Encapsulates the shared workflow + * of bucket traversal, stopping offset checks, LakeWriter management, offset/timestamp + * tracking, split completion, and table completion. + * + * @param buckets the set of buckets that have records + * @param recordsExtractor function to extract records for a given bucket + * @param handler callback for processing records within a single bucket + * @param consumedUpToOffsetExtractor function to extract the consumed-up-to offset for a given + * bucket. The offset is used for progress tracking and split completion even when records + * are empty. + * @param the record type + * @return the write results and finished split IDs + * @throws IOException if an I/O error occurs during processing + */ + private RecordsWithSplitIds> processLogRecords( + Set buckets, + Function> recordsExtractor, + BucketRecordsHandler handler, + Function consumedUpToOffsetExtractor) + throws IOException { Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); // Iterate every polled bucket, including those that only advanced their offset. - for (TableBucket bucket : scanRecords.buckets()) { + for (TableBucket bucket : buckets) { Long stoppingOffset = currentTableStoppingOffsets.get(bucket); if (stoppingOffset == null) { continue; } - List records = scanRecords.records(bucket); - LakeWriter lakeWriter = null; - ScanRecord lastRecord = null; - - for (ScanRecord record : records) { - lastRecord = record; - - // The scanner may return records beyond this split's exclusive stopping offset. - // Those records belong to the next split and must not be tiered here. - if (record.logOffset() >= stoppingOffset) { - continue; - } - - if (lakeWriter == null) { - lakeWriter = - getOrCreateLakeWriter( - bucket, - currentTableSplitsByBucket.get(bucket).getPartitionName()); - } - lakeWriter.write(record); - if (record.getSizeInBytes() > 0) { - tieringMetrics.recordBytesRead(record.getSizeInBytes()); - } - } + List records = recordsExtractor.apply(bucket); // consumedUpToOffset is an exclusive upper bound: all offsets before it have been // consumed by the scanner in this poll round. It may advance even when records is - // empty, for example when FIRST_ROW filters duplicate upserts into empty WAL batches. - Long consumedUpToOffset = scanRecords.consumedUpToOffset(bucket); + // empty, e.g. when FIRST_ROW filters duplicate upserts into empty WAL batches. + Long consumedUpToOffset = consumedUpToOffsetExtractor.apply(bucket); checkState( consumedUpToOffset != null, "Missing consumed-up-to offset for polled bucket %s.", bucket); + // Write records to the lake; returns the last written timestamp, + // or UNKNOWN_BUCKET_TIMESTAMP if no records were actually written. + long lastWrittenTimestamp = + handler.handleRecords( + records, + () -> + getOrCreateLakeWriter( + bucket, + currentTableSplitsByBucket + .get(bucket) + .getPartitionName()), + stoppingOffset); + // The split owns offsets before stoppingOffset only. If the scanner consumed past // the split boundary, cap the tiered progress at stoppingOffset so the next split // still owns later data. long tieredLogEndOffset = Math.min(consumedUpToOffset, stoppingOffset); long tieredTimestamp; - if (lastRecord != null) { - tieredTimestamp = lastRecord.timestamp(); + if (lastWrittenTimestamp >= 0) { + tieredTimestamp = lastWrittenTimestamp; } else { LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket); tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP; @@ -410,12 +469,13 @@ private RecordsWithSplitIds> forLogRecords( currentTableTieredOffsetAndTimestamp.put( bucket, new LogOffsetAndTimestamp(tieredLogEndOffset - 1, tieredTimestamp)); - // The split owns offsets below stoppingOffset. If the scanner has not consumed up to - // that exclusive bound yet, keep the split active. + // The split owns offsets below stoppingOffset. If the scanner has not consumed up + // to that exclusive bound yet, keep the split active. if (consumedUpToOffset < stoppingOffset) { continue; } + // Split completion: unsubscribe, remove split, complete lake writer. currentTableStoppingOffsets.remove(bucket); if (bucket.getPartitionId() != null) { currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); @@ -447,6 +507,87 @@ private RecordsWithSplitIds> forLogRecords( return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } + /** + * Handles row-based ScanRecord writing for the log path. + * + * @return the timestamp of the last written record, or -1 if no records were written + */ + private long handleLogRecords( + List records, + SupplierWithException, IOException> lakeWriterSupplier, + long stoppingOffset) + throws IOException { + long lastWrittenTimestamp = UNKNOWN_BUCKET_TIMESTAMP; + LakeWriter lakeWriter = null; + for (ScanRecord record : records) { + if (record.logOffset() < stoppingOffset) { + if (lakeWriter == null) { + lakeWriter = lakeWriterSupplier.get(); + } + lakeWriter.write(record); + lastWrittenTimestamp = record.timestamp(); + if (record.getSizeInBytes() > 0) { + tieringMetrics.recordBytesRead(record.getSizeInBytes()); + } + } + } + return lastWrittenTimestamp; + } + + /** + * Handles Arrow batch writing for the record batch path. + * + * @return the timestamp of the last written batch, or -1 if no batches were written + */ + private long handleArrowBatchRecords( + List batches, + SupplierWithException, IOException> lakeWriterSupplier, + long stoppingOffset) + throws IOException { + SupportsRecordBatchWrite batchWriter = null; + long lastWrittenTimestamp = UNKNOWN_BUCKET_TIMESTAMP; + for (ArrowBatchData batch : batches) { + long batchBaseOffset = batch.getBaseLogOffset(); + long batchRecordCount = batch.getRecordCount(); + long batchTimestamp = batch.getTimestamp(); + if (batchBaseOffset >= stoppingOffset) { + batch.close(); + continue; + } + + long writableRowCount = stoppingOffset - batchBaseOffset; + int writableRows = (int) Math.min(batchRecordCount, writableRowCount); + if (writableRows <= 0) { + batch.close(); + continue; + } + + if (batchWriter == null) { + LakeWriter lakeWriter = lakeWriterSupplier.get(); + if (!(lakeWriter instanceof SupportsRecordBatchWrite)) { + throw new IOException( + "LakeWriter does not support RecordBatch writes: " + + lakeWriter.getClass().getName()); + } + batchWriter = (SupportsRecordBatchWrite) lakeWriter; + } + + ArrowBatchData batchToWrite = batch; + if (writableRows < batchRecordCount) { + batchToWrite = batch.truncateAndTransferOwnership(writableRows); + } + long batchSizeInBytes = batchToWrite.getSizeInBytes(); + try (ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(batchToWrite)) { + batchWriter.write(arrowRecordBatch); + } + if (batchSizeInBytes > 0) { + tieringMetrics.recordBytesRead(batchSizeInBytes); + } + lastWrittenTimestamp = batchTimestamp; + } + return lastWrittenTimestamp; + } + private LakeWriter getOrCreateLakeWriter( TableBucket bucket, @Nullable String partitionName) throws IOException { LakeWriter lakeWriter = lakeWriters.get(bucket); @@ -591,6 +732,7 @@ private void finishCurrentTable() throws IOException { currentTableId = null; currentTablePath = null; currentTableNumberOfSplits = null; + currentTableUseRecordBatchPath = null; currentPendingSnapshotSplits.clear(); currentTableStoppingOffsets.clear(); currentTableTieredOffsetAndTimestamp.clear(); @@ -732,6 +874,41 @@ public Set finishedSplits() { } } + /** + * Callback interface for processing records within a single bucket. Encapsulates the + * differences in write strategy between the row-based (ScanRecord) and Arrow batch + * (ArrowBatchData) paths. + * + * @param the record type (ScanRecord or ArrowBatchData) + */ + @FunctionalInterface + private interface BucketRecordsHandler { + + /** + * Processes the records for a bucket and writes them to the lake. + * + * @param records the records for this bucket + * @param lakeWriterSupplier supplier for lazily creating the lake writer + * @param stoppingOffset the stopping offset for this bucket + * @return the timestamp of the last written record, or -1 if no records were written + * @throws IOException if an I/O error occurs during writing + */ + long handleRecords( + List records, + SupplierWithException, IOException> lakeWriterSupplier, + long stoppingOffset) + throws IOException; + } + + private static boolean checkUnshadedArrowAvailable(ClassLoader classLoader) { + try { + Class.forName("org.apache.arrow.vector.VectorSchemaRoot", false, classLoader); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + private static final class LogOffsetAndTimestamp { private final long logOffset; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 9aad5d1c3d..91c65ffc7e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -431,7 +431,10 @@ private TieringSplitReader createTieringReader(Connection co InternalSourceReaderMetricGroup.mock( new MetricListener().getMetricGroup())); return new TieringSplitReader<>( - connection, new TestingLakeTieringFactory(), tieringMetrics); + connection, + new TestingLakeTieringFactory(), + Thread.currentThread().getContextClassLoader(), + tieringMetrics); } private TieringSplitReader createTieringReader( @@ -440,7 +443,11 @@ private TieringSplitReader createTieringReader( new TieringMetrics( InternalSourceReaderMetricGroup.mock( new MetricListener().getMetricGroup())); - return new TieringSplitReader<>(connection, lakeTieringFactory, tieringMetrics); + return new TieringSplitReader<>( + connection, + lakeTieringFactory, + Thread.currentThread().getContextClassLoader(), + tieringMetrics); } private void verifyTieringRows( diff --git a/fluss-lake/fluss-lake-paimon/pom.xml b/fluss-lake/fluss-lake-paimon/pom.xml index 1deb0fd65a..a1e6d1abca 100644 --- a/fluss-lake/fluss-lake-paimon/pom.xml +++ b/fluss-lake/fluss-lake-paimon/pom.xml @@ -51,6 +51,31 @@ provided + + + org.apache.paimon + paimon-arrow + ${paimon.version} + provided + + + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + provided + + org.apache.fluss fluss-client diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8ac7aabe4e..5c2738e233 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -17,9 +17,12 @@ package org.apache.fluss.lake.paimon.tiering; +import org.apache.fluss.lake.batch.ArrowRecordBatch; +import org.apache.fluss.lake.batch.RecordBatch; import org.apache.fluss.lake.paimon.tiering.append.AppendOnlyWriter; import org.apache.fluss.lake.paimon.tiering.mergetree.MergeTreeWriter; import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.lake.writer.SupportsRecordBatchWrite; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogRecord; @@ -38,7 +41,7 @@ import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; /** Implementation of {@link LakeWriter} for Paimon. */ -public class PaimonLakeWriter implements LakeWriter { +public class PaimonLakeWriter implements LakeWriter, SupportsRecordBatchWrite { private final Catalog paimonCatalog; private final RecordWriter recordWriter; @@ -80,6 +83,25 @@ public void write(LogRecord record) throws IOException { } } + @Override + public void write(RecordBatch recordBatch) throws IOException { + if (!(recordBatch instanceof ArrowRecordBatch)) { + throw new IllegalArgumentException( + "PaimonLakeWriter only supports ArrowRecordBatch, but got " + + recordBatch.getClass().getSimpleName()); + } + if (!(recordWriter instanceof AppendOnlyWriter)) { + throw new IllegalStateException( + "Arrow record batch writing is only supported for append-only tables."); + } + try { + ((AppendOnlyWriter) recordWriter) + .writeArrowBatch(((ArrowRecordBatch) recordBatch).getArrowBatchData()); + } catch (Exception e) { + throw new IOException("Failed to write Arrow record batch to Paimon.", e); + } + } + @Override public PaimonWriteResult complete() throws IOException { CommitMessage commitMessage; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyArrowBatchHelper.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyArrowBatchHelper.java new file mode 100644 index 0000000000..4fdb8bce79 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyArrowBatchHelper.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.tiering.append; + +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.record.ArrowBatchData; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.paimon.arrow.ArrowBundleRecords; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * Helper class that encapsulates Arrow-dependent batch writing logic for append-only tables. + * + *

This class is separated from {@link AppendOnlyWriter} to avoid loading Arrow classes when + * Arrow is not on the classpath. It is lazily loaded only when Arrow batch writing is actually + * needed. + */ +class AppendOnlyArrowBatchHelper implements AutoCloseable { + + private final FileStoreTable fileStoreTable; + private final TableWriteImpl tableWrite; + private final RowType tableRowType; + private final int bucket; + + private static final Field BUCKET_FIELD = + new Field( + TableDescriptor.BUCKET_COLUMN_NAME, + new FieldType(false, new ArrowType.Int(32, true), null), + null); + private static final Field OFFSET_FIELD = + new Field( + TableDescriptor.OFFSET_COLUMN_NAME, + new FieldType(false, new ArrowType.Int(64, true), null), + null); + private static final Field TIMESTAMP_FIELD = + new Field( + TableDescriptor.TIMESTAMP_COLUMN_NAME, + new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null), + null); + + // Child allocator for system column vectors, sharing the same root as the batch allocator + @Nullable private BufferAllocator systemColumnAllocator; + + // Reusable resources for enriched VectorSchemaRoot with system columns + @Nullable private VectorSchemaRoot enrichedRoot; + @Nullable private Schema enrichedSchema; + @Nullable private IntVector bucketVector; + @Nullable private BigIntVector offsetVector; + @Nullable private TimeStampMilliVector timestampVector; + + AppendOnlyArrowBatchHelper( + FileStoreTable fileStoreTable, + TableWriteImpl tableWrite, + RowType tableRowType, + int bucket) { + this.fileStoreTable = fileStoreTable; + this.tableWrite = tableWrite; + this.tableRowType = tableRowType; + this.bucket = bucket; + } + + /** + * Writes an Arrow batch directly to Paimon Parquet files. Enriches the VectorSchemaRoot with + * system columns (__bucket, __offset, __timestamp) and uses Paimon's {@link ArrowBundleRecords} + * for efficient batch writing. + */ + void writeArrowBatch(ArrowBatchData arrowBatchData, BinaryRow partition) throws Exception { + int writtenBucket = bucket; + if (fileStoreTable.store().bucketMode() == BucketMode.BUCKET_UNAWARE) { + writtenBucket = 0; + } + + VectorSchemaRoot originalRoot = arrowBatchData.getVectorSchemaRoot(); + long baseOffset = arrowBatchData.getBaseLogOffset(); + long timestamp = arrowBatchData.getTimestamp(); + int rowCount = originalRoot.getRowCount(); + + ensureEnrichedRootInitialized(originalRoot, originalRoot.getVector(0).getAllocator()); + updateEnrichedVectorSchemaRoot(writtenBucket, baseOffset, timestamp, rowCount); + + ArrowBundleRecords arrowBundleRecords = + new ArrowBundleRecords(enrichedRoot, tableRowType, false); + + tableWrite.writeBundle(partition, writtenBucket, arrowBundleRecords); + } + + /** + * Ensures the enriched VectorSchemaRoot is initialized with system column vectors. Reuses + * system column vectors when the root allocator has not changed. The enrichedRoot references + * the current originalRoot's data vectors plus the system column vectors. + * + *

System column vectors must share the same root allocator as the data vectors. Batches from + * different poll rounds may use different root allocators (each {@code CompletedFetch} creates + * its own {@code LogRecordReadContext} with a fresh {@code RootAllocator}), so the system + * column vectors are recreated when the root allocator changes. + */ + private void ensureEnrichedRootInitialized( + VectorSchemaRoot originalRoot, BufferAllocator batchAllocator) { + List originalFields = originalRoot.getSchema().getFields(); + int currentFieldCount = originalFields.size(); + + BufferAllocator currentRoot = batchAllocator.getRoot(); + + // (Re)create system column vectors when the root allocator has changed. + if (bucketVector == null || systemColumnAllocator.getRoot() != currentRoot) { + closeSystemColumns(); + + systemColumnAllocator = + currentRoot.newChildAllocator("system-column-allocator", 0, Long.MAX_VALUE); + bucketVector = new IntVector(BUCKET_FIELD, systemColumnAllocator); + offsetVector = new BigIntVector(OFFSET_FIELD, systemColumnAllocator); + timestampVector = new TimeStampMilliVector(TIMESTAMP_FIELD, systemColumnAllocator); + } + + if (enrichedSchema == null) { + List enrichedFields = new ArrayList<>(originalFields); + enrichedFields.add(BUCKET_FIELD); + enrichedFields.add(OFFSET_FIELD); + enrichedFields.add(TIMESTAMP_FIELD); + enrichedSchema = new Schema(enrichedFields); + } + + // recreate enrichedRoot to reference the current originalRoot's data vectors + List allVectors = new ArrayList<>(); + for (int i = 0; i < currentFieldCount; i++) { + allVectors.add(originalRoot.getVector(i)); + } + allVectors.add(bucketVector); + allVectors.add(offsetVector); + allVectors.add(timestampVector); + + enrichedRoot = new VectorSchemaRoot(enrichedSchema, allVectors, originalRoot.getRowCount()); + } + + private void closeSystemColumns() { + if (bucketVector != null) { + bucketVector.close(); + bucketVector = null; + } + if (offsetVector != null) { + offsetVector.close(); + offsetVector = null; + } + if (timestampVector != null) { + timestampVector.close(); + timestampVector = null; + } + if (systemColumnAllocator != null) { + systemColumnAllocator.close(); + systemColumnAllocator = null; + } + } + + /** + * Updates system column values in the enriched VectorSchemaRoot. Data columns are already + * referenced from the original root. + */ + private void updateEnrichedVectorSchemaRoot( + int bucket, long baseOffset, long timestamp, int rowCount) { + enrichedRoot.setRowCount(rowCount); + + if (bucketVector.getValueCapacity() < rowCount) { + bucketVector.allocateNew(rowCount); + } + if (offsetVector.getValueCapacity() < rowCount) { + offsetVector.allocateNew(rowCount); + } + if (timestampVector.getValueCapacity() < rowCount) { + timestampVector.allocateNew(rowCount); + } + + for (int i = 0; i < rowCount; i++) { + bucketVector.set(i, bucket); + } + bucketVector.setValueCount(rowCount); + + for (int i = 0; i < rowCount; i++) { + offsetVector.set(i, baseOffset + i); + } + offsetVector.setValueCount(rowCount); + + for (int i = 0; i < rowCount; i++) { + timestampVector.set(i, timestamp); + } + timestampVector.setValueCount(rowCount); + } + + @Override + public void close() { + closeSystemColumns(); + enrichedRoot = null; + enrichedSchema = null; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java index a0966dfd4c..0caaed97c4 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java @@ -19,6 +19,7 @@ import org.apache.fluss.lake.paimon.tiering.RecordWriter; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ArrowBatchData; import org.apache.fluss.record.LogRecord; import org.apache.fluss.types.RowType; @@ -38,6 +39,13 @@ public class AppendOnlyWriter extends RecordWriter { private final FileStoreTable fileStoreTable; + /** + * Lazily-initialized helper for Arrow batch writing. Stored as {@link AutoCloseable} to avoid + * loading Arrow classes when Arrow is not on the classpath. The actual type is {@link + * AppendOnlyArrowBatchHelper} which is only loaded when {@link #writeArrowBatch} is called. + */ + @Nullable private AutoCloseable arrowBatchHelper; + public AppendOnlyWriter( FileStoreTable fileStoreTable, TableBucket tableBucket, @@ -71,4 +79,31 @@ public void write(LogRecord record) throws Exception { } tableWrite.getWrite().write(partition, writtenBucket, flussRecordAsPaimonRow); } + + /** + * Writes an Arrow batch directly to Paimon Parquet files. Delegates to {@link + * AppendOnlyArrowBatchHelper} which is lazily loaded to avoid class loading issues when Arrow + * is not on the classpath. + */ + public void writeArrowBatch(ArrowBatchData arrowBatchData) throws Exception { + AppendOnlyArrowBatchHelper helper; + if (arrowBatchHelper == null) { + helper = + new AppendOnlyArrowBatchHelper( + fileStoreTable, tableWrite, tableRowType, bucket); + arrowBatchHelper = helper; + } else { + helper = (AppendOnlyArrowBatchHelper) arrowBatchHelper; + } + helper.writeArrowBatch(arrowBatchData, partition); + } + + @Override + public void close() throws Exception { + if (arrowBatchHelper != null) { + arrowBatchHelper.close(); + arrowBatchHelper = null; + } + super.close(); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java new file mode 100644 index 0000000000..a8e395725a --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow.converter; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.columnar.ArrayColumnVector; +import org.apache.paimon.data.columnar.BooleanColumnVector; +import org.apache.paimon.data.columnar.ByteColumnVector; +import org.apache.paimon.data.columnar.BytesColumnVector; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.ColumnarArray; +import org.apache.paimon.data.columnar.ColumnarMap; +import org.apache.paimon.data.columnar.ColumnarRow; +import org.apache.paimon.data.columnar.DecimalColumnVector; +import org.apache.paimon.data.columnar.DoubleColumnVector; +import org.apache.paimon.data.columnar.FloatColumnVector; +import org.apache.paimon.data.columnar.IntColumnVector; +import org.apache.paimon.data.columnar.LongColumnVector; +import org.apache.paimon.data.columnar.MapColumnVector; +import org.apache.paimon.data.columnar.RowColumnVector; +import org.apache.paimon.data.columnar.ShortColumnVector; +import org.apache.paimon.data.columnar.TimestampColumnVector; +import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; + +import java.util.ArrayList; +import java.util.List; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Convert a {@link FieldVector} to {@link ColumnVector}. + * + *

This is copied from Paimon's {@code Arrow2PaimonVectorConverter} (paimon-arrow) with the + * following bug fixes: + * + *

    + *
  • {@code TimestampType} and {@code LocalZonedTimestampType}: read the raw long value via + * {@code vector.getDataBuffer().getLong()} instead of {@code ((TimeStampVector) + * vector).get(i)} or {@code (long) vector.getObject(i)}, which may return {@code + * LocalDateTime} and cause {@code ClassCastException} for timezone-unaware timestamp vectors. + *
  • {@code TimeType}: handle all Arrow time vector types ({@code TimeMilliVector}, {@code + * TimeMicroVector}, {@code TimeNanoVector}, {@code TimeSecVector}) instead of hardcoding + * {@code TimeMilliVector}. + *
  • {@code BinaryType}: use {@code FixedSizeBinaryVector} instead of {@code VarBinaryVector} to + * match the fixed-length binary Arrow type. + *
+ * + *

TODO: remove this class once https://github.com/apache/paimon/issues/8134 is fixed in upstream + * Paimon. + */ +public interface Arrow2PaimonVectorConverter { + + static Arrow2PaimonVectorConverter construct(DataType type) { + return type.accept(Arrow2PaimonVectorConvertorVisitor.INSTANCE); + } + + static Arrow2PaimonVectorConverter construct( + Arrow2PaimonVectorConvertorVisitor visitor, DataType type) { + return type.accept(visitor); + } + + ColumnVector convertVector(FieldVector vector); + + /** Visitor to create convertor from arrow to paimon. */ + class Arrow2PaimonVectorConvertorVisitor + implements DataTypeVisitor { + + public static final Arrow2PaimonVectorConvertorVisitor INSTANCE = + new Arrow2PaimonVectorConvertorVisitor(); + + @Override + public Arrow2PaimonVectorConverter visit(CharType charType) { + return vector -> + new BytesColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Bytes getBytes(int index) { + byte[] bytes; + if (vector instanceof FixedSizeBinaryVector) { + bytes = ((FixedSizeBinaryVector) vector).get(index); + } else { + bytes = ((VarCharVector) vector).get(index); + } + + return new Bytes(bytes, 0, bytes.length) { + @Override + public byte[] getBytes() { + return bytes; + } + }; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(VarCharType varCharType) { + return vector -> + new BytesColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Bytes getBytes(int index) { + byte[] bytes = ((VarCharVector) vector).get(index); + return new Bytes(bytes, 0, bytes.length) { + @Override + public byte[] getBytes() { + return bytes; + } + }; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(BooleanType booleanType) { + return vector -> + new BooleanColumnVector() { + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public boolean getBoolean(int index) { + return ((BitVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(BinaryType binaryType) { + return vector -> + new BytesColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Bytes getBytes(int index) { + byte[] bytes = ((FixedSizeBinaryVector) vector).get(index); + return new Bytes(bytes, 0, bytes.length) { + @Override + public byte[] getBytes() { + return bytes; + } + }; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(VarBinaryType varBinaryType) { + return vector -> + new BytesColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Bytes getBytes(int index) { + byte[] bytes = ((VarBinaryVector) vector).get(index); + return new Bytes(bytes, 0, bytes.length) { + @Override + public byte[] getBytes() { + return bytes; + } + }; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(DecimalType decimalType) { + return vector -> + new DecimalColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Decimal getDecimal(int index, int precision, int scale) { + return Decimal.fromBigDecimal( + ((DecimalVector) vector).getObject(index), precision, scale); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(TinyIntType tinyIntType) { + return vector -> + new ByteColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public byte getByte(int index) { + return ((TinyIntVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(SmallIntType smallIntType) { + return vector -> + new ShortColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public short getShort(int index) { + return ((SmallIntVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(IntType intType) { + return vector -> + new IntColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public int getInt(int index) { + return ((IntVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(BigIntType bigIntType) { + return vector -> + new LongColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public long getLong(int index) { + return ((BigIntVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(FloatType floatType) { + return vector -> + new FloatColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public float getFloat(int index) { + return ((Float4Vector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(DoubleType doubleType) { + return vector -> + new DoubleColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public double getDouble(int index) { + return ((Float8Vector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(DateType dateType) { + return vector -> + new IntColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public int getInt(int index) { + return ((DateDayVector) vector).getObject(index); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(TimeType timeType) { + // Paimon stores TIME as milliseconds int. Arrow may use different units + // depending on precision, so convert to millis accordingly. + return vector -> + new IntColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public int getInt(int index) { + if (vector instanceof TimeMilliVector) { + return ((TimeMilliVector) vector).get(index); + } else if (vector instanceof TimeMicroVector) { + return (int) (((TimeMicroVector) vector).get(index) / 1_000); + } else if (vector instanceof TimeNanoVector) { + return (int) (((TimeNanoVector) vector).get(index) / 1_000_000); + } else if (vector instanceof TimeSecVector) { + return ((TimeSecVector) vector).get(index) * 1_000; + } + throw new UnsupportedOperationException( + "Unsupported Arrow time vector type: " + + vector.getClass().getName()); + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(TimestampType timestampType) { + return vector -> + new TimestampColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Timestamp getTimestamp(int i, int precision) { + long value = + vector.getDataBuffer() + .getLong((long) i * TimeStampVector.TYPE_WIDTH); + if (precision == 0) { + return Timestamp.fromEpochMillis(value * 1000); + } else if (precision >= 1 && precision <= 3) { + return Timestamp.fromEpochMillis(value); + } else if (precision >= 4 && precision <= 6) { + return Timestamp.fromMicros(value); + } else { + return Timestamp.fromEpochMillis( + value / 1_000_000, (int) (value % 1_000_000)); + } + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(LocalZonedTimestampType localZonedTimestampType) { + return vector -> + new TimestampColumnVector() { + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public Timestamp getTimestamp(int i, int precision) { + long value = + vector.getDataBuffer() + .getLong((long) i * TimeStampVector.TYPE_WIDTH); + if (precision == 0) { + return Timestamp.fromEpochMillis(value * 1000); + } else if (precision >= 1 && precision <= 3) { + return Timestamp.fromEpochMillis(value); + } else if (precision >= 4 && precision <= 6) { + return Timestamp.fromMicros(value); + } else { + return Timestamp.fromEpochMillis( + value / 1_000_000, (int) (value % 1_000_000)); + } + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(VariantType variantType) { + throw new UnsupportedOperationException(); + } + + @Override + public Arrow2PaimonVectorConverter visit(ArrayType arrayType) { + final Arrow2PaimonVectorConverter arrowVectorConvertor = + arrayType.getElementType().accept(this); + + return vector -> + new ArrayColumnVector() { + + private boolean inited = false; + private ColumnVector columnVector; + + private void init() { + if (!inited) { + FieldVector child = ((ListVector) vector).getDataVector(); + this.columnVector = arrowVectorConvertor.convertVector(child); + inited = true; + } + } + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public InternalArray getArray(int index) { + init(); + ListVector listVector = (ListVector) vector; + int start = listVector.getElementStartIndex(index); + int end = listVector.getElementEndIndex(index); + return new ColumnarArray(columnVector, start, end - start); + } + + @Override + public ColumnVector getColumnVector() { + init(); + return columnVector; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(MultisetType multisetType) { + throw new UnsupportedOperationException("Doesn't support MultisetType."); + } + + @Override + public Arrow2PaimonVectorConverter visit(MapType mapType) { + final Arrow2PaimonVectorConverter keyConvertor = mapType.getKeyType().accept(this); + final Arrow2PaimonVectorConverter valueConverter = mapType.getValueType().accept(this); + + return vector -> + new MapColumnVector() { + + private boolean inited = false; + private ListVector mapVector; + private ColumnVector keyColumnVector; + private ColumnVector valueColumnVector; + + private void init() { + if (!inited) { + this.mapVector = (ListVector) vector; + StructVector listVector = (StructVector) mapVector.getDataVector(); + + FieldVector keyVector = listVector.getChildrenFromFields().get(0); + FieldVector valueVector = listVector.getChildrenFromFields().get(1); + + this.keyColumnVector = keyConvertor.convertVector(keyVector); + this.valueColumnVector = valueConverter.convertVector(valueVector); + inited = true; + } + } + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public InternalMap getMap(int index) { + init(); + + int start = mapVector.getElementStartIndex(index); + int end = mapVector.getElementEndIndex(index); + + return new ColumnarMap( + keyColumnVector, valueColumnVector, start, end - start); + } + + @Override + public ColumnVector[] getChildren() { + return new ColumnVector[] {keyColumnVector, valueColumnVector}; + } + }; + } + + @Override + public Arrow2PaimonVectorConverter visit(RowType rowType) { + final List convertors = new ArrayList<>(); + final List names = new ArrayList<>(); + List fields = rowType.getFields(); + for (int i = 0; i < rowType.getFields().size(); i++) { + convertors.add(rowType.getTypeAt(i).accept(this)); + names.add(fields.get(i).name()); + } + + return vector -> + new RowColumnVector() { + + private boolean inited = false; + private VectorizedColumnBatch vectorizedColumnBatch; + + private void init() { + if (!inited) { + List children = + ((StructVector) vector).getChildrenFromFields(); + ColumnVector[] vectors = new ColumnVector[convertors.size()]; + + for (FieldVector child : children) { + int index = names.indexOf(child.getName()); + if (index != -1) { + vectors[index] = convertors.get(index).convertVector(child); + } + } + + this.vectorizedColumnBatch = new VectorizedColumnBatch(vectors); + inited = true; + } + } + + @Override + public boolean isNullAt(int index) { + return vector.isNull(index); + } + + @Override + public InternalRow getRow(int index) { + init(); + return new ColumnarRow(vectorizedColumnBatch, index); + } + + @Override + public VectorizedColumnBatch getBatch() { + init(); + return vectorizedColumnBatch; + } + }; + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index bf92cd5f57..3d4da4fe50 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -366,9 +366,9 @@ void testTieringForAlterTable() throws Exception { for (int i = 0; i < 10; i++) { rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); flussRows.addAll(rows); - // write records - writeRows(t2, rows, true); } + // write records + writeRows(t2, flussRows, true); // check the status of replica after synced; // note: we can't update log start offset for unaware bucket mode log table assertReplicaStatus(t2Bucket, 30); diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index c565f22db8..3e2de27611 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -95,6 +95,13 @@ test + + org.apache.paimon + paimon-arrow + ${paimon.version} + test + + org.apache.fluss fluss-lake-iceberg diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 944378b00a..38aeaa9898 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -454,6 +454,7 @@ org.apache.fluss.lake.batch.ArrowRecordBatch org.apache.fluss.lake.committer.CommittedLakeSnapshot org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType + org.apache.paimon.arrow.converter.Arrow2PaimonVectorConverter* org.apache.fluss.lake.lance.*