Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag nodes correctly #4038

Merged
merged 4 commits into from
Aug 29, 2024

Conversation

arjun4084346
Copy link
Contributor

@arjun4084346 arjun4084346 commented Aug 26, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    When a dag is cancelled and multiple ReevaluateDagProcs are being processed for kill dag nodes, all of them will try to delete the dag and insert the dag node state too. this "insert" and "delete" may create inconsistency in the dag state store.
    To safe guard against this, let only ReevaluateDagProc update the status of a dag node; and it should only UPDATE the status, NOT try to INSERT the dag node.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    updated tests

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@@ -174,12 +171,6 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addDag will happen only by LaunchDagProc once in the life cycle of Dag
and
updateDagNode will happen only after submitting the job to SpecProducer (to store the future) or in Reevaluate to update the status

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating the status too soon (like here), may cause a dag to delete before ReevaluateDagProc is processed.

@arjun4084346 arjun4084346 changed the title Update node [GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag nodes correctly Aug 26, 2024
if (this.dagStateStore.cleanUp(dagId)) {
log.info("Deleted dag {}", dagId);
} else {
log.info("Dag deletion was tried but did not happen {}", dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only .info level, not warn or even error? alternatively this could arguably be an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, deletion not happening can really be just because the element was already absent...
if there is some mysql exception, it would be thrown, it will not return a boolean in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be try/catch in this case with logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, i intend to not swallow exceptions.
and i am just fine if cleanup returns false. so did not do try/catch

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 38.79%. Comparing base (d58bbc0) to head (964add6).
Report is 4 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##             master    #4038   +/-   ##
=========================================
  Coverage     38.79%   38.79%           
  Complexity     1599     1599           
=========================================
  Files           388      388           
  Lines         15998    15998           
  Branches       1585     1585           
=========================================
+ Hits           6206     6207    +1     
  Misses         9293     9293           
+ Partials        499      498    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

if (this.dagStateStore.cleanUp(dagId)) {
log.info("Deleted dag {}", dagId);
} else {
log.info("Dag deletion was tried but did not happen {}", dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be try/catch in this case with logging?

@arjun4084346 arjun4084346 merged commit 2695699 into apache:master Aug 29, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants