From 726c3569c38ef4340f095d8f4edbd09ca4736c5e Mon Sep 17 00:00:00 2001 From: vainhope <845869847@qq.com> Date: Mon, 17 Apr 2023 18:46:19 +0800 Subject: [PATCH] =?UTF-8?q?[issue=5F1045][taier-schedule]=20self-dependenc?= =?UTF-8?q?e=20day=20tasks=20fill=20data=20lose=E2=80=A6=20(#1046)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … dependencies #1045 --- .../server/builder/AbstractJobBuilder.java | 7 +- .../server/builder/FillDataJobBuilder.java | 83 +++++++++---------- 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/AbstractJobBuilder.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/AbstractJobBuilder.java index 24fb6e2794..6122decbab 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/AbstractJobBuilder.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/AbstractJobBuilder.java @@ -18,8 +18,8 @@ package com.dtstack.taier.scheduler.server.builder; -import com.dtstack.taier.common.enums.EScheduleJobType; import com.dtstack.taier.common.enums.Deleted; +import com.dtstack.taier.common.enums.EScheduleJobType; import com.dtstack.taier.common.enums.Restarted; import com.dtstack.taier.common.env.EnvironmentContext; import com.dtstack.taier.common.exception.TaierDefineException; @@ -32,8 +32,8 @@ import com.dtstack.taier.scheduler.server.ScheduleJobDetails; import com.dtstack.taier.scheduler.server.builder.cron.ScheduleConfManager; import com.dtstack.taier.scheduler.server.builder.cron.ScheduleCorn; -import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency; import com.dtstack.taier.scheduler.server.builder.dependency.DependencyManager; +import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency; import com.dtstack.taier.scheduler.service.ScheduleActionService; import com.dtstack.taier.scheduler.service.ScheduleJobService; import com.dtstack.taier.scheduler.service.ScheduleTaskShadeService; @@ -157,6 +157,9 @@ public List buildJob(ScheduleTaskShade batchTaskShade, Strin * @return 名称 */ private String getName(ScheduleTaskShade scheduleTaskShade, String name, String cycTime) { + if (StringUtils.isBlank(name)) { + return getPrefix() + "_" + scheduleTaskShade.getName() + "_" + cycTime; + } return getPrefix() + "_" + name + "_" + scheduleTaskShade.getName() + "_" + cycTime; } diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/FillDataJobBuilder.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/FillDataJobBuilder.java index 12fd191e32..f43bd88dff 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/FillDataJobBuilder.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/FillDataJobBuilder.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -75,7 +76,6 @@ public class FillDataJobBuilder extends AbstractJobBuilder { * @param endDay 每天时间范围 结束范围 * @throws Exception */ - @Transactional(rollbackFor = Exception.class) public void createFillJob(Set all, Set run, Long fillId, String fillName, String beginTime, String endTime, String startDay, String endDay) throws Exception { Date startDate = DateUtil.parseDate(startDay, DateUtil.DATE_FORMAT, Locale.CHINA); @@ -102,54 +102,52 @@ public void createFillJob(Set all, Set run, Long fillId, String fill * @param endTime 每天时间范围 结束范围 * @throws Exception */ - @Transactional(rollbackFor = Exception.class) public void buildFillDataJobGraph(String fillName, Long fillId, Set all, Set run, String triggerDay, - String beginTime, String endTime) throws Exception { + String beginTime, String endTime) throws Exception { List allList = Lists.newArrayList(all); List> partition = Lists.partition(allList, environmentContext.getJobGraphTaskLimitSize()); AtomicJobSortWorker sortWorker = new AtomicJobSortWorker(); + List saveList = Lists.newArrayList(); + CompletableFuture.allOf(partition.stream() + .map(taskKey -> + CompletableFuture.runAsync(() -> + fillTaskPartition(fillName, fillId, run, triggerDay, beginTime, endTime, allList, sortWorker, saveList, taskKey), + jobGraphBuildPool)) + .toArray(CompletableFuture[]::new)).thenAccept(a -> savaFillJob(saveList)).join(); + } - for (List taskKey : partition) { - jobGraphBuildPool.submit(() -> { - try { - List saveList = Lists.newArrayList(); - for (Long taskId : taskKey) { - try { - ScheduleTaskShade scheduleTaskShade = scheduleTaskService - .lambdaQuery() - .eq(ScheduleTaskShade::getTaskId, taskId) - .eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus()) - .one(); - - if (scheduleTaskShade != null) { - List jobBuilderBeanList = Lists.newArrayList(); - // 非工作流任务子任务 - if (scheduleTaskShade.getFlowId() == 0) { - // 生成补数据实例 - jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker), - environmentContext.getBuildJobErrorRetry(), 200, false); - } else { - Long flowId = scheduleTaskShade.getFlowId(); - if (!allList.contains(flowId)) { - // 生成周期实例 - jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, beginTime, fillId, sortWorker), - environmentContext.getBuildJobErrorRetry(), 200, false); - } - } - - for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) { - addMap(run, saveList, taskId, jobBuilderBean); - } - } - } catch (Exception e) { - LOGGER.error("taskKey : {} error:", taskId, e); + private void fillTaskPartition(String fillName, Long fillId, Set run, String triggerDay, String beginTime, String endTime, List allList, AtomicJobSortWorker sortWorker, List saveList, List taskKey) { + for (Long taskId : taskKey) { + try { + ScheduleTaskShade scheduleTaskShade = scheduleTaskService + .lambdaQuery() + .eq(ScheduleTaskShade::getTaskId, taskId) + .eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus()) + .one(); + + if (scheduleTaskShade != null) { + List jobBuilderBeanList = Lists.newArrayList(); + // 非工作流任务子任务 + if (scheduleTaskShade.getFlowId() == 0) { + // 生成补数据实例 + jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker), + environmentContext.getBuildJobErrorRetry(), 200, false); + } else { + Long flowId = scheduleTaskShade.getFlowId(); + if (!allList.contains(flowId)) { + // 生成周期实例 + jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker), + environmentContext.getBuildJobErrorRetry(), 200, false); } } - savaFillJob(saveList); - } catch (Exception e) { - LOGGER.error("fill error:", e); + + for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) { + addMap(run, saveList, taskId, jobBuilderBean); + } } - }); + } catch (Exception e) { + LOGGER.error("taskKey : {} error:", taskId, e); + } } } @@ -178,7 +176,8 @@ private void addMap(Set run, List saveList, Long taskI * * @param allJobList 所有集合 */ - private void savaFillJob(List allJobList) { + @Transactional(rollbackFor = Exception.class) + public void savaFillJob(List allJobList) { scheduleJobService.insertJobList(allJobList, EScheduleType.FILL_DATA.getType()); Set operatorJobIds = allJobList .stream()