diff --git a/pyproject.toml b/pyproject.toml index 72cb052e..53dae8fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,8 @@ dependencies = [ "termcolor", "pygraphviz", "lxml", - "f90nml" + "f90nml", + "aiida-shell>=0.8.1", ] license = {file = "LICENSE"} @@ -46,7 +47,7 @@ Changelog = "https://github.com/C2SM/Sirocco/blob/main/CHANGELOG.md" [tool.pytest.ini_options] # Configuration for [pytest](https://docs.pytest.org) -addopts = "--pdbcls=IPython.terminal.debugger:TerminalPdb" +addopts = "-s --pdbcls=IPython.terminal.debugger:TerminalPdb" norecursedirs = "tests/cases" markers = [ "slow: slow integration tests which are not recommended to run locally for normal development", @@ -66,7 +67,7 @@ filterwarnings = [ source = ["sirocco"] [tool.ruff] -include = ["src/*py", "tests/*py"] +include = ["src/*py", "tests/*py"] # PRCOMMENT: Do we want to run Ruff via CI on our test files?? target-version = "py310" [tool.ruff.lint] @@ -75,6 +76,14 @@ ignore = [ "TRY003", # write custom error messages for formatting ] +[tool.ruff.lint.per-file-ignores] +"tests/*py" = [ + "SLF001", # Private member accessed + "S101", # Use of assert detected + "T201", # `print` found + "PLR2004", # Magic value used in comparison +] + ## Hatch configurations [tool.hatch.build.targets.sdist] @@ -151,3 +160,7 @@ ignore_missing_imports = true [[tool.mypy.overrides]] module = ["aiida_workgraph.sockets.builtins"] ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = ["termcolor._types"] +ignore_missing_imports = true diff --git a/src/sirocco/core/graph_items.py b/src/sirocco/core/graph_items.py index 03a68655..7666d5d2 100644 --- a/src/sirocco/core/graph_items.py +++ b/src/sirocco/core/graph_items.py @@ -49,6 +49,7 @@ def from_config(cls, config: ConfigBaseData, coordinates: dict) -> AvailableData data_class = AvailableData if isinstance(config, ConfigAvailableData) else GeneratedData return data_class( name=config.name, + computer=config.computer, type=config.type, src=config.src, coordinates=coordinates, diff --git a/src/sirocco/parsing/yaml_data_models.py b/src/sirocco/parsing/yaml_data_models.py index e6bda0ed..b00ad31c 100644 --- a/src/sirocco/parsing/yaml_data_models.py +++ b/src/sirocco/parsing/yaml_data_models.py @@ -281,7 +281,16 @@ class ConfigShellTaskSpecs: plugin: ClassVar[Literal["shell"]] = "shell" port_pattern: ClassVar[re.Pattern] = field(default=re.compile(r"{PORT(\[sep=.+\])?::(.+?)}"), repr=False) sep_pattern: ClassVar[re.Pattern] = field(default=re.compile(r"\[sep=(.+)\]"), repr=False) - src: Path | None = None + src: Path | None = field( + default=None, + metadata={ + "description": ( + "If `src` not absolute, this ends up to be relative to the root directory of the config file." + "This should also be solved by registering `Code`s in AiiDA for the required scripts." + "See issues #60 and #127." + ) + }, + ) command: str env_source_files: list[str] = field(default_factory=list) diff --git a/src/sirocco/workgraph.py b/src/sirocco/workgraph.py index 242f2b00..121c8627 100644 --- a/src/sirocco/workgraph.py +++ b/src/sirocco/workgraph.py @@ -32,7 +32,15 @@ def _execute(self, engine_process, args=None, kwargs=None, var_kwargs=None): # # Workaround starts here # This part is part of the workaround. We need to manually add the outputs from the task. # Because kwargs are not populated with outputs - default_outputs = {"remote_folder", "remote_stash", "retrieved", "_outputs", "_wait", "stdout", "stderr"} + default_outputs = { + "remote_folder", + "remote_stash", + "retrieved", + "_outputs", + "_wait", + "stdout", + "stderr", + } task_outputs = set(self.outputs._sockets.keys()) # noqa SLF001 # there so public accessor task_outputs = task_outputs.union(set(inputs.pop("outputs", []))) missing_outputs = task_outputs.difference(default_outputs) @@ -97,6 +105,7 @@ def __init__(self, core_workflow: core.Workflow): for task in self._core_workflow.tasks: if isinstance(task, core.ShellTask): self._set_shelljob_arguments(task) + self._set_shelljob_filenames(task) # link wait on to workgraph tasks for task in self._core_workflow.tasks: @@ -184,7 +193,16 @@ def _add_aiida_input_data_node(self, data: core.Data): except NotExistent as err: msg = f"Could not find computer {data.computer!r} for input {data}." raise ValueError(msg) from err - self._aiida_data_nodes[label] = aiida.orm.RemoteData(remote_path=data.src, label=label, computer=computer) + # `remote_path` must be str not PosixPath to be JSON-serializable + # PRCOMMENT: Hack for now to make the tests pass + if computer.label == "localhost": + self._aiida_data_nodes[label] = aiida.orm.RemoteData( + remote_path=str(data_full_path), label=label, computer=computer + ) + else: + self._aiida_data_nodes[label] = aiida.orm.RemoteData( + remote_path=str(data.src), label=label, computer=computer + ) elif data.type == "file": self._aiida_data_nodes[label] = aiida.orm.SinglefileData(label=label, file=data_full_path) elif data.type == "dir": @@ -229,6 +247,8 @@ def _create_shell_task_node(self, task: core.ShellTask): ] prepend_text = "\n".join([f"source {env_source_path}" for env_source_path in env_source_paths]) metadata["options"] = {"prepend_text": prepend_text} + # NOTE: Hardcoded for now, possibly make user-facing option + metadata["options"]["use_symlinks"] = True ## computer if task.computer is not None: @@ -283,7 +303,10 @@ def _link_input_node_to_shelltask(self, task: core.ShellTask, input_: core.Data) socket = getattr(workgraph_task.inputs.nodes, f"{input_label}") socket.value = self.data_from_core(input_) elif isinstance(input_, core.GeneratedData): - self._workgraph.add_link(self.socket_from_core(input_), workgraph_task.inputs[f"nodes.{input_label}"]) + self._workgraph.add_link( + self.socket_from_core(input_), + workgraph_task.inputs[f"nodes.{input_label}"], + ) else: raise TypeError @@ -293,10 +316,10 @@ def _link_wait_on_to_task(self, task: core.Task): self.task_from_core(task).wait = [self.task_from_core(wt) for wt in task.wait_on] def _set_shelljob_arguments(self, task: core.ShellTask): - """set AiiDA ShellJob arguments by replacing port placeholders by aiida labels""" - + """Set AiiDA ShellJob arguments by replacing port placeholders with AiiDA labels.""" workgraph_task = self.task_from_core(task) workgraph_task_arguments: SocketAny = workgraph_task.inputs.arguments + if workgraph_task_arguments is None: msg = ( f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph " @@ -304,10 +327,56 @@ def _set_shelljob_arguments(self, task: core.ShellTask): ) raise ValueError(msg) - input_labels = {port: list(map(self.label_placeholder, task.inputs[port])) for port in task.inputs} + # Build input_labels dictionary for port resolution + input_labels: dict[str, list[str]] = {} + for port_name, input_list in task.inputs.items(): + input_labels[port_name] = [] + for input_ in input_list: + # Use the full AiiDA label as the placeholder content + input_label = self.get_aiida_label_from_graph_item(input_) + input_labels[port_name].append(f"{{{input_label}}}") + + # Resolve the command with port placeholders replaced by input labels _, arguments = self.split_cmd_arg(task.resolve_ports(input_labels)) workgraph_task_arguments.value = arguments + def _set_shelljob_filenames(self, task: core.ShellTask): + """Set AiiDA ShellJob filenames for data entities, including parameterized data.""" + filenames = {} + workgraph_task = self.task_from_core(task) + + if not workgraph_task.inputs.filenames: + return + + # Handle input files + for input_ in task.input_data_nodes(): + input_label = self.get_aiida_label_from_graph_item(input_) + + if task.computer and input_.computer and isinstance(input_, core.AvailableData): + # For RemoteData on the same computer, use just the filename + filename = Path(input_.src).name + filenames[input_.name] = filename + else: + # For other cases (including GeneratedData), we need to handle parameterized data + # Importantly, multiple data nodes with the same base name but different + # coordinates need unique filenames to avoid conflicts in the working directory + + # Count how many inputs have the same base name + same_name_count = sum(1 for inp in task.input_data_nodes() if inp.name == input_.name) + + if same_name_count > 1: + # Multiple data nodes with same base name - use full label as filename + # to ensure uniqueness in working directory + filename = input_label + else: + # Single data node with this name - can use simple filename + filename = Path(input_.src).name if hasattr(input_, "src") else input_.name + + # The key in filenames dict should be the input label (what's used in nodes dict) + filenames[input_label] = filename + + workgraph_task.inputs.filenames.value = filenames + def run( self, inputs: None | dict[str, Any] = None, diff --git a/tests/cases/parameters/config/config.yml b/tests/cases/parameters/config/config.yml index 25fa32d6..92904ebd 100644 --- a/tests/cases/parameters/config/config.yml +++ b/tests/cases/parameters/config/config.yml @@ -1,6 +1,6 @@ --- -start_date: &root_start_date '2026-01-01T00:00' -stop_date: &root_stop_date '2028-01-01T00:00' +start_date: &root_start_date "2026-01-01T00:00" +stop_date: &root_stop_date "2028-01-01T00:00" cycles: - bimonthly_tasks: @@ -49,13 +49,17 @@ cycles: inputs: - analysis_foo_bar: target_cycle: - lag: ['P0M', 'P6M'] + lag: ["P0M", "P6M"] port: None outputs: [yearly_analysis] tasks: - icon: plugin: shell + # PRCOMMENT + # Relative path -> except if this cannot be resolved to a registered code + # Probably either enforce absolute path, or provide a code argument + # See: https://github.com/C2SM/Sirocco/pull/153 src: scripts/icon.py command: "icon.py --restart {PORT::restart} --init {PORT::init} --forcing {PORT::forcing}" parameters: [foo, bar] diff --git a/tests/conftest.py b/tests/conftest.py index 67171ce0..a4437c0f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ def __init__(self, url: str, response: requests.Response): def download_file(url: str, file_path: pathlib.Path): - response = requests.get(url) + response = requests.get(url) # noqa: S113 request-without-timeout if not response.ok: raise DownloadError(url, response) @@ -48,7 +48,7 @@ def icon_grid_simple_path(pytestconfig): @pytest.fixture def icon_filepath_executable() -> str: - which_icon = subprocess.run(["which", "icon"], capture_output=True, check=False) + which_icon = subprocess.run(["which", "icon"], capture_output=True, check=False) # noqa: S607 if which_icon.returncode: msg = "Could not find icon executable." raise FileNotFoundError(msg) @@ -87,7 +87,7 @@ def minimal_invert_task_io_config() -> models.ConfigWorkflow: ), models.ConfigCycleTask( name="task_a", - inputs=[models.ConfigCycleTaskInput(name="availalble", port="None")], + inputs=[models.ConfigCycleTaskInput(name="available", port="None")], outputs=[models.ConfigCycleTaskOutput(name="output_a")], ), ], @@ -99,7 +99,11 @@ def minimal_invert_task_io_config() -> models.ConfigWorkflow: ], data=models.ConfigData( available=[ - models.ConfigAvailableData(name="availalble", type=models.DataType.FILE, src=pathlib.Path("foo.txt")) + models.ConfigAvailableData( + name="available", + type=models.DataType.FILE, + src=pathlib.Path("foo.txt"), + ) ], generated=[ models.ConfigGeneratedData(name="output_a", type=models.DataType.DIR, src=pathlib.Path("bar")), @@ -154,7 +158,7 @@ def serialize_nml(config_paths: dict[str, pathlib.Path], workflow: workflow.Work def pytest_configure(config): if config.getoption("reserialize"): - print("Regenerating serialized references") # noqa: T201 # this is actual UX, not a debug print + print("Regenerating serialized references") # this is actual UX, not a debug print for config_case in ALL_CONFIG_CASES: config_paths = generate_config_paths(config_case) wf = workflow.Workflow.from_config_file(str(config_paths["yml"])) diff --git a/tests/test_wc_workflow.py b/tests/test_wc_workflow.py index ef808950..6c9359cd 100644 --- a/tests/test_wc_workflow.py +++ b/tests/test_wc_workflow.py @@ -32,6 +32,7 @@ def test_icon(): # configs that are tested for running workgraph @pytest.mark.slow +@pytest.mark.usefixtures("aiida_localhost") @pytest.mark.parametrize( "config_case", [ @@ -39,14 +40,12 @@ def test_icon(): "parameters", ], ) -def test_run_workgraph(config_case, config_paths, aiida_computer): # noqa: ARG001 # config_case is overridden +def test_run_workgraph(config_case, config_paths): # noqa: ARG001 # config_case is overridden """Tests end-to-end the parsing from file up to running the workgraph. Automatically uses the aiida_profile fixture to create a new profile. Note to debug the test with your profile please run this in a separate file as the profile is deleted after test finishes. """ - # some configs reference computer "localhost" which we need to create beforehand - aiida_computer("localhost").store() core_workflow = Workflow.from_config_file(str(config_paths["yml"])) aiida_workflow = AiidaWorkGraph(core_workflow) diff --git a/tests/test_workgraph.py b/tests/test_workgraph.py new file mode 100644 index 00000000..4eeea427 --- /dev/null +++ b/tests/test_workgraph.py @@ -0,0 +1,727 @@ +import textwrap + +import pytest +from aiida import orm + +from sirocco.core import GeneratedData, Workflow +from sirocco.parsing import yaml_data_models as models +from sirocco.workgraph import AiidaWorkGraph + + +def test_get_aiida_label_from_graph_item(tmp_path): + """Test that AiiDA labels are generated correctly.""" + + # Mock data nodes with different coordinate combinations + output_path = tmp_path / "output" + data_simple = GeneratedData(name="output", type=models.DataType.FILE, src=output_path, coordinates={}) + + data_with_date = GeneratedData( + name="output", + type=models.DataType.FILE, + src=output_path, + coordinates={"date": "2026-01-01-00:00:00"}, + ) + + data_with_params = GeneratedData( + name="output", + type=models.DataType.FILE, + src=output_path, + coordinates={"foo": 0, "bar": 3.0, "date": "2026-01-01-00:00:00"}, + ) + + # Test label generation + assert AiidaWorkGraph.get_aiida_label_from_graph_item(data_simple) == "output" + assert AiidaWorkGraph.get_aiida_label_from_graph_item(data_with_date) == "output_date_2026_01_01_00_00_00" + assert ( + AiidaWorkGraph.get_aiida_label_from_graph_item(data_with_params) + == "output_foo_0___bar_3_0___date_2026_01_01_00_00_00" + ) + + +def test_filename_conflict_detection(tmp_path): + """Test logic for detecting when unique filenames are needed.""" + + output_path = tmp_path / "output" + other_path = tmp_path / "other" + + inputs = [ + GeneratedData( + name="output", + type=models.DataType.FILE, + src=output_path, + coordinates={"foo": 0}, + ), + GeneratedData( + name="output", + type=models.DataType.FILE, + src=output_path, + coordinates={"foo": 1}, + ), + GeneratedData( + name="other", + type=models.DataType.FILE, + src=other_path, + coordinates={}, + ), + ] + + # Test that conflict detection works + output_conflicts = [inp for inp in inputs if inp.name == "output"] + other_conflicts = [inp for inp in inputs if inp.name == "other"] + + assert len(output_conflicts) == 2 # Should need unique filenames + assert len(other_conflicts) == 1 # Should use simple filename + + +@pytest.mark.usefixtures("aiida_localhost") +def test_basic_remote_data_filename(tmp_path): + """Test basic RemoteData filename handling.""" + file_path = tmp_path / "foo.txt" + file_path.touch() + script_path = tmp_path / "script.sh" + script_path.touch() + + config_wf = models.ConfigWorkflow( + name="basic", + rootdir=tmp_path, + cycles=[ + models.ConfigCycle( + name="cycle", + tasks=[ + models.ConfigCycleTask( + name="task", + inputs=[models.ConfigCycleTaskInput(name="data", port="input")], + ) + ], + ), + ], + tasks=[ + models.ConfigShellTask( + name="task", + command="echo {PORT::input}", + src=str(script_path), + computer="localhost", + ), + ], + data=models.ConfigData( + available=[ + models.ConfigAvailableData( + name="data", + type=models.DataType.FILE, + src=str(file_path), + computer="localhost", + ) + ], + ), + ) + + core_wf = Workflow.from_config_workflow(config_wf) + aiida_wf = AiidaWorkGraph(core_wf) + + # Check that RemoteData was created and filename is correct + task = aiida_wf._workgraph.tasks[0] + assert isinstance(task.inputs.nodes["data"].value, orm.RemoteData) + assert task.inputs.filenames.value == {"data": "foo.txt"} + assert task.inputs.arguments.value == "{data}" + + +@pytest.mark.usefixtures("aiida_localhost") +def test_parameterized_filename_conflicts(tmp_path): + """Test parameterized data filename handling in various conflict scenarios. + + This test covers: + 1. Parameterized data with conflicts (multiple files with same base name) + 2. Mixed conflict/no-conflict scenarios (some files conflict, others don't) + 3. Proper filename assignment based on conflict detection + """ + yaml_content = textwrap.dedent(f""" + name: test_workflow + cycles: + - simulation_cycle: + tasks: + - simulate: + inputs: + - input_file: + port: input + - shared_config: + port: config + outputs: [simulation_output] + - processing_cycle: + tasks: + - process_data: + inputs: + - simulation_output: + parameters: + foo: all + port: files + outputs: [processed_output] + - analyze: + inputs: + - shared_config: + port: config + - simulation_output: + parameters: + foo: all + port: data + outputs: [analysis_result] + tasks: + - simulate: + plugin: shell + command: "simulate.py {{PORT::input}} --config {{PORT::config}}" + src: {tmp_path}/simulate.py + parameters: [foo] + computer: localhost + - process_data: + plugin: shell + command: "process.py {{PORT::files}}" + src: {tmp_path}/process.py + parameters: [foo] + computer: localhost + - analyze: + plugin: shell + command: "analyze.py --config {{PORT::config}} --data {{PORT::data}}" + src: {tmp_path}/analyze.py + parameters: [foo] + computer: localhost + data: + available: + - input_file: + type: file + src: {tmp_path}/input.txt + computer: localhost + - shared_config: + type: file + src: {tmp_path}/config.json + computer: localhost + generated: + - simulation_output: + type: file + src: output.dat + parameters: [foo] + - processed_output: + type: file + src: processed.dat + parameters: [foo] + - analysis_result: + type: file + src: result.txt + parameters: [foo] + parameters: + foo: [1, 2] + """) + + config_file = tmp_path / "config.yml" + config_file.write_text(yaml_content) + + # Create required files + (tmp_path / "input.txt").touch() + (tmp_path / "config.json").touch() + (tmp_path / "simulate.py").touch() + (tmp_path / "process.py").touch() + (tmp_path / "analyze.py").touch() + + core_wf = Workflow.from_config_file(str(config_file)) + aiida_wf = AiidaWorkGraph(core_wf) + + # Test 1: process_data tasks (from first test) + process_tasks = [task for task in aiida_wf._workgraph.tasks if task.name.startswith("process_data")] + assert len(process_tasks) == 2 # One for each foo value + + for task in process_tasks: + nodes_keys = list(task.inputs.nodes._sockets.keys()) + filenames = task.inputs.filenames.value + arguments = task.inputs.arguments.value + + # Each task should have exactly two simulation_output inputs (foo=1 and foo=2) + sim_output_keys = [k for k in nodes_keys if k.startswith("simulation_output")] + assert len(sim_output_keys) == 2 + + # The filenames should be the full labels (since there are conflicts) + for key in sim_output_keys: + assert filenames[key] == key # Full label used as filename + assert key in arguments # Key appears in arguments + + # Test 2: analyze tasks (from second test - mixed conflict/no-conflict) + analyze_tasks = [task for task in aiida_wf._workgraph.tasks if task.name.startswith("analyze")] + assert len(analyze_tasks) == 2 # One for each foo value + + for task in analyze_tasks: + filenames = task.inputs.filenames.value + + # shared_config should use simple filename (no conflict across tasks) + assert filenames["shared_config"] == "config.json" + + # simulation_output should use full labels (conflict with other analyze tasks) + sim_output_keys = [k for k in filenames if k.startswith("simulation_output")] + assert len(sim_output_keys) == 2 # Should have both foo=1 and foo=2 inputs + + for key in sim_output_keys: + assert filenames[key] == key # Full label as filename + assert "foo_" in key # Contains parameter info + + # Test 3: simulate tasks (should have simple filenames for shared_config) + simulate_tasks = [task for task in aiida_wf._workgraph.tasks if task.name.startswith("simulate")] + assert len(simulate_tasks) == 2 # One for each foo value + + for task in simulate_tasks: + filenames = task.inputs.filenames.value + + # Both input_file and shared_config should use simple names (no conflicts) + assert filenames["input_file"] == "input.txt" + assert filenames["shared_config"] == "config.json" + + +@pytest.mark.usefixtures("aiida_localhost") +def test_parameterized_workflow_regression(tmp_path): + """Regression test for exact parameterized workflow output.""" + yaml_str = textwrap.dedent(f""" + start_date: "2026-01-01T00:00" + stop_date: "2026-07-01T00:00" + cycles: + - simulation: + cycling: + start_date: "2026-01-01T00:00" + stop_date: "2026-07-01T00:00" + period: P6M + tasks: + - simulate: + inputs: + - initial_data: + port: input + outputs: [sim_result] + - analysis: + cycling: + start_date: "2026-01-01T00:00" + stop_date: "2026-07-01T00:00" + period: P6M + tasks: + - analyze: + inputs: + - sim_result: + parameters: {{param: all}} + port: data + outputs: [final_result] + tasks: + - simulate: + plugin: shell + command: "simulate.py {{PORT::input}}" + src: {tmp_path}/simulate.py + parameters: [param] + computer: localhost + - analyze: + plugin: shell + command: "analyze.py {{PORT::data}}" + src: {tmp_path}/analyze.py + parameters: [param] + computer: localhost + data: + available: + - initial_data: + type: file + src: {tmp_path}/input.dat + computer: localhost + generated: + - sim_result: + type: file + src: result.dat + parameters: [param] + - final_result: + type: file + src: final.dat + parameters: [param] + parameters: + param: [1, 2] + """) + + config_file = tmp_path / "config.yml" + config_file.write_text(yaml_str) + + # Create minimal required files + (tmp_path / "input.dat").touch() + (tmp_path / "simulate.py").touch() + (tmp_path / "analyze.py").touch() + + core_wf = Workflow.from_config_file(str(config_file)) + aiida_wf = AiidaWorkGraph(core_wf) + + # Regression testing: verify structure + analyze_tasks = [t for t in aiida_wf._workgraph.tasks if t.name.startswith("analyze")] + assert len(analyze_tasks) == 2 # One for each param value + + task = analyze_tasks[0] # Test one of the analyze tasks + filenames = task.inputs.filenames.value + arguments = task.inputs.arguments.value + nodes_keys = list(task.inputs.nodes._sockets.keys()) + + # Expected values for regression detection + expected_keys = [ + "sim_result_param_1___date_2026_01_01_00_00_00", + "sim_result_param_2___date_2026_01_01_00_00_00", + ] + expected_filenames = { + "sim_result_param_1___date_2026_01_01_00_00_00": "sim_result_param_1___date_2026_01_01_00_00_00", + "sim_result_param_2___date_2026_01_01_00_00_00": "sim_result_param_2___date_2026_01_01_00_00_00", + } + expected_arguments = ( + "{sim_result_param_1___date_2026_01_01_00_00_00} {sim_result_param_2___date_2026_01_01_00_00_00}" + ) + + assert set(nodes_keys) == set(expected_keys) + assert filenames == expected_filenames + assert arguments == expected_arguments + + +@pytest.mark.usefixtures("aiida_localhost") +def test_comprehensive_parameterized_workflow(tmp_path): + """Test parameterized workflow behavior and properties.""" + yaml_str = textwrap.dedent(f""" + start_date: &start "2026-01-01T00:00" + stop_date: &stop "2026-07-01T00:00" + cycles: + - main: + cycling: + start_date: *start + stop_date: *stop + period: P6M + tasks: + - simulate: + inputs: + - config: + port: cfg + outputs: [sim_output] + - analyze: + inputs: + - sim_output: + parameters: {{foo: all, bar: single}} + port: data + outputs: [analysis] + tasks: + - simulate: + plugin: shell + command: "sim.py {{PORT::cfg}}" + src: {tmp_path}/sim.py + parameters: [foo, bar] + computer: localhost + - analyze: + plugin: shell + command: "analyze.py {{PORT::data}}" + src: {tmp_path}/analyze.py + parameters: [bar] + computer: localhost + data: + available: + - config: + type: file + src: {tmp_path}/config.txt + computer: localhost + generated: + - sim_output: + type: file + src: output.dat + parameters: [foo, bar] + - analysis: + type: file + src: analysis.txt + parameters: [bar] + parameters: + foo: [0, 1] + bar: [3.0] + """) + + config_file = tmp_path / "config.yml" + config_file.write_text(yaml_str) + + # Create files + (tmp_path / "config.txt").touch() + (tmp_path / "sim.py").touch() + (tmp_path / "analyze.py").touch() + + core_wf = Workflow.from_config_file(str(config_file)) + aiida_wf = AiidaWorkGraph(core_wf) + + # Verify task structure + sim_tasks = [t for t in aiida_wf._workgraph.tasks if t.name.startswith("simulate")] + analyze_tasks = [t for t in aiida_wf._workgraph.tasks if t.name.startswith("analyze")] + + assert len(sim_tasks) == 2 # 2 foo values, and 1 bar value -> 2 tasks + assert len(analyze_tasks) == 1 # 1 bar value -> 1 task + + # Check simulate tasks (should have simple config filename) + for task in sim_tasks: + filenames = task.inputs.filenames.value + assert filenames["config"] == "config.txt" # No conflict, simple name + + # Check analyze task (should have complex filenames due to conflicts) + analyze_task = analyze_tasks[0] + filenames = analyze_task.inputs.filenames.value + + # Should have 2 sim_output inputs with full labels as filenames + sim_output_keys = [k for k in filenames if k.startswith("sim_output")] + assert len(sim_output_keys) == 2 + + for key in sim_output_keys: + assert filenames[key] == key # Full label used as filename + assert "foo_" in key + assert "bar_3_0" in key + + +# PRCOMMENT: Kept this hardcoded, explicit test based on the `parameters` case +# Can probably be removed as the other tests cover the behavior, but wanted to keep for now +@pytest.mark.usefixtures("aiida_localhost") +def test_comprehensive_parameterized_explicit(tmp_path): + import pathlib + + # Get the test cases directory relative to the test file + test_dir = pathlib.Path(__file__).parent.parent / "cases" + + yaml_str = textwrap.dedent( + f""" + start_date: &root_start_date "2026-01-01T00:00" + stop_date: &root_stop_date "2028-01-01T00:00" + cycles: + - bimonthly_tasks: + cycling: + start_date: *root_start_date + stop_date: *root_stop_date + period: P6M + tasks: + - icon: + inputs: + - initial_conditions: + when: + at: *root_start_date + port: init + - icon_restart: + when: + after: *root_start_date + target_cycle: + lag: -P6M + parameters: + foo: single + bar: single + port: restart + - forcing: + port: forcing + outputs: [icon_output, icon_restart] + - statistics_foo: + inputs: + - icon_output: + parameters: + bar: single + port: None + outputs: [analysis_foo] + - statistics_foo_bar: + inputs: + - analysis_foo: + port: None + outputs: [analysis_foo_bar] + - yearly: + cycling: + start_date: *root_start_date + stop_date: *root_stop_date + period: P1Y + tasks: + - merge: + inputs: + - analysis_foo_bar: + target_cycle: + lag: ["P0M", "P6M"] + port: None + outputs: [yearly_analysis] + tasks: + - icon: + plugin: shell + src: {test_dir}/parameters/config/scripts/icon.py + command: "icon.py --restart {{PORT::restart}} --init {{PORT::init}} --forcing {{PORT::forcing}}" + parameters: [foo, bar] + computer: localhost + - statistics_foo: + plugin: shell + src: {test_dir}/parameters/config/scripts/statistics.py + command: "statistics.py {{PORT::None}}" + parameters: [bar] + computer: localhost + - statistics_foo_bar: + plugin: shell + src: {test_dir}/parameters/config/scripts/statistics.py + command: "statistics.py {{PORT::None}}" + computer: localhost + - merge: + plugin: shell + src: {test_dir}/parameters/config/scripts/merge.py + command: "merge.py {{PORT::None}}" + computer: localhost + data: + available: + - initial_conditions: + type: file + src: {test_dir}/small/config/data/initial_conditions + computer: localhost + - forcing: + type: file + src: {test_dir}/parameters/config/data/forcing + computer: localhost + generated: + - icon_output: + type: file + src: icon_output + parameters: [foo, bar] + - icon_restart: + type: file + src: restart + parameters: [foo, bar] + - analysis_foo: + type: file + src: analysis + parameters: [bar] + - analysis_foo_bar: + type: file + src: analysis + - yearly_analysis: + type: file + src: analysis + parameters: + foo: [0, 1] + bar: [3.0] + """ + ) + yaml_file = tmp_path / "config.yml" + yaml_file.write_text(yaml_str) + + core_wf = Workflow.from_config_file(yaml_file) + aiida_wf = AiidaWorkGraph(core_workflow=core_wf) + filenames_list = [task.inputs.filenames.value for task in aiida_wf._workgraph.tasks] + arguments_list = [task.inputs.arguments.value for task in aiida_wf._workgraph.tasks] + nodes_list = [list(task.inputs.nodes._sockets.keys()) for task in aiida_wf._workgraph.tasks] + + expected_filenames_list = [ + {"forcing": "forcing", "initial_conditions": "initial_conditions"}, + {"forcing": "forcing", "initial_conditions": "initial_conditions"}, + { + "forcing": "forcing", + "icon_restart_foo_0___bar_3_0___date_2026_01_01_00_00_00": "restart", + }, + { + "forcing": "forcing", + "icon_restart_foo_1___bar_3_0___date_2026_01_01_00_00_00": "restart", + }, + { + "forcing": "forcing", + "icon_restart_foo_0___bar_3_0___date_2026_07_01_00_00_00": "restart", + }, + { + "forcing": "forcing", + "icon_restart_foo_1___bar_3_0___date_2026_07_01_00_00_00": "restart", + }, + { + "forcing": "forcing", + "icon_restart_foo_0___bar_3_0___date_2027_01_01_00_00_00": "restart", + }, + { + "forcing": "forcing", + "icon_restart_foo_1___bar_3_0___date_2027_01_01_00_00_00": "restart", + }, + { + "icon_output_foo_0___bar_3_0___date_2026_01_01_00_00_00": "icon_output_foo_0___bar_3_0___date_2026_01_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2026_01_01_00_00_00": "icon_output_foo_1___bar_3_0___date_2026_01_01_00_00_00", + }, + { + "icon_output_foo_0___bar_3_0___date_2026_07_01_00_00_00": "icon_output_foo_0___bar_3_0___date_2026_07_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2026_07_01_00_00_00": "icon_output_foo_1___bar_3_0___date_2026_07_01_00_00_00", + }, + { + "icon_output_foo_0___bar_3_0___date_2027_01_01_00_00_00": "icon_output_foo_0___bar_3_0___date_2027_01_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2027_01_01_00_00_00": "icon_output_foo_1___bar_3_0___date_2027_01_01_00_00_00", + }, + { + "icon_output_foo_0___bar_3_0___date_2027_07_01_00_00_00": "icon_output_foo_0___bar_3_0___date_2027_07_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2027_07_01_00_00_00": "icon_output_foo_1___bar_3_0___date_2027_07_01_00_00_00", + }, + {"analysis_foo_bar_3_0___date_2026_01_01_00_00_00": "analysis"}, + {"analysis_foo_bar_3_0___date_2026_07_01_00_00_00": "analysis"}, + {"analysis_foo_bar_3_0___date_2027_01_01_00_00_00": "analysis"}, + {"analysis_foo_bar_3_0___date_2027_07_01_00_00_00": "analysis"}, + { + "analysis_foo_bar_date_2026_01_01_00_00_00": "analysis_foo_bar_date_2026_01_01_00_00_00", + "analysis_foo_bar_date_2026_07_01_00_00_00": "analysis_foo_bar_date_2026_07_01_00_00_00", + }, + { + "analysis_foo_bar_date_2027_01_01_00_00_00": "analysis_foo_bar_date_2027_01_01_00_00_00", + "analysis_foo_bar_date_2027_07_01_00_00_00": "analysis_foo_bar_date_2027_07_01_00_00_00", + }, + ] + + expected_arguments_list = [ + "--restart --init {initial_conditions} --forcing {forcing}", + "--restart --init {initial_conditions} --forcing {forcing}", + "--restart {icon_restart_foo_0___bar_3_0___date_2026_01_01_00_00_00} --init " "--forcing {forcing}", + "--restart {icon_restart_foo_1___bar_3_0___date_2026_01_01_00_00_00} --init " "--forcing {forcing}", + "--restart {icon_restart_foo_0___bar_3_0___date_2026_07_01_00_00_00} --init " "--forcing {forcing}", + "--restart {icon_restart_foo_1___bar_3_0___date_2026_07_01_00_00_00} --init " "--forcing {forcing}", + "--restart {icon_restart_foo_0___bar_3_0___date_2027_01_01_00_00_00} --init " "--forcing {forcing}", + "--restart {icon_restart_foo_1___bar_3_0___date_2027_01_01_00_00_00} --init " "--forcing {forcing}", + "{icon_output_foo_0___bar_3_0___date_2026_01_01_00_00_00} " + "{icon_output_foo_1___bar_3_0___date_2026_01_01_00_00_00}", + "{icon_output_foo_0___bar_3_0___date_2026_07_01_00_00_00} " + "{icon_output_foo_1___bar_3_0___date_2026_07_01_00_00_00}", + "{icon_output_foo_0___bar_3_0___date_2027_01_01_00_00_00} " + "{icon_output_foo_1___bar_3_0___date_2027_01_01_00_00_00}", + "{icon_output_foo_0___bar_3_0___date_2027_07_01_00_00_00} " + "{icon_output_foo_1___bar_3_0___date_2027_07_01_00_00_00}", + "{analysis_foo_bar_3_0___date_2026_01_01_00_00_00}", + "{analysis_foo_bar_3_0___date_2026_07_01_00_00_00}", + "{analysis_foo_bar_3_0___date_2027_01_01_00_00_00}", + "{analysis_foo_bar_3_0___date_2027_07_01_00_00_00}", + "{analysis_foo_bar_date_2026_01_01_00_00_00} " "{analysis_foo_bar_date_2026_07_01_00_00_00}", + "{analysis_foo_bar_date_2027_01_01_00_00_00} " "{analysis_foo_bar_date_2027_07_01_00_00_00}", + ] + + expected_nodes_list = [ + ["initial_conditions", "forcing"], + ["initial_conditions", "forcing"], + ["icon_restart_foo_0___bar_3_0___date_2026_01_01_00_00_00", "forcing"], + ["icon_restart_foo_1___bar_3_0___date_2026_01_01_00_00_00", "forcing"], + ["icon_restart_foo_0___bar_3_0___date_2026_07_01_00_00_00", "forcing"], + ["icon_restart_foo_1___bar_3_0___date_2026_07_01_00_00_00", "forcing"], + ["icon_restart_foo_0___bar_3_0___date_2027_01_01_00_00_00", "forcing"], + ["icon_restart_foo_1___bar_3_0___date_2027_01_01_00_00_00", "forcing"], + [ + "icon_output_foo_0___bar_3_0___date_2026_01_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2026_01_01_00_00_00", + ], + [ + "icon_output_foo_0___bar_3_0___date_2026_07_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2026_07_01_00_00_00", + ], + [ + "icon_output_foo_0___bar_3_0___date_2027_01_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2027_01_01_00_00_00", + ], + [ + "icon_output_foo_0___bar_3_0___date_2027_07_01_00_00_00", + "icon_output_foo_1___bar_3_0___date_2027_07_01_00_00_00", + ], + ["analysis_foo_bar_3_0___date_2026_01_01_00_00_00"], + ["analysis_foo_bar_3_0___date_2026_07_01_00_00_00"], + ["analysis_foo_bar_3_0___date_2027_01_01_00_00_00"], + ["analysis_foo_bar_3_0___date_2027_07_01_00_00_00"], + [ + "analysis_foo_bar_date_2026_01_01_00_00_00", + "analysis_foo_bar_date_2026_07_01_00_00_00", + ], + [ + "analysis_foo_bar_date_2027_01_01_00_00_00", + "analysis_foo_bar_date_2027_07_01_00_00_00", + ], + ] + + assert arguments_list == expected_arguments_list + assert filenames_list == expected_filenames_list + assert nodes_list == expected_nodes_list + + # PRCOMMENT: Introduce this once we can automatically create the codes in a reasonable way. + # Currently, it still fails... + # output_node = aiida_wf.run() + # assert ( + # output_node.is_finished_ok + # ), f"Not successful run. Got exit code {output_node.exit_code} with message {output_node.exit_message}."