diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index ae04328202..3c804596af 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -23,6 +23,7 @@ import org.apache.fluss.exception.FlussException; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.LogStorageException; +import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; @@ -34,6 +35,7 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.PartitionRegistration; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; @@ -48,7 +50,9 @@ import java.io.File; import java.io.IOException; +import java.nio.file.DirectoryNotEmptyException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -386,6 +390,25 @@ private LogTablet loadLog( PhysicalTablePath physicalTablePath = pathAndBucket.f0; TablePath tablePath = physicalTablePath.getTablePath(); TableInfo tableInfo = getTableInfo(zkClient, tablePath); + + // Table schema exists, but the partition may have been dropped (or dropped + // and recreated with a new ID) while this TS was offline. + // Validate both partition name and partition ID against ZK. + String partitionName = physicalTablePath.getPartitionName(); + if (partitionName != null) { + Optional registration = + zkClient.getPartition(tablePath, partitionName); + if (!registration.isPresent() + || registration.get().getPartitionId() != tableBucket.getPartitionId()) { + throw new PartitionNotExistException( + String.format( + "Failed to load partition '%s' (partitionId=%d) of table '%s': " + + "partition not found or partitionId mismatch in " + + "zookeeper metadata.", + partitionName, tableBucket.getPartitionId(), tablePath)); + } + } + LogTablet logTablet = LogTablet.create( dataDir, @@ -535,17 +558,19 @@ public void run() { loadLog(dataDir, tabletDir, cleanShutdown, recoveryPoints, conf, clock); } catch (Exception e) { LOG.error("Fail to loadLog from {}", tabletDir, e); - if (e instanceof SchemaNotExistException) { + if (e instanceof SchemaNotExistException + || e instanceof PartitionNotExistException) { LOG.error( - "schema not exist, table for {} has already been dropped, the residual data will be removed.", + "Table or partition for {} has already been dropped, the residual data will be removed.", tabletDir, e); FileUtils.deleteDirectoryQuietly(tabletDir); - // Also delete corresponding KV tablet directory if it exists try { Tuple2 pathAndBucket = FlussPaths.parseTabletDir(tabletDir); + + // Also delete corresponding KV tablet directory if it exists File kvTabletDir = FlussPaths.kvTabletDir( dataDir, pathAndBucket.f0, pathAndBucket.f1); @@ -555,11 +580,24 @@ public void run() { kvTabletDir); FileUtils.deleteDirectoryQuietly(kvTabletDir); } - } catch (Exception kvDeleteException) { + + boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null; + File partitionDir = tabletDir.getParentFile(); + if (partitionDir != null) { + deleteEmptyDirQuietly(partitionDir); + + if (isPartitioned) { + File tableDir = partitionDir.getParentFile(); + if (tableDir != null) { + deleteEmptyDirQuietly(tableDir); + } + } + } + } catch (Exception cleanupException) { LOG.warn( - "Failed to delete corresponding KV tablet directory for log {}: {}", + "Failed to clean up residual KV/parent directories for {}: {}", tabletDir, - kvDeleteException.getMessage()); + cleanupException.getMessage()); } return; } @@ -607,6 +645,18 @@ private LogRecoveryTask( } } + private static void deleteEmptyDirQuietly(File dir) { + try { + Files.delete(dir.toPath()); + } catch (DirectoryNotEmptyException e) { + LOG.warn("Directory {} is not empty, skipping deletion.", dir); + } catch (NoSuchFileException ignored) { + // Already gone — fine. + } catch (IOException e) { + LOG.warn("Failed to delete empty directory {}: {}", dir, e.getMessage()); + } + } + private static final class LogShutdownTask { private final File dataDir; private final List logs; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 0aaea033a4..280e34124d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -124,7 +125,9 @@ import java.io.File; import java.io.IOException; +import java.nio.file.DirectoryNotEmptyException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -145,7 +148,6 @@ import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2; import static org.apache.fluss.server.TabletManagerBase.getTableInfo; -import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -978,7 +980,20 @@ public void stopReplicas( TableBucket tb = data.getTableBucket(); HostedReplica hostedReplica = getReplica(tb); if (hostedReplica instanceof NoneReplica) { - // do nothing fort this case. + if (data.isDeleteLocal()) { + try { + sweepOrphanTabletDirs(tb, deletedTableIds, deletedPartitionIds); + } catch (Exception e) { + LOG.error( + "Failed to sweep orphan tablet directories for {}", + tb, + e); + result.add( + new StopReplicaResultForBucket( + tb, ApiError.fromThrowable(e))); + continue; + } + } result.add(new StopReplicaResultForBucket(tb)); } else if (hostedReplica instanceof OfflineReplica) { LOG.warn( @@ -1927,6 +1942,59 @@ private StopReplicaResultForBucket stopReplica( return new StopReplicaResultForBucket(tb); } + /** + * Remove on-disk tablet directories for a bucket that the in-memory ReplicaManager does not + * know about. This handles the case where a stopReplica(delete=true) arrives after the + * TabletServer was restarted during a delete — LogManager loaded the log at startup but no + * NotifyLeaderAndIsr ever ran, so allReplicas is empty. + */ + private void sweepOrphanTabletDirs( + TableBucket tb, Map deletedTableIds, Map deletedPartitionIds) { + Optional orphanLog = logManager.getLog(tb); + if (!orphanLog.isPresent()) { + return; + } + + LogTablet logTablet = orphanLog.get(); + File dataDir = logTablet.getDataDir(); + PhysicalTablePath physicalTablePath = logTablet.getPhysicalTablePath(); + Path tabletParentDir = logManager.getTabletParentDir(dataDir, physicalTablePath, tb); + + // Clean KV before log so that if KV cleanup fails, the log is still + // present and a coordinator retry can re-enter this method. + boolean isKvTable = false; + if (kvManager.getKv(tb).isPresent()) { + kvManager.dropKv(tb); + isKvTable = true; + } else { + File kvTabletDir = FlussPaths.kvTabletDir(dataDir, physicalTablePath, tb); + if (kvTabletDir.exists()) { + isKvTable = true; + try { + FileUtils.deleteDirectory(kvTabletDir); + } catch (IOException e) { + throw new KvStorageException( + String.format( + "Failed to delete orphan KV tablet directory %s", kvTabletDir), + e); + } + } + } + + logManager.dropLog(tb); + + localDiskManager.recordReplicaDelete(dataDir, isKvTable); + + if (tb.getPartitionId() != null) { + deletedPartitionIds.put(tb.getPartitionId(), tabletParentDir); + deletedTableIds.put(tb.getTableId(), tabletParentDir.getParent()); + } else { + deletedTableIds.put(tb.getTableId(), tabletParentDir); + } + + LOG.info("Swept orphan tablet directories for bucket {}", tb); + } + private void truncateToHighWatermark(List replicas) { for (Replica replica : replicas) { long highWatermark = replica.getLogTablet().getHighWatermark(); @@ -1960,14 +2028,19 @@ private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, Strin } private void dropEmptyTableOrPartitionDir(Path dir, long id, String dirType) { - if (!Files.exists(dir) || !isDirectoryEmpty(dir)) { - return; - } - - LOG.info("Drop empty {} dir '{}' of {} id {}.", dirType, dir, dirType, id); try { - FileUtils.deleteDirectory(dir.toFile()); - } catch (Exception e) { + Files.delete(dir); + LOG.info("Dropped empty {} dir '{}' of {} id {}.", dirType, dir, dirType, id); + } catch (DirectoryNotEmptyException e) { + LOG.warn( + "{} dir '{}' of {} id {} is not empty, skipping deletion.", + dirType, + dir, + dirType, + id); + } catch (NoSuchFileException ignored) { + // Already gone — fine. + } catch (IOException e) { LOG.error("Failed to delete empty {} dir '{}' of {} id {}.", dirType, dir, dirType, e); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java index 2abec7264b..86e2fb8f5f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java @@ -19,31 +19,42 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.server.testutils.RpcMessageTestUtils; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.File; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; import static org.assertj.core.api.Assertions.assertThat; @@ -132,6 +143,133 @@ void testStopReplica(boolean isPkTable) throws Exception { retryUtilReplicaNotExist(tb, isr2, tableDirs2); } + @Test + void testDropTableCleansOrphanDirsOnTabletServerRestart() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_table"); + long tableId = + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + List isr = waitAndGetIsr(tb); + + // Write data so that actual segment files exist on every replica. + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get(); + + int offlineServerId = isr.get(0); + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager(); + Replica replica = replicaManager.getReplicaOrException(tb); + Path offlineTsTableDir = replica.getTabletParentDir(); + File offlineTsLogDir = replica.getLogTablet().getLogDir(); + assertThat(offlineTsTableDir).exists(); + assertThat(offlineTsLogDir).exists(); + // Verify actual data files (.log, .index) exist under the log directory. + assertThat(offlineTsLogDir.listFiles()).isNotEmpty(); + + FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2); + + coordinatorGateway + .dropTable( + RpcMessageTestUtils.newDropTableRequest( + tablePath.getDatabaseName(), tablePath.getTableName(), false)) + .get(); + assertThat(zkClient.tableExist(tablePath)).isFalse(); + + FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + + // Both the log directory (with data files) and the table parent directory + // should be cleaned at startup via the SchemaNotExistException handler. + retry( + Duration.ofMinutes(1), + () -> { + assertThat(offlineTsLogDir).doesNotExist(); + assertThat(offlineTsTableDir).doesNotExist(); + }); + } + + @Test + void testDropPartitionCleansOrphanDirsOnTabletServerRestart() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_partition"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build()) + .distributedBy(1) + .partitionedBy("b") + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3) + .build(); + long tableId = + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + String partitionName = "p1"; + long partitionId = + RpcMessageTestUtils.createPartition( + FLUSS_CLUSTER_EXTENSION, + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false); + TableBucket tb = new TableBucket(tableId, partitionId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + List isr = waitAndGetIsr(tb); + + // Write data so that actual segment files exist on every replica. + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get(); + + int offlineServerId = isr.get(0); + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager(); + Replica replica = replicaManager.getReplicaOrException(tb); + Path offlineTsPartitionDir = replica.getTabletParentDir(); + File offlineTsLogDir = replica.getLogTablet().getLogDir(); + assertThat(offlineTsPartitionDir).exists(); + assertThat(offlineTsLogDir).exists(); + assertThat(offlineTsLogDir.listFiles()).isNotEmpty(); + + FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2); + + coordinatorGateway + .dropPartition( + RpcMessageTestUtils.newDropPartitionRequest( + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false)) + .get(); + + FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + + // Both the log directory (with data files) and the partition parent + // directory should be cleaned at startup via the PartitionNotExistException handler. + retry( + Duration.ofMinutes(1), + () -> { + assertThat(offlineTsLogDir).doesNotExist(); + assertThat(offlineTsPartitionDir).doesNotExist(); + }); + } + private List waitAndGetIsr(TableBucket tb) { LeaderAndIsr leaderAndIsr = waitValue( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index eea0d5eb12..ff6b0246ee 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -28,6 +28,7 @@ import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; @@ -69,6 +70,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; +import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.metadata.ClusterMetadata; @@ -96,6 +98,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -2545,4 +2548,41 @@ void testHighWatermarkCheckpointIsWrittenPerDirectory() throws Exception { assertThat(checkpoint2).containsOnlyKeys(new TableBucket(DATA1_TABLE_ID, 1)); assertThat(checkpoint2.get(new TableBucket(DATA1_TABLE_ID, 1))).isEqualTo(10L); } + + @Test + void testStopReplicaSweepsOrphanDirsForNoneReplica() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH); + + // Create a log directly via LogManager without going through ReplicaManager. + // This simulates the state after TS restart where LogManager loaded the log + // but no NotifyLeaderAndIsr arrived (so allReplicas is empty → NoneReplica). + File dataDir = localDiskManager.dataDirs().get(0); + LogTablet logTablet = + logManager.getOrCreateLog( + dataDir, physicalTablePath, tb, LogFormat.ARROW, 1, false); + File logDir = logTablet.getLogDir(); + Path tableDir = logManager.getTabletParentDir(dataDir, physicalTablePath, tb); + assertThat(logDir).exists(); + assertThat(tableDir).exists(); + + // Verify the bucket is NoneReplica (not in allReplicas). + assertThat(replicaManager.getReplica(tb)).isInstanceOf(ReplicaManager.NoneReplica.class); + + // Send stopReplicas with deleteLocal=true. This should hit the NoneReplica + // branch and invoke sweepOrphanTabletDirs to clean up the orphan log. + CompletableFuture> future = new CompletableFuture<>(); + replicaManager.stopReplicas( + INITIAL_COORDINATOR_EPOCH, + Collections.singletonList( + new StopReplicaData(tb, true, false, INITIAL_COORDINATOR_EPOCH, 0)), + future::complete); + + assertThat(future.get()).containsOnly(new StopReplicaResultForBucket(tb)); + // The log directory and table parent directory should be cleaned up. + assertThat(logDir).doesNotExist(); + assertThat(tableDir).doesNotExist(); + // LogManager should no longer hold the log. + assertThat(logManager.getLog(tb)).isNotPresent(); + } }