diff --git a/pydra/engine/boutiques.py b/pydra/engine/boutiques.py new file mode 100644 index 0000000000..af7cb549a8 --- /dev/null +++ b/pydra/engine/boutiques.py @@ -0,0 +1,215 @@ +import typing as ty +import json +import attr +from urllib.request import urlretrieve +from pathlib import Path +from functools import reduce + +from ..utils.messenger import AuditFlag +from ..engine import ShellCommandTask +from ..engine.specs import SpecInfo, ShellSpec, ShellOutSpec, File, attr_fields +from .helpers_file import is_local_file + + +class BoshTask(ShellCommandTask): + """Shell Command Task based on the Boutiques descriptor""" + + def __init__( + self, + zenodo_id=None, + bosh_file=None, + audit_flags: AuditFlag = AuditFlag.NONE, + cache_dir=None, + input_spec_names: ty.Optional[ty.List] = None, + messenger_args=None, + messengers=None, + name=None, + output_spec_names: ty.Optional[ty.List] = None, + rerun=False, + strip=False, + **kwargs, + ): + """ + Initialize this task. + + Parameters + ---------- + zenodo_id: :obj: str + Zenodo ID + bosh_file : : str + json file with the boutiques descriptors + audit_flags : :obj:`pydra.utils.messenger.AuditFlag` + Auditing configuration + cache_dir : :obj:`os.pathlike` + Cache directory + input_spec_names : :obj: list + Input names for input_spec. + messenger_args : + TODO + messengers : + TODO + name : :obj:`str` + Name of this task. + output_spec_names : :obj: list + Output names for output_spec. + strip : :obj:`bool` + TODO + + """ + self.cache_dir = cache_dir + if (bosh_file and zenodo_id) or not (bosh_file or zenodo_id): + raise Exception("either bosh or zenodo_id has to be specified") + elif zenodo_id: + self.bosh_file = self._download_spec(zenodo_id) + else: # bosh_file + self.bosh_file = bosh_file + + with self.bosh_file.open() as f: + self.bosh_spec = json.load(f) + + self.input_spec = self._prepare_input_spec(names_subset=input_spec_names) + self.output_spec = self._prepare_output_spec(names_subset=output_spec_names) + self.bindings = ["-v", f"{self.bosh_file.parent}:{self.bosh_file.parent}:ro"] + + super(BoshTask, self).__init__( + name=name, + input_spec=self.input_spec, + output_spec=self.output_spec, + executable=["bosh", "exec", "launch"], + args=["-s"], + audit_flags=audit_flags, + messengers=messengers, + messenger_args=messenger_args, + cache_dir=self.cache_dir, + strip=strip, + rerun=rerun, + **kwargs, + ) + self.strip = strip + + def _download_spec(self, zenodo_id): + """ + usind boutiques Searcher to find url of zenodo file for a specific id, + and download the file to self.cache_dir + """ + from boutiques.searcher import Searcher + + searcher = Searcher(zenodo_id, exact_match=True) + hits = searcher.zenodo_search().json()["hits"]["hits"] + if len(hits) == 0: + raise Exception(f"can't find zenodo spec for {zenodo_id}") + elif len(hits) > 1: + raise Exception(f"too many hits for {zenodo_id}") + else: + zenodo_url = hits[0]["files"][0]["links"]["self"] + zenodo_file = self.cache_dir / f"zenodo.{zenodo_id}.json" + urlretrieve(zenodo_url, zenodo_file) + return zenodo_file + + def _prepare_input_spec(self, names_subset=None): + """ creating input spec from the zenodo file + if name_subset provided, only names from the subset will be used in the spec + """ + binputs = self.bosh_spec["inputs"] + self._input_spec_keys = {} + fields = [] + for input in binputs: + name = input["id"] + if names_subset is None: + pass + elif name not in names_subset: + continue + else: + names_subset.remove(name) + if input["type"] == "File": + tp = File + elif input["type"] == "String": + tp = str + elif input["type"] == "Number": + tp = float + elif input["type"] == "Flag": + tp = bool + else: + tp = None + # adding list + if tp and "list" in input and input["list"]: + tp = ty.List[tp] + + mdata = { + "help_string": input.get("description", None) or input["name"], + "mandatory": not input["optional"], + "argstr": input.get("command-line-flag", None), + } + fields.append((name, tp, mdata)) + self._input_spec_keys[input["value-key"]] = "{" + f"{name}" + "}" + if names_subset: + raise RuntimeError(f"{names_subset} are not in the zenodo input spec") + spec = SpecInfo(name="Inputs", fields=fields, bases=(ShellSpec,)) + return spec + + def _prepare_output_spec(self, names_subset=None): + """ creating output spec from the zenodo file + if name_subset provided, only names from the subset will be used in the spec + """ + boutputs = self.bosh_spec["output-files"] + fields = [] + for output in boutputs: + name = output["id"] + if names_subset is None: + pass + elif name not in names_subset: + continue + else: + names_subset.remove(name) + path_template = reduce( + lambda s, r: s.replace(*r), + self._input_spec_keys.items(), + output["path-template"], + ) + mdata = { + "help_string": output.get("description", None) or output["name"], + "mandatory": not output["optional"], + "output_file_template": path_template, + } + fields.append((name, attr.ib(type=File, metadata=mdata))) + + if names_subset: + raise RuntimeError(f"{names_subset} are not in the zenodo output spec") + spec = SpecInfo(name="Outputs", fields=fields, bases=(ShellOutSpec,)) + return spec + + def _command_args_single(self, state_ind, ind=None): + """Get command line arguments for a single state""" + input_filepath = self._bosh_invocation_file(state_ind=state_ind, ind=ind) + cmd_list = ( + self.inputs.executable + + [str(self.bosh_file), input_filepath] + + self.inputs.args + + self.bindings + ) + return cmd_list + + def _bosh_invocation_file(self, state_ind, ind=None): + """creating bosh invocation file - json file with inputs values""" + input_json = {} + for f in attr_fields(self.inputs): + if f.name in ["executable", "args"]: + continue + if self.state and f"{self.name}.{f.name}" in state_ind: + value = getattr(self.inputs, f.name)[state_ind[f"{self.name}.{f.name}"]] + else: + value = getattr(self.inputs, f.name) + # adding to the json file if specified by the user + if value is not attr.NOTHING and value != "NOTHING": + if is_local_file(f): + value = Path(value) + self.bindings.extend(["-v", f"{value.parent}:{value.parent}:ro"]) + value = str(value) + + input_json[f.name] = value + + filename = self.cache_dir / f"{self.name}-{ind}.json" + with open(filename, "w") as jsonfile: + json.dump(input_json, jsonfile) + + return str(filename) diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 70a8364467..1b34428a13 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -311,9 +311,8 @@ def collect_additional_outputs(self, input_spec, inputs, output_dir): if fld.type is File: # assuming that field should have either default or metadata, but not both if ( - not (fld.default is None or fld.default == attr.NOTHING) - and fld.metadata - ): + fld.default is None or fld.default == attr.NOTHING + ) and not fld.metadata: # TODO: is it right? raise Exception("File has to have default value or metadata") elif not fld.default == attr.NOTHING: additional_out[fld.name] = self._field_defaultvalue( @@ -360,9 +359,23 @@ def _field_metadata(self, fld, inputs, output_dir): if "value" in fld.metadata: return output_dir / fld.metadata["value"] elif "output_file_template" in fld.metadata: - return output_dir / fld.metadata["output_file_template"].format( - **inputs.__dict__ + sfx_tmpl = (output_dir / fld.metadata["output_file_template"]).suffixes + if sfx_tmpl: + # removing suffix from input field if template has it's own suffix + inputs_templ = { + k: v.split(".")[0] + for k, v in inputs.__dict__.items() + if isinstance(v, str) + } + else: + inputs_templ = { + k: v for k, v in inputs.__dict__.items() if isinstance(v, str) + } + out_path = output_dir / fld.metadata["output_file_template"].format( + **inputs_templ ) + return out_path + elif "callable" in fld.metadata: return fld.metadata["callable"](fld.name, output_dir) else: diff --git a/pydra/engine/task.py b/pydra/engine/task.py index aa7fff5aed..8cb088a23d 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -399,13 +399,16 @@ def _run_task(self): else: args = self.command_args if args: - # removing emty strings + # removing empty strings args = [str(el) for el in args if el not in ["", " "]] keys = ["return_code", "stdout", "stderr"] values = execute(args, strip=self.strip) self.output_ = dict(zip(keys, values)) if self.output_["return_code"]: - raise RuntimeError(self.output_["stderr"]) + if self.output_["stderr"]: + raise RuntimeError(self.output_["stderr"]) + else: + raise RuntimeError(self.output_["stdout"]) class ContainerTask(ShellCommandTask): diff --git a/pydra/engine/tests/data_tests/test.nii.gz b/pydra/engine/tests/data_tests/test.nii.gz new file mode 100644 index 0000000000..1f8a74013b Binary files /dev/null and b/pydra/engine/tests/data_tests/test.nii.gz differ diff --git a/pydra/engine/tests/test_boutiques.py b/pydra/engine/tests/test_boutiques.py new file mode 100644 index 0000000000..fd04fb4531 --- /dev/null +++ b/pydra/engine/tests/test_boutiques.py @@ -0,0 +1,174 @@ +import os, shutil +import subprocess as sp +from pathlib import Path +import pytest + +from ..core import Workflow +from ..task import ShellCommandTask +from ..submitter import Submitter +from ..boutiques import BoshTask +from .utils import result_no_submitter, result_submitter, no_win + +need_bosh_docker = pytest.mark.skipif( + shutil.which("docker") is None + or sp.call(["docker", "info"] or sp.call(["bosh", "version"])), + reason="requires docker and bosh", +) + +if bool(shutil.which("sbatch")): + Plugins = ["cf", "slurm"] +else: + Plugins = ["cf"] + +Infile = Path(__file__).resolve().parent / "data_tests" / "test.nii.gz" + + +@no_win +@need_bosh_docker +@pytest.mark.flaky(reruns=2) +@pytest.mark.parametrize( + "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] +) +@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("plugin", Plugins) +def test_boutiques_1(maskfile, plugin, results_function): + """ simple task to run fsl.bet using BoshTask""" + btask = BoshTask(name="NA", zenodo_id="1482743") + btask.inputs.infile = Infile + btask.inputs.maskfile = maskfile + res = results_function(btask, plugin) + + assert res.output.return_code == 0 + + # checking if the outfile exists and if it has proper name + assert res.output.outfile.name == "test_brain.nii.gz" + assert res.output.outfile.exists() + # other files should also have proper names, but they do not exist + assert res.output.out_outskin_off.name == "test_brain_outskin_mesh.off" + assert not res.output.out_outskin_off.exists() + + +@no_win +@need_bosh_docker +def test_boutiques_spec_1(): + """ testing spec: providing input/output fields names""" + btask = BoshTask( + name="NA", + zenodo_id="1482743", + infile=Infile, + maskfile="test_brain.nii.gz", + input_spec_names=["infile", "maskfile"], + output_spec_names=["outfile", "out_outskin_off"], + ) + + assert len(btask.input_spec.fields) == 2 + assert btask.input_spec.fields[0][0] == "infile" + assert btask.input_spec.fields[1][0] == "maskfile" + assert hasattr(btask.inputs, "infile") + assert hasattr(btask.inputs, "maskfile") + + assert len(btask.output_spec.fields) == 2 + assert btask.output_spec.fields[0][0] == "outfile" + assert btask.output_spec.fields[1][0] == "out_outskin_off" + + +@no_win +@need_bosh_docker +def test_boutiques_spec_2(): + """ testing spec: providing partial input/output fields names""" + btask = BoshTask( + name="NA", + zenodo_id="1482743", + infile=Infile, + maskfile="test_brain.nii.gz", + input_spec_names=["infile"], + output_spec_names=[], + ) + + assert len(btask.input_spec.fields) == 1 + assert btask.input_spec.fields[0][0] == "infile" + assert hasattr(btask.inputs, "infile") + # input doesn't see maskfile + assert not hasattr(btask.inputs, "maskfile") + + assert len(btask.output_spec.fields) == 0 + + +@no_win +@need_bosh_docker +@pytest.mark.parametrize( + "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] +) +@pytest.mark.parametrize("plugin", Plugins) +def test_boutiques_wf_1(maskfile, plugin): + """ wf with one task that runs fsl.bet using BoshTask""" + wf = Workflow(name="wf", input_spec=["maskfile", "infile"]) + wf.inputs.maskfile = maskfile + wf.inputs.infile = Infile + + wf.add( + BoshTask( + name="bet", + zenodo_id="1482743", + infile=wf.lzin.infile, + maskfile=wf.lzin.maskfile, + ) + ) + + wf.set_output([("outfile", wf.bet.lzout.outfile)]) + + with Submitter(plugin=plugin) as sub: + wf(submitter=sub) + + res = wf.result() + assert res.output.outfile.name == "test_brain.nii.gz" + assert res.output.outfile.exists() + + +@no_win +@need_bosh_docker +@pytest.mark.parametrize( + "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] +) +@pytest.mark.parametrize("plugin", Plugins) +def test_boutiques_wf_2(maskfile, plugin): + """ wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask""" + wf = Workflow(name="wf", input_spec=["maskfile", "infile"]) + wf.inputs.maskfile = maskfile + wf.inputs.infile = Infile + + wf.add( + BoshTask( + name="bet", + zenodo_id="1482743", + infile=wf.lzin.infile, + maskfile=wf.lzin.maskfile, + ) + ) + wf.add( + BoshTask( + name="stat", zenodo_id="3240521", input_file=wf.bet.lzout.outfile, v=True + ) + ) + wf.add(ShellCommandTask(name="cat", executable="cat", args=wf.stat.lzout.output)) + + wf.set_output( + [ + ("outfile_bet", wf.bet.lzout.outfile), + ("out_stat", wf.stat.lzout.output), + ("out", wf.cat.lzout.stdout), + ] + ) + + with Submitter(plugin=plugin) as sub: + wf(submitter=sub) + + res = wf.result() + assert res.output.outfile_bet.name == "test_brain.nii.gz" + assert res.output.outfile_bet.exists() + + assert res.output.out_stat.name == "output.txt" + assert res.output.out_stat.exists() + + assert int(res.output.out.rstrip().split()[0]) == 11534336 + assert float(res.output.out.rstrip().split()[1]) == 11534336.0 diff --git a/pydra/engine/tests/test_dockertask.py b/pydra/engine/tests/test_dockertask.py index 73c7f778a0..081059a4fc 100644 --- a/pydra/engine/tests/test_dockertask.py +++ b/pydra/engine/tests/test_dockertask.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -import os, sys, shutil -import subprocess as sp +import os, shutil import pytest import attr @@ -9,21 +8,13 @@ from ..submitter import Submitter from ..core import Workflow from ..specs import ShellOutSpec, SpecInfo, File, DockerSpec +from .utils import no_win, need_docker if bool(shutil.which("sbatch")): Plugins = ["cf", "slurm"] else: Plugins = ["cf"] -need_docker = pytest.mark.skipif( - shutil.which("docker") is None or sp.call(["docker", "info"]), - reason="no docker within the container", -) -no_win = pytest.mark.skipif( - sys.platform.startswith("win"), - reason="docker command not adjusted for windows docker", -) - @no_win @need_docker diff --git a/pydra/engine/tests/test_shelltask.py b/pydra/engine/tests/test_shelltask.py index 671c65a31e..a3ae3663c3 100644 --- a/pydra/engine/tests/test_shelltask.py +++ b/pydra/engine/tests/test_shelltask.py @@ -11,6 +11,7 @@ from ..submitter import Submitter from ..core import Workflow from ..specs import ShellOutSpec, ShellSpec, SpecInfo, File +from .utils import result_no_submitter, result_submitter if sys.platform.startswith("win"): @@ -22,20 +23,6 @@ Plugins = ["cf"] -def result_no_submitter(shell_task, plugin=None): - """ helper function to return result when running without submitter """ - return shell_task() - - -def result_submitter(shell_task, plugin): - """ helper function to return result when running with submitter - with specific plugin - """ - with Submitter(plugin=plugin) as sub: - shell_task(submitter=sub) - return shell_task.result() - - @pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) @pytest.mark.parametrize("plugin", Plugins) def test_shell_cmd_1(plugin, results_function): diff --git a/pydra/engine/tests/utils.py b/pydra/engine/tests/utils.py index e4bec20e4e..247fe6ca70 100644 --- a/pydra/engine/tests/utils.py +++ b/pydra/engine/tests/utils.py @@ -1,13 +1,41 @@ # Tasks for testing import time +import sys, shutil import typing as tp from pathlib import Path +import subprocess as sp +import pytest from ..core import Workflow +from ..submitter import Submitter from ... import mark from ..specs import File +need_docker = pytest.mark.skipif( + shutil.which("docker") is None or sp.call(["docker", "info"]), + reason="no docker within the container", +) +no_win = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="docker command not adjusted for windows docker", +) + + +def result_no_submitter(shell_task, plugin=None): + """ helper function to return result when running without submitter """ + return shell_task() + + +def result_submitter(shell_task, plugin): + """ helper function to return result when running with submitter + with specific plugin + """ + with Submitter(plugin=plugin) as sub: + shell_task(submitter=sub) + return shell_task.result() + + @mark.task def fun_addtwo(a): import time diff --git a/setup.cfg b/setup.cfg index 88cff46654..2a06e56cf8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ test_requires = pytest-cov pytest-env pytest-xdist + pytest-rerunfailures codecov numpy psutil @@ -41,6 +42,7 @@ test_requires = notebook==5.7.8 jupyter jupyter_contrib_nbextensions + boutiques packages = find: include_package_data = True @@ -65,6 +67,7 @@ test = pytest-cov pytest-env pytest-xdist + pytest-rerunfailures codecov numpy pyld @@ -74,6 +77,7 @@ test = notebook==5.7.8 jupyter jupyter_contrib_nbextensions + boutiques tests = %(test)s dev =