Skip to content

Commit 719052f

Browse files
author
Aditya Pratap Singh
committed
merged dagNodeStateStore and failedDagNodeStateStore tables
1 parent bdbf43a commit 719052f

File tree

5 files changed

+79
-40
lines changed

5 files changed

+79
-40
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.service.modules.flowgraph;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -48,6 +49,8 @@ public class Dag<T> {
4849
// Map to maintain parent to children mapping.
4950
private Map<DagNode, List<DagNode<T>>> parentChildMap;
5051
private List<DagNode<T>> nodes;
52+
@Setter
53+
private boolean isFailedDag;
5154

5255
@Setter
5356
@Deprecated // because this field is not persisted in mysql and contains information in very limited cases
@@ -259,11 +262,16 @@ public static class DagNode<T> {
259262
private T value;
260263
//List of parent Nodes that are dependencies of this Node.
261264
private List<DagNode<T>> parentNodes;
265+
private boolean isFailedDag;
262266

263267
//Constructor
264268
public DagNode(T value) {
265269
this.value = value;
266270
}
271+
public DagNode(T value,boolean isFailedDag) {
272+
this.value = value;
273+
this.isFailedDag = isFailedDag;
274+
}
267275

268276

269277
public void addParentNode(DagNode<T> node) {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore {
4040
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
4141
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
4242
*/
43-
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
43+
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException;
44+
4445

4546
/**
4647
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,8 @@ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
134134
@Override
135135
public void markDagFailed(DagManager.DagId dagId) throws IOException {
136136
Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
137+
dag.setFailedDag(true);
137138
this.failedDagStateStore.writeCheckpoint(dag);
138-
this.dagStateStore.cleanUp(dagId);
139-
// todo - updated failedDagStateStore iff cleanup returned 1
140-
// or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed`
141139
log.info("Marked dag failed {}", dagId);
142140
}
143141

@@ -161,7 +159,7 @@ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) thro
161159
@Override
162160
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
163161
throws IOException {
164-
this.dagStateStore.updateDagNode(dagId, dagNode);
162+
this.dagStateStore.updateDagNode(dagId, dagNode, false);
165163
}
166164

167165
@Override

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
7777
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
7878
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;
7979

80-
// todo add a column that tells if it is a running dag or a failed dag
81-
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
82-
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
83-
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
84-
+ "dag_node JSON NOT NULL, "
85-
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
86-
+ "PRIMARY KEY (dag_node_id), "
87-
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
88-
+ "INDEX dag_index (parent_dag_id))";
89-
90-
protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
91-
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
92-
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
93-
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
80+
protected static final String CREATE_TABLE_STATEMENT =
81+
"CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
82+
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83+
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84+
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85+
+ "is_failed_dag INT NOT NULL DEFAULT 0, "
86+
+ "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), "
87+
+ "INDEX dag_index (parent_dag_id))";
88+
89+
protected static final String INSERT_STATEMENT =
90+
"INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
91+
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
92+
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?";
93+
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?";
9494
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
9595
private final ContextAwareCounter totalDagCount;
9696

@@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
105105
DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
106106

107107
try (Connection connection = dataSource.getConnection();
108-
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
108+
PreparedStatement createStatement = connection.prepareStatement(
109+
String.format(CREATE_TABLE_STATEMENT, tableName))) {
109110
createStatement.executeUpdate();
110111
connection.commit();
111112
} catch (SQLException e) {
@@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
126127
}
127128

128129
@Override
129-
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
130-
throws IOException {
130+
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
131131
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
132132
boolean newDag = false;
133133
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
134-
if (updateDagNode(dagId, dagNode) == 1) {
134+
if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) {
135135
newDag = true;
136136
}
137137
}
@@ -153,7 +153,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
153153
return deleteStatement.executeUpdate() != 0;
154154
} catch (SQLException e) {
155155
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
156-
}}, true);
156+
}
157+
}, true);
157158
this.totalDagCount.dec();
158159
return true;
159160
}
@@ -167,7 +168,8 @@ public void cleanUp(String dagId) throws IOException {
167168
@Override
168169
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
169170
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
170-
+ "the DagManager that is replaced by DagProcessingEngine"); }
171+
+ "the DagManager that is replaced by DagProcessingEngine");
172+
}
171173

172174
@Override
173175
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
@@ -195,33 +197,37 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
195197
}
196198

197199
@Override
198-
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
200+
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag)
201+
throws IOException {
199202
String dagNodeId = dagNode.getValue().getId().toString();
200203
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
201204
try {
202205
insertStatement.setString(1, dagNodeId);
203206
insertStatement.setString(2, parentDagId.toString());
204207
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
208+
insertStatement.setInt(4, isFailedDag ? 1 : 0);
205209
return insertStatement.executeUpdate();
206210
} catch (SQLException e) {
207211
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
208-
}}, true);
212+
}
213+
}, true);
209214
}
210215

211216
@Override
212217
public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDagId) throws IOException {
213-
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
214-
getStatement.setString(1, parentDagId.toString());
215-
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
216-
try (ResultSet rs = getStatement.executeQuery()) {
217-
while (rs.next()) {
218-
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
219-
}
220-
return dagNodes;
221-
} catch (SQLException e) {
222-
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
223-
}
224-
}, true);
218+
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName),
219+
getStatement -> {
220+
getStatement.setString(1, parentDagId.toString());
221+
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
222+
try (ResultSet rs = getStatement.executeQuery()) {
223+
while (rs.next()) {
224+
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
225+
}
226+
return dagNodes;
227+
} catch (SQLException e) {
228+
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
229+
}
230+
}, true);
225231
}
226232

227233
@Override
@@ -230,7 +236,7 @@ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) t
230236
getStatement.setString(1, dagNodeId.toString());
231237
try (ResultSet rs = getStatement.executeQuery()) {
232238
if (rs.next()) {
233-
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
239+
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0),rs.getBoolean(2)));
234240
}
235241
return Optional.empty();
236242
} catch (SQLException e) {

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,30 @@ public void testAddGetAndDeleteDag() throws Exception{
137137
Assert.assertNull(this.dagStateStore.getDag(dagId1));
138138
Assert.assertNull(this.dagStateStore.getDag(dagId2));
139139
}
140+
141+
@Test
142+
public void testMarkDagAsFailed() throws Exception {
143+
//Set up initial conditions
144+
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
145+
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
146+
147+
this.dagStateStore.writeCheckpoint(dag);
148+
//Check Initial State
149+
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
150+
Assert.assertFalse(node.isFailedDag());
151+
}
152+
dag.setFailedDag(true);
153+
154+
this.dagStateStore.writeCheckpoint(dag);
155+
156+
Dag<JobExecutionPlan> updatedDag = this.dagStateStore.getDag(dagId);
157+
for (Dag.DagNode<JobExecutionPlan> node : updatedDag.getNodes()) {
158+
Assert.assertTrue(node.isFailedDag());
159+
}
160+
161+
// Cleanup
162+
dagStateStore.cleanUp(dagId);
163+
Assert.assertNull(this.dagStateStore.getDag(dagId));
164+
}
165+
140166
}

0 commit comments

Comments
 (0)