Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,50 @@
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow which using null key params")
public void testStartWorkflow_usingNullKeyParam() {

Check warning on line 780 in dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZriDoXafMa30m0oSXlF&open=AZriDoXafMa30m0oSXlF&pullRequest=17711
final String yaml = "/it/start/workflow_with_null_key_param.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder()
.commandParams(Lists.newArrayList(Property.builder()
.prop(null)
.direct(Direct.IN)
.type(DataType.VARCHAR)
.value("commandParam")
.build()))
.build();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(runWorkflowCommandParam)
.build();
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflow))
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.hasSize(2)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTask() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

workflows:
- name: workflow_with_one_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
userId: 1
globalParams: '[{"prop":null,"value":"workflowParam","direct":"IN","type":"VARCHAR"}]'
executionType: PARALLEL

tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: >
{
"localParams": [
{
"prop": "",
"direct": "IN",
"type": "VARCHAR",
"value": ""
}
],
"shellScript": "echo 111",
"resourceList": []
}
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: >
{
"localParams": [
{
"prop": null,
"direct": "IN",
"type": "VARCHAR",
"value": ""
}
],
"shellScript": "echo 111",
"resourceList": []
}
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
taskExecuteType: BATCH



taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -66,10 +66,12 @@
import javax.annotation.Nullable;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CuringParamsServiceImpl implements CuringParamsService {

Expand Down Expand Up @@ -160,7 +162,13 @@ public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, Stri
}

/**
* Generate prepare params include project params, global parameters, local parameters, built-in parameters, varpool, start-up params.
* Prepares the final map of task execution parameters by merging parameters from multiple sources
* in a well-defined priority order. The resulting map is guaranteed to contain only valid entries:
* <ul>
* <li>Keys are non-null and non-blank strings</li>
* <li>Values are non-null {@link Property} objects</li>
* </ul>
*
* <p> The priority of the parameters is as follows:
* <p> varpool > command parameters > local parameters > global parameters > project parameters > built-in parameters
* todo: Use TaskRuntimeParams to represent this.
Expand All @@ -180,87 +188,101 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
String workflowDefinitionName) {
Map<String, Property> prepareParamsMap = new HashMap<>();

// assign value to definedParams here
Map<String, Property> globalParams = parseGlobalParamsMap(workflowInstance);

// combining local and global parameters
Map<String, Property> localParams = parameters.getInputLocalParametersMap();

// stream pass params
List<Property> varPools = parseVarPool(taskInstance);

// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
// If it is a complement, you need to pass in the task instance id
// to locate the time of the process instance complement.
ICommandParam commandParam = JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class);
if (commandParam == null) {
throw new ServiceException(String.format("Failed to parse command parameter for workflow instance %s",
workflowInstance.getId()));
}
String timeZone = commandParam.getTimeZone();

// built-in params
Map<String, String> builtInParams =
setBuiltInParamsMap(taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);
// 1. Built-in parameters (lowest precedence)
Map<String, String> builtInParams = setBuiltInParamsMap(
taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);
safePutAll(prepareParamsMap, ParameterUtils.getUserDefParamsMap(builtInParams));

// project-level params
// 2. Project-level parameters
Map<String, Property> projectParams = getProjectParameterMap(taskInstance.getProjectCode());
safePutAll(prepareParamsMap, projectParams);

if (MapUtils.isNotEmpty(builtInParams)) {
prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(builtInParams));
}

if (MapUtils.isNotEmpty(projectParams)) {
prepareParamsMap.putAll(projectParams);
}

if (MapUtils.isNotEmpty(globalParams)) {
prepareParamsMap.putAll(globalParams);
}
// 3. Workflow global parameters
Map<String, Property> globalParams = parseGlobalParamsMap(workflowInstance);
safePutAll(prepareParamsMap, globalParams);

if (MapUtils.isNotEmpty(localParams)) {
prepareParamsMap.putAll(localParams);
}
// 4. Task-local parameters
Map<String, Property> localParams = parameters.getInputLocalParametersMap();
safePutAll(prepareParamsMap, localParams);

// 5. Command-line / complement parameters
if (CollectionUtils.isNotEmpty(commandParam.getCommandParams())) {
prepareParamsMap.putAll(commandParam.getCommandParams().stream()
.collect(Collectors.toMap(Property::getProp, Function.identity())));
Map<String, Property> commandParamsMap = commandParam.getCommandParams().stream()
.filter(prop -> StringUtils.isNotBlank(prop.getProp()))
.collect(Collectors.toMap(
Property::getProp,
Function.identity(),
(v1, v2) -> v2 // retain last on duplicate key
));
safePutAll(prepareParamsMap, commandParamsMap);
}

// 6. VarPool: override values only for existing IN-direction parameters
List<Property> varPools = parseVarPool(taskInstance);
if (CollectionUtils.isNotEmpty(varPools)) {
// overwrite the in parameter by varPool
for (Property varPool : varPools) {
Property property = prepareParamsMap.get(varPool.getProp());
if (property == null || property.getDirect() != Direct.IN) {
if (StringUtils.isBlank(varPool.getProp())) {
continue;
}
property.setValue(varPool.getValue());
Property targetParam = prepareParamsMap.get(varPool.getProp());
if (targetParam != null && Direct.IN.equals(targetParam.getDirect())) {
targetParam.setValue(varPool.getValue());
}
}
}

Iterator<Map.Entry<String, Property>> iter = prepareParamsMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();

if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().contains(Constants.FUNCTION_START_WITH)) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();

// handle some chain parameter assign, such as `{"var1": "${var2}", "var2": 1}` should be convert to
// `{"var1": 1, "var2": 1}`
val = convertParameterPlaceholders(val, prepareParamsMap);
property.setValue(val);
}
// 7. Inject business/scheduling parameters (e.g., ${datetime}), which may contain or reference placeholders
Map<String, Property> businessParams = preBuildBusinessParams(workflowInstance);
safePutAll(prepareParamsMap, businessParams);

// 8. Resolve all placeholders (e.g., "${output_dir}") using the current parameter context
resolvePlaceholders(prepareParamsMap);

return prepareParamsMap;
}

/**
* Safely merges entries from the {@code source} map into the {@code target} map,
* skipping any entry with a {@code null}, empty, or blank key, or a {@code null} value.
*
* @param target the destination map to merge into (must not be null)
* @param source the source map whose valid entries will be copied (may be null or empty)
*/
private void safePutAll(Map<String, Property> target, Map<String, Property> source) {
if (MapUtils.isEmpty(source)) {
return;
}
source.forEach((key, value) -> {
if (StringUtils.isNotBlank(key) && value != null) {
target.put(key, value);
} else {
log.warn("Skipped invalid parameter entry: key='{}', value={}", key, value);
}
});
}

// put schedule time param to params map
Map<String, Property> paramsMap = preBuildBusinessParams(workflowInstance);
if (MapUtils.isNotEmpty(paramsMap)) {
prepareParamsMap.putAll(paramsMap);
/**
* Resolves placeholder expressions (e.g., "${var}") in parameter values by substituting them
* with actual values from the current {@code paramsMap}.
*
* @param paramsMap the map of parameters (key: parameter name, value: {@link Property}) to resolve
*/
private void resolvePlaceholders(Map<String, Property> paramsMap) {
for (Property prop : paramsMap.values()) {
String val = prop.getValue();
if (StringUtils.isNotEmpty(val) && val.contains(Constants.FUNCTION_START_WITH)) {
prop.setValue(convertParameterPlaceholders(val, paramsMap));
}
}
return prepareParamsMap;
}

/**
Expand Down
Loading
Loading