Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,9 @@ public class LakeSnapshot {
// the specific log offset of the snapshot
private final Map<TableBucket, Long> tableBucketsOffset;

// the partition name by partition id of this lake snapshot if
// is a partitioned table, empty if not a partitioned table
private final Map<Long, String> partitionNameById;

public LakeSnapshot(
long snapshotId,
Map<TableBucket, Long> tableBucketsOffset,
Map<Long, String> partitionNameById) {
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
this.snapshotId = snapshotId;
this.tableBucketsOffset = tableBucketsOffset;
this.partitionNameById = partitionNameById;
}

public long getSnapshotId() {
Expand All @@ -58,19 +50,13 @@ public Map<TableBucket, Long> getTableBucketsOffset() {
return Collections.unmodifiableMap(tableBucketsOffset);
}

public Map<Long, String> getPartitionNameById() {
return Collections.unmodifiableMap(partitionNameById);
}

@Override
public String toString() {
return "LakeSnapshot{"
+ "snapshotId="
+ snapshotId
+ ", tableBucketsOffset="
+ tableBucketsOffset
+ ", partitionNameById="
+ partitionNameById
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,16 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
long snapshotId = response.getSnapshotId();
Map<TableBucket, Long> tableBucketsOffset =
new HashMap<>(response.getBucketSnapshotsCount());
Map<Long, String> partitionNameById = new HashMap<>();
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
Long partitionId =
pbLakeSnapshotForBucket.hasPartitionId()
? pbLakeSnapshotForBucket.getPartitionId()
: null;
TableBucket tableBucket =
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
if (partitionId != null && pbLakeSnapshotForBucket.hasPartitionName()) {
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
}
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
}
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
return new LakeSnapshot(snapshotId, tableBucketsOffset);
}

public static List<FsPathAndFileName> toFsPathAndFileName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ public class ConfigOptions {
"The rack for the tabletServer. This will be used in rack aware bucket assignment "
+ "for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10`");

public static final ConfigOption<Integer> TABLET_SERVER_IO_POOL_SIZE =
key("tablet-server.io-pool.size")
.intType()
.defaultValue(3)
.withDescription(
"The size of the IO thread pool to run blocking operations for tablet server. "
+ "The default value is 3.");

public static final ConfigOption<String> DATA_DIR =
key("data.dir")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,11 @@ public class BucketOffset implements Serializable {
private final long logOffset;
private final int bucket;
private final @Nullable Long partitionId;
private final @Nullable String partitionQualifiedName;

public BucketOffset(
long logOffset,
int bucket,
@Nullable Long partitionId,
@Nullable String partitionQualifiedName) {
public BucketOffset(long logOffset, int bucket, @Nullable Long partitionId) {
this.logOffset = logOffset;
this.bucket = bucket;
this.partitionId = partitionId;
this.partitionQualifiedName = partitionQualifiedName;
}

public long getLogOffset() {
Expand All @@ -57,11 +51,6 @@ public Long getPartitionId() {
return partitionId;
}

@Nullable
public String getPartitionQualifiedName() {
return partitionQualifiedName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -71,9 +60,13 @@ public boolean equals(Object o) {
return false;
}
BucketOffset that = (BucketOffset) o;
return bucket == that.bucket
&& logOffset == that.logOffset
&& Objects.equals(partitionId, that.partitionId)
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
return logOffset == that.logOffset
&& bucket == that.bucket
&& Objects.equals(partitionId, that.partitionId);
}

@Override
public int hashCode() {
return Objects.hash(logOffset, bucket, partitionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public class CommittedLakeSnapshot {
// partition bucket
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();

// partition id -> partition name, will be empty if is not a partitioned table
// the partition name is a qualified name in the format: key1=value1/key2=value2
private final Map<Long, String> qualifiedPartitionNameById = new HashMap<>();

public CommittedLakeSnapshot(long lakeSnapshotId) {
this.lakeSnapshotId = lakeSnapshotId;
}
Expand All @@ -50,34 +46,30 @@ public void addBucket(int bucketId, long offset) {
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
}

public void addPartitionBucket(
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
}

public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
return logEndOffsets;
}

public Map<Long, String> getQualifiedPartitionNameById() {
return qualifiedPartitionNameById;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
return lakeSnapshotId == that.lakeSnapshotId
&& Objects.equals(logEndOffsets, that.logEndOffsets)
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
&& Objects.equals(logEndOffsets, that.logEndOffsets);
}

@Override
public int hashCode() {
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
return Objects.hash(lakeSnapshotId, logEndOffsets);
}

@Override
Expand All @@ -87,8 +79,6 @@ public String toString() {
+ lakeSnapshotId
+ ", logEndOffsets="
+ logEndOffsets
+ ", partitionNameById="
+ qualifiedPartitionNameById
+ '}';
}
}
41 changes: 41 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class FlussPaths {
/** The name of the directory for shared remote snapshot kv files. */
public static final String REMOTE_KV_SNAPSHOT_SHARED_DIR = "shared";

private static final String REMOTE_LAKE_DIR_NAME = "lake";

// ----------------------------------------------------------------------------------------
// LOG/KV Tablet Paths
// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -681,6 +683,45 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX + snapshotId);
}

/**
* Returns the remote path for storing lake snapshot metadata required by Fluss for a table.
*
* <p>The path contract:
*
* <pre>
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
* </pre>
*/
public static FsPath remoteLakeTableSnapshotMetadataDir(
String remoteDataDir, TablePath tablePath, long tableId) {
return new FsPath(
String.format(
"%s/%s/%s/%s-%d",
remoteDataDir,
REMOTE_LAKE_DIR_NAME,
tablePath.getDatabaseName(),
tablePath.getTableName(),
tableId));
}

/**
* Returns a remote path for storing lake snapshot metadata required by Fluss for a table.
*
* <p>The path contract:
*
* <pre>
* {$remoteLakeTableSnapshotMetadataDir}/manifest/{uuid}.manifest
* </pre>
*/
public static FsPath remoteLakeTableSnapshotManifestPath(
String remoteDataDir, TablePath tablePath, long tableId) {
return new FsPath(
String.format(
"%s/manifest/%s.manifest",
remoteLakeTableSnapshotMetadataDir(remoteDataDir, tablePath, tableId),
UUID.randomUUID()));
}

/**
* Returns the remote directory path for storing kv snapshot shared files (SST files with UUID
* prefix).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class BucketOffsetJsonSerde
public static final BucketOffsetJsonSerde INSTANCE = new BucketOffsetJsonSerde();
private static final String PARTITION_ID = "partition_id";
private static final String BUCKET_ID = "bucket";
private static final String PARTITION_NAME = "partition_name";
private static final String LOG_OFFSET = "offset";

@Override
Expand All @@ -39,14 +38,10 @@ public BucketOffset deserialize(JsonNode node) {
Long partitionId = partitionIdNode == null ? null : partitionIdNode.asLong();
int bucketId = node.get(BUCKET_ID).asInt();

// deserialize partition name
JsonNode partitionNameNode = node.get(PARTITION_NAME);
String partitionName = partitionNameNode == null ? null : partitionNameNode.asText();

// deserialize log offset
long logOffset = node.get(LOG_OFFSET).asLong();

return new BucketOffset(logOffset, bucketId, partitionId, partitionName);
return new BucketOffset(logOffset, bucketId, partitionId);
}

@Override
Expand All @@ -59,11 +54,6 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
}
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());

// serialize partition name
if (bucketOffset.getPartitionQualifiedName() != null) {
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionQualifiedName());
}

// serialize bucket offset
generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@ public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {

@Override
protected BucketOffset[] createObjects() {
return new BucketOffset[] {
new BucketOffset(10, 1, 1L, "country=eu-central/year=2023/month=12"),
new BucketOffset(20, 2, null, null)
};
return new BucketOffset[] {new BucketOffset(10, 1, 1L), new BucketOffset(20, 2, null)};
}

@Override
protected String[] expectedJsons() {
return new String[] {
"{\"partition_id\":1,\"bucket\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"offset\":10}",
"{\"bucket\":2,\"offset\":20}"
"{\"partition_id\":1,\"bucket\":1,\"offset\":10}", "{\"bucket\":2,\"offset\":20}"
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.fluss.flink.tiering.committer;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.types.Tuple2;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,19 +30,13 @@ class FlussTableLakeSnapshot {

private final long lakeSnapshotId;

// <table_bucket, partition_name> -> log end offsets,
// if the bucket is not of a partition, the partition_name is null
private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;

// <table_bucket, partition_name> -> max timestamps,
// if the bucket is not of a partition, the partition_name is null
private final Map<Tuple2<TableBucket, String>, Long> maxTimestamps;
// table_bucket -> log end offsets,
private final Map<TableBucket, Long> logEndOffsets;

FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
this.tableId = tableId;
this.lakeSnapshotId = lakeSnapshotId;
this.logEndOffsets = new HashMap<>();
this.maxTimestamps = new HashMap<>();
}

public long tableId() {
Expand All @@ -54,27 +47,20 @@ public long lakeSnapshotId() {
return lakeSnapshotId;
}

public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
public Set<TableBucket> tableBuckets() {
return logEndOffsets.keySet();
}

public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, long timestamp) {
logEndOffsets.put(Tuple2.of(bucket, null), offset);
maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
}

public void addPartitionBucketOffsetAndTimestamp(
TableBucket bucket, String partitionName, long offset, long timestamp) {
logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
public void addBucketOffset(TableBucket bucket, long offset) {
logEndOffsets.put(bucket, offset);
}

public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
return logEndOffsets.get(bucketPartition);
public void addPartitionBucketOffset(TableBucket bucket, long offset) {
logEndOffsets.put(bucket, offset);
}

public long getMaxTimestamp(Tuple2<TableBucket, String> bucketPartition) {
return maxTimestamps.get(bucketPartition);
public long getLogEndOffset(TableBucket bucket) {
return logEndOffsets.get(bucket);
}

@Override
Expand All @@ -86,8 +72,6 @@ public String toString() {
+ lakeSnapshotId
+ ", logEndOffsets="
+ logEndOffsets
+ ", maxTimestamps="
+ maxTimestamps
+ '}';
}
}
Loading