Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.committer;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.lake.writer.LakeWriteResult;

import javax.annotation.Nullable;

Expand All @@ -34,22 +35,36 @@
* @since 0.7
*/
@PublicEvolving
public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable {
public interface LakeCommitter<WriteResult extends LakeWriteResult, CommittableT>
extends AutoCloseable {

/**
* The property key used to store the file path of lake table bucket offsets in snapshot
* properties.
*/
String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";

/**
* Converts a list of write results to a committable object with watermark.
*
* @param writeResults the list of write results
* @param watermark watermark to be committed
* @return the committable object
* @throws IOException if an I/O error occurs
*/
CommittableT toCommittable(List<WriteResult> writeResults, @Nullable Long watermark)
throws IOException;

/**
* Converts a list of write results to a committable object.
*
* @param writeResults the list of write results
* @return the committable object
* @throws IOException if an I/O error occurs
*/
CommittableT toCommittable(List<WriteResult> writeResults) throws IOException;
default CommittableT toCommittable(List<WriteResult> writeResults) throws IOException {
return toCommittable(writeResults, null);
}

/**
* Commits the given committable object.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.watermark;

import org.apache.fluss.row.InternalRow;

import javax.annotation.Nullable;

/**
* Extracts watermark values from {@link InternalRow} for lake tiering.
*
* <p>The extracted value should be expressed in epoch milliseconds and should represent the
* watermark derived from the row's event-time field and watermark strategy.
*/
public interface WatermarkExtractor {

/**
* Extracts the watermark for the given row.
*
* @param row the row to extract the watermark from
* @return the watermark in epoch milliseconds, or {@code null} if the row does not provide a
* watermark value
*/
@Nullable
Long currentWatermark(InternalRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
* @since 0.7
*/
@PublicEvolving
public interface LakeTieringFactory<WriteResult, CommittableT> extends Serializable {
public interface LakeTieringFactory<WriteResult extends LakeWriteResult, CommittableT>
extends Serializable {

String FLUSS_LAKE_TIERING_COMMIT_USER = "__fluss_lake_tiering";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.writer;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
* The result produced by a {@link LakeWriter} after writing records to lake storage.
*
* <p>A write result is passed to the lake committer and may carry lake-specific commit information,
* such as files. It can also expose the watermark of the written records so that the tiering commit
* can report the synchronization progress to the lake snapshot.
*/
public interface LakeWriteResult extends Serializable {

/**
* Returns the maximum watermark of the records included in this write result, in epoch
* milliseconds.
*
* <p>Returns {@code null} if the source table does not define a watermark, no watermark can be
* extracted from the written records, or the lake implementation does not report watermarks.
*/
@Nullable
default Long getWatermark() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* @since 0.7
*/
@PublicEvolving
public interface LakeWriter<WriteResult> extends Closeable {
public interface LakeWriter<WriteResult extends LakeWriteResult> extends Closeable {
/**
* Writes a record to the lake.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.writer;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.lake.watermark.WatermarkExtractor;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -62,4 +63,12 @@ public interface WriterInitContext {
* @return the Fluss table info
*/
TableInfo tableInfo();

/**
* Returns the watermark extractor, or null if table does not define watermark.
*
* @return the watermark extractor, or null if table does not define watermark
*/
@Nullable
WatermarkExtractor watermarkExtractor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriteResult;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -74,7 +75,7 @@
* <p>Finally, it will also commit the committed lake snapshot to Fluss cluster to make Fluss aware
* of the tiering progress.
*/
public class TieringCommitOperator<WriteResult, Committable>
public class TieringCommitOperator<WriteResult extends LakeWriteResult, Committable>
extends AbstractStreamOperator<CommittableMessage<Committable>>
implements OneInputStreamOperator<
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
Expand Down Expand Up @@ -232,14 +233,35 @@ private CommitResult commitWriteResults(

Map<TableBucket, Long> logEndOffsets = new HashMap<>();
Map<TableBucket, Long> logMaxTieredTimestamps = new HashMap<>();
Long watermark = null;
for (TableBucketWriteResult<WriteResult> writeResult : nonEmptyResults) {
TableBucket tableBucket = writeResult.tableBucket();
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
logMaxTieredTimestamps.put(tableBucket, writeResult.maxTimestamp());
Long writeResultWatermark = writeResult.writeResult().getWatermark();
if (writeResultWatermark == null) {
continue;
}
watermark =
watermark == null
? writeResultWatermark
: Math.min(watermark, writeResultWatermark);
}

if (nonEmptyResults.size() < committableWriteResults.size()) {
// Empty results means some splits has not been processed, possibly caused by force
// completion. Do not update watermark here.
if (watermark != null) {
LOG.warn(
"There are some empty write results for {}, do not update watermark. Watermark of non-empty splits is {}.",
tablePath,
watermark);
}
watermark = null;
}

// to committable
Committable committable = lakeCommitter.toCommittable(writeResults);
Committable committable = lakeCommitter.toCommittable(writeResults, watermark);
// before commit to lake, check fluss not missing any lake snapshot committed by fluss
LakeSnapshot flussCurrentLakeSnapshot = getLatestLakeSnapshot(tablePath);
checkFlussNotMissingLakeSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriteResult;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/** The factory to create {@link TieringCommitOperator}. */
public class TieringCommitOperatorFactory<WriteResult, Committable>
public class TieringCommitOperatorFactory<WriteResult extends LakeWriteResult, Committable>
extends AbstractStreamOperatorFactory<CommittableMessage<Committable>>
implements OneInputStreamOperatorFactory<
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.flink.tiering.source;

import org.apache.fluss.lake.writer.LakeWriteResult;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
Expand All @@ -31,7 +32,7 @@
* one round of tiering. It'll be passed to downstream committer operator to collect all the write
* results of a table and do commit.
*/
public class TableBucketWriteResult<WriteResult> implements Serializable {
public class TableBucketWriteResult<WriteResult extends LakeWriteResult> implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.writer.LakeWriteResult;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;

/** The emitter to emit {@link TableBucketWriteResult} to downstream {@link LakeCommitter}. */
public class TableBucketWriteResultEmitter<WriteResult>
public class TableBucketWriteResultEmitter<WriteResult extends LakeWriteResult>
implements RecordEmitter<
TableBucketWriteResult<WriteResult>,
TableBucketWriteResult<WriteResult>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.flink.tiering.source;

import org.apache.fluss.lake.writer.LakeWriteResult;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;

Expand All @@ -27,13 +28,13 @@
import java.io.IOException;

/** The serializer for {@link TableBucketWriteResult}. */
public class TableBucketWriteResultSerializer<WriteResult>
public class TableBucketWriteResultSerializer<WriteResult extends LakeWriteResult>
implements SimpleVersionedSerializer<TableBucketWriteResult<WriteResult>> {

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = 1;
private static final int CURRENT_VERSION = 2;

private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer<WriteResult>
writeResultSerializer;
Expand Down Expand Up @@ -77,6 +78,7 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
// write -1 to mark write result as null
out.writeInt(-1);
} else {
out.writeInt(writeResultSerializer.getVersion());
byte[] serializeBytes = writeResultSerializer.serialize(writeResult);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
Expand All @@ -99,7 +101,7 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
@Override
public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] serialized)
throws IOException {
if (version != CURRENT_VERSION) {
if (version > CURRENT_VERSION) {
throw new IOException("Unknown version " + version);
}
final DataInputDeserializer in = new DataInputDeserializer(serialized);
Expand All @@ -120,14 +122,28 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);

// deserialize write result
int writeResultLength = in.readInt();
WriteResult writeResult;
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult = writeResultSerializer.deserialize(version, writeResultBytes);
if (version == 1) {
int writeResultLength = in.readInt();
if (writeResultLength >= 0) {
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult = writeResultSerializer.deserialize(version, writeResultBytes);
} else {
writeResult = null;
}
} else {
writeResult = null;
int writeResultVersionOrNull = in.readInt();
if (writeResultVersionOrNull == -1) {
writeResult = null;
} else {
int writeResultLength = in.readInt();
byte[] writeResultBytes = new byte[writeResultLength];
in.readFully(writeResultBytes);
writeResult =
writeResultSerializer.deserialize(
writeResultVersionOrNull, writeResultBytes);
}
}

// deserialize log end offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import org.apache.fluss.flink.adapter.TypeInformationAdapter;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.writer.LakeWriteResult;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.util.function.SerializableSupplier;

/** A {@link TypeInformation} for {@link TableBucketWriteResult} . */
public class TableBucketWriteResultTypeInfo<WriteResult>
public class TableBucketWriteResultTypeInfo<WriteResult extends LakeWriteResult>
extends TypeInformationAdapter<TableBucketWriteResult<WriteResult>> {

private final SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
Expand All @@ -38,9 +39,10 @@ private TableBucketWriteResultTypeInfo(
this.writeResultSerializerFactory = writeResultSerializerFactory;
}

public static <WriteResult> TypeInformation<TableBucketWriteResult<WriteResult>> of(
SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
writeResultSerializerFactory) {
public static <WriteResult extends LakeWriteResult>
TypeInformation<TableBucketWriteResult<WriteResult>> of(
SerializableSupplier<SimpleVersionedSerializer<WriteResult>>
writeResultSerializerFactory) {
return new TableBucketWriteResultTypeInfo<>(writeResultSerializerFactory);
}

Expand Down
Loading