diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 29ffc485e0..0c743c4e2e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.flowgraph; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,6 +49,8 @@ public class Dag { // Map to maintain parent to children mapping. private Map>> parentChildMap; private List> nodes; + @Setter + private boolean isFailedDag; @Setter @Deprecated // because this field is not persisted in mysql and contains information in very limited cases @@ -259,11 +262,16 @@ public static class DagNode { private T value; //List of parent Nodes that are dependencies of this Node. private List> parentNodes; + private boolean isFailedDag; //Constructor public DagNode(T value) { this.value = value; } + public DagNode(T value,boolean isFailedDag) { + this.value = value; + this.isFailedDag = isFailedDag; + } public void addParentNode(DagNode node) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520..64c11f81e1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * Refer */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException; + /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b..34fc0b15a2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -134,10 +134,8 @@ public void addDag(Dag dag) throws IOException { @Override public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); + dag.setFailedDag(true); this.failedDagStateStore.writeCheckpoint(dag); - this.dagStateStore.cleanUp(dagId); - // todo - updated failedDagStateStore iff cleanup returned 1 - // or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed` log.info("Marked dag failed {}", dagId); } @@ -161,7 +159,7 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697..121579fbab 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,20 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - // todo add a column that tells if it is a running dag or a failed dag - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " - + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " - + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " - + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; - protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; - protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag INT NOT NULL DEFAULT 0, " + + "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), " + + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; + protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { + public void writeCheckpoint(Dag dag) throws IOException { DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); boolean newDag = false; for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { + if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) { newDag = true; } } @@ -153,7 +153,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException { return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); + } + }, true); this.totalDagCount.dec(); return true; } @@ -167,7 +168,8 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine"); + } @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -195,33 +197,37 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) + throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { insertStatement.setString(1, dagNodeId); insertStatement.setString(2, parentDagId.toString()); insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.setInt(4, isFailedDag ? 1 : 0); return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); - }}, true); + } + }, true); } @Override public Set> getDagNodes(DagManager.DagId parentDagId) throws IOException { - return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { - getStatement.setString(1, parentDagId.toString()); - HashSet> dagNodes = new HashSet<>(); - try (ResultSet rs = getStatement.executeQuery()) { - while (rs.next()) { - dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); - } - return dagNodes; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); - } - }, true); + return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), + getStatement -> { + getStatement.setString(1, parentDagId.toString()); + HashSet> dagNodes = new HashSet<>(); + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); + } + }, true); } @Override @@ -230,7 +236,7 @@ public Optional> getDagNode(DagNodeId dagNodeId) t getStatement.setString(1, dagNodeId.toString()); try (ResultSet rs = getStatement.executeQuery()) { if (rs.next()) { - return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); + return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0),rs.getBoolean(2))); } return Optional.empty(); } catch (SQLException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java index 131186b3d1..4c10a0dc5f 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java @@ -137,4 +137,30 @@ public void testAddGetAndDeleteDag() throws Exception{ Assert.assertNull(this.dagStateStore.getDag(dagId1)); Assert.assertNull(this.dagStateStore.getDag(dagId2)); } + + @Test + public void testMarkDagAsFailed() throws Exception { + //Set up initial conditions + Dag dag = DagTestUtils.buildDag("test_dag", 789L); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + + this.dagStateStore.writeCheckpoint(dag); + //Check Initial State + for (Dag.DagNode node : dag.getNodes()) { + Assert.assertFalse(node.isFailedDag()); + } + dag.setFailedDag(true); + + this.dagStateStore.writeCheckpoint(dag); + + Dag updatedDag = this.dagStateStore.getDag(dagId); + for (Dag.DagNode node : updatedDag.getNodes()) { + Assert.assertTrue(node.isFailedDag()); + } + + // Cleanup + dagStateStore.cleanUp(dagId); + Assert.assertNull(this.dagStateStore.getDag(dagId)); + } + } \ No newline at end of file