Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ protected int recordCount(List<ArrowBatchData> fetchedRecords) {
protected ArrowScanRecords toResult(
Map<TableBucket, List<ArrowBatchData>> fetchedRecords,
Map<TableBucket, Long> consumedUpToOffsets) {
// Arrow scan paths don't need consumedUpToOffsets (issue #2371 is specific to
// row-based tiering), so it's discarded here rather than carried in ArrowScanRecords.
return new ArrowScanRecords(fetchedRecords);
return new ArrowScanRecords(fetchedRecords, consumedUpToOffsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.utils.IOUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -43,8 +44,18 @@ public class ArrowScanRecords implements Iterable<ArrowBatchData>, AutoCloseable

private final Map<TableBucket, List<ArrowBatchData>> records;

/** The exclusive upper bound of consumed offsets per polled bucket in this round. */
private final Map<TableBucket, Long> consumedUpToOffsets;

public ArrowScanRecords(Map<TableBucket, List<ArrowBatchData>> records) {
this(records, Collections.emptyMap());
}

public ArrowScanRecords(
Map<TableBucket, List<ArrowBatchData>> records,
Map<TableBucket, Long> consumedUpToOffsets) {
this.records = records;
this.consumedUpToOffsets = consumedUpToOffsets;
}

/** Get just the Arrow batches for the given bucket. */
Expand All @@ -56,11 +67,23 @@ public List<ArrowBatchData> records(TableBucket scanBucket) {
return Collections.unmodifiableList(recs);
}

/** Returns the buckets that contain Arrow batches. */
/** Returns the buckets that were polled in this round. */
public Set<TableBucket> buckets() {
return Collections.unmodifiableSet(records.keySet());
}

/**
* Get the exclusive upper bound of offsets consumed for the given bucket in this poll round.
*
* @param bucket the bucket to query
* @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in
* this round
*/
@Nullable
public Long consumedUpToOffset(TableBucket bucket) {
return consumedUpToOffsets.get(bucket);
}

/** Returns the total number of rows in all batches. */
public int count() {
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,32 @@
package org.apache.fluss.lake.batch;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.record.ArrowBatchData;

/**
* The Arrow implementation of the RecordBatch interface.
* The Arrow implementation of the {@link RecordBatch} interface.
*
* <p>Wraps an {@link ArrowBatchData} for use by lake writers that support batch writing via {@link
* org.apache.fluss.lake.writer.SupportsRecordBatchWrite}.
*
* @since 0.7
*/
@PublicEvolving
public class ArrowRecordBatch implements RecordBatch {}
public class ArrowRecordBatch implements RecordBatch, AutoCloseable {

private final ArrowBatchData arrowBatchData;

public ArrowRecordBatch(ArrowBatchData arrowBatchData) {
this.arrowBatchData = arrowBatchData;
}

/** Returns the underlying {@link ArrowBatchData}. */
public ArrowBatchData getArrowBatchData() {
return arrowBatchData;
}

@Override
public void close() {
arrowBatchData.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.fluss.annotation.Internal;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;

import static org.apache.fluss.utils.Preconditions.checkArgument;
Expand All @@ -38,6 +40,7 @@ public class ArrowBatchData implements AutoCloseable {
private final long baseLogOffset;
private final long timestamp;
private final int schemaId;
private boolean closed;

public ArrowBatchData(
VectorSchemaRoot vectorSchemaRoot, long baseLogOffset, long timestamp, int schemaId) {
Expand Down Expand Up @@ -72,6 +75,17 @@ public int getRecordCount() {
return vectorSchemaRoot.getRowCount();
}

/** Returns the total size in bytes of the underlying Arrow buffers. */
public long getSizeInBytes() {
long size = 0;
for (FieldVector vector : vectorSchemaRoot.getFieldVectors()) {
for (ArrowBuf buf : vector.getBuffers(false)) {
size += buf.readableBytes();
}
}
return size;
}

/**
* Creates a new {@link ArrowBatchData} containing a contiguous slice of this batch's rows and
* releases the original vector data.
Expand All @@ -92,12 +106,37 @@ skipRows < getRecordCount(),
int remainingRows = getRecordCount() - skipRows;
VectorSchemaRoot slicedRoot = vectorSchemaRoot.slice(skipRows, remainingRows);
// release original vector buffers; sliced vectors hold independent copies
vectorSchemaRoot.close();
close();
return new ArrowBatchData(slicedRoot, baseLogOffset + skipRows, timestamp, schemaId);
}

/**
* Creates a new {@link ArrowBatchData} containing only the first {@code rowCount} rows and
* releases the original vector data.
*
* <p>After this method returns, the original {@link ArrowBatchData} instance MUST NOT be used
* or closed. The caller is responsible for closing the returned instance.
*
* @param rowCount the number of leading rows to keep
* @return a new {@link ArrowBatchData} containing the first {@code rowCount} rows
*/
public ArrowBatchData truncateAndTransferOwnership(int rowCount) {
checkArgument(rowCount > 0, "rowCount must be > 0, but is %s", rowCount);
checkArgument(
rowCount <= getRecordCount(),
"rowCount(%s) must be <= recordCount(%s)",
rowCount,
getRecordCount());
VectorSchemaRoot slicedRoot = vectorSchemaRoot.slice(0, rowCount);
close();
return new ArrowBatchData(slicedRoot, baseLogOffset, timestamp, schemaId);
}

@Override
public void close() {
vectorSchemaRoot.close();
if (!closed) {
closed = true;
vectorSchemaRoot.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ private UnshadedArrowReadUtils() {}
public static Schema toArrowSchema(RowType rowType) {
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema shadedSchema =
ArrowUtils.toArrowSchema(rowType);
return Schema.deserializeMessage(ByteBuffer.wrap(shadedSchema.serializeAsMessage()));
// Use toByteArray()/deserialize() instead of serializeAsMessage()/deserializeMessage()
// for compatibility with Arrow 12.x (used by Spark 3.4/3.5)
return Schema.deserialize(ByteBuffer.wrap(shadedSchema.toByteArray()));
}

public static void loadArrowBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetc
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup());
ClassLoader userClassLoader = context.getUserCodeClassLoader().asClassLoader();
return new TieringSourceFetcherManager<>(
elementsQueue,
() ->
new TieringSplitReader<>(
connection, lakeTieringFactory, pollTimeout, tieringMetrics),
connection,
lakeTieringFactory,
userClassLoader,
pollTimeout,
tieringMetrics),
context.getConfiguration(),
(ignore) -> {});
}
Expand Down
Loading