Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job #4049

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

arjun4084346
Copy link
Contributor

@arjun4084346 arjun4084346 commented Sep 5, 2024

fix merge conflicts
add tests

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    created SKIPPED execution status
    used it for the jobs that cannot be run because it's parent job is cancelled

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    updated tests in ReevaluateDagProcTest

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 41.11%. Comparing base (444f266) to head (8d5589d).
Report is 4 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4049      +/-   ##
============================================
+ Coverage     38.79%   41.11%   +2.31%     
- Complexity     1599     2201     +602     
============================================
  Files           388      480      +92     
  Lines         15998    20360    +4362     
  Branches       1585     2355     +770     
============================================
+ Hits           6207     8371    +2164     
- Misses         9293    11097    +1804     
- Partials        498      892     +394     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice improvement. it generally looks good, but let's align on whether SKIPPED is only job-level or also flow-level. once we decide, I'll take one more pass through the ReevaluateDagProcTest to read that more closely

@@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED

/**
* Flow or job is skipped
Copy link
Contributor

@phet phet Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.

@@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
executionStatus == SKIPPED) {
Copy link
Contributor

@phet phet Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear here: is "skipping" able to be reversed, so the node can later be ready? (I'm equating getNext to identifying the set of "ready" nodes.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no , skipped cannot not be reversed. this diff should not be here, ill change it. i think it might be appropriate in some draft version of this PR, but not anymore

@@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by

private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is CANCELLED last, and SKIPPED prior to COMPLETE? what of the similar idea that news of job COMPLETE might arrive after we'd already attempted cancellation or skipping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these four are the terminal statuses and once a job reaches here, further GTEs can be ignored.
we also do not expect to see two of them for the same job.
yes, there can be some combinations of events among these four, that may arrive due to race condition, but i think, in that case, it is ok for GaaS to just adhere to any of the ordering. I do not want to change the correct ordering, so I added SKIPPED before other terminal statuses, to basically support the same idea of yours - let it show complete if it is cancelled/skipped earlier.

findDependentJobs(dag, node, dependentJobs);
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about hard-coding to this static

@@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering... is this a kind of 'ping-pong'?
a. a job fails, which emits a GTE
b. the KJSM sees the GTE and then creates a DagActionType.REEVALUATE
c. this ReevaluateDagProc emits a SKIPPED GTE for all dependent jobs
d. the KJSM sees those GTEs and creates a DagActionType.REEVALUATE for each of those

I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a DagAction for reevaluating those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.

break;
case CANCELLED:
case SKIPPED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is job-level SKIPPED, due to the "ping-pong" I just described?

or is arising from a flow-level execution-status of SKIPPED. if the latter, who sets that? I thought it would be only job-level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes this needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants