Skip to content

Commit 6ac2141

Browse files
committed
enable lake table stores multiple lake snapshot && lake snapshot should contain readable offset and tiered offset
1 parent 5db8f4b commit 6ac2141

File tree

19 files changed

+409
-226
lines changed

19 files changed

+409
-226
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public Long getPartitionId() {
5353

5454
@Override
5555
public boolean equals(Object o) {
56+
if (this == o) {
57+
return true;
58+
}
5659
if (o == null || getClass() != o.getClass()) {
5760
return false;
5861
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5656

5757
@Override
5858
public boolean equals(Object o) {
59+
if (this == o) {
60+
return true;
61+
}
5962
if (o == null || getClass() != o.getClass()) {
6063
return false;
6164
}

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -689,20 +689,37 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
689689
* <p>The path contract:
690690
*
691691
* <pre>
692-
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}.snapshot
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
693693
* </pre>
694694
*/
695-
public static FsPath remoteLakeTableSnapshotPath(
696-
String remoteDataDir, TablePath tablePath, long tableId, long snapshotId) {
695+
public static FsPath remoteLakeTableSnapshotMetadataDir(
696+
String remoteDataDir, TablePath tablePath, long tableId) {
697697
return new FsPath(
698698
String.format(
699-
"%s/%s/%s/%s-%d/snapshot/%d.snapshot",
699+
"%s/%s/%s/%s-%d",
700700
remoteDataDir,
701701
REMOTE_LAKE_DIR_NAME,
702702
tablePath.getDatabaseName(),
703703
tablePath.getTableName(),
704-
tableId,
705-
snapshotId));
704+
tableId));
705+
}
706+
707+
/**
708+
* Returns a remote path for storing lake snapshot metadata required by Fluss for a table.
709+
*
710+
* <p>The path contract:
711+
*
712+
* <pre>
713+
* {$remoteLakeTableSnapshotMetadataDir}/manifest/{uuid}.manifest
714+
* </pre>
715+
*/
716+
public static FsPath remoteLakeTableSnapshotManifestPath(
717+
String remoteDataDir, TablePath tablePath, long tableId) {
718+
return new FsPath(
719+
String.format(
720+
"%s/manifest/%s.manifest",
721+
remoteLakeTableSnapshotMetadataDir(remoteDataDir, tablePath, tableId),
722+
UUID.randomUUID()));
706723
}
707724

708725
/**

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ private void testPartitionedTableTiering() throws Exception {
402402
put(
403403
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
404404
"["
405-
+ "{\"partition_id\":0,\"bucket\":0,\"partition_name\":\"date=2025\",\"offset\":3},"
406-
+ "{\"partition_id\":1,\"bucket\":0,\"partition_name\":\"date=2026\",\"offset\":3}"
405+
+ "{\"partition_id\":0,\"bucket\":0,\"offset\":3},"
406+
+ "{\"partition_id\":1,\"bucket\":0,\"offset\":3}"
407407
+ "]");
408408
}
409409
};

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
335335
put(
336336
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
337337
String.format(
338-
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
338+
"[{\"partition_id\":%d,\"bucket\":0,\"offset\":1}]",
339339
partitionId));
340340
}
341341
};
@@ -475,8 +475,7 @@ void testTieringForAlterTable() throws Exception {
475475
}
476476

477477
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
478-
String raw =
479-
"{\"partition_id\":%s,\"bucket\":0,\"partition_name\":\"date=%s\",\"offset\":3}";
478+
String raw = "{\"partition_id\":%s,\"bucket\":0,\"offset\":3}";
480479
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
481480
Collections.sort(partitionIds);
482481
List<String> partitionOffsetStrs = new ArrayList<>();

fluss-server/src/main/java/org/apache/fluss/server/coordinator/RemoteStorageCleaner.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.coordinator;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.exception.FlussRuntimeException;
2223
import org.apache.fluss.fs.FileSystem;
@@ -41,13 +42,16 @@ public class RemoteStorageCleaner {
4142

4243
private final FsPath remoteLogDir;
4344

45+
private final String remoteDataDir;
46+
4447
private final FileSystem remoteFileSystem;
4548

4649
private final ExecutorService ioExecutor;
4750

4851
public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecutor) {
4952
this.remoteKvDir = FlussPaths.remoteKvDir(configuration);
5053
this.remoteLogDir = FlussPaths.remoteLogDir(configuration);
54+
this.remoteDataDir = configuration.getString(ConfigOptions.REMOTE_DATA_DIR);
5155
this.ioExecutor = ioExecutor;
5256
try {
5357
this.remoteFileSystem = remoteKvDir.getFileSystem();
@@ -57,10 +61,16 @@ public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecu
5761
}
5862
}
5963

60-
public void asyncDeleteTableRemoteDir(TablePath tablePath, boolean isKvTable, long tableId) {
64+
public void asyncDeleteTableRemoteDir(
65+
TablePath tablePath, boolean isKvTable, boolean isLakeEnabled, long tableId) {
6166
if (isKvTable) {
6267
asyncDeleteDir(FlussPaths.remoteTableDir(remoteKvDir, tablePath, tableId));
6368
}
69+
if (isLakeEnabled) {
70+
asyncDeleteDir(
71+
FlussPaths.remoteLakeTableSnapshotMetadataDir(
72+
remoteDataDir, tablePath, tableId));
73+
}
6474
asyncDeleteDir(FlussPaths.remoteTableDir(remoteLogDir, tablePath, tableId));
6575
}
6676

fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ private void asyncDeleteRemoteDirectory(long tableId) {
275275
TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
276276
if (tableInfo != null) {
277277
remoteStorageCleaner.asyncDeleteTableRemoteDir(
278-
tableInfo.getTablePath(), tableInfo.hasPrimaryKey(), tableId);
278+
tableInfo.getTablePath(),
279+
tableInfo.hasPrimaryKey(),
280+
tableInfo.getTableConfig().isDataLakeEnabled(),
281+
tableId);
279282
}
280283
}
281284

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
4747

4848
@Override
4949
public boolean equals(Object o) {
50+
if (this == o) {
51+
return true;
52+
}
5053
if (o == null || getClass() != o.getClass()) {
5154
return false;
5255
}

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1572,7 +1572,7 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15721572
}
15731573
}
15741574
lakeTableInfoByTableId.put(
1575-
tableId, new LakeTableSnapshot(snapshotId, tableId, bucketLogEndOffset));
1575+
tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset));
15761576
}
15771577
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);
15781578
}

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,8 @@ public Optional<LakeTable> getLakeTable(long tableId) throws Exception {
10601060
public Optional<LakeTableSnapshot> getLakeTableSnapshot(long tableId) throws Exception {
10611061
Optional<LakeTable> optLakeTable = getLakeTable(tableId);
10621062
if (optLakeTable.isPresent()) {
1063-
return Optional.of(optLakeTable.get().toLakeTableSnapshot());
1063+
// always get the latest snapshot
1064+
return Optional.of(optLakeTable.get().getLatestTableSnapshot());
10641065
} else {
10651066
return Optional.empty();
10661067
}

0 commit comments

Comments
 (0)