Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Lo committed Sep 4, 2024
1 parent 6665c6f commit a2d65c4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;

Expand Down Expand Up @@ -48,5 +47,4 @@ default WorkUnitStream handle(WorkUnitStream workUnitStream) throws IOException
* @throws IOException
*/
void close() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.gobblin.destination;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.workunit.WorkUnitStream;
Expand All @@ -38,11 +36,6 @@ public WorkUnitStream handle(WorkUnitStream workUnitSteam) {
});
}

@Override
public List<String> getCleanupResources() {
return new ArrayList<>();
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;


/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
* reading in a {@link WUProcessingSpec} to determine the location of the output task states */
/** Activity for cleaning up a list of resources */
@ActivityInterface
public interface CleanupActivity {
public interface DeleteWorkDirsActivity {
/**
* Clean the list of resources specified in the input
* TODO: Generalize the input to support multiple platforms outside of just HDFS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.temporal.ddm.activity.CleanupActivity;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CleanupResult;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
Expand All @@ -57,11 +57,12 @@


@Slf4j
public class CleanupActivityImpl implements CleanupActivity {
public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
static String UNDEFINED_JOB_NAME = "<job_name_stub>";

@Override
public CleanupResult cleanup(WUProcessingSpec workSpec, EventSubmitterContext eventSubmitterContext, Set<String> resourcesToClean) {
//TODO: Emit timers to measure length of cleanup step
Optional<String> optJobName = Optional.empty();
try {
FileSystem fs = Help.loadFileSystem(workSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
Expand Down Expand Up @@ -52,7 +53,8 @@ protected Class<?>[] getWorkflowImplClasses() {

@Override
protected Object[] getActivityImplInstances() {
return new Object[] { new CommitActivityImpl(), new GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl() };
return new Object[] { new CommitActivityImpl(), new GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl(),
new DeleteWorkDirsActivityImpl()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.CleanupActivity;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
Expand Down Expand Up @@ -74,7 +74,18 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class,
GEN_WUS_ACTIVITY_OPTS);

private final CleanupActivity cleanupActivityStub = Workflow.newActivityStub(CleanupActivity.class);
private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(3))
.setMaximumInterval(Duration.ofSeconds(100))
.setBackoffCoefficient(2)
.setMaximumAttempts(4)
.build();

private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
.build();
private final DeleteWorkDirsActivity _deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS);

@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
Expand Down Expand Up @@ -107,7 +118,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
);
} finally {
// TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid flight
cleanupActivityStub.cleanup(wuSpec, eventSubmitterContext, generateWorkUnitResults.getCleanupResources());
_deleteWorkDirsActivityStub.cleanup(wuSpec, eventSubmitterContext, generateWorkUnitResults.getCleanupResources());
}
}

Expand Down

0 comments on commit a2d65c4

Please sign in to comment.