Skip to content

Commit

Permalink
add a new status SKIPPED for skipped jobs and flows
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Aug 15, 2024
1 parent e857c00 commit 0a092b5
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED

/**
* Flow or job is skipped
*/
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand Down Expand Up @@ -213,6 +214,7 @@ public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
@Override
public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
return getDagNodes(dagId).stream()
.filter(node -> !node.getValue().getExecutionStatus().equals(ExecutionStatus.SKIPPED))
.anyMatch(node -> !FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.gobblin.util.PropertiesUtils;

import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
import static org.apache.gobblin.service.ExecutionStatus.SKIPPED;


/**
Expand Down Expand Up @@ -159,6 +160,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
Properties cancelJobArgs = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
String serializedFuture = null;
Dag<JobExecutionPlan> dag = dagManagementStateStore.getDag(dagId).get();

if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
cancelJobArgs.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
Expand All @@ -173,10 +175,11 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
} else {
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
traverseAndSetSkippedStatus(dag, dagNodeToCancel, dagManagementStateStore, dagId);

// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
// into inconsistent state.
Expand All @@ -187,6 +190,24 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}

/**
* Set execution status for all the subtree of the DagNode (excluding the node itself) to {@link ExecutionStatus#SKIPPED}
*/
private static void traverseAndSetSkippedStatus(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node,
DagManagementStateStore dagManagementStateStore, DagManager.DagId dagId) throws IOException {
try {
dagManagementStateStore.addDagNodeState(node, DagManagerUtils.generateDagId(dag));
} catch (IOException e) {
throw new RuntimeException(e);
}

for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
child.getValue().setExecutionStatus(SKIPPED);
dagManagementStateStore.addDagNodeState(child, dagId);
traverseAndSetSkippedStatus(dag, child, dagManagementStateStore, dagId);
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -180,13 +181,14 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("fg"))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
FlowCompilationValidationHelper flowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class);
JobStatus
jobStatus = JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
message("Test message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();

doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());

LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow2",
Expand Down Expand Up @@ -226,5 +228,20 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());

Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job
Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job
Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled

Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));

dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);

doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any());
Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
}

0 comments on commit 0a092b5

Please sign in to comment.