Skip to content

Commit

Permalink
merged dagNodeStateStore and failedDagNodeStateStore tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Aug 19, 2024
1 parent bdbf43a commit 719052f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.modules.flowgraph;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class Dag<T> {
// Map to maintain parent to children mapping.
private Map<DagNode, List<DagNode<T>>> parentChildMap;
private List<DagNode<T>> nodes;
@Setter
private boolean isFailedDag;

@Setter
@Deprecated // because this field is not persisted in mysql and contains information in very limited cases
Expand Down Expand Up @@ -259,11 +262,16 @@ public static class DagNode<T> {
private T value;
//List of parent Nodes that are dependencies of this Node.
private List<DagNode<T>> parentNodes;
private boolean isFailedDag;

//Constructor
public DagNode(T value) {
this.value = value;
}
public DagNode(T value,boolean isFailedDag) {
this.value = value;
this.isFailedDag = isFailedDag;
}


public void addParentNode(DagNode<T> node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore {
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException;


/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
@Override
public void markDagFailed(DagManager.DagId dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
dag.setFailedDag(true);
this.failedDagStateStore.writeCheckpoint(dag);
this.dagStateStore.cleanUp(dagId);
// todo - updated failedDagStateStore iff cleanup returned 1
// or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed`
log.info("Marked dag failed {}", dagId);
}

Expand All @@ -161,7 +159,7 @@ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) thro
@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagId, dagNode, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;

// todo add a column that tells if it is a running dag or a failed dag
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
+ "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), "
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
protected static final String CREATE_TABLE_STATEMENT =
"CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "is_failed_dag INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT =
"INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
private final ContextAwareCounter totalDagCount;

Expand All @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());

try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
PreparedStatement createStatement = connection.prepareStatement(
String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
Expand All @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) {
newDag = true;
}
}
Expand All @@ -153,7 +153,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}}, true);
}
}, true);
this.totalDagCount.dec();
return true;
}
Expand All @@ -167,7 +168,8 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
+ "the DagManager that is replaced by DagProcessingEngine"); }
+ "the DagManager that is replaced by DagProcessingEngine");
}

@Override
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
Expand Down Expand Up @@ -195,33 +197,37 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag)
throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.setInt(4, isFailedDag ? 1 : 0);
return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
}}, true);
}
}, true);
}

@Override
public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDagId) throws IOException {
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName),
getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
}

@Override
Expand All @@ -230,7 +236,7 @@ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) t
getStatement.setString(1, dagNodeId.toString());
try (ResultSet rs = getStatement.executeQuery()) {
if (rs.next()) {
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0),rs.getBoolean(2)));
}
return Optional.empty();
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,30 @@ public void testAddGetAndDeleteDag() throws Exception{
Assert.assertNull(this.dagStateStore.getDag(dagId1));
Assert.assertNull(this.dagStateStore.getDag(dagId2));
}

@Test
public void testMarkDagAsFailed() throws Exception {
//Set up initial conditions
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);

this.dagStateStore.writeCheckpoint(dag);
//Check Initial State
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
Assert.assertFalse(node.isFailedDag());
}
dag.setFailedDag(true);

this.dagStateStore.writeCheckpoint(dag);

Dag<JobExecutionPlan> updatedDag = this.dagStateStore.getDag(dagId);
for (Dag.DagNode<JobExecutionPlan> node : updatedDag.getNodes()) {
Assert.assertTrue(node.isFailedDag());
}

// Cleanup
dagStateStore.cleanUp(dagId);
Assert.assertNull(this.dagStateStore.getDag(dagId));
}

}

0 comments on commit 719052f

Please sign in to comment.