Skip to content

Commit 0902fe0

Browse files
committed
enable lake table stores multiple lake snapshot && lake snapshot should contain readable offset and tiered offset
1 parent 5db8f4b commit 0902fe0

14 files changed

Lines changed: 362 additions & 223 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -684,25 +684,25 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
684684
}
685685

686686
/**
687-
* Returns the remote path for storing lake snapshot metadata required by Fluss for a table.
687+
* Returns a remote path for storing lake snapshot metadata required by Fluss for a table.
688688
*
689689
* <p>The path contract:
690690
*
691691
* <pre>
692-
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}.snapshot
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/manifest/{uuid}.manifest
693693
* </pre>
694694
*/
695-
public static FsPath remoteLakeTableSnapshotPath(
696-
String remoteDataDir, TablePath tablePath, long tableId, long snapshotId) {
695+
public static FsPath remoteLakeTableSnapshotManifestPath(
696+
String remoteDataDir, TablePath tablePath, long tableId) {
697697
return new FsPath(
698698
String.format(
699-
"%s/%s/%s/%s-%d/snapshot/%d.snapshot",
699+
"%s/%s/%s/%s-%d/manifest/%s.manifest",
700700
remoteDataDir,
701701
REMOTE_LAKE_DIR_NAME,
702702
tablePath.getDatabaseName(),
703703
tablePath.getTableName(),
704704
tableId,
705-
snapshotId));
705+
UUID.randomUUID()));
706706
}
707707

708708
/**

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ private void testPartitionedTableTiering() throws Exception {
402402
put(
403403
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
404404
"["
405-
+ "{\"partition_id\":0,\"bucket\":0,\"partition_name\":\"date=2025\",\"offset\":3},"
406-
+ "{\"partition_id\":1,\"bucket\":0,\"partition_name\":\"date=2026\",\"offset\":3}"
405+
+ "{\"partition_id\":0,\"bucket\":0,\"offset\":3},"
406+
+ "{\"partition_id\":1,\"bucket\":0,\"offset\":3}"
407407
+ "]");
408408
}
409409
};

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
335335
put(
336336
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
337337
String.format(
338-
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
338+
"[{\"partition_id\":%d,\"bucket\":0,\"offset\":1}]",
339339
partitionId));
340340
}
341341
};
@@ -475,8 +475,7 @@ void testTieringForAlterTable() throws Exception {
475475
}
476476

477477
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
478-
String raw =
479-
"{\"partition_id\":%s,\"bucket\":0,\"partition_name\":\"date=%s\",\"offset\":3}";
478+
String raw = "{\"partition_id\":%s,\"bucket\":0,\"offset\":3}";
480479
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
481480
Collections.sort(partitionIds);
482481
List<String> partitionOffsetStrs = new ArrayList<>();

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1572,7 +1572,7 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15721572
}
15731573
}
15741574
lakeTableInfoByTableId.put(
1575-
tableId, new LakeTableSnapshot(snapshotId, tableId, bucketLogEndOffset));
1575+
tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset));
15761576
}
15771577
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);
15781578
}

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ public Optional<LakeTable> getLakeTable(long tableId) throws Exception {
10601060
public Optional<LakeTableSnapshot> getLakeTableSnapshot(long tableId) throws Exception {
10611061
Optional<LakeTable> optLakeTable = getLakeTable(tableId);
10621062
if (optLakeTable.isPresent()) {
1063-
return Optional.of(optLakeTable.get().toLakeTableSnapshot());
1063+
return Optional.of(optLakeTable.get().getLatestTableSnapshot());
10641064
} else {
10651065
return Optional.empty();
10661066
}

fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,8 @@ public static RemoteLogManifestHandle decode(byte[] json) {
574574
*
575575
* <ul>
576576
* <li>Version 1 (legacy): Full snapshot data stored directly in ZK
577-
* <li>Version 2 (current): Only metadata file path stored in ZK, actual data in remote file
577+
* <li>Version 2 (current): A list of snapshot metadata, with metadata file path stored in ZK,
578+
* actual data in remote file
578579
* </ul>
579580
*/
580581
public static final class LakeTableZNode {

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
package org.apache.fluss.server.zk.data.lake;
2020

2121
import org.apache.fluss.fs.FSDataInputStream;
22+
import org.apache.fluss.fs.FileSystem;
2223
import org.apache.fluss.fs.FsPath;
2324
import org.apache.fluss.server.zk.data.ZkData;
2425
import org.apache.fluss.utils.IOUtils;
2526

2627
import javax.annotation.Nullable;
2728

2829
import java.io.ByteArrayOutputStream;
30+
import java.io.IOException;
31+
import java.util.Collections;
32+
import java.util.List;
2933

34+
import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
3035
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3136

3237
/**
@@ -36,23 +41,17 @@
3641
*
3742
* <ul>
3843
* <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data directly
39-
* <li>Version 2 (current): Contains only the file paths point to the file storing {@link
40-
* LakeTableSnapshot}.
44+
* <li>Version 2 (current): Contains a list of lake snapshot, recording the metadata file path for
45+
* different lake snapshots, with actual metadata storing in file
4146
* </ul>
4247
*
4348
* @see LakeTableJsonSerde for JSON serialization and deserialization
4449
*/
4550
public class LakeTable {
4651

4752
// Version 2 (current):
48-
// the pointer to the file storing the latest known LakeTableSnapshot, will be null in
49-
// version1
50-
@Nullable private final FsPath lakeTableLatestSnapshotFileHandle;
51-
52-
// the pointer to the file storing the latest known compacted LakeTableSnapshot, will be null in
53-
// version1 or no any known compacted LakeTableSnapshot
54-
// todo: support tiering service commit lake table latest compacted snapshot to Fluss
55-
@Nullable private final FsPath lakeTableLatestCompactedSnapshotFileHandle = null;
53+
// a list of lake snapshot metadata, record the metadata for different lake snapshots
54+
@Nullable private final List<LakeSnapshotMetadata> lakeSnapshotMetadata;
5655

5756
// Version 1 (legacy): the full lake table snapshot info stored in ZK, will be null in version2
5857
@Nullable private final LakeTableSnapshot lakeTableSnapshot;
@@ -67,45 +66,114 @@ public LakeTable(LakeTableSnapshot lakeTableSnapshot) {
6766
}
6867

6968
/**
70-
* Creates a LakeTable with a metadata file path (version 2 format).
69+
* Creates a LakeTable with a lake snapshot metadata (version 2 format).
7170
*
72-
* @param lakeTableSnapshotFileHandle the path to the metadata file containing the snapshot data
71+
* @param lakeSnapshotMetadata the metadata containing the file path to the snapshot data
7372
*/
74-
public LakeTable(@Nullable FsPath lakeTableSnapshotFileHandle) {
75-
this(null, lakeTableSnapshotFileHandle);
73+
public LakeTable(LakeSnapshotMetadata lakeSnapshotMetadata) {
74+
this(null, Collections.singletonList(lakeSnapshotMetadata));
75+
}
76+
77+
/**
78+
* Creates a LakeTable with a list of lake snapshot metadata (version 2 format).
79+
*
80+
* @param lakeSnapshotMetadata the list of lake snapshot metadata
81+
*/
82+
public LakeTable(List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
83+
this(null, lakeSnapshotMetadata);
7684
}
7785

7886
private LakeTable(
7987
@Nullable LakeTableSnapshot lakeTableSnapshot,
80-
@Nullable FsPath lakeTableSnapshotFileHandle) {
88+
List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
8189
this.lakeTableSnapshot = lakeTableSnapshot;
82-
this.lakeTableLatestSnapshotFileHandle = lakeTableSnapshotFileHandle;
90+
this.lakeSnapshotMetadata = lakeSnapshotMetadata;
8391
}
8492

8593
@Nullable
86-
public FsPath getLakeTableLatestSnapshotFileHandle() {
87-
return lakeTableLatestSnapshotFileHandle;
94+
public LakeSnapshotMetadata getLakeTableLatestSnapshot() {
95+
if (lakeSnapshotMetadata != null && !lakeSnapshotMetadata.isEmpty()) {
96+
return lakeSnapshotMetadata.get(0);
97+
}
98+
return null;
99+
}
100+
101+
@Nullable
102+
public List<LakeSnapshotMetadata> getLakeSnapshotMetadata() {
103+
return lakeSnapshotMetadata;
88104
}
89105

90106
/**
91-
* Converts this LakeTable to a LakeTableSnapshot.
107+
* Get the latest table snapshot for the lake table.
92108
*
93109
* <p>If this LakeTable was created from a LakeTableSnapshot (version 1), returns it directly.
94110
* Otherwise, reads the snapshot data from the lake snapshot file.
95111
*
96112
* @return the LakeTableSnapshot
97113
*/
98-
public LakeTableSnapshot toLakeTableSnapshot() throws Exception {
114+
public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
99115
if (lakeTableSnapshot != null) {
100116
return lakeTableSnapshot;
101117
}
118+
FsPath tieredOffsetsFilePath =
119+
checkNotNull(getLakeTableLatestSnapshot()).tieredOffsetsFilePath;
102120
FSDataInputStream inputStream =
103-
checkNotNull(getLakeTableLatestSnapshotFileHandle())
104-
.getFileSystem()
105-
.open(getLakeTableLatestSnapshotFileHandle());
121+
tieredOffsetsFilePath.getFileSystem().open(tieredOffsetsFilePath);
106122
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
107123
IOUtils.copyBytes(inputStream, outputStream, true);
108124
return LakeTableSnapshotJsonSerde.fromJson(outputStream.toByteArray());
109125
}
110126
}
127+
128+
/** The lake snapshot metadata entry stored in zk lake table. */
129+
public static class LakeSnapshotMetadata {
130+
private final long snapshotId;
131+
132+
// the file path to file storing the tiered offsets,
133+
// it points a file storing LakeTableSnapshot which only include tiered offsets
134+
private final FsPath tieredOffsetsFilePath;
135+
136+
// the file path to file storing the readable offsets
137+
private final FsPath readableOffsetsFilePath;
138+
139+
public LakeSnapshotMetadata(
140+
long snapshotId, FsPath tieredOffsetsFilePath, FsPath readableOffsetsFilePath) {
141+
this.snapshotId = snapshotId;
142+
this.tieredOffsetsFilePath = tieredOffsetsFilePath;
143+
this.readableOffsetsFilePath = readableOffsetsFilePath;
144+
}
145+
146+
public long getSnapshotId() {
147+
return snapshotId;
148+
}
149+
150+
public FsPath getTieredOffsetsFilePath() {
151+
return tieredOffsetsFilePath;
152+
}
153+
154+
public FsPath getReadableOffsetsFilePath() {
155+
return readableOffsetsFilePath;
156+
}
157+
158+
public void discard() {
159+
if (tieredOffsetsFilePath != null) {
160+
delete(tieredOffsetsFilePath);
161+
}
162+
if (readableOffsetsFilePath != null
163+
&& readableOffsetsFilePath != tieredOffsetsFilePath) {
164+
delete(readableOffsetsFilePath);
165+
}
166+
}
167+
168+
private void delete(FsPath fsPath) {
169+
try {
170+
FileSystem fileSystem = tieredOffsetsFilePath.getFileSystem();
171+
if (fileSystem.exists(tieredOffsetsFilePath)) {
172+
fileSystem.delete(tieredOffsetsFilePath, false);
173+
}
174+
} catch (IOException e) {
175+
LOG.warn("Error deleting filePath at {}", fsPath, e);
176+
}
177+
}
178+
}
111179
}

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.server.zk.ZooKeeperClient;
2727
import org.apache.fluss.utils.FlussPaths;
2828

29-
import java.io.IOException;
3029
import java.util.HashMap;
3130
import java.util.Map;
3231
import java.util.Optional;
@@ -63,26 +62,44 @@ public void upsertLakeTable(
6362
if (optPreviousLakeTable.isPresent()) {
6463
lakeTableSnapshot =
6564
mergeLakeTable(
66-
optPreviousLakeTable.get().toLakeTableSnapshot(), lakeTableSnapshot);
65+
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
6766
}
6867

68+
// store the lake table snapshot into a file
6969
FsPath lakeTableSnapshotFsPath =
7070
storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
71-
LakeTable lakeTable = new LakeTable(lakeTableSnapshotFsPath);
71+
72+
LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
73+
new LakeTable.LakeSnapshotMetadata(
74+
lakeTableSnapshot.getSnapshotId(),
75+
// use the lake table snapshot file as the tiered offsets file since
76+
// the the table snapshot file will contain the tiered log end offsets
77+
lakeTableSnapshotFsPath,
78+
// currently, readableOffsetsFilePath is always same with
79+
// tieredOffsetsFilePath, but in the future we'll commit a readable offsets
80+
// separately to mark the what's the readable offsets for a snapshot since
81+
// in paimon dv table, tiered log end offsets is not same with readable
82+
// offsets
83+
lakeTableSnapshotFsPath);
84+
85+
// currently, we keep only one lake snapshot metadata in zk,
86+
// todo: in solve paimon dv union read issue #2121, we'll keep multiple lake snapshot
87+
// metadata
88+
LakeTable lakeTable = new LakeTable(lakeSnapshotMetadata);
7289
try {
7390
zkClient.upsertLakeTable(tableId, lakeTable, optPreviousLakeTable.isPresent());
7491
} catch (Exception e) {
7592
LOG.warn("Failed to upsert lake table snapshot to zk.", e);
76-
// delete the new lake table snapshot file
77-
deleteFile(lakeTableSnapshotFsPath);
93+
// discard the new lake snapshot metadata
94+
lakeSnapshotMetadata.discard();
7895
throw e;
7996
}
8097

8198
if (optPreviousLakeTable.isPresent()) {
82-
FsPath previousLakeSnapshotFsPath =
83-
optPreviousLakeTable.get().getLakeTableLatestSnapshotFileHandle();
84-
if (previousLakeSnapshotFsPath != null) {
85-
deleteFile(previousLakeSnapshotFsPath);
99+
LakeTable.LakeSnapshotMetadata previousLakeSnapshotMetadata =
100+
optPreviousLakeTable.get().getLakeTableLatestSnapshot();
101+
if (previousLakeSnapshotMetadata != null) {
102+
previousLakeSnapshotMetadata.discard();
86103
}
87104
}
88105
}
@@ -98,41 +115,27 @@ private LakeTableSnapshot mergeLakeTable(
98115
new HashMap<>(previousLakeTableSnapshot.getBucketLogEndOffset());
99116
bucketLogEndOffset.putAll(newLakeTableSnapshot.getBucketLogEndOffset());
100117

101-
return new LakeTableSnapshot(
102-
newLakeTableSnapshot.getSnapshotId(),
103-
newLakeTableSnapshot.getTableId(),
104-
bucketLogEndOffset);
118+
return new LakeTableSnapshot(newLakeTableSnapshot.getSnapshotId(), bucketLogEndOffset);
105119
}
106120

107121
private FsPath storeLakeTableSnapshot(
108122
long tableId, TablePath tablePath, LakeTableSnapshot lakeTableSnapshot)
109123
throws Exception {
110124
// get the remote file path to store the lake table snapshot information
111-
FsPath remoteLakeTableSnapshotPath =
112-
FlussPaths.remoteLakeTableSnapshotPath(
113-
remoteDataDir, tablePath, tableId, lakeTableSnapshot.getSnapshotId());
125+
FsPath remoteLakeTableSnapshotManifestPath =
126+
FlussPaths.remoteLakeTableSnapshotManifestPath(remoteDataDir, tablePath, tableId);
114127
// check whether the parent directory exists, if not, create the directory
115-
FileSystem fileSystem = remoteLakeTableSnapshotPath.getFileSystem();
116-
if (!fileSystem.exists(remoteLakeTableSnapshotPath.getParent())) {
117-
fileSystem.mkdirs(remoteLakeTableSnapshotPath.getParent());
128+
FileSystem fileSystem = remoteLakeTableSnapshotManifestPath.getFileSystem();
129+
if (!fileSystem.exists(remoteLakeTableSnapshotManifestPath.getParent())) {
130+
fileSystem.mkdirs(remoteLakeTableSnapshotManifestPath.getParent());
118131
}
119132
// serialize table snapshot to json bytes, and write to file
120133
byte[] jsonBytes = LakeTableSnapshotJsonSerde.toJson(lakeTableSnapshot);
121134
try (FSDataOutputStream outputStream =
122-
fileSystem.create(remoteLakeTableSnapshotPath, FileSystem.WriteMode.OVERWRITE)) {
135+
fileSystem.create(
136+
remoteLakeTableSnapshotManifestPath, FileSystem.WriteMode.OVERWRITE)) {
123137
outputStream.write(jsonBytes);
124138
}
125-
return remoteLakeTableSnapshotPath;
126-
}
127-
128-
private void deleteFile(FsPath filePath) {
129-
try {
130-
FileSystem fileSystem = filePath.getFileSystem();
131-
if (fileSystem.exists(filePath)) {
132-
fileSystem.delete(filePath, false);
133-
}
134-
} catch (IOException e) {
135-
LOG.warn("Error deleting filePath at {}", filePath, e);
136-
}
139+
return remoteLakeTableSnapshotManifestPath;
137140
}
138141
}

0 commit comments

Comments
 (0)