Skip to content

Commit

Permalink
[GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag …
Browse files Browse the repository at this point in the history
…nodes correctly (#4038)

* avoid adding dag node where update only is required
* address review comments
  • Loading branch information
arjun4084346 committed Aug 29, 2024
1 parent f6ba473 commit 2695699
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface DagManagementStateStore {
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId) throws IOException;
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
public interface DagStateStoreWithDagNodes extends DagStateStore {

/**
* Updates a dag node identified by the provided {@link DagManager.DagId}
* with the given {@link Dag.DagNode}.
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
* Updates the {@link Dag.DagNode} with the provided value.
* Returns true if the dag node is updated successfully, false otherwise
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws IOException {

@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
this.dagStateStore.cleanUp(dagId);
log.info("Deleted dag {}", dagId);
if (this.dagStateStore.cleanUp(dagId)) {
log.info("Deleted dag {}", dagId);
} else {
log.info("Dag deletion was tried but did not happen {}", dagId);
}
}

@Override
Expand All @@ -158,9 +161,9 @@ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) thro
}

@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.ServiceConfigKeys;
Expand Down Expand Up @@ -87,12 +85,11 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)";
protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET dag_node = ? WHERE dag_node_id = ?";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
private final ContextAwareCounter totalDagCount;

public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topologySpecMap) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
Expand Down Expand Up @@ -121,23 +118,26 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
MetricContext metricContext =
Instrumented.getMetricContext(new State(ConfigUtils.configToProperties(config)), this.getClass());
this.totalDagCount = metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
log.info("Instantiated {}", getClass().getSimpleName());
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
newDag = true;
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
String dagId = DagManagerUtils.generateDagId(dag).toString();
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT, tableName), insertStatement -> {

for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
insertStatement.setObject(1, dagNode.getValue().getId().toString());
insertStatement.setObject(2, dagId);
insertStatement.setObject(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.addBatch();
}
}
if (newDag) {
this.totalDagCount.inc();
}

try {
return insertStatement.executeBatch();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag for %s", dagId), e);
}}, true);
}

@Override
Expand All @@ -147,15 +147,15 @@ public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {

@Override
public boolean cleanUp(DagManager.DagId dagId) throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> {
try {
deleteStatement.setString(1, dagId.toString());
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}}, true);
this.totalDagCount.dec();
return true;
return dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName),
deleteStatement -> {
try {
deleteStatement.setString(1, dagId.toString());
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}
}, true);
}

@Override
Expand Down Expand Up @@ -195,16 +195,15 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
return dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT, tableName), updateStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
return insertStatement.executeUpdate();
updateStatement.setString(1, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
updateStatement.setString(2, dagNodeId);
return updateStatement.executeUpdate() == 1;
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
throw new IOException(String.format("Failure updating dag node for %s", dagNodeId), e);
}}, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
dagManagementStateStore.addDagNodeState(dagNode, dagId);
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
Expand All @@ -158,7 +158,6 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
Properties cancelJobArgs = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
String serializedFuture = null;

if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
Expand All @@ -175,12 +174,6 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
// into inconsistent state.
sendCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,17 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
}

// get the dag after updating dag node status
Dag<JobExecutionPlan> dag = dagManagementStateStore.getDag(getDagId()).get();
Optional<Dag<JobExecutionPlan>> dagOptional = dagManagementStateStore.getDag(getDagId());
if (!dagOptional.isPresent()) {
// This may happen if another ReevaluateDagProc removed the dag after this DagProc updated the dag node status.
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
log.warn("Dag not found {}", getDagId());
return;
}

Dag<JobExecutionPlan> dag = dagOptional.get();
onJobFinish(dagManagementStateStore, dagNode, dag);

if (jobStatus.isShouldRetry()) {
Expand Down Expand Up @@ -132,7 +142,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
private void updateDagNodeStatus(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) throws IOException {
dagNode.getValue().setExecutionStatus(executionStatus);
dagManagementStateStore.addDagNodeState(dagNode, getDagId());
dagManagementStateStore.updateDagNode(dagNode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,13 @@ public void testAddDag() throws Exception {
Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());

this.dagManagementStateStore.addDag(dag);
this.dagManagementStateStore.addDag(dag2);
this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
this.dagManagementStateStore.updateDagNode(dagNode);
this.dagManagementStateStore.updateDagNode(dagNode2);
this.dagManagementStateStore.updateDagNode(dagNode3);

Assert.assertTrue(compareLists(dag.getNodes(), this.dagManagementStateStore.getDag(dagId).get().getNodes()));
Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
Expand All @@ -126,7 +125,7 @@ public void testAddDag() throws Exception {
jobExecutionPlan.setJobFuture(Optional.of(future));

Dag.DagNode<JobExecutionPlan> duplicateDagNode = new Dag.DagNode<>(jobExecutionPlan);
this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
this.dagManagementStateStore.updateDagNode(duplicateDagNode);
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testOneNextJobToRun() throws Exception {
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
// there are two invocations, one after setting status and other after sending new job to specProducer
Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).addDagNodeState(any(), any());
Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).updateDagNode(any());

// assert that the first job is completed
Assert.assertEquals(ExecutionStatus.COMPLETE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ different methods create different spec executors (e.g. MockedSpecExecutor.creat
buildNaiveTopologySpec().getSpecExecutor() respectively.
the result will be that after serializing/deserializing the test dag, the spec executor (and producer) type may change */

Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any());

Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
Expand Down

0 comments on commit 2695699

Please sign in to comment.