diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java index dee3a559b594..eae9691d1bbd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java @@ -138,6 +138,8 @@ int updateWorkflowInstanceState( @Param("originState") WorkflowExecutionStatus originState, @Param("targetState") WorkflowExecutionStatus targetState); + int forceUpdateWorkflowInstanceState(@Param("id") Integer id, @Param("status") WorkflowExecutionStatus status); + /** * update workflow instance by tenantCode * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java index fb1a093bf3c3..52b0770109fe 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java @@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExecutionStatus originState, WorkflowExecutionStatus targetState); + void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status); + /** * find last scheduler workflow instance in the date interval * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java index b9b92f54dd5e..6f35057d4432 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java @@ -72,6 +72,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec } } + @Override + public void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status) { + mybatisMapper.forceUpdateWorkflowInstanceState(id, status); + } + /** * find last scheduler process instance in the date interval * diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml index fc2fb6acc207..906e72137f27 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml @@ -158,6 +158,10 @@ where id = #{workflowInstanceId} and state = #{originState} + + update t_ds_workflow_instance set state = #{status} where id = #{id} + + update t_ds_workflow_instance set tenant_code = #{destTenantCode} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java index 1860b0d3893f..f2f76d8f090e 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java @@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() { unsupportedOperationException.getMessage()); } + @Test + void forceUpdateWorkflowInstanceState() { + WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1, WorkflowExecutionStatus.RUNNING_EXECUTION); + workflowInstanceDao.insert(workflowInstance); + workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(), WorkflowExecutionStatus.FAILURE); + assertEquals(WorkflowExecutionStatus.FAILURE, + workflowInstanceDao.queryById(workflowInstance.getId()).getState()); + } + @Test void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() { long workflowDefinitionCode = 1L; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java index 5e84a24a7676..1e2469526413 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; @@ -52,6 +53,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; /** * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. @@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable { @Autowired private IWorkflowRepository workflowRepository; + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + @Autowired private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory; @@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable { @Autowired private WorkflowEventBusCoordinator workflowEventBusCoordinator; + @Autowired + private TransactionTemplate transactionTemplate; + private ExecutorService commandHandleThreadPool; private boolean flag = false; @@ -189,8 +197,18 @@ private Void bootstrapError(Command command, Throwable throwable) { throwable); return null; } - log.error("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable); - commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable)); + + transactionTemplate.execute(status -> { + log.warn("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable); + final int workflowInstanceId = command.getWorkflowInstanceId(); + + workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE); + log.info("Set workflow instance {} state to FAILURE", workflowInstanceId); + + commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable)); + log.info("Move command {} to error command table", command.getId()); + return null; + }); return null; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java index 5641822816ba..5245c7c595af 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; public class WorkflowGraph implements IWorkflowGraph { @@ -46,12 +45,20 @@ public WorkflowGraph(List workflowTaskRelations, List(); this.successors = new HashMap<>(); - this.taskDefinitionMap = taskDefinitions - .stream() - .collect(Collectors.toMap(TaskDefinition::getName, Function.identity())); - this.taskDefinitionCodeMap = taskDefinitions - .stream() - .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); + this.taskDefinitionMap = new HashMap<>(taskDefinitions.size()); + this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size()); + for (TaskDefinition taskDefinition : taskDefinitions) { + if (taskDefinitionMap.containsKey(taskDefinition.getName())) { + throw new IllegalArgumentException( + "Duplicate task name: " + taskDefinition.getName() + " in the workflow"); + } + taskDefinitionMap.put(taskDefinition.getName(), taskDefinition); + if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) { + throw new IllegalArgumentException( + "Duplicate task code: " + taskDefinition.getCode() + " in the workflow"); + } + taskDefinitionCodeMap.put(taskDefinition.getCode(), taskDefinition); + } addTaskNodes(taskDefinitions); addTaskEdge(workflowTaskRelations); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index 1ae906aa3ff1..80eddee92319 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -124,6 +124,30 @@ public void testStartWorkflow_with_oneSuccessTaskDryRun() { masterContainer.assertAllResourceReleased(); } + @Test + @DisplayName("Test start a workflow with two fake task(A) has the same name") + public void testStartWorkflow_contains_duplicateTaskName() { + final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.FAILURE); + assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty(); + }); + + masterContainer.assertAllResourceReleased(); + } + @Test @DisplayName("Test start a workflow with one fake task(A) using serial wait strategy") public void testStartWorkflow_with_serialWaitStrategy() { diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml new file mode 100644 index 000000000000..2597c24e07af --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflows: + - name: workflow_with_duplicate_task_name + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with two parallel failed tasks + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: A + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + - name: A + code: 2 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 2 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java index fa7a0a4f1493..98745486156f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; import org.apache.commons.lang3.StringUtils; @@ -61,14 +60,11 @@ public class CommandServiceImpl implements CommandService { @Autowired private ScheduleMapper scheduleMapper; - @Autowired - private WorkflowDefinitionMapper processDefineMapper; - @Override public void moveToErrorCommand(Command command, String message) { - ErrorCommand errorCommand = new ErrorCommand(command, message); - this.errorCommandMapper.insert(errorCommand); - this.commandMapper.deleteById(command.getId()); + final ErrorCommand errorCommand = new ErrorCommand(command, message); + errorCommandMapper.insert(errorCommand); + commandMapper.deleteById(command.getId()); } @Override