Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ int updateWorkflowInstanceState(
@Param("originState") WorkflowExecutionStatus originState,
@Param("targetState") WorkflowExecutionStatus targetState);

int forceUpdateWorkflowInstanceState(@Param("id") Integer id, @Param("status") WorkflowExecutionStatus status);

/**
* update workflow instance by tenantCode
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId,
WorkflowExecutionStatus originState,
WorkflowExecutionStatus targetState);

void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status);

/**
* find last scheduler workflow instance in the date interval
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec
}
}

@Override
public void forceUpdateWorkflowInstanceState(Integer id, WorkflowExecutionStatus status) {
mybatisMapper.forceUpdateWorkflowInstanceState(id, status);
}

/**
* find last scheduler process instance in the date interval
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@
where id = #{workflowInstanceId} and state = #{originState}
</update>

<update id="forceUpdateWorkflowInstanceState">
update t_ds_workflow_instance set state = #{status} where id = #{id}
</update>

<update id="updateWorkflowInstanceByTenantCode">
update t_ds_workflow_instance
set tenant_code = #{destTenantCode}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() {
unsupportedOperationException.getMessage());
}

@Test
void forceUpdateWorkflowInstanceState() {
WorkflowInstance workflowInstance = createWorkflowInstance(1L, 1, WorkflowExecutionStatus.RUNNING_EXECUTION);
workflowInstanceDao.insert(workflowInstance);
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(), WorkflowExecutionStatus.FAILURE);
assertEquals(WorkflowExecutionStatus.FAILURE,
workflowInstanceDao.queryById(workflowInstance.getId()).getState());
}

@Test
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
long workflowDefinitionCode = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
Expand All @@ -52,6 +53,7 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;

/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
Expand All @@ -75,7 +77,10 @@
@Autowired
private IWorkflowRepository workflowRepository;

@Autowired
private WorkflowInstanceDao workflowInstanceDao;

@Autowired

Check warning on line 83 in dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this field injection and use constructor injection instead.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZrDNDEZbpU4U_eJy9oF&open=AZrDNDEZbpU4U_eJy9oF&pullRequest=17745
private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;

@Autowired
Expand All @@ -84,6 +89,9 @@
@Autowired
private WorkflowEventBusCoordinator workflowEventBusCoordinator;

@Autowired

Check warning on line 92 in dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this field injection and use constructor injection instead.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZrFct1w6K3iolRphEva&open=AZrFct1w6K3iolRphEva&pullRequest=17745
private TransactionTemplate transactionTemplate;

private ExecutorService commandHandleThreadPool;

private boolean flag = false;
Expand Down Expand Up @@ -189,8 +197,18 @@
throwable);
return null;
}
log.error("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));

transactionTemplate.execute(status -> {
log.warn("Failed bootstrap command {} ", JSONUtils.toPrettyJsonString(command), throwable);
final int workflowInstanceId = command.getWorkflowInstanceId();

workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId, WorkflowExecutionStatus.FAILURE);
log.info("Set workflow instance {} state to FAILURE", workflowInstanceId);

commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace(throwable));
log.info("Move command {} to error command table", command.getId());
return null;
});
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public class WorkflowGraph implements IWorkflowGraph {
Expand All @@ -46,12 +45,20 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
this.predecessors = new HashMap<>();
this.successors = new HashMap<>();

this.taskDefinitionMap = taskDefinitions
.stream()
.collect(Collectors.toMap(TaskDefinition::getName, Function.identity()));
this.taskDefinitionCodeMap = taskDefinitions
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
this.taskDefinitionMap = new HashMap<>(taskDefinitions.size());
this.taskDefinitionCodeMap = new HashMap<>(taskDefinitions.size());
for (TaskDefinition taskDefinition : taskDefinitions) {
if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
throw new IllegalArgumentException(
"Duplicate task name: " + taskDefinition.getName() + " in the workflow");
}
taskDefinitionMap.put(taskDefinition.getName(), taskDefinition);
if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
throw new IllegalArgumentException(
"Duplicate task code: " + taskDefinition.getCode() + " in the workflow");
}
taskDefinitionCodeMap.put(taskDefinition.getCode(), taskDefinition);
}

addTaskNodes(taskDefinitions);
addTaskEdge(workflowTaskRelations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with two fake task(A) has the same name")
public void testStartWorkflow_contains_duplicateTaskName() {

Check warning on line 129 in dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZrOt5Yh7IxbKjIkne6M&open=AZrOt5Yh7IxbKjIkne6M&pullRequest=17745
final String yaml = "/it/start/workflow_with_duplicate_task_name.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.FAILURE);
assertThat(repository.queryTaskInstance(workflowInstanceId)).isEmpty();
});

masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) using serial wait strategy")
public void testStartWorkflow_with_serialWaitStrategy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_duplicate_task_name
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with two parallel failed tasks
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: A
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -61,14 +60,11 @@ public class CommandServiceImpl implements CommandService {
@Autowired
private ScheduleMapper scheduleMapper;

@Autowired
private WorkflowDefinitionMapper processDefineMapper;

@Override
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
this.commandMapper.deleteById(command.getId());
final ErrorCommand errorCommand = new ErrorCommand(command, message);
errorCommandMapper.insert(errorCommand);
commandMapper.deleteById(command.getId());
}

@Override
Expand Down
Loading