From 3534cf81ae1686313574318e325c9e8a395a9e8e Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Mon, 26 Aug 2024 11:26:06 -0700 Subject: [PATCH 1/4] avoid adding dag node where update only is required --- .../DagManagementStateStore.java | 2 +- .../DagStateStoreWithDagNodes.java | 8 +-- .../MySqlDagManagementStateStore.java | 11 ++-- .../MysqlDagStateStoreWithDagNodes.java | 64 ++++++++++++------- .../orchestration/proc/DagProcUtils.java | 9 +-- .../orchestration/proc/ReevaluateDagProc.java | 9 ++- .../MySqlDagManagementStateStoreTest.java | 9 ++- .../proc/ReevaluateDagProcTest.java | 2 +- .../orchestration/proc/ResumeDagProcTest.java | 2 +- 9 files changed, 64 insertions(+), 52 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 0a8514f274..5e3c49d13c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -101,7 +101,7 @@ public interface DagManagementStateStore { * @param dagNode dag node to be added * @param dagId dag id of the dag this dag node belongs to */ - void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException; + void updateDagNode(Dag.DagNode dagNode) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520..093b45432b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -35,12 +35,10 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { /** - * Updates a dag node identified by the provided {@link DagManager.DagId} - * with the given {@link Dag.DagNode}. - * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one - * Refer + * Updates the {@link Dag.DagNode} with the provided value. + * Returns 1 if the dag node is updated successfully, 0 otherwise */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(Dag.DagNode dagNode) throws IOException; /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 45ee013c7d..c76c5d15ff 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws IOException { @Override public void deleteDag(DagManager.DagId dagId) throws IOException { - this.dagStateStore.cleanUp(dagId); - log.info("Deleted dag {}", dagId); + if (this.dagStateStore.cleanUp(dagId)) { + log.info("Deleted dag {}", dagId); + } else { + log.info("Dag deletion was tried but did not happen {}", dagId); + } } @Override @@ -158,9 +161,9 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro } @Override - public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) + public synchronized void updateDagNode(Dag.DagNode dagNode) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagNode); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697..98b95e8119 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -87,8 +87,8 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; + protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)"; + protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET dag_node = ? WHERE dag_node_id = ?"; protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; @@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { - DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); - boolean newDag = false; - for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { - newDag = true; + public void writeCheckpoint(Dag dag) throws IOException { + String dagId = DagManagerUtils.generateDagId(dag).toString(); + dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT, tableName), insertStatement -> { + int dagSize = dag.getNodes().size(); + Object[][] data = new Object[dagSize][3]; + + for (int i=0; i dagNode = dag.getNodes().get(i); + data[i][0] = dagNode.getValue().getId().toString(); + data[i][1] = dagId; + data[i][2] =this.serDe.serialize(Collections.singletonList(dagNode.getValue())); } - } - if (newDag) { - this.totalDagCount.inc(); - } + + for (Object[] row : data) { + insertStatement.setObject(1, row[0]); + insertStatement.setObject(2, row[1]); + insertStatement.setObject(3, row[2]); + insertStatement.addBatch(); + } + try { + return insertStatement.executeBatch(); + } catch (SQLException e) { + throw new IOException(String.format("Failure adding dag for %s", dagId), e); + }}, true); + + this.totalDagCount.inc(); } @Override @@ -147,15 +161,18 @@ public void cleanUp(Dag dag) throws IOException { @Override public boolean cleanUp(DagManager.DagId dagId) throws IOException { - dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> { + if (dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> { try { deleteStatement.setString(1, dagId.toString()); return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); - this.totalDagCount.dec(); - return true; + }}, true)) { + this.totalDagCount.dec(); + return true; + } else { + return false; + } } @Override @@ -195,16 +212,15 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public int updateDagNode(Dag.DagNode dagNode) throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); - return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { + return dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT, tableName), updateStatement -> { try { - insertStatement.setString(1, dagNodeId); - insertStatement.setString(2, parentDagId.toString()); - insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); - return insertStatement.executeUpdate(); + updateStatement.setString(1, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + updateStatement.setString(2, dagNodeId); + return updateStatement.executeUpdate(); } catch (SQLException e) { - throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); + throw new IOException(String.format("Failure updating dag node for %s", dagNodeId), e); }}, true); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 0a9f6dcd67..71693705fc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -134,7 +134,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat jobOrchestrationTimer.stop(jobMetadata); log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri); dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode); - dagManagementStateStore.addDagNodeState(dagNode, dagId); + dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); @@ -158,7 +158,6 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat public static void cancelDagNode(Dag.DagNode dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { Properties cancelJobArgs = new Properties(); - DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel); String serializedFuture = null; if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { @@ -175,12 +174,6 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId()); } DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get(); - // add back the dag node with updated states in the store - dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED); - dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId); - // send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction - // that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get - // into inconsistent state. sendCancellationEvent(dagNodeToCancel); log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture); } catch (Exception e) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index ef554f6418..214b487bfd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -81,6 +81,11 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = dagManagementStateStore.getDag(getDagId()).get(); + dag.getNodes().stream().filter(node -> node.getValue().getId().equals(getDagNodeId())).findFirst().get().getValue() + .setExecutionStatus(executionStatus); updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus); boolean isRetry = jobStatus.isShouldRetry(); @@ -92,8 +97,6 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = dagManagementStateStore.getDag(getDagId()).get(); onJobFinish(dagManagementStateStore, dagNode, dag); if (jobStatus.isShouldRetry()) { @@ -132,7 +135,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dagNode, ExecutionStatus executionStatus) throws IOException { dagNode.getValue().setExecutionStatus(executionStatus); - dagManagementStateStore.addDagNodeState(dagNode, getDagId()); + dagManagementStateStore.updateDagNode(dagNode); } /** diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index 3185943902..27a9dd553d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -100,14 +100,13 @@ public void testAddDag() throws Exception { Dag.DagNode dagNode2 = dag.getNodes().get(1); Dag.DagNode dagNode3 = dag2.getNodes().get(0); DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); - DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2); DagNodeId dagNodeId = DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig()); this.dagManagementStateStore.addDag(dag); this.dagManagementStateStore.addDag(dag2); - this.dagManagementStateStore.addDagNodeState(dagNode, dagId); - this.dagManagementStateStore.addDagNodeState(dagNode2, dagId); - this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2); + this.dagManagementStateStore.updateDagNode(dagNode); + this.dagManagementStateStore.updateDagNode(dagNode2); + this.dagManagementStateStore.updateDagNode(dagNode3); Assert.assertTrue(compareLists(dag.getNodes(), this.dagManagementStateStore.getDag(dagId).get().getNodes())); Assert.assertEquals(dagNode, this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get()); @@ -126,7 +125,7 @@ public void testAddDag() throws Exception { jobExecutionPlan.setJobFuture(Optional.of(future)); Dag.DagNode duplicateDagNode = new Dag.DagNode<>(jobExecutionPlan); - this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId); + this.dagManagementStateStore.updateDagNode(duplicateDagNode); Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index 651cc441df..6a15bf1645 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -123,7 +123,7 @@ public void testOneNextJobToRun() throws Exception { // next job is sent to spec producer Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any()); // there are two invocations, one after setting status and other after sending new job to specProducer - Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).addDagNodeState(any(), any()); + Mockito.verify(this.dagManagementStateStore, Mockito.times(2)).updateDagNode(any()); // assert that the first job is completed Assert.assertEquals(ExecutionStatus.COMPLETE, diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java index 150bed10e6..0623d00288 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java @@ -109,7 +109,7 @@ different methods create different spec executors (e.g. MockedSpecExecutor.creat buildNaiveTopologySpec().getSpecExecutor() respectively. the result will be that after serializing/deserializing the test dag, the spec executor (and producer) type may change */ - Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any()); + Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any()); Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } From 1b58f1a3fd2dfc26710d24f439d2e2b63be80241 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Thu, 29 Aug 2024 10:24:50 -0700 Subject: [PATCH 2/4] address review comments --- .../DagStateStoreWithDagNodes.java | 2 +- .../MysqlDagStateStoreWithDagNodes.java | 49 ++++++------------- 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 093b45432b..568476d5c4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -38,7 +38,7 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Updates the {@link Dag.DagNode} with the provided value. * Returns 1 if the dag node is updated successfully, 0 otherwise */ - int updateDagNode(Dag.DagNode dagNode) throws IOException; + boolean updateDagNode(Dag.DagNode dagNode) throws IOException; /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 98b95e8119..98aa356660 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -47,9 +47,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metastore.MysqlDataSourceFactory; -import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_serde.GsonSerDe; import org.apache.gobblin.service.ServiceConfigKeys; @@ -92,7 +90,6 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; - private final ContextAwareCounter totalDagCount; public MysqlDagStateStoreWithDagNodes(Config config, Map topologySpecMap) throws IOException { if (config.hasPath(CONFIG_PREFIX)) { @@ -121,7 +118,6 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory(); MetricContext metricContext = Instrumented.getMetricContext(new State(ConfigUtils.configToProperties(config)), this.getClass()); - this.totalDagCount = metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT); log.info("Instantiated {}", getClass().getSimpleName()); } @@ -129,29 +125,19 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo public void writeCheckpoint(Dag dag) throws IOException { String dagId = DagManagerUtils.generateDagId(dag).toString(); dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT, tableName), insertStatement -> { - int dagSize = dag.getNodes().size(); - Object[][] data = new Object[dagSize][3]; - for (int i=0; i dagNode = dag.getNodes().get(i); - data[i][0] = dagNode.getValue().getId().toString(); - data[i][1] = dagId; - data[i][2] =this.serDe.serialize(Collections.singletonList(dagNode.getValue())); - } - - for (Object[] row : data) { - insertStatement.setObject(1, row[0]); - insertStatement.setObject(2, row[1]); - insertStatement.setObject(3, row[2]); + for (Dag.DagNode dagNode : dag.getNodes()) { + insertStatement.setObject(1, dagNode.getValue().getId().toString()); + insertStatement.setObject(2, dagId); + insertStatement.setObject(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); insertStatement.addBatch(); } + try { return insertStatement.executeBatch(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag for %s", dagId), e); }}, true); - - this.totalDagCount.inc(); } @Override @@ -161,18 +147,15 @@ public void cleanUp(Dag dag) throws IOException { @Override public boolean cleanUp(DagManager.DagId dagId) throws IOException { - if (dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), deleteStatement -> { - try { - deleteStatement.setString(1, dagId.toString()); - return deleteStatement.executeUpdate() != 0; - } catch (SQLException e) { - throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true)) { - this.totalDagCount.dec(); - return true; - } else { - return false; - } + return dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, tableName), + deleteStatement -> { + try { + deleteStatement.setString(1, dagId.toString()); + return deleteStatement.executeUpdate() != 0; + } catch (SQLException e) { + throw new IOException(String.format("Failure deleting dag for %s", dagId), e); + } + }, true); } @Override @@ -212,13 +195,13 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public boolean updateDagNode(Dag.DagNode dagNode) throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT, tableName), updateStatement -> { try { updateStatement.setString(1, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); updateStatement.setString(2, dagNodeId); - return updateStatement.executeUpdate(); + return updateStatement.executeUpdate() == 1; } catch (SQLException e) { throw new IOException(String.format("Failure updating dag node for %s", dagNodeId), e); }}, true); From 964add63f98408480801b9ae201ba515ac4afc9c Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Thu, 29 Aug 2024 12:06:31 -0700 Subject: [PATCH 3/4] address review comments --- .../modules/orchestration/DagStateStoreWithDagNodes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 568476d5c4..171ce8855e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -36,7 +36,7 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { /** * Updates the {@link Dag.DagNode} with the provided value. - * Returns 1 if the dag node is updated successfully, 0 otherwise + * Returns true if the dag node is updated successfully, false otherwise */ boolean updateDagNode(Dag.DagNode dagNode) throws IOException; From 319aa5943b8b00614ccba1eded30927a6867cdab Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Thu, 29 Aug 2024 14:39:56 -0700 Subject: [PATCH 4/4] address review comments --- .../orchestration/proc/ReevaluateDagProc.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index 214b487bfd..cdfa3d0509 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -81,11 +81,6 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = dagManagementStateStore.getDag(getDagId()).get(); - dag.getNodes().stream().filter(node -> node.getValue().getId().equals(getDagNodeId())).findFirst().get().getValue() - .setExecutionStatus(executionStatus); updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus); boolean isRetry = jobStatus.isShouldRetry(); @@ -97,6 +92,18 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair> dagOptional = dagManagementStateStore.getDag(getDagId()); + if (!dagOptional.isPresent()) { + // This may happen if another ReevaluateDagProc removed the dag after this DagProc updated the dag node status. + // The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs + // are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the + // status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried. + log.warn("Dag not found {}", getDagId()); + return; + } + + Dag dag = dagOptional.get(); onJobFinish(dagManagementStateStore, dagNode, dag); if (jobStatus.isShouldRetry()) {