-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job #4049
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,4 +49,9 @@ enum ExecutionStatus { | |
* Flow cancelled. | ||
*/ | ||
CANCELLED | ||
|
||
/** | ||
* Flow or job is skipped | ||
*/ | ||
SKIPPED | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,7 +158,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat | |
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId); | ||
} | ||
|
||
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { | ||
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException { | ||
Properties cancelJobArgs = new Properties(); | ||
String serializedFuture = null; | ||
|
||
|
@@ -183,12 +183,34 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, | |
} | ||
} | ||
|
||
public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException { | ||
/** | ||
* Emits JOB_SKIPPED GTE for each of the dependent jobs. | ||
*/ | ||
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) { | ||
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>(); | ||
findDependentJobs(dag, node, dependentJobs); | ||
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) { | ||
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); | ||
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about hard-coding to this |
||
} | ||
} | ||
|
||
private static void findDependentJobs(Dag<JobExecutionPlan> dag, | ||
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> result) { | ||
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) { | ||
if (!result.contains(child)) { | ||
result.add(child); | ||
findDependentJobs(dag, child, result); | ||
} | ||
} | ||
} | ||
|
||
public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException { | ||
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes(); | ||
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag)); | ||
|
||
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) { | ||
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); | ||
DagProcUtils.cancelDagNode(dagNodeToCancel); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,8 +60,11 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> ini | |
protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optional<Dag.DagNode<JobExecutionPlan>>, | ||
Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { | ||
if (!dagNodeWithJobStatus.getLeft().isPresent()) { | ||
// one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process | ||
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own | ||
// One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process | ||
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own. | ||
// Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently. | ||
// In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and | ||
// create a ReevaluateDagProc. But in these cases Dag was never created or never saved. | ||
log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId); | ||
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); | ||
return; | ||
|
@@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona | |
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs | ||
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the | ||
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried. | ||
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate | ||
// dag actions for SKIPPED dependent jobs | ||
log.warn("Dag not found {}", getDagId()); | ||
return; | ||
} | ||
|
@@ -117,7 +122,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona | |
} else if (DagProcUtils.isDagFinished(dag)) { | ||
String flowEvent = DagProcUtils.calcFlowStatus(dag); | ||
dag.setFlowEvent(flowEvent); | ||
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent); | ||
DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, dag, flowEvent); | ||
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) { | ||
// todo - verify if work from PR#3641 is required | ||
dagManagementStateStore.deleteDag(getDagId()); | ||
|
@@ -159,16 +164,21 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da | |
dag.setMessage("Flow failed because job " + jobName + " failed"); | ||
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED); | ||
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode); | ||
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering... is this a kind of 'ping-pong'? I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs. |
||
break; | ||
case CANCELLED: | ||
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED); | ||
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); | ||
break; | ||
case COMPLETE: | ||
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode); | ||
if (!DagProcUtils.isDagFinished(dag)) { // this may fail when dag failure option is finish_running and some dag node has failed | ||
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId()); | ||
} | ||
break; | ||
case SKIPPED: | ||
// no action needed for a skipped job | ||
break; | ||
default: | ||
log.warn("It should not reach here. Job status {} is unexpected.", executionStatus); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by | |
|
||
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList | ||
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, | ||
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, | ||
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is CANCELLED last, and SKIPPED prior to COMPLETE? what of the similar idea that news of job COMPLETE might arrive after we'd already attempted cancellation or skipping? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these four are the terminal statuses and once a job reaches here, further GTEs can be ignored. |
||
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED); | ||
|
||
private final JobIssueEventHandler jobIssueEventHandler; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.