Skip to content

Commit c5a8622

Browse files
committed
[feat_1.2][taier-schedule] fix operatorRecord id cycle
1 parent 4c545bd commit c5a8622

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed

taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,9 @@ public class EnvironmentContext implements InitializingBean {
244244
@Value("${plugin.path:#{systemProperties['user.dir']}/pluginLibs}")
245245
private String pluginPath;
246246

247+
@Value("${stopLimit:100000}")
248+
private Integer stopLimit;
249+
247250
@Value("${logs.limit.num:10000}")
248251
private Integer logsLimitNum;
249252

@@ -562,4 +565,8 @@ public Integer getLogsLimitNum() {
562565
public void setLogsLimitNum(Integer logsLimitNum) {
563566
this.logsLimitNum = logsLimitNum;
564567
}
568+
569+
public int getStopLimit() {
570+
return stopLimit;
571+
}
565572
}

taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public class JobStopDealer implements InitializingBean, DisposableBean {
8383
@Autowired
8484
private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService;
8585

86-
private static final int JOB_STOP_LIMIT = 1000;
8786
private static final int WAIT_INTERVAL = 3000;
8887
private static final int OPERATOR_EXPIRED_INTERVAL = 60000;
8988
private final int asyncDealStopJobQueueSize = 100;
@@ -93,7 +92,7 @@ public class JobStopDealer implements InitializingBean, DisposableBean {
9392

9493
private final DelayBlockingQueue<StoppedJob<JobElement>> stopJobQueue = new DelayBlockingQueue<StoppedJob<JobElement>>(1000);
9594
private final ExecutorService delayStopProcessorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory("delayStopProcessor"));
96-
private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(asyncDealStopJobPoolSize, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180));
95+
private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(2, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180));
9796
private final ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1, new CustomThreadFactory(this.getClass().getSimpleName()));
9897
private final DelayStopProcessor delayStopProcessor = new DelayStopProcessor();
9998
private final AcquireStopJob acquireStopJob = new AcquireStopJob();
@@ -122,8 +121,8 @@ public int addStopJobs(List<ScheduleJob> scheduleJobList, Integer isForce) {
122121
return 0;
123122
}
124123

125-
if (scheduleJobList.size() > JOB_STOP_LIMIT) {
126-
throw new RdosDefineException("please don't stop too many tasks at once, limit:" + JOB_STOP_LIMIT);
124+
if (scheduleJobList.size() > environmentContext.getStopLimit()) {
125+
throw new RdosDefineException("please don't stop too many tasks at once, limit:" + environmentContext.getStopLimit());
127126
}
128127

129128
// 分离实例是否提交到yarn上,如果提交到yarn上,需要发送请求stop,如果未提交,直接更新db

taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/scheduler/OperatorRecordJobScheduler.java

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@
1414
import com.dtstack.taier.scheduler.service.ScheduleJobService;
1515
import com.google.common.collect.Lists;
1616
import org.apache.commons.collections.CollectionUtils;
17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
1917
import org.springframework.beans.factory.annotation.Autowired;
2018

2119
import java.util.ArrayList;
20+
import java.util.Comparator;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Set;
@@ -44,39 +43,46 @@ public abstract class OperatorRecordJobScheduler extends AbstractJobSummitSchedu
4443
@Autowired
4544
private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService;
4645

46+
private Long operatorRecordStartId = 0L;
47+
4748
@Override
4849
protected List<ScheduleJobDetails> listExecJob(Long startSort, String nodeAddress, Boolean isEq) {
49-
List<ScheduleJobOperatorRecord> records = scheduleJobOperatorRecordService.listOperatorRecord(startSort, nodeAddress, getOperatorType().getType(), isEq);
50-
51-
if (CollectionUtils.isNotEmpty(records)) {
52-
Set<String> jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet());
53-
List<ScheduleJob> scheduleJobList = getScheduleJob(jobIds);
54-
55-
if (CollectionUtils.isNotEmpty(scheduleJobList)) {
56-
List<String> jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList());
57-
if (jobIds.size() != scheduleJobList.size()) {
58-
// 过滤出来已经提交运行的实例,删除操作记录
59-
List<String> deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList());
60-
removeOperatorRecord(deleteJobIdList);
61-
}
50+
List<ScheduleJobOperatorRecord> records = scheduleJobOperatorRecordService.listOperatorRecord(operatorRecordStartId, nodeAddress, getOperatorType().getType(), isEq);
51+
//empty
52+
if (CollectionUtils.isEmpty(records)) {
53+
operatorRecordStartId = 0L;
54+
return new ArrayList<>();
55+
}
6256

63-
List<String> jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList());
64-
List<ScheduleJobJob> scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys);
65-
Map<String, List<ScheduleJobJob>> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey));
66-
List<ScheduleJobDetails> scheduleJobDetailsList = new ArrayList<>(scheduleJobList.size());
57+
Set<String> jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet());
58+
List<ScheduleJob> scheduleJobList = getScheduleJob(jobIds);
6759

68-
for (ScheduleJob scheduleJob : scheduleJobList) {
69-
ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails();
70-
scheduleJobDetails.setScheduleJob(scheduleJob);
71-
scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey()));
72-
scheduleJobDetailsList.add(scheduleJobDetails);
73-
}
74-
return scheduleJobDetailsList;
75-
} else {
76-
removeOperatorRecord(Lists.newArrayList(jobIds));
77-
}
60+
if (CollectionUtils.isEmpty(scheduleJobList)) {
61+
operatorRecordStartId = 0L;
62+
removeOperatorRecord(Lists.newArrayList(jobIds));
7863
}
79-
return Lists.newArrayList();
64+
65+
//set max
66+
records.stream().max(Comparator.comparing(ScheduleJobOperatorRecord::getId))
67+
.ifPresent(scheduleJobOperatorRecord -> operatorRecordStartId = scheduleJobOperatorRecord.getId());
68+
69+
if (jobIds.size() != scheduleJobList.size()) {
70+
List<String> jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList());
71+
// 过滤出来已经提交运行的实例,删除操作记录
72+
List<String> deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList());
73+
removeOperatorRecord(deleteJobIdList);
74+
}
75+
76+
List<String> jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList());
77+
List<ScheduleJobJob> scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys);
78+
Map<String, List<ScheduleJobJob>> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey));
79+
80+
return scheduleJobList.stream().map(scheduleJob -> {
81+
ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails();
82+
scheduleJobDetails.setScheduleJob(scheduleJob);
83+
scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey()));
84+
return scheduleJobDetails;
85+
}).collect(Collectors.toList());
8086
}
8187

8288
/**

0 commit comments

Comments
 (0)