Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit with high-level design for refactoring DagManager #3756

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

meethngala
Copy link
Contributor

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
The idea is make DagManager less bulky and break it into submodules which will share it responsibilities. This is one step towards achieving the statelessness in DagManager when we want to operate in a multi-active (multiple DagManager leaders) mode
Using this PR as a reference only. Will create subsequent PRs with actual implementation.

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Copy link
Contributor

@umustafi umustafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good boiler plate code, need more java docs, some pseudocode, and perhaps diagrams to really clarify what the new interactions you are suggesting are. I don't want to go too much into implementation but need a few more clues as to what you're imagining.

/**
* Responsible to performing the actual work for a given {@link DagTask}.
* It processes the {@link DagTask} by first initializing its state, performing actions
* like updating {@link DagStateStore} and finally submiting an event to the executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's differentiate between contacting the executor to carry out an action and submitting status events



/**
* Responsible to performing the actual work for a given {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"performing work and notifying other components/modules of it's imminent or completed actions"

Comment on lines +31 to +36
abstract protected S initialize() throws MaybeRetryableException;
abstract protected R act(S state) throws MaybeRetryableException;
abstract protected void sendNotification(R result) throws MaybeRetryableException;

final void process() {
throw new UnsupportedOperationException(" Process unsupported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate a little on how S, R, and these functions are used with pseudocode in process? Who is the user of DagProc? Is it sufficient just to call process with the DagTask as input or do they need to initialize, then act, etc...? Does process do all of the above instead? Does it handle retries? Can you add some java docs to these methods.

public interface DagProcFactory extends DagTaskVisitor<DagProc> {
DagProc meet(LaunchDagTask ldt);
DagProc meet(KillDagTask kdt);
DagProc meet(ResumeDagTask rdt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the SLA type actions as well from above

  • enforceFlowCompletionDeadline
  • void enforceJobStartDeadline


/**
* Defines an individual task or job in a Dag.
* It carries the state information required by {@link DagProc} to for its processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra "to"

Comment on lines +40 to +60
public void launchFlow() {

}

@Override
public void resumeFlow() {

}

@Override
public void killFlow() {

}

@Override
public void enforceFlowCompletionDeadline() {

}

@Override
public void enforceJobStartDeadline() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who calls these functions? Why are there no parameters? I initially thought DagManagement from above will be the one to obtain these objects one by one from DagTaskStream but perhaps DagManager is the user of DagTaskStream and utilizes the DagManagement interface? It would be good to include in these classes how they fit into the other classes (who is a caller of what)? A diagram might actually be super helpful at clarifying for any reader who interacts with whom in what capacity.

public interface DagTaskVisitor<T> {
T meet(LaunchDagTask launchDagTask);
T meet(KillDagTask killDagTask);
T meet(ResumeDagTask resumeDagTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing remaining SLA tasks



/**
* An implmentation of {@link DagProc} for killing {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for API call to kill or from any other pt? Good to clarify in java doc



/**
* An implmentation of {@link DagProc} for launching {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newly added launches or every time task needs to be launched on executor?

Comment on lines +22 to +23
private DagTaskStream dagTaskStream;
private DagProc dagProc;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some clarification on how these are used (see questions above). Does run below keep getting called continuously? Once the DagTask is returned do we call the DagTaskStream methods to launch/kill....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants