diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index dee3a559b594..eae9691d1bbd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -138,6 +138,8 @@ int updateWorkflowInstanceState(
@Param("originState") WorkflowExecutionStatus originState,
@Param("targetState") WorkflowExecutionStatus targetState);
+ int forceUpdateWorkflowInstanceState(@Param("id") Integer id, @Param("status") WorkflowExecutionStatus status);
+
/**
* update workflow instance by tenantCode
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index fb1a093bf3c3..52b0770109fe 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId,
WorkflowExecutionStatus originState,
WorkflowExecutionStatus targetState);
+ void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status);
+
/**
* find last scheduler workflow instance in the date interval
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index b9b92f54dd5e..6f35057d4432 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -72,6 +72,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec
}
}
+ @Override
+ public void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status) {
+ mybatisMapper.forceUpdateWorkflowInstanceState(id, status);
+ }
+
/**
* find last scheduler process instance in the date interval
*
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index fc2fb6acc207..906e72137f27 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -158,6 +158,10 @@
where id = #{workflowInstanceId} and state = #{originState}
+
+ update t_ds_workflow_instance set state = #{status} where id = #{id}
+
+
update t_ds_workflow_instance
set tenant_code = #{destTenantCode}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
index 1860b0d3893f..f2f76d8f090e 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
@@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() {
unsupportedOperationException.getMessage());
}
+ @Test
+ void forceUpdateWorkflowInstanceState() {
+ WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1, WorkflowExecutionStatus.RUNNING_EXECUTION);
+ workflowInstanceDao.insert(workflowInstance);
+ workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(), WorkflowExecutionStatus.FAILURE);
+ assertEquals(WorkflowExecutionStatus.FAILURE,
+ workflowInstanceDao.queryById(workflowInstance.getId()).getState());
+ }
+
@Test
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
long workflowDefinitionCode = 1L;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
index 5e84a24a7676..1e2469526413 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
@@ -26,6 +26,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -52,6 +53,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.support.TransactionTemplate;
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
@@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
@Autowired
private IWorkflowRepository workflowRepository;
+ @Autowired
+ private WorkflowInstanceDao workflowInstanceDao;
+
@Autowired
private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;
@@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
@Autowired
private WorkflowEventBusCoordinator workflowEventBusCoordinator;
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
private ExecutorService commandHandleThreadPool;
private boolean flag = false;
@@ -189,8 +197,18 @@ private Void bootstrapError(Command command, Throwable throwable) {
throwable);
return null;
}
- log.error("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
- commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
+
+ transactionTemplate.execute(status -> {
+ log.warn("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
+ final int workflowInstanceId = command.getWorkflowInstanceId();
+
+ workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
+ log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);
+
+ commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
+ log.info("Move command {} to error command table", command.getId());
+ return null;
+ });
return null;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
index 5641822816ba..5245c7c595af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java
@@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
public class WorkflowGraph implements IWorkflowGraph {
@@ -46,12 +45,20 @@ public WorkflowGraph(List workflowTaskRelations, List();
this.successors = new HashMap<>();
- this.taskDefinitionMap = taskDefinitions
- .stream()
- .collect(Collectors.toMap(TaskDefinition::getName, Function.identity()));
- this.taskDefinitionCodeMap = taskDefinitions
- .stream()
- .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
+ this.taskDefinitionMap = new HashMap<>(taskDefinitions.size());
+ this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size());
+ for (TaskDefinition taskDefinition : taskDefinitions) {
+ if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
+ throw new IllegalArgumentException(
+ "Duplicate task name: " + taskDefinition.getName() + " in the workflow");
+ }
+ taskDefinitionMap.put(taskDefinition.getName(), taskDefinition);
+ if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
+ throw new IllegalArgumentException(
+ "Duplicate task code: " + taskDefinition.getCode() + " in the workflow");
+ }
+ taskDefinitionCodeMap.put(taskDefinition.getCode(), taskDefinition);
+ }
addTaskNodes(taskDefinitions);
addTaskEdge(workflowTaskRelations);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 1ae906aa3ff1..80eddee92319 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -124,6 +124,30 @@ public void testStartWorkflow_with_oneSuccessTaskDryRun() {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow with two fake task(A) has the same name")
+ public void testStartWorkflow_contains_duplicateTaskName() {
+ final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+ .isEqualTo(WorkflowExecutionStatus.FAILURE);
+ assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty();
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one fake task(A) using serial wait strategy")
public void testStartWorkflow_with_serialWaitStrategy() {
diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml
new file mode 100644
index 000000000000..2597c24e07af
--- /dev/null
+++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_duplicate_task_name.yaml
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_duplicate_task_name
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with two parallel failed tasks
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: A
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
index fa7a0a4f1493..98745486156f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
@@ -28,7 +28,6 @@
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.commons.lang3.StringUtils;
@@ -61,14 +60,11 @@ public class CommandServiceImpl implements CommandService {
@Autowired
private ScheduleMapper scheduleMapper;
- @Autowired
- private WorkflowDefinitionMapper processDefineMapper;
-
@Override
public void moveToErrorCommand(Command command, String message) {
- ErrorCommand errorCommand = new ErrorCommand(command, message);
- this.errorCommandMapper.insert(errorCommand);
- this.commandMapper.deleteById(command.getId());
+ final ErrorCommand errorCommand = new ErrorCommand(command, message);
+ errorCommandMapper.insert(errorCommand);
+ commandMapper.deleteById(command.getId());
}
@Override