-
Notifications
You must be signed in to change notification settings - Fork 744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job #4049
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,4 +49,9 @@ enum ExecutionStatus { | |
* Flow cancelled. | ||
*/ | ||
CANCELLED | ||
|
||
/** | ||
* Flow or job is skipped | ||
*/ | ||
SKIPPED | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) | |
DagNode<JobExecutionPlan> node = nodesToExpand.poll(); | ||
ExecutionStatus executionStatus = getExecutionStatus(node); | ||
boolean addFlag = true; | ||
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) { | ||
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME || | ||
executionStatus == SKIPPED) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm unclear here: is "skipping" able to be reversed, so the node can later be ready? (I'm equating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no , skipped cannot not be reversed. this diff should not be here, ill change it. i think it might be appropriate in some draft version of this PR, but not anymore |
||
//Add a node to be executed next, only if all of its parent nodes are COMPLETE. | ||
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node); | ||
for (DagNode<JobExecutionPlan> parentNode : parentNodes) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,6 @@ | |
|
||
import org.apache.gobblin.configuration.ConfigurationKeys; | ||
import org.apache.gobblin.metrics.GobblinTrackingEvent; | ||
import org.apache.gobblin.metrics.event.EventSubmitter; | ||
import org.apache.gobblin.metrics.event.TimingEvent; | ||
import org.apache.gobblin.runtime.api.JobSpec; | ||
import org.apache.gobblin.runtime.api.Spec; | ||
|
@@ -158,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat | |
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId); | ||
} | ||
|
||
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { | ||
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException { | ||
Properties cancelJobArgs = new Properties(); | ||
String serializedFuture = null; | ||
|
||
|
@@ -183,12 +182,34 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, | |
} | ||
} | ||
|
||
public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException { | ||
/** | ||
* Emits JOB_SKIPPED GTE for each of the dependent job. | ||
arjun4084346 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) { | ||
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>(); | ||
findDependentJobs(dag, node, dependentJobs); | ||
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) { | ||
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); | ||
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about hard-coding to this |
||
} | ||
} | ||
|
||
private static void findDependentJobs(Dag<JobExecutionPlan> dag, | ||
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) { | ||
arjun4084346 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) { | ||
if (!dependentJobs.contains(child)) { | ||
dependentJobs.add(child); | ||
findDependentJobs(dag, child, dependentJobs); | ||
} | ||
} | ||
} | ||
|
||
public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException { | ||
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes(); | ||
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag)); | ||
|
||
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) { | ||
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); | ||
DagProcUtils.cancelDagNode(dagNodeToCancel); | ||
} | ||
} | ||
|
||
|
@@ -202,7 +223,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo | |
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided | ||
* flow event type. | ||
*/ | ||
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) { | ||
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) { | ||
arjun4084346 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!dag.isEmpty()) { | ||
// Every dag node will contain the same flow metadata | ||
Config config = DagUtils.getDagJobConfig(dag); | ||
|
@@ -213,7 +234,7 @@ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExe | |
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage()); | ||
} | ||
|
||
eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata); | ||
DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,8 +60,11 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> ini | |
protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optional<Dag.DagNode<JobExecutionPlan>>, | ||
Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { | ||
if (!dagNodeWithJobStatus.getLeft().isPresent()) { | ||
// one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process | ||
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own | ||
// One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process | ||
// has cleaned up the Dag, yet did not complete the lease before this current one acquired its own. | ||
// Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently. | ||
// In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and | ||
// create a ReevaluateDagProc. But in these cases Dag was never created or never saved. | ||
log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId); | ||
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); | ||
return; | ||
|
@@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona | |
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs | ||
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the | ||
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried. | ||
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate | ||
// dag actions for SKIPPED dependent jobs | ||
log.warn("Dag not found {}", getDagId()); | ||
return; | ||
} | ||
|
@@ -117,7 +122,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona | |
} else if (DagProcUtils.isDagFinished(dag)) { | ||
String flowEvent = DagProcUtils.calcFlowStatus(dag); | ||
dag.setFlowEvent(flowEvent); | ||
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent); | ||
DagProcUtils.setAndEmitFlowEvent(dag, flowEvent); | ||
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) { | ||
// todo - verify if work from PR#3641 is required | ||
dagManagementStateStore.deleteDag(getDagId()); | ||
|
@@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da | |
dag.setMessage("Flow failed because job " + jobName + " failed"); | ||
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED); | ||
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode); | ||
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering... is this a kind of 'ping-pong'? I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs. |
||
break; | ||
case CANCELLED: | ||
case SKIPPED: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is job-level or is arising from a flow-level execution-status of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, yes this needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs. |
||
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED); | ||
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); | ||
break; | ||
case COMPLETE: | ||
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.