Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job #4049

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private void runJobExecutionLauncher() throws JobException {

try {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
// todo it should emit SKIPPED_JOB event that sets the job status SKIPPED rather than CANCELLED
TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class LauncherTimings {
public static final String JOB_PENDING_RESUME = "JobPendingResume";
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
public static final String JOB_PREPARE = "JobPrepareTimer";
public static final String JOB_SKIPPED = "JobSkipped";
public static final String JOB_START = "JobStartTimer";
public static final String JOB_RUN = "JobRunTimer";
public static final String JOB_COMMIT = "JobCommitTimer";
Expand Down Expand Up @@ -76,6 +77,7 @@ public static class FlowTimings {
public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded";
public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded";
public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
public static final String FLOW_SKIPPED = "FlowSkipped";
}

public static class FlowEventConstants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe
ImmutableList.of(
createFlowCompiledEvent(),
createJobOrchestratedEvent(1, 2),
createJobSkippedEvent()
createJobSkippedTimeEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
Expand Down Expand Up @@ -836,6 +836,40 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
jobStatusMonitor.shutDown();
}

@Test
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");

//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createJobSkippedEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});

try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);

State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.SKIPPED.name());
Mockito.verify(dagManagementStateStore, Mockito.times(1)).addJobDagAction(
any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));

jobStatusMonitor.shutDown();
}

private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator,
String jobGroup, String jobName) throws IOException {
jobStatusMonitor.processMessage(recordIterator.next());
Expand Down Expand Up @@ -871,11 +905,15 @@ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int
return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
}

private GobblinTrackingEvent createJobSkippedEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_SKIPPED, Maps.newHashMap());
}

private GobblinTrackingEvent createJobStartEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
}

private GobblinTrackingEvent createJobSkippedEvent() {
private GobblinTrackingEvent createJobSkippedTimeEvent() {
return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED

/**
* Flow or job is skipped
Copy link
Contributor

@phet phet Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.

*/
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList(ExecutionStatus.FAILED.name(),
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.SKIPPED.name());
public static final int MAX_LOOKBACK = 100;

private final JobStatusRetriever jobStatusRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public interface DagManagementStateStore {
* {@link DagManagementStateStore#addDag}. This call is just an additional identifier which may be used
* for DagNode level operations. In the future, it may be merged with checkpointDag.
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
executionStatus == SKIPPED) {
Copy link
Contributor

@phet phet Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear here: is "skipping" able to be reversed, so the node can later be ready? (I'm equating getNext to identifying the set of "ready" nodes.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no , skipped cannot not be reversed. this diff should not be here, ill change it. i think it might be appropriate in some draft version of this PR, but not anymore

//Add a node to be executed next, only if all of its parent nodes are COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -158,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId);
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
Properties cancelJobArgs = new Properties();
String serializedFuture = null;

Expand All @@ -183,12 +182,34 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
arjun4084346 marked this conversation as resolved.
Show resolved Hide resolved
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
findDependentJobs(dag, node, dependentJobs);
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about hard-coding to this static

}
}

private static void findDependentJobs(Dag<JobExecutionPlan> dag,
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
arjun4084346 marked this conversation as resolved.
Show resolved Hide resolved
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
if (!dependentJobs.contains(child)) {
dependentJobs.add(child);
findDependentJobs(dag, child, dependentJobs);
}
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag));

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}
}

Expand All @@ -202,7 +223,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) {
arjun4084346 marked this conversation as resolved.
Show resolved Hide resolved
if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = DagUtils.getDagJobConfig(dag);
Expand All @@ -213,7 +234,7 @@ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExe
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
}

eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId());

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}",
DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart);
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNode);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
}

dag.get().setMessage("Flow killed by request");
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);

if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
if (dagNodeToCancel.isPresent()) {
DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel.get());
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
DagProcUtils.cancelDag(dag.get());
}
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
log.warn("Dag with id " + getDagId() + " could not be compiled or cannot run concurrently.");
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagUtils.getFlowId(dag.get()),
Dag.FlowState.RUNNING);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> ini
protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own
// One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own.
// Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently.
// In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and
// create a ReevaluateDagProc. But in these cases Dag was never created or never saved.
log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId);
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
return;
Expand Down Expand Up @@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate
// dag actions for SKIPPED dependent jobs
log.warn("Dag not found {}", getDagId());
return;
}
Expand All @@ -117,7 +122,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
} else if (DagProcUtils.isDagFinished(dag)) {
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
DagProcUtils.setAndEmitFlowEvent(dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
Expand Down Expand Up @@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering... is this a kind of 'ping-pong'?
a. a job fails, which emits a GTE
b. the KJSM sees the GTE and then creates a DagActionType.REEVALUATE
c. this ReevaluateDagProc emits a SKIPPED GTE for all dependent jobs
d. the KJSM sees those GTEs and creates a DagActionType.REEVALUATE for each of those

I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a DagAction for reevaluating those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.

break;
case CANCELLED:
case SKIPPED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is job-level SKIPPED, due to the "ping-pong" I just described?

or is arising from a flow-level execution-status of SKIPPED. if the latter, who sets that? I thought it would be only job-level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes this needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
long flowResumeTime = System.currentTimeMillis();

// Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
DagProcUtils.setAndEmitFlowEvent(failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);

for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
quotaManager.releaseQuota(dagNode);
}
}
// Send FLOW_FAILED event
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
// Send FLOW_SKIPPED event
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow is skipped because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_SKIPPED).stop(flowMetadata);
return Optional.absent();
}
}
Expand Down
Loading
Loading