Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.exception.FlussException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
Expand All @@ -34,6 +35,7 @@
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.storage.LocalDiskManager;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.PartitionRegistration;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
Expand All @@ -48,7 +50,9 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -386,6 +390,25 @@ private LogTablet loadLog(
PhysicalTablePath physicalTablePath = pathAndBucket.f0;
TablePath tablePath = physicalTablePath.getTablePath();
TableInfo tableInfo = getTableInfo(zkClient, tablePath);

// Table schema exists, but the partition may have been dropped (or dropped
// and recreated with a new ID) while this TS was offline.
// Validate both partition name and partition ID against ZK.
String partitionName = physicalTablePath.getPartitionName();
if (partitionName != null) {
Optional<PartitionRegistration> registration =
zkClient.getPartition(tablePath, partitionName);
if (!registration.isPresent()
|| registration.get().getPartitionId() != tableBucket.getPartitionId()) {
throw new PartitionNotExistException(
String.format(
"Failed to load partition '%s' (partitionId=%d) of table '%s': "
+ "partition not found or partitionId mismatch in "
+ "zookeeper metadata.",
partitionName, tableBucket.getPartitionId(), tablePath));
}
}

LogTablet logTablet =
LogTablet.create(
dataDir,
Expand Down Expand Up @@ -535,17 +558,19 @@ public void run() {
loadLog(dataDir, tabletDir, cleanShutdown, recoveryPoints, conf, clock);
} catch (Exception e) {
LOG.error("Fail to loadLog from {}", tabletDir, e);
if (e instanceof SchemaNotExistException) {
if (e instanceof SchemaNotExistException
|| e instanceof PartitionNotExistException) {
LOG.error(
"schema not exist, table for {} has already been dropped, the residual data will be removed.",
"Table or partition for {} has already been dropped, the residual data will be removed.",
tabletDir,
e);
FileUtils.deleteDirectoryQuietly(tabletDir);

// Also delete corresponding KV tablet directory if it exists
try {
Tuple2<PhysicalTablePath, TableBucket> pathAndBucket =
FlussPaths.parseTabletDir(tabletDir);

// Also delete corresponding KV tablet directory if it exists
File kvTabletDir =
FlussPaths.kvTabletDir(
dataDir, pathAndBucket.f0, pathAndBucket.f1);
Expand All @@ -555,11 +580,24 @@ public void run() {
kvTabletDir);
FileUtils.deleteDirectoryQuietly(kvTabletDir);
}
} catch (Exception kvDeleteException) {

boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null;
File partitionDir = tabletDir.getParentFile();
if (partitionDir != null) {
deleteEmptyDirQuietly(partitionDir);

if (isPartitioned) {
File tableDir = partitionDir.getParentFile();
if (tableDir != null) {
deleteEmptyDirQuietly(tableDir);
}
}
}
} catch (Exception cleanupException) {
LOG.warn(
"Failed to delete corresponding KV tablet directory for log {}: {}",
"Failed to clean up residual KV/parent directories for {}: {}",
tabletDir,
kvDeleteException.getMessage());
cleanupException.getMessage());
}
return;
}
Expand Down Expand Up @@ -607,6 +645,18 @@ private LogRecoveryTask(
}
}

private static void deleteEmptyDirQuietly(File dir) {
try {
Files.delete(dir.toPath());
} catch (DirectoryNotEmptyException e) {
LOG.warn("Directory {} is not empty, skipping deletion.", dir);
} catch (NoSuchFileException ignored) {
// Already gone — fine.
} catch (IOException e) {
LOG.warn("Failed to delete empty directory {}: {}", dir, e.getMessage());
}
}

private static final class LogShutdownTask {
private final File dataDir;
private final List<LogTablet> logs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidRequiredAcksException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.exception.LogOffsetOutOfRangeException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
Expand Down Expand Up @@ -124,7 +125,9 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -145,7 +148,6 @@

import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2;
import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.Preconditions.checkState;
Expand Down Expand Up @@ -978,7 +980,20 @@ public void stopReplicas(
TableBucket tb = data.getTableBucket();
HostedReplica hostedReplica = getReplica(tb);
if (hostedReplica instanceof NoneReplica) {
// do nothing fort this case.
if (data.isDeleteLocal()) {
try {
sweepOrphanTabletDirs(tb, deletedTableIds, deletedPartitionIds);
Comment thread
gyang94 marked this conversation as resolved.
} catch (Exception e) {
LOG.error(
"Failed to sweep orphan tablet directories for {}",
tb,
e);
result.add(
new StopReplicaResultForBucket(
tb, ApiError.fromThrowable(e)));
continue;
}
}
result.add(new StopReplicaResultForBucket(tb));
} else if (hostedReplica instanceof OfflineReplica) {
LOG.warn(
Expand Down Expand Up @@ -1927,6 +1942,59 @@ private StopReplicaResultForBucket stopReplica(
return new StopReplicaResultForBucket(tb);
}

/**
* Remove on-disk tablet directories for a bucket that the in-memory ReplicaManager does not
* know about. This handles the case where a stopReplica(delete=true) arrives after the
* TabletServer was restarted during a delete — LogManager loaded the log at startup but no
* NotifyLeaderAndIsr ever ran, so allReplicas is empty.
*/
private void sweepOrphanTabletDirs(
TableBucket tb, Map<Long, Path> deletedTableIds, Map<Long, Path> deletedPartitionIds) {
Optional<LogTablet> orphanLog = logManager.getLog(tb);
if (!orphanLog.isPresent()) {
return;
}

LogTablet logTablet = orphanLog.get();
File dataDir = logTablet.getDataDir();
PhysicalTablePath physicalTablePath = logTablet.getPhysicalTablePath();
Path tabletParentDir = logManager.getTabletParentDir(dataDir, physicalTablePath, tb);

// Clean KV before log so that if KV cleanup fails, the log is still
// present and a coordinator retry can re-enter this method.
boolean isKvTable = false;
if (kvManager.getKv(tb).isPresent()) {
kvManager.dropKv(tb);
isKvTable = true;
} else {
File kvTabletDir = FlussPaths.kvTabletDir(dataDir, physicalTablePath, tb);
if (kvTabletDir.exists()) {
isKvTable = true;
try {
FileUtils.deleteDirectory(kvTabletDir);
} catch (IOException e) {
throw new KvStorageException(
String.format(
"Failed to delete orphan KV tablet directory %s", kvTabletDir),
e);
}
}
}

logManager.dropLog(tb);

localDiskManager.recordReplicaDelete(dataDir, isKvTable);

if (tb.getPartitionId() != null) {
deletedPartitionIds.put(tb.getPartitionId(), tabletParentDir);
deletedTableIds.put(tb.getTableId(), tabletParentDir.getParent());
} else {
deletedTableIds.put(tb.getTableId(), tabletParentDir);
}

LOG.info("Swept orphan tablet directories for bucket {}", tb);
}

private void truncateToHighWatermark(List<Replica> replicas) {
for (Replica replica : replicas) {
long highWatermark = replica.getLogTablet().getHighWatermark();
Expand Down Expand Up @@ -1960,14 +2028,19 @@ private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, Strin
}

private void dropEmptyTableOrPartitionDir(Path dir, long id, String dirType) {
if (!Files.exists(dir) || !isDirectoryEmpty(dir)) {
return;
}

LOG.info("Drop empty {} dir '{}' of {} id {}.", dirType, dir, dirType, id);
try {
FileUtils.deleteDirectory(dir.toFile());
} catch (Exception e) {
Files.delete(dir);
LOG.info("Dropped empty {} dir '{}' of {} id {}.", dirType, dir, dirType, id);
} catch (DirectoryNotEmptyException e) {
LOG.warn(
"{} dir '{}' of {} id {} is not empty, skipping deletion.",
dirType,
dir,
dirType,
id);
} catch (NoSuchFileException ignored) {
// Already gone — fine.
} catch (IOException e) {
LOG.error("Failed to delete empty {} dir '{}' of {} id {}.", dirType, dir, dirType, e);
}
}
Expand Down
Loading