6161import org .apache .fluss .server .coordinator .event .FencedCoordinatorEvent ;
6262import org .apache .fluss .server .coordinator .event .NewTabletServerEvent ;
6363import org .apache .fluss .server .coordinator .event .NotifyKvSnapshotOffsetEvent ;
64+ import org .apache .fluss .server .coordinator .event .NotifyLakeTableOffsetEvent ;
6465import org .apache .fluss .server .coordinator .event .NotifyLeaderAndIsrResponseReceivedEvent ;
6566import org .apache .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
6667import org .apache .fluss .server .coordinator .event .watcher .TabletServerChangeWatcher ;
@@ -524,6 +525,8 @@ public void process(CoordinatorEvent event) {
524525 commitKvSnapshotEvent , commitKvSnapshotEvent .getRespCallback ());
525526 } else if (event instanceof NotifyKvSnapshotOffsetEvent ) {
526527 processNotifyKvSnapshotOffsetEvent ((NotifyKvSnapshotOffsetEvent ) event );
528+ } else if (event instanceof NotifyLakeTableOffsetEvent ) {
529+ processNotifyLakeTableOffsetEvent ((NotifyLakeTableOffsetEvent ) event );
527530 } else if (event instanceof CommitRemoteLogManifestEvent ) {
528531 CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
529532 (CommitRemoteLogManifestEvent ) event ;
@@ -533,9 +536,8 @@ public void process(CoordinatorEvent event) {
533536 } else if (event instanceof CommitLakeTableSnapshotEvent ) {
534537 CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
535538 (CommitLakeTableSnapshotEvent ) event ;
536- completeFromCallable (
537- commitLakeTableSnapshotEvent .getRespCallback (),
538- () -> tryProcessCommitLakeTableSnapshot (commitLakeTableSnapshotEvent ));
539+ tryProcessCommitLakeTableSnapshot (
540+ commitLakeTableSnapshotEvent , commitLakeTableSnapshotEvent .getRespCallback ());
539541 } else if (event instanceof ControlledShutdownEvent ) {
540542 ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent ) event ;
541543 completeFromCallable (
@@ -1097,6 +1099,30 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
10971099 coordinatorContext .getCoordinatorEpoch ());
10981100 }
10991101
1102+ private void processNotifyLakeTableOffsetEvent (NotifyLakeTableOffsetEvent event ) {
1103+ Map <Long , LakeTableSnapshot > lakeTableSnapshots = event .getLakeTableSnapshots ();
1104+ coordinatorRequestBatch .newBatch ();
1105+ for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1106+ lakeTableSnapshots .entrySet ()) {
1107+ LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotEntry .getValue ();
1108+ for (Map .Entry <TableBucket , Long > bucketLogEndOffsetEntry :
1109+ lakeTableSnapshot .getBucketLogEndOffset ().entrySet ()) {
1110+ TableBucket tb = bucketLogEndOffsetEntry .getKey ();
1111+ coordinatorContext
1112+ .getBucketLeaderAndIsr (bucketLogEndOffsetEntry .getKey ())
1113+ .ifPresent (
1114+ leaderAndIsr ->
1115+ coordinatorRequestBatch
1116+ .addNotifyLakeTableOffsetRequestForTableServers (
1117+ coordinatorContext .getAssignment (tb ),
1118+ tb ,
1119+ lakeTableSnapshot ));
1120+ }
1121+ }
1122+ coordinatorRequestBatch .sendNotifyLakeTableOffsetRequest (
1123+ coordinatorContext .getCoordinatorEpoch ());
1124+ }
1125+
11001126 private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest (
11011127 CommitRemoteLogManifestEvent event ) {
11021128 CommitRemoteLogManifestData manifestData = event .getCommitRemoteLogManifestData ();
@@ -1146,56 +1172,52 @@ private <T> void processAccessContext(AccessContextEvent<T> event) {
11461172 }
11471173 }
11481174
1149- private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot (
1150- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent ) {
1175+ private void tryProcessCommitLakeTableSnapshot (
1176+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent ,
1177+ CompletableFuture <CommitLakeTableSnapshotResponse > callback ) {
1178+ // commit the lake table snapshot asynchronously
11511179 CommitLakeTableSnapshotData commitLakeTableSnapshotData =
11521180 commitLakeTableSnapshotEvent .getCommitLakeTableSnapshotData ();
1153- CommitLakeTableSnapshotResponse response = new CommitLakeTableSnapshotResponse ();
11541181 Map <Long , LakeTableSnapshot > lakeTableSnapshots =
11551182 commitLakeTableSnapshotData .getLakeTableSnapshot ();
1156- for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1157- lakeTableSnapshots .entrySet ()) {
1158- Long tableId = lakeTableSnapshotEntry .getKey ();
1159-
1160- PbCommitLakeTableSnapshotRespForTable tableResp = response .addTableResp ();
1161- tableResp .setTableId (tableId );
1162-
1163- try {
1164- TablePath tablePath = coordinatorContext .getTablePathById (tableId );
1165- if (tablePath == null ) {
1166- throw new RuntimeException (
1167- String .format ("Failed to find table path for table id: %d" , tableId ));
1168- }
1169- zooKeeperClient .upsertLakeTableSnapshot (
1170- tableId , tablePath , lakeTableSnapshotEntry .getValue ());
1171- } catch (Exception e ) {
1172- ApiError error = ApiError .fromThrowable (e );
1173- tableResp .setError (error .error ().code (), error .message ());
1174- }
1175- }
1183+ ioExecutor .execute (
1184+ () -> {
1185+ try {
1186+ CommitLakeTableSnapshotResponse response =
1187+ new CommitLakeTableSnapshotResponse ();
1188+ for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1189+ lakeTableSnapshots .entrySet ()) {
1190+ Long tableId = lakeTableSnapshotEntry .getKey ();
1191+
1192+ PbCommitLakeTableSnapshotRespForTable tableResp =
1193+ response .addTableResp ();
1194+ tableResp .setTableId (tableId );
1195+
1196+ try {
1197+ TablePath tablePath = coordinatorContext .getTablePathById (tableId );
1198+ if (tablePath == null ) {
1199+ throw new RuntimeException (
1200+ String .format (
1201+ "Failed to find table path for table id: %d" ,
1202+ tableId ));
1203+ }
1204+ // this involves IO operation (ZK), so we do it in ioExecutor
1205+ zooKeeperClient .upsertLakeTableSnapshot (
1206+ tableId , tablePath , lakeTableSnapshotEntry .getValue ());
1207+ } catch (Exception e ) {
1208+ ApiError error = ApiError .fromThrowable (e );
1209+ tableResp .setError (error .error ().code (), error .message ());
1210+ }
1211+ }
11761212
1177- // send notify lakehouse data request to all replicas.
1178- coordinatorRequestBatch .newBatch ();
1179- for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1180- lakeTableSnapshots .entrySet ()) {
1181- LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotEntry .getValue ();
1182- for (Map .Entry <TableBucket , Long > bucketLogEndOffsetEntry :
1183- lakeTableSnapshot .getBucketLogEndOffset ().entrySet ()) {
1184- TableBucket tb = bucketLogEndOffsetEntry .getKey ();
1185- coordinatorContext
1186- .getBucketLeaderAndIsr (bucketLogEndOffsetEntry .getKey ())
1187- .ifPresent (
1188- leaderAndIsr ->
1189- coordinatorRequestBatch
1190- .addNotifyLakeTableOffsetRequestForTableServers (
1191- coordinatorContext .getAssignment (tb ),
1192- tb ,
1193- lakeTableSnapshot ));
1194- }
1195- }
1196- coordinatorRequestBatch .sendNotifyLakeTableOffsetRequest (
1197- coordinatorContext .getCoordinatorEpoch ());
1198- return response ;
1213+ // send notify lakehouse data request to all replicas via coordinator event
1214+ coordinatorEventManager .put (
1215+ new NotifyLakeTableOffsetEvent (lakeTableSnapshots ));
1216+ callback .complete (response );
1217+ } catch (Exception e ) {
1218+ callback .completeExceptionally (e );
1219+ }
1220+ });
11991221 }
12001222
12011223 private ControlledShutdownResponse tryProcessControlledShutdown (
0 commit comments