From 7a02a8dd8a652a43616e24ffa67a44e918c8e8fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=BD=87?= Date: Tue, 26 May 2026 10:36:35 +0800 Subject: [PATCH] [lake/tiering] Support reporting watermark to Paimon snapshot --- .../fluss/lake/committer/LakeCommitter.java | 19 +- .../lake/watermark/WatermarkExtractor.java | 41 +++ .../fluss/lake/writer/LakeTieringFactory.java | 3 +- .../fluss/lake/writer/LakeWriteResult.java | 45 ++++ .../apache/fluss/lake/writer/LakeWriter.java | 2 +- .../fluss/lake/writer/WriterInitContext.java | 9 + .../committer/TieringCommitOperator.java | 26 +- .../TieringCommitOperatorFactory.java | 3 +- .../source/TableBucketWriteResult.java | 3 +- .../source/TableBucketWriteResultEmitter.java | 3 +- .../TableBucketWriteResultSerializer.java | 34 ++- .../TableBucketWriteResultTypeInfo.java | 10 +- .../flink/tiering/source/TieringSource.java | 5 +- .../source/TieringSourceFetcherManager.java | 3 +- .../tiering/source/TieringSourceReader.java | 19 +- .../tiering/source/TieringSplitReader.java | 10 +- .../source/TieringWriterInitContext.java | 12 +- .../watermark/SimpleWatermarkExtractor.java | 203 ++++++++++++++ .../tiering/TestingLakeTieringFactory.java | 10 +- .../flink/tiering/TestingWriteResult.java | 20 +- .../committer/TieringCommitOperatorTest.java | 127 ++++++++- .../SimpleWatermarkExtractorTest.java | 251 ++++++++++++++++++ .../tiering/TestingValuesLakeCommitter.java | 3 +- .../tiering/TestingValuesLakeWriter.java | 4 +- .../iceberg/tiering/IcebergLakeCommitter.java | 3 +- .../iceberg/tiering/IcebergWriteResult.java | 5 +- .../iceberg/tiering/IcebergTieringTest.java | 7 + .../lance/tiering/LanceLakeCommitter.java | 4 +- .../lake/lance/tiering/LanceWriteResult.java | 5 +- .../lake/lance/tiering/LanceTieringTest.java | 7 + .../paimon/tiering/PaimonLakeCommitter.java | 5 +- .../lake/paimon/tiering/PaimonLakeWriter.java | 15 +- .../paimon/tiering/PaimonWriteResult.java | 18 +- .../tiering/PaimonWriteResultSerializer.java | 65 ++++- .../paimon/tiering/PaimonTieringTest.java | 134 ++++++++++ .../PaimonWriteResultSerializerTest.java | 82 ++++++ .../lakehouse/TestingPaimonStoragePlugin.java | 6 +- 37 files changed, 1150 insertions(+), 71 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriteResult.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java create mode 100644 fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializerTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java index 33e89afc9a..2de7185f29 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.committer; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.lake.writer.LakeWriteResult; import javax.annotation.Nullable; @@ -34,7 +35,8 @@ * @since 0.7 */ @PublicEvolving -public interface LakeCommitter extends AutoCloseable { +public interface LakeCommitter + extends AutoCloseable { /** * The property key used to store the file path of lake table bucket offsets in snapshot @@ -42,6 +44,17 @@ public interface LakeCommitter extends AutoCloseable */ String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets"; + /** + * Converts a list of write results to a committable object with watermark. + * + * @param writeResults the list of write results + * @param watermark watermark to be committed + * @return the committable object + * @throws IOException if an I/O error occurs + */ + CommittableT toCommittable(List writeResults, @Nullable Long watermark) + throws IOException; + /** * Converts a list of write results to a committable object. * @@ -49,7 +62,9 @@ public interface LakeCommitter extends AutoCloseable * @return the committable object * @throws IOException if an I/O error occurs */ - CommittableT toCommittable(List writeResults) throws IOException; + default CommittableT toCommittable(List writeResults) throws IOException { + return toCommittable(writeResults, null); + } /** * Commits the given committable object. diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java b/fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java new file mode 100644 index 0000000000..79ddc1696f --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.watermark; + +import org.apache.fluss.row.InternalRow; + +import javax.annotation.Nullable; + +/** + * Extracts watermark values from {@link InternalRow} for lake tiering. + * + *

The extracted value should be expressed in epoch milliseconds and should represent the + * watermark derived from the row's event-time field and watermark strategy. + */ +public interface WatermarkExtractor { + + /** + * Extracts the watermark for the given row. + * + * @param row the row to extract the watermark from + * @return the watermark in epoch milliseconds, or {@code null} if the row does not provide a + * watermark value + */ + @Nullable + Long currentWatermark(InternalRow row); +} diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeTieringFactory.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeTieringFactory.java index 955e8b7581..412e26693a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeTieringFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeTieringFactory.java @@ -35,7 +35,8 @@ * @since 0.7 */ @PublicEvolving -public interface LakeTieringFactory extends Serializable { +public interface LakeTieringFactory + extends Serializable { String FLUSS_LAKE_TIERING_COMMIT_USER = "__fluss_lake_tiering"; diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriteResult.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriteResult.java new file mode 100644 index 0000000000..239c040726 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriteResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.writer; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * The result produced by a {@link LakeWriter} after writing records to lake storage. + * + *

A write result is passed to the lake committer and may carry lake-specific commit information, + * such as files. It can also expose the watermark of the written records so that the tiering commit + * can report the synchronization progress to the lake snapshot. + */ +public interface LakeWriteResult extends Serializable { + + /** + * Returns the maximum watermark of the records included in this write result, in epoch + * milliseconds. + * + *

Returns {@code null} if the source table does not define a watermark, no watermark can be + * extracted from the written records, or the lake implementation does not report watermarks. + */ + @Nullable + default Long getWatermark() { + return null; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java index c3ab3a352a..fce39753a5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java @@ -31,7 +31,7 @@ * @since 0.7 */ @PublicEvolving -public interface LakeWriter extends Closeable { +public interface LakeWriter extends Closeable { /** * Writes a record to the lake. * diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java index 6c9c2f4086..bdb652f669 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.writer; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -62,4 +63,12 @@ public interface WriterInitContext { * @return the Fluss table info */ TableInfo tableInfo(); + + /** + * Returns the watermark extractor, or null if table does not define watermark. + * + * @return the watermark extractor, or null if table does not define watermark + */ + @Nullable + WatermarkExtractor watermarkExtractor(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index 2f3a69a7ea..442217c065 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -32,6 +32,7 @@ import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.committer.TieringStats; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; @@ -74,7 +75,7 @@ *

Finally, it will also commit the committed lake snapshot to Fluss cluster to make Fluss aware * of the tiering progress. */ -public class TieringCommitOperator +public class TieringCommitOperator extends AbstractStreamOperator> implements OneInputStreamOperator< TableBucketWriteResult, CommittableMessage> { @@ -232,14 +233,35 @@ private CommitResult commitWriteResults( Map logEndOffsets = new HashMap<>(); Map logMaxTieredTimestamps = new HashMap<>(); + Long watermark = null; for (TableBucketWriteResult writeResult : nonEmptyResults) { TableBucket tableBucket = writeResult.tableBucket(); logEndOffsets.put(tableBucket, writeResult.logEndOffset()); logMaxTieredTimestamps.put(tableBucket, writeResult.maxTimestamp()); + Long writeResultWatermark = writeResult.writeResult().getWatermark(); + if (writeResultWatermark == null) { + continue; + } + watermark = + watermark == null + ? writeResultWatermark + : Math.min(watermark, writeResultWatermark); + } + + if (nonEmptyResults.size() < committableWriteResults.size()) { + // Empty results means some splits has not been processed, possibly caused by force + // completion. Do not update watermark here. + if (watermark != null) { + LOG.warn( + "There are some empty write results for {}, do not update watermark. Watermark of non-empty splits is {}.", + tablePath, + watermark); + } + watermark = null; } // to committable - Committable committable = lakeCommitter.toCommittable(writeResults); + Committable committable = lakeCommitter.toCommittable(writeResults, watermark); // before commit to lake, check fluss not missing any lake snapshot committed by fluss LakeSnapshot flussCurrentLakeSnapshot = getLatestLakeSnapshot(tablePath); checkFlussNotMissingLakeSnapshot( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java index efced7aeab..db978c4e32 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.source.TableBucketWriteResult; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; @@ -27,7 +28,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; /** The factory to create {@link TieringCommitOperator}. */ -public class TieringCommitOperatorFactory +public class TieringCommitOperatorFactory extends AbstractStreamOperatorFactory> implements OneInputStreamOperatorFactory< TableBucketWriteResult, CommittableMessage> { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java index abec3c6c21..e097565508 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.tiering.source; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -31,7 +32,7 @@ * one round of tiering. It'll be passed to downstream committer operator to collect all the write * results of a table and do commit. */ -public class TableBucketWriteResult implements Serializable { +public class TableBucketWriteResult implements Serializable { private static final long serialVersionUID = 1L; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultEmitter.java index 2b12337f84..c182752edf 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultEmitter.java @@ -19,12 +19,13 @@ import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.committer.LakeCommitter; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; /** The emitter to emit {@link TableBucketWriteResult} to downstream {@link LakeCommitter}. */ -public class TableBucketWriteResultEmitter +public class TableBucketWriteResultEmitter implements RecordEmitter< TableBucketWriteResult, TableBucketWriteResult, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 3651760955..d5025e3a33 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.tiering.source; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -27,13 +28,13 @@ import java.io.IOException; /** The serializer for {@link TableBucketWriteResult}. */ -public class TableBucketWriteResultSerializer +public class TableBucketWriteResultSerializer implements SimpleVersionedSerializer> { private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer writeResultSerializer; @@ -77,6 +78,7 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu // write -1 to mark write result as null out.writeInt(-1); } else { + out.writeInt(writeResultSerializer.getVersion()); byte[] serializeBytes = writeResultSerializer.serialize(writeResult); out.writeInt(serializeBytes.length); out.write(serializeBytes); @@ -99,7 +101,7 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu @Override public TableBucketWriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { + if (version > CURRENT_VERSION) { throw new IOException("Unknown version " + version); } final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -120,14 +122,28 @@ public TableBucketWriteResult deserialize(int version, byte[] seria TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); // deserialize write result - int writeResultLength = in.readInt(); WriteResult writeResult; - if (writeResultLength >= 0) { - byte[] writeResultBytes = new byte[writeResultLength]; - in.readFully(writeResultBytes); - writeResult = writeResultSerializer.deserialize(version, writeResultBytes); + if (version == 1) { + int writeResultLength = in.readInt(); + if (writeResultLength >= 0) { + byte[] writeResultBytes = new byte[writeResultLength]; + in.readFully(writeResultBytes); + writeResult = writeResultSerializer.deserialize(version, writeResultBytes); + } else { + writeResult = null; + } } else { - writeResult = null; + int writeResultVersionOrNull = in.readInt(); + if (writeResultVersionOrNull == -1) { + writeResult = null; + } else { + int writeResultLength = in.readInt(); + byte[] writeResultBytes = new byte[writeResultLength]; + in.readFully(writeResultBytes); + writeResult = + writeResultSerializer.deserialize( + writeResultVersionOrNull, writeResultBytes); + } } // deserialize log end offset diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java index 424673c26c..3580dcc1b1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java @@ -19,6 +19,7 @@ import org.apache.fluss.flink.adapter.TypeInformationAdapter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -26,7 +27,7 @@ import org.apache.flink.util.function.SerializableSupplier; /** A {@link TypeInformation} for {@link TableBucketWriteResult} . */ -public class TableBucketWriteResultTypeInfo +public class TableBucketWriteResultTypeInfo extends TypeInformationAdapter> { private final SerializableSupplier> @@ -38,9 +39,10 @@ private TableBucketWriteResultTypeInfo( this.writeResultSerializerFactory = writeResultSerializerFactory; } - public static TypeInformation> of( - SerializableSupplier> - writeResultSerializerFactory) { + public static + TypeInformation> of( + SerializableSupplier> + writeResultSerializerFactory) { return new TableBucketWriteResultTypeInfo<>(writeResultSerializerFactory); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index 649b758704..0a36552061 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -26,6 +26,7 @@ import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorStateSerializer; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.shaded.guava32.com.google.common.hash.HashFunction; import org.apache.fluss.shaded.guava32.com.google.common.hash.Hasher; import org.apache.fluss.shaded.guava32.com.google.common.hash.Hashing; @@ -53,7 +54,7 @@ * * @param the type of write lake result. */ -public class TieringSource +public class TieringSource implements Source< TableBucketWriteResult, TieringSplit, TieringSourceEnumeratorState> { @@ -129,7 +130,7 @@ private static byte[] generateOperatorHash() { } /** Builder for {@link TieringSource}. */ - public static class Builder { + public static class Builder { private final Configuration flussConf; private final LakeTieringFactory lakeTieringFactory; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java index ac72aad664..bb323ea529 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -20,6 +20,7 @@ import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter; import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -38,7 +39,7 @@ * The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches * the max duration of tiering to {@link TieringSplitReader}. */ -public class TieringSourceFetcherManager +public class TieringSourceFetcherManager extends SingleThreadFetcherManagerAdapter< TableBucketWriteResult, TieringSplit> { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index 6f0fc43b95..c8eddace0b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -26,6 +26,7 @@ import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; @@ -44,7 +45,7 @@ /** A {@link SourceReader} that read records from Fluss and write to lake. */ @Internal -public final class TieringSourceReader +public final class TieringSourceReader extends SingleThreadMultiplexSourceReaderBaseAdapter< TableBucketWriteResult, TableBucketWriteResult, @@ -82,13 +83,15 @@ public TieringSourceReader( this.connection = connection; } - private static TieringSourceFetcherManager createFetcherManager( - FutureCompletingBlockingQueue>> - elementsQueue, - SourceReaderContext context, - Connection connection, - LakeTieringFactory lakeTieringFactory, - Duration pollTimeout) { + private static + TieringSourceFetcherManager createFetcherManager( + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); return new TieringSourceFetcherManager<>( elementsQueue, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index d59787e15d..6351183cae 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -29,7 +29,10 @@ import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.watermark.SimpleWatermarkExtractor; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; @@ -62,7 +65,7 @@ import static org.apache.fluss.utils.Preconditions.checkState; /** The {@link SplitReader} implementation which will read Fluss and write to lake. */ -public class TieringSplitReader +public class TieringSplitReader implements SplitReader, TieringSplit> { private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class); @@ -93,6 +96,7 @@ public class TieringSplitReader @Nullable private TablePath currentTablePath; @Nullable private LogScanner currentLogScanner; @Nullable private Table currentTable; + @Nullable private WatermarkExtractor currentTableWatermarkExtractor; private final Queue currentPendingSnapshotSplits; @Nullable private BoundedSplitReader currentSnapshotSplitReader; @@ -272,6 +276,7 @@ private Table getOrMoveToTable(TieringSplit split) { currentTableId = split.getTableBucket().getTableId(); currentTableNumberOfSplits = split.getNumberOfSplits(); TableInfo currentTableInfo = checkNotNull(currentTable).getTableInfo(); + currentTableWatermarkExtractor = SimpleWatermarkExtractor.create(currentTableInfo); // check currentTable's id for the table path is same with table id of the tiering // split, if not, it means the tiering split is for a previous dropped table. let's fail // directly @@ -431,7 +436,8 @@ private LakeWriter getOrCreateLakeWriter( currentTablePath, bucket, partitionName, - currentTable.getTableInfo())); + currentTable.getTableInfo(), + currentTableWatermarkExtractor)); lakeWriters.put(bucket, lakeWriter); } return lakeWriter; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java index 121c4fb9cb..1cf6dc828a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.tiering.source; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; @@ -31,16 +32,19 @@ public class TieringWriterInitContext implements WriterInitContext { private final TableBucket tableBucket; @Nullable private final String partition; private final TableInfo tableInfo; + @Nullable private final WatermarkExtractor watermarkExtractor; public TieringWriterInitContext( TablePath tablePath, TableBucket tableBucket, @Nullable String partition, - TableInfo tableInfo) { + TableInfo tableInfo, + @Nullable WatermarkExtractor watermarkExtractor) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partition = partition; this.tableInfo = tableInfo; + this.watermarkExtractor = watermarkExtractor; } @Override @@ -63,4 +67,10 @@ public String partition() { public TableInfo tableInfo() { return tableInfo; } + + @Nullable + @Override + public WatermarkExtractor watermarkExtractor() { + return watermarkExtractor; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java new file mode 100644 index 0000000000..a73e0d7670 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.watermark; + +import org.apache.fluss.lake.watermark.WatermarkExtractor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Extracts epoch-millis watermark values from {@link InternalRow} by parsing Flink watermark + * definitions stored in table properties. Only two expression formats are supported: + * + *

    + *
  • {@code WATERMARK FOR ts AS ts} — direct column reference, zero delay + *
  • {@code WATERMARK FOR ts AS ts - INTERVAL '5' SECOND} — column minus interval delay + *
+ * + *

The rowtime column must be a physical column (computed columns are not supported). Returns + * {@code null} from {@link #create(TableInfo)} if the watermark configuration cannot be parsed. + */ +public class SimpleWatermarkExtractor implements WatermarkExtractor { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleWatermarkExtractor.class); + + private static final String WATERMARK_PROPERTY_PREFIX = "schema.watermark."; + private static final String WATERMARK_ROWTIME_SUFFIX = ".rowtime"; + private static final String WATERMARK_STRATEGY_EXPR_SUFFIX = ".strategy.expr"; + private static final String WATERMARK_STRATEGY_DATA_TYPE_SUFFIX = ".strategy.data-type"; + + private static final Pattern WATERMARK_EXPR_SIMPLE_COLUMN_PATTERN = + Pattern.compile("(`\\w+`|\\w+)"); + private static final Pattern WATERMARK_EXPR_COLUMN_MINUS_INTERVAL_PATTERN = + Pattern.compile( + "(`\\w+`|\\w+)\\s+-\\s+INTERVAL\\s+'(\\d+\\.?\\d*)'\\s+(SECOND|MINUTE|HOUR|DAY)", + Pattern.CASE_INSENSITIVE); + private static final Pattern TIMESTAMP_TYPE_PATTERN = + Pattern.compile("TIMESTAMP(?:_LTZ)?\\((\\d+)\\)", Pattern.CASE_INSENSITIVE); + + private final int fieldIndex; + private final int precision; + private final boolean isTimestampLtz; + private final long delayMillis; + + private SimpleWatermarkExtractor( + int fieldIndex, int precision, boolean isTimestampLtz, long delayMillis) { + this.fieldIndex = fieldIndex; + this.precision = precision; + this.isTimestampLtz = isTimestampLtz; + this.delayMillis = delayMillis; + } + + /** + * Creates a {@link SimpleWatermarkExtractor} from the table's properties. Returns null if no + * watermark configuration is found or the watermark configuration cannot be parsed. + */ + @Nullable + public static SimpleWatermarkExtractor create(TableInfo tableInfo) { + Map props = tableInfo.getCustomProperties().toMap(); + RowType rowType = tableInfo.getRowType(); + + boolean isWatermarkDefined = false; + String rowtimeColumn = null; + String watermarkIndex = null; + + for (Map.Entry entry : props.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(WATERMARK_PROPERTY_PREFIX) + && key.endsWith(WATERMARK_ROWTIME_SUFFIX)) { + isWatermarkDefined = true; + rowtimeColumn = entry.getValue(); + watermarkIndex = + key.substring( + WATERMARK_PROPERTY_PREFIX.length(), + key.length() - WATERMARK_ROWTIME_SUFFIX.length()); + if (!String.valueOf(0).equals(watermarkIndex)) { + LOG.warn( + "There are more than 1 watermark definition for {}, which is not supported for watermark extraction.", + tableInfo.getTablePath()); + return null; + } + break; + } + } + + if (!isWatermarkDefined) { + return null; + } + + int fieldIndex = rowType.getFieldIndex(rowtimeColumn); + if (fieldIndex < 0) { + LOG.warn( + "Watermark rowtime column '{}' not found in row type for {}, " + + "computed column is not supported for watermark extraction.", + tableInfo.getTablePath(), + rowtimeColumn); + return null; + } + + Long watermarkDelayMillis = parseWatermarkDelayMillis(props); + if (watermarkDelayMillis == null) { + LOG.warn( + "Cannot parse delay millis for {}, complex watermark expression is not supported for watermark extraction.", + tableInfo.getTablePath()); + return null; + } + + boolean isTimestampLtz; + int precision = 3; + String dataTypeKey = + WATERMARK_PROPERTY_PREFIX + watermarkIndex + WATERMARK_STRATEGY_DATA_TYPE_SUFFIX; + String dataTypeValue = props.get(dataTypeKey); + if (dataTypeValue != null) { + String trimmed = dataTypeValue.trim().toUpperCase(); + isTimestampLtz = trimmed.contains("TIMESTAMP_LTZ"); + Matcher matcher = TIMESTAMP_TYPE_PATTERN.matcher(trimmed); + if (matcher.find()) { + precision = Integer.parseInt(matcher.group(1)); + } + } else { + LOG.warn("Watermark column data type is not defined for {}.", tableInfo.getTablePath()); + return null; + } + + return new SimpleWatermarkExtractor( + fieldIndex, precision, isTimestampLtz, watermarkDelayMillis); + } + + /** Extracts the epoch-millis watermark from the given row. */ + @Override + @Nullable + public Long currentWatermark(InternalRow row) { + if (row.isNullAt(fieldIndex)) { + return null; + } + if (isTimestampLtz) { + return row.getTimestampLtz(fieldIndex, precision).getEpochMillisecond() - delayMillis; + } else { + return row.getTimestampNtz(fieldIndex, precision).getMillisecond() - delayMillis; + } + } + + /** + * Parses the watermark delay from the strategy expression. For example, {@code `col` - INTERVAL + * '5' SECOND} yields 5000 milliseconds. Returns {@code null} if the expression is unsupported. + */ + @Nullable + private static Long parseWatermarkDelayMillis(Map props) { + for (Map.Entry entry : props.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(WATERMARK_PROPERTY_PREFIX) + && key.endsWith(WATERMARK_STRATEGY_EXPR_SUFFIX)) { + String expr = entry.getValue(); + if (WATERMARK_EXPR_SIMPLE_COLUMN_PATTERN.matcher(expr).matches()) { + return 0L; + } + Matcher matcher = WATERMARK_EXPR_COLUMN_MINUS_INTERVAL_PATTERN.matcher(expr); + if (matcher.matches()) { + double amount = Double.parseDouble(matcher.group(2)); + String unit = matcher.group(3).toUpperCase(); + switch (unit) { + case "SECOND": + return (long) (amount * 1000); + case "MINUTE": + return (long) (amount * 60_000); + case "HOUR": + return (long) (amount * 3_600_000); + case "DAY": + return (long) (amount * 86_400_000); + default: + return null; + } + } + return null; + } + } + return 0L; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java index 92f7c562a6..feeffeccdd 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java @@ -99,6 +99,7 @@ public static final class TestingLakeCommitter implements LakeCommitter { private long currentSnapshot; + @Nullable private Long watermark; @Nullable private final CommittedLakeSnapshot mockMissingCommittedLakeSnapshot; @@ -110,9 +111,16 @@ public TestingLakeCommitter(CommittedLakeSnapshot mockMissingCommittedLakeSnapsh this.mockMissingCommittedLakeSnapshot = mockMissingCommittedLakeSnapshot; } + @Nullable + public Long getWatermark() { + return watermark; + } + @Override - public TestingCommittable toCommittable(List testingWriteResults) + public TestingCommittable toCommittable( + List testingWriteResults, @Nullable Long watermark) throws IOException { + this.watermark = watermark; List writeResults = new ArrayList<>(); for (TestingWriteResult testingWriteResult : testingWriteResults) { writeResults.add(testingWriteResult.getWriteResult()); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingWriteResult.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingWriteResult.java index fdeefa999f..57322d589c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingWriteResult.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingWriteResult.java @@ -17,16 +17,34 @@ package org.apache.fluss.flink.tiering; +import org.apache.fluss.lake.writer.LakeWriteResult; + +import javax.annotation.Nullable; + /** A WriteResult for testing purpose. */ -public class TestingWriteResult { +public class TestingWriteResult implements LakeWriteResult { + + private static final long serialVersionUID = 1L; private final int writeResult; + @Nullable private final Long watermark; public TestingWriteResult(int writeResult) { + this(writeResult, null); + } + + public TestingWriteResult(int writeResult, @Nullable Long watermark) { this.writeResult = writeResult; + this.watermark = watermark; } public int getWriteResult() { return writeResult; } + + @Override + @Nullable + public Long getWatermark() { + return watermark; + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 70d86bb2c8..9f99ed55eb 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -194,6 +194,7 @@ void testCommitPartitionedTable() throws Exception { tableBucket, partitionIdAndNameEntry.getKey(), 1, + null, currentOffset, currentTimestamp, numberOfWriteResults)); @@ -356,6 +357,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { tableBucket, partitionName, 3, + null, 3, 3L, numberOfWriteResults)); @@ -389,6 +391,27 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { tableBucket, null, writeResult, + null, + logEndOffset, + maxTimestamp, + numberOfWriteResults); + } + + private StreamRecord> + createTableBucketWriteResultStreamRecord( + TablePath tablePath, + TableBucket tableBucket, + @Nullable Integer writeResult, + @Nullable Long watermark, + long logEndOffset, + long maxTimestamp, + int numberOfWriteResults) { + return createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + null, + writeResult, + watermark, logEndOffset, maxTimestamp, numberOfWriteResults); @@ -400,6 +423,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { TableBucket tableBucket, @Nullable String partitionName, @Nullable Integer writeResult, + @Nullable Long watermark, long logEndOffset, long maxTimestamp, int numberOfWriteResults) { @@ -408,7 +432,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { tablePath, tableBucket, partitionName, - writeResult == null ? null : new TestingWriteResult(writeResult), + writeResult == null ? null : new TestingWriteResult(writeResult, watermark), logEndOffset, maxTimestamp, numberOfWriteResults); @@ -505,6 +529,107 @@ void testCommitFailsWhenTableRecreated() throws Exception { .contains("dropped and recreated during tiering"); } + @Test + void testCommitWithWatermark() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_with_watermark"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter = + new TestingLakeTieringFactory.TestingLakeCommitter(); + committerOperator = + new TieringCommitOperator<>( + parameters, + FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), + new TestingLakeTieringFactory(testingLakeCommitter)); + committerOperator.open(); + + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + long watermark = 1000L + bucket * 100; + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + bucket + 1, + watermark, + bucket, + bucket, + numberOfWriteResults)); + } + + assertThat(testingLakeCommitter.getWatermark()).isEqualTo(1000L); + } + + @Test + void testCommitWithoutWatermark() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_without_watermark"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter = + new TestingLakeTieringFactory.TestingLakeCommitter(); + committerOperator = + new TieringCommitOperator<>( + parameters, + FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), + new TestingLakeTieringFactory(testingLakeCommitter)); + committerOperator.open(); + + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + bucket + 1, + bucket, + bucket, + numberOfWriteResults)); + } + + assertThat(testingLakeCommitter.getWatermark()).isNull(); + } + + @Test + void testCommitWithEmptyResultResetsWatermark() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_empty_result_resets_watermark"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter = + new TestingLakeTieringFactory.TestingLakeCommitter(); + committerOperator = + new TieringCommitOperator<>( + parameters, + FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), + new TestingLakeTieringFactory(testingLakeCommitter)); + committerOperator.open(); + + // bucket 0 and 1 have watermarks + for (int bucket = 0; bucket < 2; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + bucket + 1, + 2000L + bucket * 100, + bucket, + (long) bucket, + numberOfWriteResults)); + } + // bucket 2 is empty (writeResult=null) + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, new TableBucket(tableId, 2), null, 2, 2L, numberOfWriteResults)); + + assertThat(testingLakeCommitter.getWatermark()).isNull(); + } + private CommittedLakeSnapshot mockCommittedLakeSnapshot( long tableId, TablePath tablePath, int snapshotId, Map logEndOffsets) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java new file mode 100644 index 0000000000..616e8ef590 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.watermark; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SimpleWatermarkExtractor}. */ +class SimpleWatermarkExtractorTest { + + @Test + void testCreateWithNoWatermarkConfig() { + TableInfo tableInfo = createTableInfoWithoutWatermark(); + assertThat(SimpleWatermarkExtractor.create(tableInfo)).isNull(); + } + + @Test + void testCreateWithSimpleColumn() { + TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(1000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(1000L); + } + + @Test + void testCreateWithSimpleColumnWithoutBackticks() { + TableInfo tableInfo = createTableInfoWithWatermark("ts", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(2000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(2000L); + } + + @Test + void testCreateWithColumnMinusInterval() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '5' SECOND", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(10000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(5000L); + } + + @Test + void testCreateWithDecimalInterval() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '0.001' SECOND", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(1000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(999L); + } + + @Test + void testCreateWithMinuteInterval() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '2' MINUTE", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(200000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(80000L); + } + + @Test + void testCreateWithHourInterval() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '1' HOUR", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(7200000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(3600000L); + } + + @Test + void testCreateWithDayInterval() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '1' DAY", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampNtz.fromMillis(172800000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(86400000L); + } + + @Test + void testCreateWithComputedColumn() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .build(); + Map customProps = new HashMap<>(); + customProps.put("schema.watermark.0.rowtime", "computed_ts"); + customProps.put("schema.watermark.0.strategy.expr", "`computed_ts`"); + customProps.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)"); + TableInfo tableInfo = createTableInfoFromSchema(schema, customProps); + assertThat(SimpleWatermarkExtractor.create(tableInfo)).isNull(); + } + + @Test + void testCreateWithComplexExpression() { + TableInfo tableInfo = createTableInfoWithWatermark("func(`ts`)", "TIMESTAMP(3)"); + assertThat(SimpleWatermarkExtractor.create(tableInfo)).isNull(); + } + + @Test + void testCreateWithTimestampLtz() { + TableInfo tableInfo = + createTableInfoWithWatermark("`ts` - INTERVAL '3' SECOND", "TIMESTAMP_LTZ(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampLtz.fromEpochMillis(10000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(7000L); + } + + @Test + void testCreateWithNoDataType() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .build(); + Map customProps = new HashMap<>(); + customProps.put("schema.watermark.0.rowtime", "ts"); + customProps.put("schema.watermark.0.strategy.expr", "`ts`"); + TableInfo tableInfo = createTableInfoFromSchema(schema, customProps); + assertThat(SimpleWatermarkExtractor.create(tableInfo)).isNull(); + } + + @Test + void testCurrentWatermarkWithNullField() { + TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, null); + assertThat(extractor.currentWatermark(row)).isNull(); + } + + @Test + void testCurrentWatermarkWithTimestampLtzSimpleColumn() { + TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP_LTZ(3)"); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = GenericRow.of(1, TimestampLtz.fromEpochMillis(5000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(5000L); + } + + @ParameterizedTest + @CsvSource({ + "TIMESTAMP, 0", + "TIMESTAMP, 1", + "TIMESTAMP, 2", + "TIMESTAMP_LTZ, 0", + "TIMESTAMP_LTZ, 1", + "TIMESTAMP_LTZ, 2", + }) + void testCreateWithTimestampPrecision(String dataTypeName, int precision) { + boolean isTimestampLtz = dataTypeName.equals("TIMESTAMP_LTZ"); + String dataType = dataTypeName + "(" + precision + ")"; + TableInfo tableInfo = createTableInfoWithWatermark("`ts` - INTERVAL '1' SECOND", dataType); + SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo); + assertThat(extractor).isNotNull(); + + GenericRow row = + isTimestampLtz + ? GenericRow.of(1, TimestampLtz.fromEpochMillis(5000L)) + : GenericRow.of(1, TimestampNtz.fromMillis(5000L)); + assertThat(extractor.currentWatermark(row)).isEqualTo(4000L); + } + + private static TableInfo createTableInfoWithWatermark(String expr, String dataType) { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("ts", DataTypes.parse(dataType)) + .build(); + Map customProps = new HashMap<>(); + customProps.put("schema.watermark.0.rowtime", "ts"); + customProps.put("schema.watermark.0.strategy.expr", expr); + customProps.put("schema.watermark.0.strategy.data-type", dataType); + return createTableInfoFromSchema(schema, customProps); + } + + private static TableInfo createTableInfoWithoutWatermark() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .build(); + return createTableInfoFromSchema(schema, new HashMap<>()); + } + + private static TableInfo createTableInfoFromSchema( + Schema schema, Map customProps) { + return new TableInfo( + TablePath.of("test_db", "test_table"), + 1L, + 0, + schema, + Collections.emptyList(), + Collections.emptyList(), + 1, + new Configuration(), + Configuration.fromMap(customProps), + null, + null, + System.currentTimeMillis(), + System.currentTimeMillis()); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java index 1628199166..35e4da7f3b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java @@ -47,7 +47,8 @@ public TestingValuesLakeCommitter(String tableId) { @Override public TestingValuesCommittable toCommittable( - List valuesWriteResults) + List valuesWriteResults, + @Nullable Long watermark) throws IOException { return new TestingValuesCommittable( valuesWriteResults.stream() diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java index 2764ba0f7c..dc173f8337 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java @@ -21,13 +21,13 @@ import org.apache.fluss.config.ConfigOption; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.values.TestingValuesLake; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.LogRecord; import org.apache.fluss.utils.InstantiationUtils; import java.io.IOException; -import java.io.Serializable; import java.time.Duration; import java.util.UUID; @@ -70,7 +70,7 @@ public TestingValuesWriteResult complete() throws IOException { public void close() throws IOException {} /** Write result of {@link TestingValuesLake}. */ - public static class TestingValuesWriteResult implements Serializable { + public static class TestingValuesWriteResult implements LakeWriteResult { private static final long serialVersionUID = 1L; private final String stageId; diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index 06b572760a..543423c42a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -72,7 +72,8 @@ public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, Table } @Override - public IcebergCommittable toCommittable(List icebergWriteResults) { + public IcebergCommittable toCommittable( + List icebergWriteResults, @Nullable Long watermark) { // Aggregate all write results into a single committable IcebergCommittable.Builder builder = IcebergCommittable.builder(); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java index d7e25403f6..59dc00591a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java @@ -18,15 +18,14 @@ package org.apache.fluss.lake.iceberg.tiering; import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.iceberg.io.WriteResult; import javax.annotation.Nullable; -import java.io.Serializable; - /** The write result of Iceberg lake writer to pass to committer to commit. */ -public class IcebergWriteResult implements Serializable { +public class IcebergWriteResult implements LakeWriteResult { private static final long serialVersionUID = 1L; diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index d5255005c7..918ef9e931 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; @@ -247,6 +248,12 @@ public String partition() { public TableInfo tableInfo() { return tableInfo; } + + @Nullable + @Override + public WatermarkExtractor watermarkExtractor() { + return null; + } }); } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java index 4fb7b96f0c..d658a33aa3 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java @@ -60,8 +60,8 @@ public LanceLakeCommitter(Configuration options, TablePath tablePath) { } @Override - public LanceCommittable toCommittable(List lanceWriteResults) - throws IOException { + public LanceCommittable toCommittable( + List lanceWriteResults, @Nullable Long watermark) throws IOException { List fragments = lanceWriteResults.stream() .map(LanceWriteResult::commitMessage) diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceWriteResult.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceWriteResult.java index 309b1d3891..74daa46805 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceWriteResult.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceWriteResult.java @@ -17,13 +17,14 @@ package org.apache.fluss.lake.lance.tiering; +import org.apache.fluss.lake.writer.LakeWriteResult; + import com.lancedb.lance.FragmentMetadata; -import java.io.Serializable; import java.util.List; /** The write result of Lance lake writer to pass to committer to commit. */ -public class LanceWriteResult implements Serializable { +public class LanceWriteResult implements LakeWriteResult { private static final long serialVersionUID = 1L; private final List commitMessage; diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 64dae36962..a90b910c07 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -27,6 +27,7 @@ import org.apache.fluss.lake.lance.utils.LanceArrowUtils; import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.Schema; @@ -311,6 +312,12 @@ public String partition() { public TableInfo tableInfo() { return tableInfo; } + + @Nullable + @Override + public WatermarkExtractor watermarkExtractor() { + return null; + } }); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java index 339ec2b2b5..f6fe08f112 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java @@ -90,9 +90,10 @@ public PaimonLakeCommitter( } @Override - public PaimonCommittable toCommittable(List paimonWriteResults) + public PaimonCommittable toCommittable( + List paimonWriteResults, @Nullable Long watermark) throws IOException { - ManifestCommittable committable = new ManifestCommittable(COMMIT_IDENTIFIER); + ManifestCommittable committable = new ManifestCommittable(COMMIT_IDENTIFIER, watermark); for (PaimonWriteResult paimonWriteResult : paimonWriteResults) { committable.addFileCommittable(paimonWriteResult.commitMessage()); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8ac7aabe4e..ccac0e15a3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -19,6 +19,7 @@ import org.apache.fluss.lake.paimon.tiering.append.AppendOnlyWriter; import org.apache.fluss.lake.paimon.tiering.mergetree.MergeTreeWriter; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TablePath; @@ -30,6 +31,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -42,6 +45,8 @@ public class PaimonLakeWriter implements LakeWriter { private final Catalog paimonCatalog; private final RecordWriter recordWriter; + @Nullable private final WatermarkExtractor watermarkExtractor; + @Nullable private Long maxWatermark; public PaimonLakeWriter( PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) @@ -69,12 +74,20 @@ public PaimonLakeWriter( writerInitContext.partition(), partitionKeys, flussRowType); + + this.watermarkExtractor = writerInitContext.watermarkExtractor(); } @Override public void write(LogRecord record) throws IOException { try { recordWriter.write(record); + if (watermarkExtractor != null) { + Long ts = watermarkExtractor.currentWatermark(record.getRow()); + if (ts != null) { + maxWatermark = maxWatermark == null ? ts : Math.max(maxWatermark, ts); + } + } } catch (Exception e) { throw new IOException("Failed to write Fluss record to Paimon.", e); } @@ -88,7 +101,7 @@ public PaimonWriteResult complete() throws IOException { } catch (Exception e) { throw new IOException("Failed to complete Paimon write.", e); } - return new PaimonWriteResult(commitMessage); + return new PaimonWriteResult(commitMessage, maxWatermark); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResult.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResult.java index 70575c00e1..1fada9a2ec 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResult.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResult.java @@ -17,22 +17,32 @@ package org.apache.fluss.lake.paimon.tiering; +import org.apache.fluss.lake.writer.LakeWriteResult; + import org.apache.paimon.table.sink.CommitMessage; -import java.io.Serializable; +import javax.annotation.Nullable; /** The write result of Paimon lake writer to pass to committer to commit. */ -public class PaimonWriteResult implements Serializable { +public class PaimonWriteResult implements LakeWriteResult { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final CommitMessage commitMessage; + @Nullable private final Long maxWatermark; - public PaimonWriteResult(CommitMessage commitMessage) { + public PaimonWriteResult(CommitMessage commitMessage, @Nullable Long maxWatermark) { this.commitMessage = commitMessage; + this.maxWatermark = maxWatermark; } public CommitMessage commitMessage() { return commitMessage; } + + @Nullable + @Override + public Long getWatermark() { + return maxWatermark; + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java index 7efb3d3754..608f9966f3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java @@ -19,15 +19,19 @@ import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageSerializer; +import java.io.ByteArrayOutputStream; import java.io.IOException; /** The {@link SimpleVersionedSerializer} for {@link PaimonWriteResult}. */ public class PaimonWriteResultSerializer implements SimpleVersionedSerializer { - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; private final CommitMessageSerializer messageSer = new CommitMessageSerializer(); @@ -38,21 +42,58 @@ public int getVersion() { @Override public byte[] serialize(PaimonWriteResult paimonWriteResult) throws IOException { - CommitMessage commitMessage = paimonWriteResult.commitMessage(); - return messageSer.serialize(commitMessage); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + + Long watermark = paimonWriteResult.getWatermark(); + if (watermark == null) { + view.writeBoolean(false); + } else { + view.writeBoolean(true); + view.writeLong(watermark); + } + + byte[] messageBytes = messageSer.serialize(paimonWriteResult.commitMessage()); + view.writeInt(messageBytes.length); + view.write(messageBytes); + + return out.toByteArray(); } @Override public PaimonWriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { - throw new UnsupportedOperationException( - "Expecting PaimonWriteResult version to be " - + CURRENT_VERSION - + ", but found " - + version - + "."); + switch (version) { + case 1: + { + CommitMessage commitMessage = + messageSer.deserialize(messageSer.getVersion(), serialized); + return new PaimonWriteResult(commitMessage, null); + } + case 2: + { + DataInputView view = new DataInputDeserializer(serialized); + + Long watermark = null; + boolean watermarkNonNull = view.readBoolean(); + if (watermarkNonNull) { + watermark = view.readLong(); + } + + int len = view.readInt(); + byte[] messageBytes = new byte[len]; + view.read(messageBytes); + CommitMessage commitMessage = + messageSer.deserialize(messageSer.getVersion(), messageBytes); + + return new PaimonWriteResult(commitMessage, watermark); + } + default: + throw new UnsupportedOperationException( + "Expecting PaimonWriteResult version to be less than or equal to " + + CURRENT_VERSION + + ", but found " + + version + + "."); } - CommitMessage commitMessage = messageSer.deserialize(messageSer.getVersion(), serialized); - return new PaimonWriteResult(commitMessage); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java index edddc5b555..9b6386ed6e 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -19,10 +19,12 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.tiering.source.watermark.SimpleWatermarkExtractor; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.watermark.WatermarkExtractor; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; @@ -34,6 +36,7 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.utils.types.Tuple2; import org.apache.paimon.CoreOptions; @@ -496,6 +499,122 @@ void testPartitionExpiration( } } + @Test + void testTieringWatermark() throws Exception { + int bucketNum = 3; + TablePath tablePath = TablePath.of("paimon", "test_tiering_watermark"); + createWatermarkTable(tablePath); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.STRING()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column( + "event_time", + org.apache.fluss.types.DataTypes.TIMESTAMP(3)) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperty("schema.watermark.0.rowtime", "event_time") + .customProperty( + "schema.watermark.0.strategy.expr", + "`event_time` - INTERVAL '5' SECOND") + .customProperty("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)") + .build(); + TableInfo tableInfo = + TableInfo.of(tablePath, 0, 1, descriptor, DEFAULT_REMOTE_DATA_DIR, 1L, 1L); + + List paimonWriteResults = new ArrayList<>(); + long[] timestamps = {20_000L, 30_000L, 40_000L}; + + for (int bucket = 0; bucket < bucketNum; bucket++) { + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, bucket, null, null, tableInfo)) { + GenericRow row = new GenericRow(3); + row.setField(0, BinaryString.fromString("val" + bucket)); + row.setField(1, BinaryString.fromString("data")); + row.setField(2, TimestampNtz.fromMillis(timestamps[bucket])); + LogRecord logRecord = + new GenericRecord(0, timestamps[bucket], ChangeType.APPEND_ONLY, row); + lakeWriter.write(logRecord); + PaimonWriteResult writeResult = lakeWriter.complete(); + paimonWriteResults.add(writeResult); + + assertThat(writeResult.getWatermark()).isEqualTo(timestamps[bucket] - 5_000L); + } + } + + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo, new Configuration())) { + PaimonCommittable committable = + lakeCommitter.toCommittable(paimonWriteResults, 15_000L); + Map snapshotProperties = new HashMap<>(); + snapshotProperties.put(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets"); + lakeCommitter.commit(committable, snapshotProperties); + } + + FileStoreTable fileStoreTable = + (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + Long watermark = fileStoreTable.snapshotManager().snapshot(1).watermark(); + assertThat(watermark).isEqualTo(15_000L); + } + + @Test + void testTieringWithoutWatermarkDefinition() throws Exception { + int bucketNum = 2; + TablePath tablePath = TablePath.of("paimon", "test_tiering_without_watermark_def"); + createWatermarkTable(tablePath); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.STRING()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column( + "event_time", + org.apache.fluss.types.DataTypes.TIMESTAMP(3)) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = + TableInfo.of(tablePath, 0, 1, descriptor, DEFAULT_REMOTE_DATA_DIR, 1L, 1L); + + List paimonWriteResults = new ArrayList<>(); + for (int bucket = 0; bucket < bucketNum; bucket++) { + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, bucket, null, null, tableInfo)) { + GenericRow row = new GenericRow(3); + row.setField(0, BinaryString.fromString("val" + bucket)); + row.setField(1, BinaryString.fromString("data")); + row.setField(2, TimestampNtz.fromMillis(5000L)); + LogRecord logRecord = new GenericRecord(0, 5000L, ChangeType.APPEND_ONLY, row); + lakeWriter.write(logRecord); + + PaimonWriteResult writeResult = lakeWriter.complete(); + paimonWriteResults.add(writeResult); + + assertThat(writeResult.getWatermark()).isNull(); + } + } + + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo, new Configuration())) { + PaimonCommittable committable = lakeCommitter.toCommittable(paimonWriteResults, null); + Map snapshotProperties = new HashMap<>(); + snapshotProperties.put(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets"); + lakeCommitter.commit(committable, snapshotProperties); + } + + FileStoreTable fileStoreTable = + (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + Long watermark = fileStoreTable.snapshotManager().snapshot(1).watermark(); + assertThat(watermark).isNull(); + } + private void verifyLogTableRecordsMultiPartition( CloseableIterator actualRecords, List expectRecords, @@ -809,6 +928,12 @@ public String partition() { public TableInfo tableInfo() { return tableInfo; } + + @Nullable + @Override + public WatermarkExtractor watermarkExtractor() { + return SimpleWatermarkExtractor.create(tableInfo); + } }); } @@ -872,6 +997,15 @@ private void createTable( doCreatePaimonTable(tablePath, builder); } + private void createWatermarkTable(TablePath tablePath) throws Exception { + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.STRING()) + .column("c2", DataTypes.STRING()) + .column("event_time", DataTypes.TIMESTAMP_MILLIS()); + doCreatePaimonTable(tablePath, builder); + } + private void createMultiPartitionTable(TablePath tablePath) throws Exception { Schema.Builder builder = Schema.newBuilder() diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializerTest.java new file mode 100644 index 0000000000..cfbffed417 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.tiering; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PaimonWriteResultSerializer}. */ +class PaimonWriteResultSerializerTest { + + @Test + void testSerializeAndDeserializeV2() throws IOException { + PaimonWriteResultSerializer serializer = new PaimonWriteResultSerializer(); + CommitMessage commitMessage = createEmptyCommitMessage(); + + // positive watermark + PaimonWriteResult original = new PaimonWriteResult(commitMessage, 12345L); + byte[] serialized = serializer.serialize(original); + PaimonWriteResult deserialized = + serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserialized.getWatermark()).isEqualTo(12345L); + + // negative watermark + original = new PaimonWriteResult(commitMessage, -12345L); + serialized = serializer.serialize(original); + deserialized = serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserialized.getWatermark()).isEqualTo(-12345L); + + // null watermark + original = new PaimonWriteResult(commitMessage, null); + serialized = serializer.serialize(original); + deserialized = serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserialized.getWatermark()).isNull(); + } + + @Test + void testDeserializeV1BackwardCompatibility() throws IOException { + PaimonWriteResultSerializer serializer = new PaimonWriteResultSerializer(); + CommitMessage commitMessage = createEmptyCommitMessage(); + + CommitMessageSerializer messageSer = new CommitMessageSerializer(); + byte[] v1Serialized = messageSer.serialize(commitMessage); + + PaimonWriteResult deserialized = serializer.deserialize(1, v1Serialized); + assertThat(deserialized.getWatermark()).isNull(); + } + + private static CommitMessage createEmptyCommitMessage() { + DataIncrement dataIncrement = + new DataIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + CompactIncrement compactIncrement = + new CompactIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + return new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, null, dataIncrement, compactIncrement); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index 88ad329e04..9b19311957 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -31,6 +31,7 @@ import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriteResult; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.DataLakeFormat; @@ -195,7 +196,7 @@ public void close() throws IOException { } } - private static class TestingPaimonWriteResult { + private static class TestingPaimonWriteResult implements LakeWriteResult { private final int writtenRecords; public TestingPaimonWriteResult(int writtenRecords) { @@ -237,7 +238,8 @@ private static class TestingPaimonCommitter @Override public TestPaimonCommittable toCommittable( - List testingPaimonWriteResults) throws IOException { + List testingPaimonWriteResults, @Nullable Long watermark) + throws IOException { return new TestPaimonCommittable(); }