Skip to content

Commit 256a8da

Browse files
committed
address comments
1 parent d7a92b2 commit 256a8da

File tree

17 files changed

+818
-354
lines changed

17 files changed

+818
-354
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,14 @@ public class ConfigOptions {
394394
"The rack for the tabletServer. This will be used in rack aware bucket assignment "
395395
+ "for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10`");
396396

397+
public static final ConfigOption<Integer> TABLET_SERVER_IO_POOL_SIZE =
398+
key("tablet-server.io-pool.size")
399+
.intType()
400+
.defaultValue(3)
401+
.withDescription(
402+
"The size of the IO thread pool to run blocking operations for tablet server. "
403+
+ "The default value is 3.");
404+
397405
public static final ConfigOption<String> DATA_DIR =
398406
key("data.dir")
399407
.stringType()

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -689,15 +689,15 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
689689
* <p>The path contract:
690690
*
691691
* <pre>
692-
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}.snapshot
693693
* </pre>
694694
*/
695695
public static FsPath remoteLakeTableSnapshotPath(
696-
Configuration conf, TablePath tablePath, long tableId, long snapshotId) {
696+
String remoteDataDir, TablePath tablePath, long tableId, long snapshotId) {
697697
return new FsPath(
698698
String.format(
699-
"%s/%s/%s/%s-%d/snapshot/%d",
700-
conf.get(ConfigOptions.REMOTE_DATA_DIR),
699+
"%s/%s/%s/%s-%d/snapshot/%d.snapshot",
700+
remoteDataDir,
701701
REMOTE_LAKE_DIR_NAME,
702702
tablePath.getDatabaseName(),
703703
tablePath.getTableName(),

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import java.util.Optional;
109109
import java.util.Set;
110110
import java.util.concurrent.CompletableFuture;
111+
import java.util.concurrent.ExecutorService;
111112
import java.util.stream.Collectors;
112113

113114
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter;
@@ -144,20 +145,24 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR
144145
private long tokenLastUpdateTimeMs = 0;
145146
private ObtainedSecurityToken securityToken = null;
146147

148+
private final ExecutorService ioExecutor;
149+
147150
public RpcServiceBase(
148151
FileSystem remoteFileSystem,
149152
ServerType provider,
150153
ZooKeeperClient zkClient,
151154
MetadataManager metadataManager,
152155
@Nullable Authorizer authorizer,
153-
DynamicConfigManager dynamicConfigManager) {
156+
DynamicConfigManager dynamicConfigManager,
157+
ExecutorService ioExecutor) {
154158
this.remoteFileSystem = remoteFileSystem;
155159
this.provider = provider;
156160
this.apiManager = new ApiManager(provider);
157161
this.zkClient = zkClient;
158162
this.metadataManager = metadataManager;
159163
this.authorizer = authorizer;
160164
this.dynamicConfigManager = dynamicConfigManager;
165+
this.ioExecutor = ioExecutor;
161166
}
162167

163168
@Override
@@ -430,28 +435,33 @@ public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
430435
TableInfo tableInfo = metadataManager.getTable(tablePath);
431436
// get table id
432437
long tableId = tableInfo.getTableId();
433-
434-
Optional<LakeTableSnapshot> optLakeTableSnapshot;
435-
try {
436-
optLakeTableSnapshot = zkClient.getLakeTableSnapshot(tableId);
437-
} catch (Exception e) {
438-
throw new FlussRuntimeException(
439-
String.format(
440-
"Failed to get lake table snapshot for table: %s, table id: %d",
441-
tablePath, tableId),
442-
e);
443-
}
444-
445-
if (!optLakeTableSnapshot.isPresent()) {
446-
throw new LakeTableSnapshotNotExistException(
447-
String.format(
448-
"Lake table snapshot not exist for table: %s, table id: %d",
449-
tablePath, tableId));
450-
}
451-
452-
LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
453-
return CompletableFuture.completedFuture(
454-
makeGetLatestLakeSnapshotResponse(tableId, lakeTableSnapshot));
438+
CompletableFuture<GetLatestLakeSnapshotResponse> resultFuture = new CompletableFuture<>();
439+
ioExecutor.execute(
440+
() -> {
441+
Optional<LakeTableSnapshot> optLakeTableSnapshot;
442+
try {
443+
optLakeTableSnapshot = zkClient.getLakeTableSnapshot(tableId);
444+
if (!optLakeTableSnapshot.isPresent()) {
445+
resultFuture.completeExceptionally(
446+
new LakeTableSnapshotNotExistException(
447+
String.format(
448+
"Lake table snapshot not exist for table: %s, table id: %d",
449+
tablePath, tableId)));
450+
} else {
451+
LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
452+
resultFuture.complete(
453+
makeGetLatestLakeSnapshotResponse(tableId, lakeTableSnapshot));
454+
}
455+
} catch (Exception e) {
456+
resultFuture.completeExceptionally(
457+
new FlussRuntimeException(
458+
String.format(
459+
"Failed to get lake table snapshot for table: %s, table id: %d",
460+
tablePath, tableId),
461+
e));
462+
}
463+
});
464+
return resultFuture;
455465
}
456466

457467
@Override

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.exception.IneligibleReplicaException;
2929
import org.apache.fluss.exception.InvalidCoordinatorException;
3030
import org.apache.fluss.exception.InvalidUpdateVersionException;
31+
import org.apache.fluss.exception.TableNotExistException;
3132
import org.apache.fluss.exception.TabletServerNotAvailableException;
3233
import org.apache.fluss.exception.UnknownTableOrBucketException;
3334
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -89,6 +90,7 @@
8990
import org.apache.fluss.server.zk.data.TabletServerRegistration;
9091
import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
9192
import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode;
93+
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
9294
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
9395
import org.apache.fluss.utils.types.Tuple2;
9496

@@ -145,6 +147,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
145147
private final String internalListenerName;
146148

147149
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
150+
private final LakeTableHelper lakeTableHelper;
148151

149152
public CoordinatorEventProcessor(
150153
ZooKeeperClient zooKeeperClient,
@@ -205,6 +208,8 @@ public CoordinatorEventProcessor(
205208
this.coordinatorMetricGroup = coordinatorMetricGroup;
206209
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
207210
this.ioExecutor = ioExecutor;
211+
this.lakeTableHelper =
212+
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
208213
}
209214

210215
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -1223,6 +1228,16 @@ private void tryProcessCommitLakeTableSnapshot(
12231228
commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
12241229
Map<Long, LakeTableSnapshot> lakeTableSnapshots =
12251230
commitLakeTableSnapshotData.getLakeTableSnapshot();
1231+
Map<Long, TablePath> tablePathById = new HashMap<>();
1232+
for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
1233+
lakeTableSnapshots.entrySet()) {
1234+
Long tableId = lakeTableSnapshotEntry.getKey();
1235+
TablePath tablePath = coordinatorContext.getTablePathById(tableId);
1236+
if (tablePath != null) {
1237+
tablePathById.put(tableId, tablePath);
1238+
}
1239+
}
1240+
12261241
ioExecutor.execute(
12271242
() -> {
12281243
try {
@@ -1237,15 +1252,16 @@ private void tryProcessCommitLakeTableSnapshot(
12371252
tableResp.setTableId(tableId);
12381253

12391254
try {
1240-
TablePath tablePath = coordinatorContext.getTablePathById(tableId);
1255+
TablePath tablePath = tablePathById.get(tableId);
12411256
if (tablePath == null) {
1242-
throw new RuntimeException(
1243-
String.format(
1244-
"Failed to find table path for table id: %d",
1245-
tableId));
1257+
throw new TableNotExistException(
1258+
"Table "
1259+
+ tableId
1260+
+ " not found in coordinator context.");
12461261
}
1262+
12471263
// this involves IO operation (ZK), so we do it in ioExecutor
1248-
zooKeeperClient.upsertLakeTableSnapshot(
1264+
lakeTableHelper.upsertLakeTable(
12491265
tableId, tablePath, lakeTableSnapshotEntry.getValue());
12501266
} catch (Exception e) {
12511267
ApiError error = ApiError.fromThrowable(e);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ protected void startServices() throws Exception {
188188

189189
MetadataManager metadataManager =
190190
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
191+
192+
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
193+
this.ioExecutor =
194+
Executors.newFixedThreadPool(
195+
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
191196
this.coordinatorService =
192197
new CoordinatorService(
193198
conf,
@@ -199,7 +204,8 @@ protected void startServices() throws Exception {
199204
authorizer,
200205
lakeCatalogDynamicLoader,
201206
lakeTableTieringManager,
202-
dynamicConfigManager);
207+
dynamicConfigManager,
208+
ioExecutor);
203209

204210
this.rpcServer =
205211
RpcServer.create(
@@ -225,11 +231,6 @@ protected void startServices() throws Exception {
225231
new AutoPartitionManager(metadataCache, metadataManager, conf);
226232
autoPartitionManager.start();
227233

228-
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
229-
this.ioExecutor =
230-
Executors.newFixedThreadPool(
231-
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
232-
233234
// start coordinator event processor after we register coordinator leader to zk
234235
// so that the event processor can get the coordinator leader node from zk during start
235236
// up.
@@ -366,15 +367,6 @@ CompletableFuture<Void> stopServices() {
366367
exception = ExceptionUtils.firstOrSuppressed(t, exception);
367368
}
368369

369-
try {
370-
if (ioExecutor != null) {
371-
// shutdown io executor
372-
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
373-
}
374-
} catch (Throwable t) {
375-
exception = ExceptionUtils.firstOrSuppressed(t, exception);
376-
}
377-
378370
try {
379371
if (coordinatorEventProcessor != null) {
380372
coordinatorEventProcessor.shutdown();
@@ -407,6 +399,11 @@ CompletableFuture<Void> stopServices() {
407399
exception = ExceptionUtils.firstOrSuppressed(t, exception);
408400
}
409401

402+
if (ioExecutor != null) {
403+
// shutdown io executor
404+
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
405+
}
406+
410407
try {
411408
if (coordinatorContext != null) {
412409
// then reset coordinatorContext

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
import java.util.List;
123123
import java.util.Map;
124124
import java.util.concurrent.CompletableFuture;
125+
import java.util.concurrent.ExecutorService;
125126
import java.util.function.Supplier;
126127
import java.util.stream.Collectors;
127128

@@ -166,14 +167,16 @@ public CoordinatorService(
166167
@Nullable Authorizer authorizer,
167168
LakeCatalogDynamicLoader lakeCatalogDynamicLoader,
168169
LakeTableTieringManager lakeTableTieringManager,
169-
DynamicConfigManager dynamicConfigManager) {
170+
DynamicConfigManager dynamicConfigManager,
171+
ExecutorService ioExecutor) {
170172
super(
171173
remoteFileSystem,
172174
ServerType.COORDINATOR,
173175
zkClient,
174176
metadataManager,
175177
authorizer,
176-
dynamicConfigManager);
178+
dynamicConfigManager,
179+
ioExecutor);
177180
this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
178181
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
179182
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@
5353
import org.apache.fluss.server.zk.data.TabletServerRegistration;
5454
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
5555
import org.apache.fluss.utils.ExceptionUtils;
56+
import org.apache.fluss.utils.ExecutorUtils;
5657
import org.apache.fluss.utils.clock.Clock;
5758
import org.apache.fluss.utils.clock.SystemClock;
59+
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
5860
import org.apache.fluss.utils.concurrent.FlussScheduler;
5961
import org.apache.fluss.utils.concurrent.FutureUtils;
6062
import org.apache.fluss.utils.concurrent.Scheduler;
@@ -70,6 +72,9 @@
7072
import java.util.List;
7173
import java.util.Optional;
7274
import java.util.concurrent.CompletableFuture;
75+
import java.util.concurrent.ExecutorService;
76+
import java.util.concurrent.Executors;
77+
import java.util.concurrent.TimeUnit;
7378
import java.util.concurrent.atomic.AtomicBoolean;
7479

7580
import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS;
@@ -159,6 +164,9 @@ public class TabletServer extends ServerBase {
159164
@GuardedBy("lock")
160165
private CoordinatorGateway coordinatorGateway;
161166

167+
@GuardedBy("lock")
168+
private ExecutorService ioExecutor;
169+
162170
public TabletServer(Configuration conf) {
163171
this(conf, SystemClock.getInstance());
164172
}
@@ -251,6 +259,11 @@ protected void startServices() throws Exception {
251259
tabletServerMetricGroup,
252260
clock);
253261
replicaManager.startup();
262+
int ioExecutorPoolSize = conf.get(ConfigOptions.TABLET_SERVER_IO_POOL_SIZE);
263+
264+
this.ioExecutor =
265+
Executors.newFixedThreadPool(
266+
ioExecutorPoolSize, new ExecutorThreadFactory("tablet-server-io"));
254267

255268
this.tabletService =
256269
new TabletService(
@@ -261,7 +274,8 @@ protected void startServices() throws Exception {
261274
metadataCache,
262275
metadataManager,
263276
authorizer,
264-
dynamicConfigManager);
277+
dynamicConfigManager,
278+
ioExecutor);
265279

266280
RequestsMetrics requestsMetrics =
267281
RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);
@@ -430,6 +444,11 @@ CompletableFuture<Void> stopServices() {
430444
if (zkClient != null) {
431445
zkClient.close();
432446
}
447+
448+
if (ioExecutor != null) {
449+
// shutdown io executor
450+
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
451+
}
433452
} catch (Throwable t) {
434453
exception = ExceptionUtils.firstOrSuppressed(t, exception);
435454
}

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.util.Optional;
8888
import java.util.Set;
8989
import java.util.concurrent.CompletableFuture;
90+
import java.util.concurrent.ExecutorService;
9091
import java.util.function.BiFunction;
9192
import java.util.stream.Collectors;
9293

@@ -134,14 +135,16 @@ public TabletService(
134135
TabletServerMetadataCache metadataCache,
135136
MetadataManager metadataManager,
136137
@Nullable Authorizer authorizer,
137-
DynamicConfigManager dynamicConfigManager) {
138+
DynamicConfigManager dynamicConfigManager,
139+
ExecutorService ioExecutor) {
138140
super(
139141
remoteFileSystem,
140142
ServerType.TABLET_SERVER,
141143
zkClient,
142144
metadataManager,
143145
authorizer,
144-
dynamicConfigManager);
146+
dynamicConfigManager,
147+
ioExecutor);
145148
this.serviceName = "server-" + serverId;
146149
this.replicaManager = replicaManager;
147150
this.metadataCache = metadataCache;

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@
165165
import org.apache.fluss.server.metadata.TableMetadata;
166166
import org.apache.fluss.server.zk.data.BucketSnapshot;
167167
import org.apache.fluss.server.zk.data.LeaderAndIsr;
168+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
168169
import org.apache.fluss.utils.json.DataTypeJsonSerde;
169170
import org.apache.fluss.utils.json.JsonSerdeUtils;
170-
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
171171

172172
import javax.annotation.Nullable;
173173

0 commit comments

Comments
 (0)