Skip to content

Commit

Permalink
[feat_1.2][taier-schedule] fix operatorRecord id cycle
Browse files Browse the repository at this point in the history
  • Loading branch information
vainhope committed Jul 25, 2022
1 parent 4c545bd commit c5a8622
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ public class EnvironmentContext implements InitializingBean {
@Value("${plugin.path:#{systemProperties['user.dir']}/pluginLibs}")
private String pluginPath;

@Value("${stopLimit:100000}")
private Integer stopLimit;

@Value("${logs.limit.num:10000}")
private Integer logsLimitNum;

Expand Down Expand Up @@ -562,4 +565,8 @@ public Integer getLogsLimitNum() {
public void setLogsLimitNum(Integer logsLimitNum) {
this.logsLimitNum = logsLimitNum;
}

public int getStopLimit() {
return stopLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public class JobStopDealer implements InitializingBean, DisposableBean {
@Autowired
private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService;

private static final int JOB_STOP_LIMIT = 1000;
private static final int WAIT_INTERVAL = 3000;
private static final int OPERATOR_EXPIRED_INTERVAL = 60000;
private final int asyncDealStopJobQueueSize = 100;
Expand All @@ -93,7 +92,7 @@ public class JobStopDealer implements InitializingBean, DisposableBean {

private final DelayBlockingQueue<StoppedJob<JobElement>> stopJobQueue = new DelayBlockingQueue<StoppedJob<JobElement>>(1000);
private final ExecutorService delayStopProcessorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory("delayStopProcessor"));
private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(asyncDealStopJobPoolSize, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180));
private final ExecutorService asyncDealStopJobService = new ThreadPoolExecutor(2, asyncDealStopJobPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(asyncDealStopJobQueueSize), new CustomThreadFactory("asyncDealStopJob"), new CustomThreadRunsPolicy("asyncDealStopJob", "stop", 180));
private final ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1, new CustomThreadFactory(this.getClass().getSimpleName()));
private final DelayStopProcessor delayStopProcessor = new DelayStopProcessor();
private final AcquireStopJob acquireStopJob = new AcquireStopJob();
Expand Down Expand Up @@ -122,8 +121,8 @@ public int addStopJobs(List<ScheduleJob> scheduleJobList, Integer isForce) {
return 0;
}

if (scheduleJobList.size() > JOB_STOP_LIMIT) {
throw new RdosDefineException("please don't stop too many tasks at once, limit:" + JOB_STOP_LIMIT);
if (scheduleJobList.size() > environmentContext.getStopLimit()) {
throw new RdosDefineException("please don't stop too many tasks at once, limit:" + environmentContext.getStopLimit());
}

// 分离实例是否提交到yarn上,如果提交到yarn上,需要发送请求stop,如果未提交,直接更新db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import com.dtstack.taier.scheduler.service.ScheduleJobService;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,39 +43,46 @@ public abstract class OperatorRecordJobScheduler extends AbstractJobSummitSchedu
@Autowired
private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService;

private Long operatorRecordStartId = 0L;

@Override
protected List<ScheduleJobDetails> listExecJob(Long startSort, String nodeAddress, Boolean isEq) {
List<ScheduleJobOperatorRecord> records = scheduleJobOperatorRecordService.listOperatorRecord(startSort, nodeAddress, getOperatorType().getType(), isEq);

if (CollectionUtils.isNotEmpty(records)) {
Set<String> jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet());
List<ScheduleJob> scheduleJobList = getScheduleJob(jobIds);

if (CollectionUtils.isNotEmpty(scheduleJobList)) {
List<String> jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList());
if (jobIds.size() != scheduleJobList.size()) {
// 过滤出来已经提交运行的实例,删除操作记录
List<String> deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList());
removeOperatorRecord(deleteJobIdList);
}
List<ScheduleJobOperatorRecord> records = scheduleJobOperatorRecordService.listOperatorRecord(operatorRecordStartId, nodeAddress, getOperatorType().getType(), isEq);
//empty
if (CollectionUtils.isEmpty(records)) {
operatorRecordStartId = 0L;
return new ArrayList<>();
}

List<String> jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList());
List<ScheduleJobJob> scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys);
Map<String, List<ScheduleJobJob>> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey));
List<ScheduleJobDetails> scheduleJobDetailsList = new ArrayList<>(scheduleJobList.size());
Set<String> jobIds = records.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toSet());
List<ScheduleJob> scheduleJobList = getScheduleJob(jobIds);

for (ScheduleJob scheduleJob : scheduleJobList) {
ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails();
scheduleJobDetails.setScheduleJob(scheduleJob);
scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey()));
scheduleJobDetailsList.add(scheduleJobDetails);
}
return scheduleJobDetailsList;
} else {
removeOperatorRecord(Lists.newArrayList(jobIds));
}
if (CollectionUtils.isEmpty(scheduleJobList)) {
operatorRecordStartId = 0L;
removeOperatorRecord(Lists.newArrayList(jobIds));
}
return Lists.newArrayList();

//set max
records.stream().max(Comparator.comparing(ScheduleJobOperatorRecord::getId))
.ifPresent(scheduleJobOperatorRecord -> operatorRecordStartId = scheduleJobOperatorRecord.getId());

if (jobIds.size() != scheduleJobList.size()) {
List<String> jodExecIds = scheduleJobList.stream().map(ScheduleJob::getJobId).collect(Collectors.toList());
// 过滤出来已经提交运行的实例,删除操作记录
List<String> deleteJobIdList = jobIds.stream().filter(jobId -> !jodExecIds.contains(jobId)).collect(Collectors.toList());
removeOperatorRecord(deleteJobIdList);
}

List<String> jobKeys = scheduleJobList.stream().map(ScheduleJob::getJobKey).collect(Collectors.toList());
List<ScheduleJobJob> scheduleJobJobList = scheduleJobJobService.listByJobKeys(jobKeys);
Map<String, List<ScheduleJobJob>> jobJobMap = scheduleJobJobList.stream().collect(Collectors.groupingBy(ScheduleJobJob::getJobKey));

return scheduleJobList.stream().map(scheduleJob -> {
ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails();
scheduleJobDetails.setScheduleJob(scheduleJob);
scheduleJobDetails.setJobJobList(jobJobMap.get(scheduleJob.getJobKey()));
return scheduleJobDetails;
}).collect(Collectors.toList());
}

/**
Expand Down

0 comments on commit c5a8622

Please sign in to comment.