Skip to content

Commit d02dd5f

Browse files
committed
[feat_1.2][taier-service] fix work flow show
1 parent 12b55f6 commit d02dd5f

File tree

5 files changed

+391
-9
lines changed

5 files changed

+391
-9
lines changed

taier-common/src/main/java/com/dtstack/taier/common/exception/ErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public enum ErrorCode implements ExceptionEnums, Serializable {
4949
SFTP_NOT_FOUND(29, "sftp can not found","sftp不存在"),
5050

5151
UPDATE_EXCEPTION(30, "update exception", "更新异常"),
52+
UNSUPPORTED_OPERATION(31, "unSupported operation", "不支持的操作"),
5253
CONFIG_ERROR(51, "config error","配置错误"),
5354

5455
HTTP_CALL_ERROR(64, "http call error", "远程调用失败"),
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.dtstack.taier.develop.enums.develop;
2+
3+
4+
import com.dtstack.taier.common.exception.RdosDefineException;
5+
6+
/**
7+
* Reason:
8+
* Date: 2017/5/4
9+
* Company: www.dtstack.com
10+
* @author xuchao
11+
*/
12+
13+
public enum ESchedulePeriodType {
14+
15+
/**
16+
* 分钟
17+
*/
18+
MIN(0),
19+
20+
/**
21+
* 小时
22+
*/
23+
HOUR(1),
24+
25+
/**
26+
* 天
27+
*/
28+
DAY(2),
29+
30+
/**
31+
* 周
32+
*/
33+
WEEK(3),
34+
35+
/**
36+
* 月
37+
*/
38+
MONTH(4);
39+
40+
private int val;
41+
42+
ESchedulePeriodType(int val){
43+
this.val = val;
44+
}
45+
46+
public int getVal(){
47+
return this.val;
48+
}
49+
50+
51+
public static ESchedulePeriodType getEnumByVal(Integer val) {
52+
for (ESchedulePeriodType periodType : ESchedulePeriodType.values()) {
53+
if (periodType.getVal() == val) {
54+
return periodType;
55+
}
56+
}
57+
throw new RdosDefineException(String.format("val:%s 没有匹配到对应的周期调度类型", val));
58+
}
59+
60+
}
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package com.dtstack.taier.develop.enums.develop;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.dtstack.taier.common.exception.ErrorCode;
5+
import com.dtstack.taier.common.exception.RdosDefineException;
6+
import com.dtstack.taier.dao.domain.Task;
7+
import org.apache.commons.lang.ArrayUtils;
8+
import org.apache.commons.lang.StringUtils;
9+
10+
/**
11+
* 工作流调度属性工具类
12+
*
13+
* @author 昆卡
14+
* @version 4.3.x-SNAPSHOT
15+
* @since 2021/10/25
16+
*/
17+
18+
public enum WorkFlowScheduleConfEnum {
19+
/**
20+
* 分钟
21+
*/
22+
MIN(String.valueOf(ESchedulePeriodType.MIN.getVal())) {
23+
@Override
24+
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
25+
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME,
26+
BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME);
27+
}
28+
29+
@Override
30+
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
31+
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME,
32+
BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME, GAP_MIN_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME,
33+
PERIOD_TYPE);
34+
}
35+
},
36+
37+
/**
38+
* 小时
39+
*/
40+
HOUR(String.valueOf(ESchedulePeriodType.HOUR.getVal())) {
41+
@Override
42+
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
43+
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, BEGIN_HOUR_KEY_NAME,
44+
BEGIN_MIN_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME);
45+
}
46+
47+
@Override
48+
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
49+
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_HOUR_KEY_NAME, BEGIN_MIN_KEY_NAME,
50+
BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, GAP_HOUR_KEY_NAME, END_HOUR_KEY_NAME, END_MIN_KEY_NAME,
51+
PERIOD_TYPE);
52+
}
53+
},
54+
55+
/**
56+
* 天
57+
*/
58+
DAY(String.valueOf(ESchedulePeriodType.DAY.getVal())) {
59+
@Override
60+
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
61+
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME, MIN_KEY_NAME);
62+
}
63+
64+
@Override
65+
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
66+
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, HOUR_KEY_NAME,
67+
MIN_KEY_NAME, PERIOD_TYPE);
68+
}
69+
},
70+
71+
/**
72+
* 周
73+
*/
74+
WEEK(String.valueOf(ESchedulePeriodType.WEEK.getVal())) {
75+
@Override
76+
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
77+
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME, HOUR_KEY_NAME,
78+
MIN_KEY_NAME);
79+
}
80+
81+
@Override
82+
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
83+
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, WEEKDAY_KEY_NAME,
84+
HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE);
85+
}
86+
},
87+
88+
/**
89+
* 月
90+
*/
91+
MONTH(String.valueOf(ESchedulePeriodType.MONTH.getVal())) {
92+
@Override
93+
public void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject) {
94+
validate(oldJsonObject, newJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME, HOUR_KEY_NAME,
95+
MIN_KEY_NAME);
96+
}
97+
98+
@Override
99+
public void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject) {
100+
applyParentScheduleConf(childNodeTask, parentJsonObject, BEGIN_DATE_KEY_NAME, END_DATE_KEY_NAME, DAY_KEY_NAME,
101+
HOUR_KEY_NAME, MIN_KEY_NAME, PERIOD_TYPE);
102+
}
103+
}
104+
;
105+
/**
106+
* 开始小时键名
107+
*/
108+
private static final String BEGIN_HOUR_KEY_NAME = "beginHour";
109+
/**
110+
* 开始分钟键名
111+
*/
112+
private static final String BEGIN_MIN_KEY_NAME = "beginMin";
113+
/**
114+
* 间隔分钟键名
115+
*/
116+
private static final String GAP_MIN_KEY_NAME = "gapMin";
117+
/**
118+
* 结束小时键名
119+
*/
120+
private static final String END_HOUR_KEY_NAME = "endHour";
121+
/**
122+
* 结束分钟键名
123+
*/
124+
private static final String END_MIN_KEY_NAME = "endMin";
125+
/**
126+
* 间隔小时键名
127+
*/
128+
private static final String GAP_HOUR_KEY_NAME = "gapHour";
129+
/**
130+
* 小时键名
131+
*/
132+
private static final String HOUR_KEY_NAME = "hour";
133+
/**
134+
* 分钟键名
135+
*/
136+
private static final String MIN_KEY_NAME = "min";
137+
/**
138+
* 星期键名
139+
*/
140+
private static final String WEEKDAY_KEY_NAME = "weekDay";
141+
/**
142+
* 天键名
143+
*/
144+
private static final String DAY_KEY_NAME = "day";
145+
/**
146+
* CRON表达式键名
147+
*/
148+
private static final String CRON_KEY_NAME = "cron";
149+
/**
150+
* 开始日期键名
151+
*/
152+
private static final String BEGIN_DATE_KEY_NAME = "beginDate";
153+
/**
154+
* 结束日期键名
155+
*/
156+
private static final String END_DATE_KEY_NAME = "endDate";
157+
/**
158+
* 调度周期键名
159+
*/
160+
private static final String PERIOD_TYPE = "periodType";
161+
162+
/**
163+
* 调度周期
164+
*/
165+
private final String periodType;
166+
167+
/**
168+
* 处理工作流子节点调度配置
169+
*
170+
* @param oldJsonObject 老周期配置json对象
171+
* @param newJsonObject 新周期配置json对象
172+
*/
173+
public abstract void checkWorkFlowChildScheduleConf(JSONObject oldJsonObject, JSONObject newJsonObject);
174+
175+
/**
176+
* 处理工作流子节点调度配置,父的属性部分给到子
177+
*
178+
* @param childNodeTask 子节点任务
179+
* @param parentJsonObject 工作流周期配置json对象
180+
*/
181+
public abstract void handleWorkFlowChildScheduleConf(Task childNodeTask, JSONObject parentJsonObject);
182+
183+
/**
184+
* 获取工作流调度周期枚举对象
185+
*
186+
* @param periodType 调度周期
187+
*/
188+
public static String getCurrentPeriodType(String periodType) {
189+
if (StringUtils.isEmpty(periodType)) {
190+
throw new RdosDefineException(ErrorCode.INVALID_PARAMETERS);
191+
}
192+
for (WorkFlowScheduleConfEnum workFlowScheduleConfEnum : WorkFlowScheduleConfEnum.values()) {
193+
if (!workFlowScheduleConfEnum.getPeriodType().equals(periodType)) {
194+
continue;
195+
}
196+
return workFlowScheduleConfEnum.name();
197+
}
198+
throw new RdosDefineException("未知的调度周期");
199+
}
200+
201+
private static void validate(JSONObject oldJsonObject, JSONObject newJsonObject, String... keyNameArray) {
202+
if (ArrayUtils.isEmpty(keyNameArray)) {
203+
return;
204+
}
205+
for (String keyName : keyNameArray) {
206+
if (!String.valueOf(oldJsonObject.getOrDefault(keyName, StringUtils.EMPTY)).equals(newJsonObject.getString(keyName))) {
207+
throw new RdosDefineException(ErrorCode.UNSUPPORTED_OPERATION);
208+
}
209+
}
210+
}
211+
212+
private static void applyParentScheduleConf(Task childNodeTask, JSONObject parentJsonObject, String... keyNameArray) {
213+
if (ArrayUtils.isEmpty(keyNameArray)) {
214+
return;
215+
}
216+
final JSONObject childJsonObject = JSONObject.parseObject(childNodeTask.getScheduleConf());
217+
for (String keyName : keyNameArray) {
218+
if (parentJsonObject.containsKey(keyName)) {
219+
childJsonObject.put(keyName, parentJsonObject.get(keyName));
220+
}
221+
}
222+
childNodeTask.setScheduleConf(childJsonObject.toJSONString());
223+
}
224+
225+
public String getPeriodType() {
226+
return periodType;
227+
}
228+
229+
WorkFlowScheduleConfEnum(String periodType) {
230+
this.periodType = periodType;
231+
}
232+
}

taier-data-develop/src/main/java/com/dtstack/taier/develop/service/develop/impl/DevelopTaskService.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import com.dtstack.taier.develop.enums.develop.FlinkVersion;
7070
import com.dtstack.taier.develop.enums.develop.SourceDTOType;
7171
import com.dtstack.taier.develop.enums.develop.TaskCreateModelType;
72+
import com.dtstack.taier.develop.enums.develop.WorkFlowScheduleConfEnum;
7273
import com.dtstack.taier.develop.mapstruct.vo.TaskDirtyDataManageTransfer;
7374
import com.dtstack.taier.develop.mapstruct.vo.TaskMapstructTransfer;
7475
import com.dtstack.taier.develop.service.console.TenantService;
@@ -1408,8 +1409,8 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc
14081409
.eq(Task::getIsDeleted, Deleted.NORMAL.getStatus())
14091410
.eq(Task::getTenantId, tenantId));
14101411

1412+
Task updateInfo = new Task();
14111413
if (Objects.isNull(taskInfo)) {
1412-
Task updateInfo = new Task();
14131414
updateInfo.setId(taskId);
14141415
updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now()));
14151416
updateInfo.setName(taskName);
@@ -1422,6 +1423,72 @@ public void editTask(Long taskId, String taskName, Long catalogueId, String desc
14221423
if (!taskId.equals(taskInfo.getId())) {
14231424
throw new RdosDefineException(ErrorCode.NAME_ALREADY_EXIST);
14241425
}
1426+
updateInfo.setGmtModified(Timestamp.valueOf(LocalDateTime.now()));
1427+
updateInfo.setName(taskName);
1428+
updateInfo.setNodePid(catalogueId);
1429+
updateInfo.setTaskDesc(desc);
1430+
updateInfo.setComponentVersion(componentVersion);
1431+
developTaskMapper.update(updateInfo, Wrappers.lambdaUpdate(Task.class).eq(Task::getId, taskInfo.getId()));
1432+
1433+
if (Objects.equals(EScheduleJobType.WORK_FLOW.getType(), taskInfo.getTaskType())){
1434+
//更新父节点目录时,同步更新子节点
1435+
if (!taskInfo.getNodePid().equals(catalogueId)) {
1436+
updateSonTaskNodePidByFlowId(taskInfo.getId(), catalogueId);
1437+
}
1438+
}
1439+
}
1440+
1441+
1442+
1443+
/**
1444+
* 更新工作流的调度信息
1445+
* 历史任务若父节点和子节点的周期不一致,则在提交时将子节点自动改为与父节点一致
1446+
*
1447+
* @param flowWorkId 工作流id
1448+
* @param newScheduleConf 新调度信息
1449+
*/
1450+
public void updateSubTaskScheduleConf(final Long flowWorkId, final JSONObject newScheduleConf) {
1451+
Task task = developTaskMapper.selectById(flowWorkId);
1452+
if (task == null) {
1453+
throw new RdosDefineException(ErrorCode.CAN_NOT_FIND_TASK);
1454+
}
1455+
final List<Task> batchTasks = this.getFlowWorkSubTasks(flowWorkId);
1456+
if (CollectionUtils.isEmpty(batchTasks)) {
1457+
return;
1458+
}
1459+
final int periodType = newScheduleConf.getInteger("periodType");
1460+
newScheduleConf.put("selfReliance", 0);
1461+
//工作流更新调度属性时,子任务同步更新
1462+
for (final Task bTask : batchTasks) {
1463+
//工作流配置的自动取消不同步子任务
1464+
newScheduleConf.remove("isExpire");
1465+
JSONObject subTaskScheduleConf = JSON.parseObject(bTask.getScheduleConf());
1466+
Boolean isFailRetry = MapUtils.getBoolean(subTaskScheduleConf, "isFailRetry", true);
1467+
Integer maxRetryNum = MapUtils.getInteger(subTaskScheduleConf, "maxRetryNum", 3);
1468+
newScheduleConf.put("isFailRetry", isFailRetry);
1469+
newScheduleConf.put("maxRetryNum", maxRetryNum);
1470+
WorkFlowScheduleConfEnum.valueOf(WorkFlowScheduleConfEnum.getCurrentPeriodType(String.valueOf(periodType)))
1471+
.handleWorkFlowChildScheduleConf(bTask, newScheduleConf);
1472+
bTask.setPeriodType(periodType);
1473+
}
1474+
for (Task batchTask : batchTasks) {
1475+
Task task1 = new Task();
1476+
task1.setScheduleConf(batchTask.getScheduleConf());
1477+
developTaskMapper.update(task1,Wrappers.lambdaUpdate(Task.class).eq(Task::getId,batchTask.getId()));
1478+
}
1479+
}
1480+
1481+
1482+
/**
1483+
* 根据父任务id,更新所有子任务的目录id
1484+
*
1485+
* @param flowId
1486+
* @param nodePid
1487+
*/
1488+
public void updateSonTaskNodePidByFlowId(Long flowId, Long nodePid) {
1489+
Task task = new Task();
1490+
task.setNodePid(nodePid);
1491+
developTaskMapper.update(task, Wrappers.lambdaUpdate(Task.class).eq(Task::getFlowId, flowId));
14251492
}
14261493

14271494
}

0 commit comments

Comments
 (0)