Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag nodes correctly #4038

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface DagManagementStateStore {
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) throws IOException;
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
public interface DagStateStoreWithDagNodes extends DagStateStore {

/**
* Updates a dag node identified by the provided {@link DagManager.DagId}
* with the given {@link Dag.DagNode}.
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
* Updates the {@link Dag.DagNode} with the provided value.
* Returns true if the dag node is updated successfully, false otherwise
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws IOException {

@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
this.dagStateStore.cleanUp(dagId);
log.info("Deleted dag {}", dagId);
if (this.dagStateStore.cleanUp(dagId)) {
log.info("Deleted dag {}", dagId);
} else {
log.info("Dag deletion was tried but did not happen {}", dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only .info level, not warn or even error? alternatively this could arguably be an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, deletion not happening can really be just because the element was already absent...
if there is some mysql exception, it would be thrown, it will not return a boolean in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be try/catch in this case with logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, i intend to not swallow exceptions.
and i am just fine if cleanup returns false. so did not do try/catch

}
}

@Override
Expand All @@ -158,9 +161,9 @@ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) thro
}

@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.ServiceConfigKeys;
Expand Down Expand Up @@ -87,12 +85,11 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)";
protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET dag_node = ? WHERE dag_node_id = ?";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
private final ContextAwareCounter totalDagCount;

public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
Expand Down Expand Up @@ -121,23 +118,26 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
MetricContext metricContext =
Instrumented.getMetricContext(new State(ConfigUtils.configToProperties(config)), this.getClass());
this.totalDagCount = metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
log.info("Instantiated {}", getClass().getSimpleName());
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
newDag = true;
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
String dagId = DagManagerUtils.generateDagId(dag).toString();
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT, tableName), insertStatement -> {

for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
insertStatement.setObject(1, dagNode.getValue().getId().toString());
insertStatement.setObject(2, dagId);
insertStatement.setObject(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.addBatch();
}
}
if (newDag) {
this.totalDagCount.inc();
}

try {
return insertStatement.executeBatch();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag for %s", dagId), e);
}}, true);
}

@Override
Expand All @@ -147,15 +147,15 @@ public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {

@Override
public boolean cleanUp(DagManager.DagId dagId) throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> {
try {
deleteStatement.setString(1, dagId.toString());
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}}, true);
this.totalDagCount.dec();
return true;
return dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName),
deleteStatement -> {
try {
deleteStatement.setString(1, dagId.toString());
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}
}, true);
}

@Override
Expand Down Expand Up @@ -195,16 +195,15 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
return dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT, tableName), updateStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
return insertStatement.executeUpdate();
updateStatement.setString(1, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
updateStatement.setString(2, dagNodeId);
return updateStatement.executeUpdate() == 1;
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
throw new IOException(String.format("Failure updating dag node for %s", dagNodeId), e);
}}, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
dagManagementStateStore.addDagNodeState(dagNode, dagId);
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
Expand All @@ -158,7 +158,6 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
Properties cancelJobArgs = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
String serializedFuture = null;

if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
Expand All @@ -175,12 +174,6 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addDag will happen only by LaunchDagProc once in the life cycle of Dag
and
updateDagNode will happen only after submitting the job to SpecProducer (to store the future) or in Reevaluate to update the status

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating the status too soon (like here), may cause a dag to delete before ReevaluateDagProc is processed.

dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
// into inconsistent state.
sendCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,17 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
}

// get the dag after updating dag node status
Dag<JobExecutionPlan> dag = dagManagementStateStore.getDag(getDagId()).get();
Optional<Dag<JobExecutionPlan>> dagOptional = dagManagementStateStore.getDag(getDagId());
if (!dagOptional.isPresent()) {
// This may happen if another ReevaluateDagProc removed the dag after this DagProc updated the dag node status.
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
log.warn("Dag not found {}", getDagId());
return;
}

Dag<JobExecutionPlan> dag = dagOptional.get();
onJobFinish(dagManagementStateStore, dagNode, dag);

if (jobStatus.isShouldRetry()) {
Expand Down Expand Up @@ -132,7 +142,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
private void updateDagNodeStatus(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) throws IOException {
dagNode.getValue().setExecutionStatus(executionStatus);
dagManagementStateStore.addDagNodeState(dagNode, getDagId());
dagManagementStateStore.updateDagNode(dagNode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,13 @@ public void testAddDag() throws Exception {
Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());

this.dagManagementStateStore.addDag(dag);
this.dagManagementStateStore.addDag(dag2);
this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
this.dagManagementStateStore.updateDagNode(dagNode);
this.dagManagementStateStore.updateDagNode(dagNode2);
this.dagManagementStateStore.updateDagNode(dagNode3);

Assert.assertTrue(compareLists(dag.getNodes(), this.dagManagementStateStore.getDag(dagId).get().getNodes()));
Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
Expand All @@ -126,7 +125,7 @@ public void testAddDag() throws Exception {
jobExecutionPlan.setJobFuture(Optional.of(future));

Dag.DagNode<JobExecutionPlan> duplicateDagNode = new Dag.DagNode<>(jobExecutionPlan);
this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
this.dagManagementStateStore.updateDagNode(duplicateDagNode);
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testOneNextJobToRun() throws Exception {
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
// there are two invocations, one after setting status and other after sending new job to specProducer
Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).addDagNodeState(any(), any());
Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).updateDagNode(any());

// assert that the first job is completed
Assert.assertEquals(ExecutionStatus.COMPLETE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ different methods create different spec executors (e.g. MockedSpecExecutor.creat
buildNaiveTopologySpec().getSpecExecutor() respectively.
the result will be that after serializing/deserializing the test dag, the spec executor (and producer) type may change */

Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any());

Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
Expand Down
Loading