Skip to content

Commit

Permalink
Fix bug around cleanup dirs, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Lo committed Sep 6, 2024
1 parent 0290123 commit cce2c6c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public CleanupResult delete(WUProcessingSpec workSpec, EventSubmitterContext eve
}

private static Map<String, Boolean> cleanupStagingDataPerTask(JobState jobState, Set<String> resourcesToClean) throws IOException {
log.warn("Clean up staging data by task is not supported, will clean up job level data instead");
log.error("Clean up staging data by task is not supported, will clean up job level data instead");

return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@
@RequiredArgsConstructor
public class CleanupResult {

@NonNull private Map<String, Boolean> attemptedCleanedDirectories;
@NonNull private Map<String, Boolean> deletionSuccessesByDirPath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -178,32 +179,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, EventSubmitterContext ev
try {
CleanupResult cleanupResult = deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
if (directoriesToClean.size() != cleanupResult.getAttemptedCleanedDirectories().size()) {
log.warn("Expected to clean up {} directories, but only cleaned up {}", directoriesToClean.size(),
cleanupResult.getAttemptedCleanedDirectories().size());
for (String dir : directoriesToClean) {
if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
log.error("Directory {} was not cleaned up, please clean up manually", dir);
}
for (String dir : directoriesToClean) {
if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
log.error("Directory {} was not cleaned up, please clean up manually", dir);
}
}
} catch (Exception e) {
log.error("Failed to cleanup work dirs", e);
}
}

private Set<String> calculateWorkDirsToDelete(String jobId, Set<String> workDirsToClean) {
protected static Set<String> calculateWorkDirsToDelete(String jobId, Set<String> workDirsToClean) {
// We want to delete the job-level directory once the job completes as well, which is the parent of the task staging/output dirs
Set<String> allDirsToClean = workDirsToClean.stream().map(workDir -> (new Path(workDir).getParent()).toString()).collect(
Collectors.toSet());
allDirsToClean.addAll(workDirsToClean);
Set<Path> allDirsToClean =
workDirsToClean.stream().map(workDir -> (new Path(workDir).getParent())).collect(Collectors.toSet());
allDirsToClean.addAll(workDirsToClean.stream().map(workDir -> new Path(workDir)).collect(Collectors.toSet()));

// Only delete directories that are associated with the current job, otherwise
return allDirsToClean.stream().filter(workDir -> {
if (!workDir.contains(jobId)) {
log.warn("Not deleting work dir {} as it does not contain the jobId {}", workDir, jobId);
Set<String> resultSet = new HashSet<>();
for (Path dir : allDirsToClean) {
if (dir.toString().contains(jobId)) {
resultSet.add(dir.toString());
} else {
log.warn("Skipping deletion of directory {} as it is not associated with job {}", dir, jobId);
}
return true;
}).collect(Collectors.toSet());
}
return resultSet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;

import java.util.HashSet;
import java.util.Set;

import org.testng.Assert;
import org.testng.annotations.Test;

public class ExecuteGobblinWorkflowImplTest {

@Test
public void testCalculateWorkDirsDeletion() {
String jobId = "jobId";
Set<String> dirsToDelete = new HashSet<>();
dirsToDelete.add("/tmp/jobId/task-staging/");
dirsToDelete.add("/tmp/jobId/task-output/");
dirsToDelete.add("/tmp/jobId/task-output/file");
dirsToDelete.add("/tmp/jobId");
dirsToDelete.add("/tmp");
Set<String> result = ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, dirsToDelete);
Assert.assertEquals(result.size(), 4);
Assert.assertTrue(result.contains("/tmp/jobId/task-output/file"));
Assert.assertTrue(result.contains("/tmp/jobId/task-output"));
Assert.assertTrue(result.contains("/tmp/jobId/task-staging"));
Assert.assertTrue(result.contains("/tmp/jobId"));
}
}

0 comments on commit cce2c6c

Please sign in to comment.