From 67e3deeb960889cbf2d1f0e540afe97e5ef0e3bc Mon Sep 17 00:00:00 2001 From: valhayot Date: Sat, 29 Feb 2020 13:22:31 -0500 Subject: [PATCH 1/4] added a pydra wrapper for boutiques tools --- pydra/engine/task.py | 139 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/pydra/engine/task.py b/pydra/engine/task.py index 4872445a0c..9eb35d6d34 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -209,6 +209,145 @@ def _run_task(self): self.output_[output_names[0]] = output +class BoutiquesTask(FunctionTask): + """Wrap a Boutiques callable as a task element.""" + + global boutiques_func + def boutiques_func(descriptor, args, **kwargs): + from boutiques.descriptor2func import function + + tool = function(descriptor) + ret = tool(*args, **kwargs) + + # print formatted output + print(ret) + + if ret.exit_code: + raise RuntimeError(ret.stderr) + + return [out.file_name for out in ret.output_files] + + def __init__( + self, + descriptor: ty.Text, + audit_flags: AuditFlag = AuditFlag.NONE, + cache_dir=None, + cache_locations=None, + input_spec: ty.Optional[SpecInfo] = None, + messenger_args=None, + messengers=None, + name=None, + output_spec: ty.Optional[BaseSpec] = None, + rerun=False, + bosh_args=None, + **kwargs, + ): + """ + Initialize this task. + + Parameters + ---------- + descriptor : :obj:`str` + The filename or zenodo ID of the boutiques descriptor + audit_flags: :obj:`pydra.utils.messenger.AuditFlag` + Auditing configurations + cache_dir : :obj:`os.pathlike` + Cache directory + cache_locations : :obj:`list` of :obj:`os.pathlike` + List of alternative cache locations. + input_spec: :obj:`pydra.engine.specs.SpecInfo` + Specification of inputs. + messenger_args : + TODO + messengers : + TODO + name : :obj:`str` + Name of this task. + output_spec : :obj:`pydra.engine.specs.BaseSpec` + Specification of inputs. + bosh_args : :object:`list` of :obj:`str` + List of arguments to pass to Boutiques + + """ + from boutiques.descriptor2func import function + from types import FunctionType + + func = boutiques_func + self.func = func + self.descriptor = descriptor + self.bosh_args = bosh_args + + if input_spec is None: + func_params = [val for val in inspect.signature(func).parameters.values() if val.name != 'kwargs'] + func_params += kwargs.keys() + input_spec = SpecInfo( + name="Inputs", + fields=[ + ( + val.name, + attr.ib( + default=val.default, + type=val.annotation, + metadata={ + "help_string": f"{val.name} parameter from {func.__name__}" + }, + ), + ) + if hasattr(val, 'default') + else ( + val, + attr.ib( + type=type(kwargs[val]), metadata={"help_string": val} + ), + ) + for val in func_params + ] + + [("_func", attr.ib(default=cp.dumps(func), type=str))], + bases=(BaseSpec,), + ) + + fmt_kwargs={ "descriptor": descriptor, "args": bosh_args } + fmt_kwargs.update(kwargs) + + super(BoutiquesTask, self).__init__( + func, + name=name, + audit_flags=audit_flags, + messengers=messengers, + messenger_args=messenger_args, + cache_dir=cache_dir, + cache_locations=cache_locations, + input_spec=input_spec, + rerun=rerun, + **fmt_kwargs, + ) + + if output_spec is None: + output_spec = SpecInfo(name="Output", fields=[("out", ty.Any)], bases=(BaseSpec,)) + self.output_spec = output_spec + + def _run_task(self): + + inputs = attr.asdict(self.inputs) + del inputs["_func"] + self.output_ = None + + output = cp.loads(self.inputs._func)(**inputs) + if output is not None: + output_names = [el[0] for el in self.output_spec.fields] + self.output_ = {} + if len(output_names) > 1: + if len(output_names) == len(output): + self.output_ = dict(zip(output_names, output)) + else: + raise Exception( + f"expected {len(self.output_spec.fields)} elements, " + f"but {len(output)} were returned" + ) + else: + self.output_[output_names[0]] = output + + class ShellCommandTask(TaskBase): """Wrap a shell command as a task element.""" From c7e0d295ff42bf4e5c66adb83f93278a3341d48c Mon Sep 17 00:00:00 2001 From: valhayot Date: Sat, 29 Feb 2020 13:24:51 -0500 Subject: [PATCH 2/4] removed some unused imports --- pydra/engine/task.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pydra/engine/task.py b/pydra/engine/task.py index 9eb35d6d34..39a4398a8c 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -269,8 +269,6 @@ def __init__( List of arguments to pass to Boutiques """ - from boutiques.descriptor2func import function - from types import FunctionType func = boutiques_func self.func = func From 5a5bfeeab8dc70b3e0f2d524a989ccd52528acc6 Mon Sep 17 00:00:00 2001 From: valhayot Date: Sun, 1 Mar 2020 12:53:03 -0500 Subject: [PATCH 3/4] some reformating and modifications to how input and output specs are handled --- pydra/engine/task.py | 94 ++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/pydra/engine/task.py b/pydra/engine/task.py index 39a4398a8c..faff441588 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -213,6 +213,7 @@ class BoutiquesTask(FunctionTask): """Wrap a Boutiques callable as a task element.""" global boutiques_func + def boutiques_func(descriptor, args, **kwargs): from boutiques.descriptor2func import function @@ -225,7 +226,7 @@ def boutiques_func(descriptor, args, **kwargs): if ret.exit_code: raise RuntimeError(ret.stderr) - return [out.file_name for out in ret.output_files] + return ret.output_files def __init__( self, @@ -272,56 +273,61 @@ def __init__( func = boutiques_func self.func = func - self.descriptor = descriptor - self.bosh_args = bosh_args + default_fields = [ + ( + val.name, + attr.ib( + default=val.default, + type=val.annotation, + metadata={ + "help_string": f"{val.name} parameter from {func.__name__}" + }, + ), + ) + if val.default is not inspect.Signature.empty + else ( + val.name, + attr.ib(type=val.annotation, metadata={"help_string": val.name}), + ) + for val in inspect.signature(func).parameters.values() + if val.name != "kwargs" + ] + + # Adding kwargs here because Boutiques also validates inputs if input_spec is None: - func_params = [val for val in inspect.signature(func).parameters.values() if val.name != 'kwargs'] - func_params += kwargs.keys() input_spec = SpecInfo( name="Inputs", fields=[ - ( - val.name, - attr.ib( - default=val.default, - type=val.annotation, - metadata={ - "help_string": f"{val.name} parameter from {func.__name__}" - }, - ), - ) - if hasattr(val, 'default') - else ( - val, - attr.ib( - type=type(kwargs[val]), metadata={"help_string": val} - ), - ) - for val in func_params - ] - + [("_func", attr.ib(default=cp.dumps(func), type=str))], + (k, attr.ib(type=type(v), metadata={"help_string": k}),) + for k, v in kwargs.items() + ], bases=(BaseSpec,), ) - fmt_kwargs={ "descriptor": descriptor, "args": bosh_args } + # users shouldn't have to add "descriptor" and "args" in their input_spec + input_spec.fields.extend(default_fields) + + fmt_kwargs = {"descriptor": descriptor, "args": bosh_args} fmt_kwargs.update(kwargs) super(BoutiquesTask, self).__init__( - func, - name=name, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - cache_locations=cache_locations, - input_spec=input_spec, - rerun=rerun, - **fmt_kwargs, + func, + name=name, + audit_flags=audit_flags, + messengers=messengers, + messenger_args=messenger_args, + cache_dir=cache_dir, + cache_locations=cache_locations, + input_spec=input_spec, + rerun=rerun, + **fmt_kwargs, ) if output_spec is None: - output_spec = SpecInfo(name="Output", fields=[("out", ty.Any)], bases=(BaseSpec,)) + output_spec = SpecInfo( + name="Output", fields=[("out", ty.Any)], bases=(BaseSpec,) + ) self.output_spec = output_spec def _run_task(self): @@ -334,16 +340,20 @@ def _run_task(self): if output is not None: output_names = [el[0] for el in self.output_spec.fields] self.output_ = {} - if len(output_names) > 1: - if len(output_names) == len(output): - self.output_ = dict(zip(output_names, output)) - else: + if len(output_names) > 1 or output_names[0] != "out": + self.output_ = { + f.boutiques_name: f.file_name + for f in output + if f.boutiques_name in output_names + } + if len(output_names) != len(self.output_.keys()): raise Exception( f"expected {len(self.output_spec.fields)} elements, " f"but {len(output)} were returned" ) else: - self.output_[output_names[0]] = output + # Return all filenames if not specified by the user what to return + self.output_[output_names[0]] = [out.file_name for out in output] class ShellCommandTask(TaskBase): From b2740f7e7c7b3023e2862f617966b17d9965e30a Mon Sep 17 00:00:00 2001 From: valhayot Date: Sun, 1 Mar 2020 13:36:43 -0500 Subject: [PATCH 4/4] changed bosh_args default value --- pydra/engine/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/task.py b/pydra/engine/task.py index faff441588..03b8963414 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -240,7 +240,7 @@ def __init__( name=None, output_spec: ty.Optional[BaseSpec] = None, rerun=False, - bosh_args=None, + bosh_args=[], **kwargs, ): """