diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 0a8514f274..5e3c49d13c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -101,7 +101,7 @@ public interface DagManagementStateStore { * @param dagNode dag node to be added * @param dagId dag id of the dag this dag node belongs to */ - void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException; + void updateDagNode(Dag.DagNode dagNode) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520..171ce8855e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -35,12 +35,10 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { /** - * Updates a dag node identified by the provided {@link DagManager.DagId} - * with the given {@link Dag.DagNode}. - * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one - * Refer + * Updates the {@link Dag.DagNode} with the provided value. + * Returns true if the dag node is updated successfully, false otherwise */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + boolean updateDagNode(Dag.DagNode dagNode) throws IOException; /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 45ee013c7d..c76c5d15ff 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws IOException { @Override public void deleteDag(DagManager.DagId dagId) throws IOException { - this.dagStateStore.cleanUp(dagId); - log.info("Deleted dag {}", dagId); + if (this.dagStateStore.cleanUp(dagId)) { + log.info("Deleted dag {}", dagId); + } else { + log.info("Dag deletion was tried but did not happen {}", dagId); + } } @Override @@ -158,9 +161,9 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro } @Override - public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) + public synchronized void updateDagNode(Dag.DagNode dagNode) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagNode); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697..98aa356660 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -47,9 +47,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metastore.MysqlDataSourceFactory; -import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_serde.GsonSerDe; import org.apache.gobblin.service.ServiceConfigKeys; @@ -87,12 +85,11 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; + protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)"; + protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET dag_node = ? WHERE dag_node_id = ?"; protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; - private final ContextAwareCounter totalDagCount; public MysqlDagStateStoreWithDagNodes(Config config, Map topologySpecMap) throws IOException { if (config.hasPath(CONFIG_PREFIX)) { @@ -121,23 +118,26 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory(); MetricContext metricContext = Instrumented.getMetricContext(new State(ConfigUtils.configToProperties(config)), this.getClass()); - this.totalDagCount = metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT); log.info("Instantiated {}", getClass().getSimpleName()); } @Override - public void writeCheckpoint(Dag dag) - throws IOException { - DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); - boolean newDag = false; - for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { - newDag = true; + public void writeCheckpoint(Dag dag) throws IOException { + String dagId = DagManagerUtils.generateDagId(dag).toString(); + dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT, tableName), insertStatement -> { + + for (Dag.DagNode dagNode : dag.getNodes()) { + insertStatement.setObject(1, dagNode.getValue().getId().toString()); + insertStatement.setObject(2, dagId); + insertStatement.setObject(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.addBatch(); } - } - if (newDag) { - this.totalDagCount.inc(); - } + + try { + return insertStatement.executeBatch(); + } catch (SQLException e) { + throw new IOException(String.format("Failure adding dag for %s", dagId), e); + }}, true); } @Override @@ -147,15 +147,15 @@ public void cleanUp(Dag dag) throws IOException { @Override public boolean cleanUp(DagManager.DagId dagId) throws IOException { - dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> { - try { - deleteStatement.setString(1, dagId.toString()); - return deleteStatement.executeUpdate() != 0; - } catch (SQLException e) { - throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); - this.totalDagCount.dec(); - return true; + return dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), + deleteStatement -> { + try { + deleteStatement.setString(1, dagId.toString()); + return deleteStatement.executeUpdate() != 0; + } catch (SQLException e) { + throw new IOException(String.format("Failure deleting dag for %s", dagId), e); + } + }, true); } @Override @@ -195,16 +195,15 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public boolean updateDagNode(Dag.DagNode dagNode) throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); - return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { + return dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT, tableName), updateStatement -> { try { - insertStatement.setString(1, dagNodeId); - insertStatement.setString(2, parentDagId.toString()); - insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); - return insertStatement.executeUpdate(); + updateStatement.setString(1, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + updateStatement.setString(2, dagNodeId); + return updateStatement.executeUpdate() == 1; } catch (SQLException e) { - throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); + throw new IOException(String.format("Failure updating dag node for %s", dagNodeId), e); }}, true); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 0a9f6dcd67..71693705fc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -134,7 +134,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat jobOrchestrationTimer.stop(jobMetadata); log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri); dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode); - dagManagementStateStore.addDagNodeState(dagNode, dagId); + dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); @@ -158,7 +158,6 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat public static void cancelDagNode(Dag.DagNode dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { Properties cancelJobArgs = new Properties(); - DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel); String serializedFuture = null; if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { @@ -175,12 +174,6 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId()); } DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get(); - // add back the dag node with updated states in the store - dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED); - dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId); - // send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction - // that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get - // into inconsistent state. sendCancellationEvent(dagNodeToCancel); log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture); } catch (Exception e) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index ef554f6418..cdfa3d0509 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -93,7 +93,17 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = dagManagementStateStore.getDag(getDagId()).get(); + Optional> dagOptional = dagManagementStateStore.getDag(getDagId()); + if (!dagOptional.isPresent()) { + // This may happen if another ReevaluateDagProc removed the dag after this DagProc updated the dag node status. + // The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs + // are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the + // status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried. + log.warn("Dag not found {}", getDagId()); + return; + } + + Dag dag = dagOptional.get(); onJobFinish(dagManagementStateStore, dagNode, dag); if (jobStatus.isShouldRetry()) { @@ -132,7 +142,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dagNode, ExecutionStatus executionStatus) throws IOException { dagNode.getValue().setExecutionStatus(executionStatus); - dagManagementStateStore.addDagNodeState(dagNode, getDagId()); + dagManagementStateStore.updateDagNode(dagNode); } /** diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index 3185943902..27a9dd553d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -100,14 +100,13 @@ public void testAddDag() throws Exception { Dag.DagNode dagNode2 = dag.getNodes().get(1); Dag.DagNode dagNode3 = dag2.getNodes().get(0); DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); - DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); this.dagManagementStateStore.addDag(dag); this.dagManagementStateStore.addDag(dag2); - this.dagManagementStateStore.addDagNodeState(dagNode, dagId); - this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); - this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); + this.dagManagementStateStore.updateDagNode(dagNode); + this.dagManagementStateStore.updateDagNode(dagNode2); + this.dagManagementStateStore.updateDagNode(dagNode3); Assert.assertTrue(compareLists(dag.getNodes(), this.dagManagementStateStore.getDag(dagId).get().getNodes())); Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get()); @@ -126,7 +125,7 @@ public void testAddDag() throws Exception { jobExecutionPlan.setJobFuture(Optional.of(future)); Dag.DagNode duplicateDagNode = new Dag.DagNode<>(jobExecutionPlan); - this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId); + this.dagManagementStateStore.updateDagNode(duplicateDagNode); Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index 651cc441df..6a15bf1645 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -123,7 +123,7 @@ public void testOneNextJobToRun() throws Exception { // next job is sent to spec producer Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any()); // there are two invocations, one after setting status and other after sending new job to specProducer - Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).addDagNodeState(any(), any()); + Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).updateDagNode(any()); // assert that the first job is completed Assert.assertEquals(ExecutionStatus.COMPLETE, diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java index 150bed10e6..0623d00288 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java @@ -109,7 +109,7 @@ different methods create different spec executors (e.g. MockedSpecExecutor.creat buildNaiveTopologySpec().getSpecExecutor() respectively. the result will be that after serializing/deserializing the test dag, the spec executor (and producer) type may change */ - Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any()); + Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any()); Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); }