Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1910]Refactor dag manager to reduce bulkiness for adhoc REST calls for launch, resume and kill #3776

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

meethngala
Copy link
Contributor

@meethngala meethngala commented Sep 14, 2023

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    I am creating this PR as part of refactoring the DagManager to make it less bulky and eventually stateless (after getting rid of the in-memory references). This PR is only handling the cases for adhoc REST client calls for launch, resume and kill/cancel flows. Currently, this PR is only providing a template for the end-to-end functionality and implementing some parts of existing DagManager's responsibility. There will be subsequent PRs to implement functionalities that are missing likewise scheduler triggers via orchestrator, advancing to the next set of nodes in a Dag, cleaning up of Dags etc.

The method signatures, instantiations and return types use widely accepted and default values, but will be restricted once responsibility/functionality of each class is determined.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work. this is a massive change and it's complicated.

high-level thoughts:

  • in the future, if there's a way to break functionality into multiple PRs, that makes it much easier to review smaller ones. here you might have created the structure of the classes and their declarations w/o putting all the logic within them... just get them to type check, but otherwise throw UnsupportedOperationException
  • as described in the comments, logic needs reworking between the various methods ITO where it should live. I'll wait for that to settle, before I go over specific logic more exhaustively
  • because there are so many files, look for a way to put subsets into subpackages rather than into the flat orchestration package. maybe orchestration.proc (should also contain DagProcFactory)?
  • still, contrary to that advice, I don't believe our convention is to put exceptions in an exception sub-package. that said, those too could go into orchestration.proc (or whatever name you choose)

@meethngala meethngala force-pushed the refactor-dag-manager-to-reduce-bulkiness branch from 88e1596 to cffd894 Compare September 18, 2023 17:26
Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, first half of second review iteration

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, finishing part 2 of second review iteration

Comment on lines 58 to 61
LaunchDagProc host(DagTaskVisitor visitor) throws IOException, InstantiationException, IllegalAccessException {

return (LaunchDagProc) visitor.meet(this);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does the need arise for an impl of this method to return the specific type of the DagProc? the cast should not be necessary. and do keep in mind that DagTaskVisitor<T> is a generic, hence:

@Override
<T> T host(DagTaskVisitor<T> visitor) {
  return visitor.meet(this);
}

@@ -94,6 +103,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor
this.dagTaskStream = new DagTaskStream();
this.isRefactoredDagManagerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather you rename this variable as isMultiLeaderDagManagerEnabled otherwise the question becomes why didn't we just modify the base classes.

}

if (dag == null) {
log.error("Dag " + dagIdToResume + " was found in memory but not found in failed dag state store");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be in the above check rather than here for this scenario? If Dag is null then whatever object was sent is invalid

* @param flowName
* @param triggerTimeStamp
*/
void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving it as a note for you in case you want to document behavior in the java doc that adhoc flows triggerTimeStamp will be "0" (see code here), while "-1" indicates some error. All valid values should be >= 0 essentially.

void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);

/**
* Currently, it is handling just the resume of a {@link Dag} request via REST client for adhoc flows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and other java docs other than LAUNCH, the "for adhoc flows" is not accurate. Rather it's a resume call from REST client, but they could be resuming a scheduled flow.

* @param flowGroup
* @param flowName
* @param flowExecutionId
* @param triggerTimeStamp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the throws exception to be consistent with above

Comment on lines +17 to +19
public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);

public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when are these actions called? do you ever want to updateJobState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these will be called when we complete a job in a Dag and based on the job status, we would update it and decide on what should happen using the onJobFinish() method. The impl. for the same will be provided as part of another PR where I add the logic for AdvanceDagProc

* It is invoked after {@link DagProc#process(DagManagementStateStore)} is completed successfully.
* @param multiActiveLeaseArbiter
* @throws IOException
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from our approach to launch in that we finish the action before completing lease. Can you document the pros and cons of this method? For ex pro we don't have to do lease arbitration twice for attempting the action and doing it. However now the lease validity time needs to be much larger and include the time to contact executor and carry out the action. Let's note these details here or somewhere else appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some confusion here. The lease acquired for a DagTask happens after we have received or processed a message from the CDC stream based on what action needs to be taken on the Dag. Now, we acquire the lease... do our processing on the Dag and upon completion release the lease ... marking it as successful. I agree with the part that lease validity time needs to be increased

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right I am getting this multi-active part confused with phase 2. Let's add some comments or javadoc to denote potential failures and/or steps needed to complete this event ie: contact executor over network and response time

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, MASSIVE PR... which is looking better and better--great job, meeth!

Comment on lines 25 to 29
public Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> getDagToJobs();

public Map<String, Dag<JobExecutionPlan>> getDagIdToDags();

public Map<String, Long> getDagToSLA();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for perf, I doubt we'd want to return such massive collections. is it truly necessary to have all the mappings at once? couldn't we instead return only the one related to a particular dag ID?
e.g.:

Dag<JobExecutionPlan> getDags(String dagId);
Long getSLA(String dagId);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and it totally makes sense. I have added these additional methods to our DagManagementStateStore interface.

@@ -108,7 +108,7 @@ static DagManager.DagId generateDagId(String flowGroup, String flowName, long fl
return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
}

static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) {
public static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NBD, but I don't find a compelling reason for this, as:

new DagManager.DagId(x, y, z)

is not much different than:

generateDagId(x, y, z);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed... only difference being one takes String and the other takes long value for flowExecutionId. These are existing methods defined inside DagManagerUtils... not sure if you want me to define just one and handle the case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... if already widely used, no need to rip out a method you did not add. but if only once or twice, I suggest getting rid of it.

contrary to your reply, I do not observe this impl converting between String and long

@Slf4j
public class DagProcessingEngine {

public static final String DAG_PROCESSING_ENGINE_PREFIX = "gobblin.service.dagProcessingEngine.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need to align w/ what's in ServiceConfigKeys? if so, let's define the prefix over there and bring it in here.

Comment on lines 47 to 55
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private DagManagementStateStore dagManagementStateStore;
private ScheduledExecutorService scheduledExecutorPool;
private Config config;
private Integer numThreads;
private Integer pollingInterval;
private DagProcessingEngine.Thread [] threads;
private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many of these seem good candidates for final. what's your take?

*/

@Alpha
public abstract class DagTask<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make host a generic method rather than DagTask a generic class


@Override
public DagProc meet(ResumeDagTask resumeDagTask) {
throw new UnsupportedOperationException("Currently cannot provide resume proc");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although ResumeDagProc might still throw UOE when process() is invoked... isn't enough scaffolding in place yet to write this as:

return new ResumeDagProc(resumeDagTask);

?

public final class KillDagProc extends DagProc {

private KillDagTask killDagTask;
private DagManagementStateStore dagManagementStateStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the process method already takes this as a param... do we need to store one in addition to what will be passed to us?

Comment on lines 62 to 63
private MetricContext metricContext;
private Optional<EventSubmitter> eventSubmitter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps should these two also be args to process?

*/
@Slf4j
@Alpha
public final class KillDagProc extends DagProc {
Copy link
Contributor

@phet phet Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recall that it's DagProc<S, R>, decide what those types are for kill and then use them in the various method signatures (that currently degraded to Object)

@meethngala meethngala force-pushed the refactor-dag-manager-to-reduce-bulkiness branch from f53e6b3 to e675e50 Compare September 27, 2023 20:04
Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finishing for now... part 1 (of 2)

Comment on lines +32 to +33
* Responsible for defining the behavior of {@link DagTask} handling scenarios for launch, resume, kill, job start
* and flow completion deadlines
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than "responsible for defining the behavior..." seems more the means by which such tasks are triggered.

that said, may be too much impl. detail to mention DagTask in the javadoc, since I don't see it actually used in the code here

Comment on lines +41 to +43
* The eventTimestamp for adhoc flows will always be 0 (zero), while -1 would indicate failures.
* Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
* Future implementations will cover launch of flows through the scheduler too!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than documenting this to be merely informative, could we instead rephrase as an interface req/policy?

(overall, I'm uncertain on your overall plan for the impl's validation, so this is based solely on reading the javadoc in isolation...)

e.g. throws IllegalArgException when eventTimestamp < 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Kip's suggestion here to make it read like an interface
"Handles launching of ... via ..."
@param followed by requirements
@throws if input doesn't match



/**
* An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: strike "An interface to provide"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more description like for a Dag, allows one to update or extract its job state or ... giving some high level descriptions

Comment on lines +25 to +27
public void addDagSLA(String dagId, Long flowSla);

public Long getDagSLA(String dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have two kinds? also, since this has never been an SLA, but rather a deadline, would now be an opportunity to adopt the accurate nomenclature?


public boolean addFailedDagId(String dagId);

public boolean checkFailedDagId(String dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unclear: what is meant by "check" - for membership/existence?

Comment on lines +117 to +118
//marks lease success and releases it
dagTaskStream.complete(dagTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first off, I absolutely LOVE how simple this impl is--even as it segregates responsibility elsewhere for deciding how to process each specific task type.

that said, I wonder whether we might go farther an encapsulate the complete/commit.

e.g. the stream could construct each task w/ a:

@FunctionalInterface
interface Committer {
  void commit();
}

that would internally store the lease. the DagProcFactory would then take that from the task and preserve it in the proc, so the last step of the proc's process could call it

* @param <DagProc>
*/
@Alpha
public interface DagTaskVisitor<DagProc> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: convention would be T (or at the most DagProcType). naming it after an actual type (even though not imported) is quite confusing

abstract protected R act(S state, DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, Exception;
abstract protected void sendNotification(R result, EventSubmitter eventSubmitter) throws MaybeRetryableException, IOException;

public final void process(DagManagementStateStore dagManagementStateStore, EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually, retries may not be governed by a static count, but rather tracking to how much time remains in the lease

this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, delayRetryMillis);
log.info("Successfully processed Dag Request");
} catch (Exception ex) {
throw new RuntimeException("Cannot process Dag Request: ", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use checked exceptions sparingly, but here, one may actually make sense (rather than RuntimeException)

public KillDagTask(DagActionStore.DagAction killAction, MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {

this.killAction = killAction;
this.leaseObtainedStatusStatus = leaseObtainedStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be initialized by a call to super()?

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pausing again... part 2

this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow request");
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));

if(isMultiLeaderDagManagerEnabled) {
DagActionStore.DagAction killAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.KILL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is killFlow called in any other place? if we don't actually have a DagAction already on hand in any of the callsites, not sure it makes sense to use that in the API. perhaps it should be:

killFlow(flowGroup, flowName, flowExecutionId, produceTimestamp)

@@ -80,6 +83,8 @@ public String load(String key) throws Exception {
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
protected FlowCatalog flowCatalog;
private DagTaskStream dagTaskStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change monitor shouldn't use a task stream as such (that's exclusively for the DagProcessingEngine.) instead let it write to the DagManagement API

@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor
// this.dagTaskStream = new DagTaskStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this isn't initialized anywhere? won't we get an NPE when using it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we should use guice to bring in it in, see the DagActionStoreChangeMonitorFactory



/**
* Defines an individual task or job in a Dag.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"job" already has other connotations as what we run on executors. maybe "defines a singular task in the lifecycle of a managed Dag"?

@Alpha
public abstract class DagTask {

protected MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatusStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not private final?

Comment on lines +56 to +60
private KillDagTask killDagTask;

public KillDagProc(KillDagTask killDagTask) {
this.killDagTask = killDagTask;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: mark this final and implement via @RequiredArgsConstructor


@Override
protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
String dagToCancel = this.killDagTask.getKillDagId().toString();
Copy link
Contributor

@phet phet Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it's a String, not a dag, I'd name dagId or dagIdToCancel

anyway, for such a short impl, I'd just make a one-liner w/o an intermediate var (unless you're planning to log it)

Comment on lines +70 to +72
* and cancel the job on the executor. The return type is kept as {@link Object} since we might want to refactor
* or add more responsibility as part of the actions taken. Hence, after completing all possible scenarios,
* it will make sense to update the method signature with its appropriate type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of date

Comment on lines +81 to +89
String dagToCancel = this.killDagTask.getKillDagId().toString();

log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
killDagNode(dagNodeToCancel);
}
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by request");
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(), DagActionStore.FlowActionType.KILL);
Copy link
Contributor

@phet phet Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  1. move dagToCancel init after the for loop (and as observed on the name already, it's a dag ID, not a DAG)
  2. call dagMgmtStateStore.getDag(dagToCancel) only once
  3. decide whether still the need for an intermediate dagToCancel var


@Override
protected void sendNotification(Dag<JobExecutionPlan> dag, EventSubmitter eventSubmitter) throws MaybeRetryableException {
for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update your IDE to catch when you forget the space between for (... this happens all over this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be a part of ur change? was this done or included by mistake?

Comment on lines +180 to +183
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class);
binder.bind(DagProcFactory.class).in(Singleton.class);
binder.bind(DagProcessingEngine.class).in(Singleton.class);
binder.bind(DagTaskStream.class).in(Singleton.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need to create an OptionalBinder like I've done above OptionalBinder.newOptionalBinder(binder,MultiActiveLeaseArbiter.class); as each of these classes are instantiated only in certain cases.

Comment on lines +41 to +43
* The eventTimestamp for adhoc flows will always be 0 (zero), while -1 would indicate failures.
* Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
* Future implementations will cover launch of flows through the scheduler too!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Kip's suggestion here to make it read like an interface
"Handles launching of ... via ..."
@param followed by requirements
@throws if input doesn't match



/**
* An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more description like for a Dag, allows one to update or extract its job state or ... giving some high level descriptions

Comment on lines +25 to +27
public void addDagSLA(String dagId, Long flowSla);

public Long getDagSLA(String dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these dagStart or dagKill Deadlines? We have both configurable so may want to create separate functions for each.


/**
* An implementation of {@link DagProc} that is responsible for cleaning up {@link Dag} that has reached an end state
* likewise: FAILED, COMPLETE or CANCELED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ie: or including:

@Override
protected Object initialize(DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, IOException {
throw new UnsupportedOperationException("Not supported");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space

throw new RuntimeException("Max retry attempts reached. Cannot initialize Dag");
}

protected final R actWithRetries(S state, DagManagementStateStore dagManagementStateStore, int maxRetryCount, long delayRetryMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a generic method that deals with retries and takes a function as parameter? Seems like you are repeating code between these two functions and want to handle them the same way

@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor
// this.dagTaskStream = new DagTaskStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we should use guice to bring in it in, see the DagActionStoreChangeMonitorFactory

if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow request");
dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId));
//TODO: add a flag for if condition only if multi-active is enabled
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow request");
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conditional above should be checked first and this should be the else right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or are we trying a "dummy kill in the new refactor" if so let's add a comment that the change below doesn't carry out the action. it's still in testing.

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whew... all done w/ this review cycle

import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;

import org.jetbrains.annotations.NotNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the wrong one... are you certain?

Comment on lines +65 to +66
@Getter
private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new LinkedBlockingQueue<>();
Copy link
Contributor

@phet phet Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a. why a public getter? could it not be solved via a more-targeted method/capability?

b. why always initialize w/ the empty queue, when there's also @AllArgsConstructor?

Comment on lines +91 to +103
if(leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
dagTask = createDagTask(dagAction,
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus);
}
if (dagTask != null) {
break; // Exit the loop when dagTask is non-null
}
} catch (IOException e) {
//TODO: need to handle exceptions gracefully
throw new RuntimeException(e);
}
}
return dagTask;
Copy link
Contributor

@phet phet Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return createDagTask(...)
would be more natural control flow, than is the DagTask dagTask = null;, the break, and finally return dagTask

Comment on lines +108 to +114
private boolean add(DagActionStore.DagAction dagAction) {
return this.dagActionQueue.offer(dagAction);
}

private DagActionStore.DagAction take() {
return this.dagActionQueue.poll();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rearrange so public methods come prior to these and other private ones

*/

@Override
public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still wondering here... do we presume a timer/reminder has been set elsewhere and just triggered?

it seems this simply evaluates whether or not the deadline is exceeded, but doesn't actually take action to enforce. is that true? if so, whose responsibility is the enforcement?

* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
*/

protected JobStatus retrieveJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
Copy link
Contributor

@phet phet Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping here.... still not clear the extent of the "state" management/retrieval going on (and whether appropriate to this class), since getStatus throws UnsupportedOperationException

* Going forward, each of these in-memory references will be read/write from MySQL store.
* Thus, the {@link DagManager} would then be stateless and operate independently.
*/
@Getter(onMethod_={@Synchronized})
Copy link
Contributor

@phet phet Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering, since I usually use at field-level, not class level: will it create an accessor for each of the individual constituent collections? I didn't perceive the DagManagementStateStore interface as meant to be quite so low-level... we don't want that do we?

final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
private final Set<String> dagIdstoClean = new HashSet<>();
private Optional<DagActionStore> dagActionStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why/when would this be empty?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants