-
Notifications
You must be signed in to change notification settings - Fork 5
Fix symlinking RemoteData nodes for remote-submission
#136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
847c96e
Actual implementation changes
GeigerJ2 dabeed4
Add specific tests for filenames argument.
GeigerJ2 a2ca536
Add checks for computer existence for filenames
GeigerJ2 ee3cbff
fix tests
GeigerJ2 996ea7b
Replace None key with src
GeigerJ2 2092627
Add expected arguments list for comparison.
GeigerJ2 48318a2
Verify with nodes
GeigerJ2 5ac98bf
Add minimal CLI interface using typer and rich
GeigerJ2 23eeaaa
Merge remote-tracking branch 'upstream/main' into remote-submission
GeigerJ2 7c9ed55
Merge remote-tracking branch 'upstream/main' into remote-submission
GeigerJ2 9cbac1d
Merge branch 'main' into remote-submission
GeigerJ2 5febd4f
Uncomment out previous implementation and duplicate test
GeigerJ2 00d2eb9
Merge in CLI for easier development
GeigerJ2 4be222f
Implementation seems to work now
GeigerJ2 84677c7
.
GeigerJ2 47e361d
.
GeigerJ2 7d84dd4
.
GeigerJ2 31268f0
.
GeigerJ2 221cfc9
.
GeigerJ2 a51ab9a
.
GeigerJ2 c149a10
.
GeigerJ2 80948e6
.
GeigerJ2 c230667
hatch fmt and types:check pass
GeigerJ2 b4ca8ff
.
GeigerJ2 4807db6
.
GeigerJ2 6a42fb8
.
GeigerJ2 3676b14
Allow and properly resolve relative AvailableData src on localhost to…
GeigerJ2 5237cdf
.
GeigerJ2 7c3bd68
.
GeigerJ2 a565fb1
.
GeigerJ2 83aea88
Remove erroneously introduced output renaming.
GeigerJ2 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,15 @@ def _execute(self, engine_process, args=None, kwargs=None, var_kwargs=None): # | |
| # Workaround starts here | ||
| # This part is part of the workaround. We need to manually add the outputs from the task. | ||
| # Because kwargs are not populated with outputs | ||
| default_outputs = {"remote_folder", "remote_stash", "retrieved", "_outputs", "_wait", "stdout", "stderr"} | ||
| default_outputs = { | ||
| "remote_folder", | ||
| "remote_stash", | ||
| "retrieved", | ||
| "_outputs", | ||
| "_wait", | ||
| "stdout", | ||
| "stderr", | ||
| } | ||
| task_outputs = set(self.outputs._sockets.keys()) # noqa SLF001 # there so public accessor | ||
| task_outputs = task_outputs.union(set(inputs.pop("outputs", []))) | ||
| missing_outputs = task_outputs.difference(default_outputs) | ||
|
|
@@ -97,6 +105,7 @@ def __init__(self, core_workflow: core.Workflow): | |
| for task in self._core_workflow.tasks: | ||
| if isinstance(task, core.ShellTask): | ||
| self._set_shelljob_arguments(task) | ||
| self._set_shelljob_filenames(task) | ||
|
|
||
| # link wait on to workgraph tasks | ||
| for task in self._core_workflow.tasks: | ||
|
|
@@ -184,7 +193,16 @@ def _add_aiida_input_data_node(self, data: core.Data): | |
| except NotExistent as err: | ||
| msg = f"Could not find computer {data.computer!r} for input {data}." | ||
| raise ValueError(msg) from err | ||
| self._aiida_data_nodes[label] = aiida.orm.RemoteData(remote_path=data.src, label=label, computer=computer) | ||
| # `remote_path` must be str not PosixPath to be JSON-serializable | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a bug in the code before this PR that surfaced when actually submitting. |
||
| # PRCOMMENT: Hack for now to make the tests pass | ||
| if computer.label == "localhost": | ||
| self._aiida_data_nodes[label] = aiida.orm.RemoteData( | ||
| remote_path=str(data_full_path), label=label, computer=computer | ||
| ) | ||
| else: | ||
| self._aiida_data_nodes[label] = aiida.orm.RemoteData( | ||
| remote_path=str(data.src), label=label, computer=computer | ||
| ) | ||
| elif data.type == "file": | ||
| self._aiida_data_nodes[label] = aiida.orm.SinglefileData(label=label, file=data_full_path) | ||
| elif data.type == "dir": | ||
|
|
@@ -229,6 +247,8 @@ def _create_shell_task_node(self, task: core.ShellTask): | |
| ] | ||
| prepend_text = "\n".join([f"source {env_source_path}" for env_source_path in env_source_paths]) | ||
| metadata["options"] = {"prepend_text": prepend_text} | ||
| # NOTE: Hardcoded for now, possibly make user-facing option | ||
| metadata["options"]["use_symlinks"] = True | ||
|
|
||
| ## computer | ||
| if task.computer is not None: | ||
|
|
@@ -283,7 +303,10 @@ def _link_input_node_to_shelltask(self, task: core.ShellTask, input_: core.Data) | |
| socket = getattr(workgraph_task.inputs.nodes, f"{input_label}") | ||
| socket.value = self.data_from_core(input_) | ||
| elif isinstance(input_, core.GeneratedData): | ||
| self._workgraph.add_link(self.socket_from_core(input_), workgraph_task.inputs[f"nodes.{input_label}"]) | ||
| self._workgraph.add_link( | ||
| self.socket_from_core(input_), | ||
| workgraph_task.inputs[f"nodes.{input_label}"], | ||
| ) | ||
| else: | ||
| raise TypeError | ||
|
|
||
|
|
@@ -293,21 +316,67 @@ def _link_wait_on_to_task(self, task: core.Task): | |
| self.task_from_core(task).wait = [self.task_from_core(wt) for wt in task.wait_on] | ||
|
|
||
| def _set_shelljob_arguments(self, task: core.ShellTask): | ||
| """set AiiDA ShellJob arguments by replacing port placeholders by aiida labels""" | ||
|
|
||
| """Set AiiDA ShellJob arguments by replacing port placeholders with AiiDA labels.""" | ||
| workgraph_task = self.task_from_core(task) | ||
| workgraph_task_arguments: SocketAny = workgraph_task.inputs.arguments | ||
|
|
||
| if workgraph_task_arguments is None: | ||
| msg = ( | ||
| f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph " | ||
| f"before linking. This is a bug in the code, please contact developers." | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| input_labels = {port: list(map(self.label_placeholder, task.inputs[port])) for port in task.inputs} | ||
| # Build input_labels dictionary for port resolution | ||
| input_labels: dict[str, list[str]] = {} | ||
| for port_name, input_list in task.inputs.items(): | ||
| input_labels[port_name] = [] | ||
| for input_ in input_list: | ||
| # Use the full AiiDA label as the placeholder content | ||
| input_label = self.get_aiida_label_from_graph_item(input_) | ||
| input_labels[port_name].append(f"{{{input_label}}}") | ||
|
|
||
| # Resolve the command with port placeholders replaced by input labels | ||
| _, arguments = self.split_cmd_arg(task.resolve_ports(input_labels)) | ||
| workgraph_task_arguments.value = arguments | ||
|
|
||
| def _set_shelljob_filenames(self, task: core.ShellTask): | ||
| """Set AiiDA ShellJob filenames for data entities, including parameterized data.""" | ||
| filenames = {} | ||
| workgraph_task = self.task_from_core(task) | ||
|
|
||
| if not workgraph_task.inputs.filenames: | ||
| return | ||
|
|
||
| # Handle input files | ||
| for input_ in task.input_data_nodes(): | ||
| input_label = self.get_aiida_label_from_graph_item(input_) | ||
|
|
||
| if task.computer and input_.computer and isinstance(input_, core.AvailableData): | ||
| # For RemoteData on the same computer, use just the filename | ||
| filename = Path(input_.src).name | ||
| filenames[input_.name] = filename | ||
| else: | ||
| # For other cases (including GeneratedData), we need to handle parameterized data | ||
| # Importantly, multiple data nodes with the same base name but different | ||
| # coordinates need unique filenames to avoid conflicts in the working directory | ||
|
|
||
| # Count how many inputs have the same base name | ||
| same_name_count = sum(1 for inp in task.input_data_nodes() if inp.name == input_.name) | ||
|
|
||
| if same_name_count > 1: | ||
| # Multiple data nodes with same base name - use full label as filename | ||
| # to ensure uniqueness in working directory | ||
| filename = input_label | ||
| else: | ||
| # Single data node with this name - can use simple filename | ||
| filename = Path(input_.src).name if hasattr(input_, "src") else input_.name | ||
|
|
||
| # The key in filenames dict should be the input label (what's used in nodes dict) | ||
| filenames[input_label] = filename | ||
|
|
||
| workgraph_task.inputs.filenames.value = filenames | ||
|
|
||
| def run( | ||
| self, | ||
| inputs: None | dict[str, Any] = None, | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-sdisables output capturing, and allows for setting breakpoints in test code