From 0a092b59d55e2071ecb450b40deb9e277c709974 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Wed, 14 Aug 2024 20:25:06 -0700 Subject: [PATCH] add a new status SKIPPED for skipped jobs and flows --- .../gobblin/service/ExecutionStatus.pdl | 5 ++++ ...bblin.service.flowexecutions.snapshot.json | 5 ++-- ...gobblin.service.flowstatuses.snapshot.json | 5 ++-- .../MySqlDagManagementStateStore.java | 2 ++ .../orchestration/proc/DagProcUtils.java | 23 ++++++++++++++++++- .../orchestration/proc/KillDagProcTest.java | 19 ++++++++++++++- 6 files changed, 53 insertions(+), 6 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl index ed9a59f889..09aa5f2931 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl @@ -49,4 +49,9 @@ enum ExecutionStatus { * Flow cancelled. */ CANCELLED + + /** + * Flow or job is skipped + */ + SKIPPED } \ No newline at end of file diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json index b2cdddc5e3..83689212a1 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json @@ -13,7 +13,7 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ], + "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ], "symbolDocs" : { "CANCELLED" : "Flow cancelled.", "COMPILED" : "Flow compiled to jobs.", @@ -23,7 +23,8 @@ "PENDING" : "Flow or job is in pending state.", "PENDING_RESUME" : "Flow or job is currently resuming.", "PENDING_RETRY" : "Flow or job is pending retry.", - "RUNNING" : "Flow or job is currently executing" + "RUNNING" : "Flow or job is currently executing.", + "SKIPPED" : "Flow or job is skipped." } }, { "type" : "record", diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json index e6a52bc835..302b691557 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json @@ -13,7 +13,7 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ], + "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ], "symbolDocs" : { "CANCELLED" : "Flow cancelled.", "COMPILED" : "Flow compiled to jobs.", @@ -23,7 +23,8 @@ "PENDING" : "Flow or job is in pending state.", "PENDING_RESUME" : "Flow or job is currently resuming.", "PENDING_RETRY" : "Flow or job is pending retry.", - "RUNNING" : "Flow or job is currently executing" + "RUNNING" : "Flow or job is currently executing.", + "SKIPPED" : "Flow or job is skipped." } }, { "type" : "record", diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b..18d9ef9a76 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -40,6 +40,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -213,6 +214,7 @@ public Optional getJobStatus(DagNodeId dagNodeId) { @Override public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException { return getDagNodes(dagId).stream() + .filter(node -> !node.getValue().getExecutionStatus().equals(ExecutionStatus.SKIPPED)) .anyMatch(node -> !FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name())); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 289454502f..1c0e068bd1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -55,6 +55,7 @@ import org.apache.gobblin.util.PropertiesUtils; import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; +import static org.apache.gobblin.service.ExecutionStatus.SKIPPED; /** @@ -159,6 +160,7 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, Properties cancelJobArgs = new Properties(); DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel); String serializedFuture = null; + Dag dag = dagManagementStateStore.getDag(dagId).get(); if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { cancelJobArgs.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, @@ -173,10 +175,11 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, } else { log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId()); } - DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get(); // add back the dag node with updated states in the store dagNodeToCancel.getValue().setExecutionStatus(CANCELLED); dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId); + traverseAndSetSkippedStatus(dag, dagNodeToCancel, dagManagementStateStore, dagId); + // send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction // that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get // into inconsistent state. @@ -187,6 +190,24 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, } } + /** + * Set execution status for all the subtree of the DagNode (excluding the node itself) to {@link ExecutionStatus#SKIPPED} + */ + private static void traverseAndSetSkippedStatus(Dag dag, Dag.DagNode node, + DagManagementStateStore dagManagementStateStore, DagManager.DagId dagId) throws IOException { + try { + dagManagementStateStore.addDagNodeState(node, DagManagerUtils.generateDagId(dag)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + for (Dag.DagNode child : dag.getChildren(node)) { + child.getValue().setExecutionStatus(SKIPPED); + dagManagementStateStore.addDagNodeState(child, dagId); + traverseAndSetSkippedStatus(dag, child, dagManagementStateStore, dagId); + } + } + public static void cancelDag(Dag dag, DagManagementStateStore dagManagementStateStore) throws IOException { List> dagNodesToCancel = dag.getNodes(); log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag)); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java index cd34b47cb9..6909feb4fc 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -180,13 +181,14 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("fg")) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); FlowCompilationValidationHelper flowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class); JobStatus jobStatus = JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId). message("Test message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); - doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any()); LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow2", @@ -226,5 +228,20 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap()); Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows)) .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + + Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job + Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job + Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job + Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job + Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled + + Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId)); + + dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + + doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any()); + Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId)); } }