Skip to content

Commit

Permalink
names remapping: avoid clashes by using different dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
simleo committed Jul 17, 2023
1 parent 57e7958 commit fc20b61
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/runcrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def cli():
)
@click.option(
"--remap-names",
help="remap file/dir names to the original ones (MAY LEAD TO CLASHES!)",
help="remap file/dir names to the original ones",
is_flag=True
)
def convert(root, output, license, workflow_name, readme, remap_names):
Expand Down
28 changes: 18 additions & 10 deletions src/runcrate/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"null": None,
}

SCATTER_JOB_PATTERN = re.compile(r"^(.+)_\d+$")
SCATTER_JOB_PATTERN = re.compile(r"^(.+)_(\d+)$")

CWLPROV_NONE = "https://w3id.org/cwl/prov#None"

Expand Down Expand Up @@ -215,6 +215,7 @@ def __init__(self, root, workflow_name=None, license=None, readme=None,
self.file_map = {}
self.manifest = self._get_manifest()
self.remap_names = remap_names
self.data_root = "data"

@staticmethod
def _get_step_maps(cwl_defs):
Expand All @@ -240,11 +241,13 @@ def _get_manifest(self):
def _resolve_plan(self, activity):
job_qname = activity.plan()
plan = activity.provenance.entity(job_qname)
scatter_id = None
if not plan:
m = SCATTER_JOB_PATTERN.match(str(job_qname))
if m:
plan = activity.provenance.entity(m.groups()[0])
return plan
scatter_id = m.groups()[1]
return plan, scatter_id

def _get_hash(self, prov_param):
k = prov_param.id.localpart
Expand Down Expand Up @@ -463,9 +466,11 @@ def add_action(self, crate, activity, parent_instrument=None):
"@type": "CreateAction",
"name": activity.label,
}))
plan = self._resolve_plan(activity)
plan, scatter_id = self._resolve_plan(activity)
plan_tag = plan.id.localpart
dest_base = Path(self.data_root)
if plan_tag == "main":
dest_base = dest_base / "main"
assert str(activity.type) == "wfprov:WorkflowRun"
instrument = workflow
self.roc_engine_run["result"] = action
Expand All @@ -480,6 +485,7 @@ def to_wf_p(k):
if parts[0] == "main":
parts[0] = parent_instrument_fragment
plan_tag = "/".join(parts)
dest_base = dest_base / (f"{plan_tag}_{scatter_id}" if scatter_id else f"{plan_tag}")
tool_name = self.step_maps[parent_instrument_fragment][plan_tag]["tool"]
instrument = crate.dereference(f"{workflow.id}#{tool_name}")
control_action = self.control_actions.get(plan_tag)
Expand All @@ -503,12 +509,14 @@ def to_wf_p(k):
action["instrument"] = instrument
action["startTime"] = activity.start().time.isoformat()
action["endTime"] = activity.end().time.isoformat()
action["object"] = self.add_action_params(crate, activity, to_wf_p, "usage")
action["result"] = self.add_action_params(crate, activity, to_wf_p, "generation")
action["object"] = self.add_action_params(crate, activity, to_wf_p, "usage",
dest_base / "in" if self.remap_names else "")
action["result"] = self.add_action_params(crate, activity, to_wf_p, "generation",
dest_base / "out" if self.remap_names else "")
for job in activity.steps():
self.add_action(crate, job, parent_instrument=instrument)

def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
def add_action_params(self, crate, activity, to_wf_p, ptype="usage", dest_base=""):
action_params = []
all_roles = set()
for rel in getattr(activity, ptype)():
Expand All @@ -528,7 +536,7 @@ def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
wf_p = crate.dereference(to_wf_p(k))
k = get_fragment(k)
v = rel.entity()
value = self.convert_param(v, crate)
value = self.convert_param(v, crate, dest_base=dest_base)
if value is None:
continue # param is optional with no default and was not set
if {"ro:Folder", "wf4ever:File"} & set(str(_) for _ in v.types()):
Expand Down Expand Up @@ -565,7 +573,7 @@ def _set_alternate_name(prov_param, action_p, parent=None):
if "alternateName" in parent:
action_p["alternateName"] = (Path(parent["alternateName"]) / basename).as_posix()

def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
def convert_param(self, prov_param, crate, convert_secondary=True, parent=None, dest_base=""):
type_names = frozenset(str(_) for _ in prov_param.types())
secondary_files = [_.generated_entity() for _ in prov_param.derivations()
if str(_.type) == "cwlprov:SecondaryFile"]
Expand All @@ -589,7 +597,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
basename = getattr(prov_param, "basename", hash_)
else:
basename = hash_
dest = Path(parent.id if parent else "") / basename
dest = Path(parent.id if parent else dest_base) / basename
action_p = crate.dereference(dest.as_posix())
if not action_p:
source = self.manifest[hash_]
Expand All @@ -610,7 +618,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
basename = getattr(prov_param, "basename", hash_)
else:
basename = hash_
dest = Path(parent.id if parent else "") / basename
dest = Path(parent.id if parent else dest_base) / basename
action_p = crate.dereference(dest.as_posix())
if not action_p:
action_p = crate.add_directory(dest_path=dest)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def test_cli_convert_remap_names(data_dir, tmpdir):
args = ["convert", str(root), "-o", str(crate_dir), "--remap-names"]
assert runner.invoke(cli, args).exit_code == 0
crate = ROCrate(crate_dir)
assert crate.get("grepucase_in/")
assert (crate_dir / "grepucase_in").is_dir()
assert crate.get("data/main/in/grepucase_in/")
assert (crate_dir / "data" / "main" / "in" / "grepucase_in").is_dir()


def test_cli_report_provenance_minimal(data_dir, caplog):
Expand Down
61 changes: 43 additions & 18 deletions tests/test_cwlprov_crate_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,48 +1174,73 @@ def test_remap_names(data_dir, tmpdir):
assert len(wf_objects) == 2
assert len(wf_results) == 1
wf_objects_map = {_.id: _ for _ in wf_objects}
wf_input_dir = wf_objects_map.get("grepucase_in/")
wf_input_dir = wf_objects_map.get("data/main/in/grepucase_in/")
assert wf_input_dir
wf_output_dir = wf_results[0]
assert wf_output_dir.id == "ucase_out/"
assert wf_output_dir.id == "data/main/out/ucase_out/"
assert set(_.id for _ in wf_input_dir["hasPart"]) == {
"grepucase_in/bar", "grepucase_in/foo"
"data/main/in/grepucase_in/bar", "data/main/in/grepucase_in/foo"
}
assert set(_.id for _ in wf_output_dir["hasPart"]) == {
"ucase_out/bar.out/", "ucase_out/foo.out/"
"data/main/out/ucase_out/bar.out/", "data/main/out/ucase_out/foo.out/"
}
for d in wf_output_dir["hasPart"]:
if d.id == "ucase_out/bar.out/":
assert d["hasPart"][0].id == "ucase_out/bar.out/bar.out.out"
if d.id == "data/main/out/ucase_out/bar.out/":
assert d["hasPart"][0].id == "data/main/out/ucase_out/bar.out/bar.out.out"
else:
assert d["hasPart"][0].id == "ucase_out/foo.out/foo.out.out"
assert d["hasPart"][0].id == "data/main/out/ucase_out/foo.out/foo.out.out"
greptool_action = action_map["packed.cwl#greptool.cwl"]
greptool_objects = greptool_action["object"]
greptool_results = greptool_action["result"]
assert len(greptool_objects) == 2
assert len(greptool_results) == 1
greptool_objects_map = {_.id: _ for _ in greptool_objects}
greptool_input_dir = greptool_objects_map.get("grepucase_in/")
assert greptool_input_dir is wf_input_dir
greptool_input_dir = greptool_objects_map.get("data/main/grep/in/grepucase_in/")
assert greptool_input_dir
assert set(_.id for _ in greptool_input_dir["hasPart"]) == {
"data/main/grep/in/grepucase_in/bar", "data/main/grep/in/grepucase_in/foo"
}
greptool_output_dir = greptool_results[0]
assert greptool_output_dir.id == "grep_out/"
assert greptool_output_dir.id == "data/main/grep/out/grep_out/"
assert set(_.id for _ in greptool_output_dir["hasPart"]) == {
"data/main/grep/out/grep_out/bar.out", "data/main/grep/out/grep_out/foo.out"
}
ucasetool_action = action_map["packed.cwl#ucasetool.cwl"]
ucasetool_objects = ucasetool_action["object"]
ucasetool_results = ucasetool_action["result"]
assert len(ucasetool_objects) == 1
assert len(ucasetool_results) == 1
ucasetool_input_dir = ucasetool_objects[0]
assert ucasetool_input_dir is greptool_output_dir
assert ucasetool_input_dir.id == "data/main/ucase/in/grep_out/"
assert set(_.id for _ in ucasetool_input_dir["hasPart"]) == {
"data/main/ucase/in/grep_out/bar.out", "data/main/ucase/in/grep_out/foo.out"
}
ucasetool_output_dir = ucasetool_results[0]
assert ucasetool_output_dir is wf_output_dir
assert ucasetool_output_dir.id == "data/main/ucase/out/ucase_out/"
assert set(_.id for _ in ucasetool_output_dir["hasPart"]) == {
"data/main/ucase/out/ucase_out/bar.out/", "data/main/ucase/out/ucase_out/foo.out/"
}

for d in ucasetool_output_dir["hasPart"]:
if d.id == "data/main/ucase/out/ucase_out/bar.out/":
assert d["hasPart"][0].id == "data/main/ucase/out/ucase_out/bar.out/bar.out.out"
else:
assert d["hasPart"][0].id == "data/main/ucase/out/ucase_out/foo.out/foo.out.out"

for e in crate.data_entities:
assert "alternateName" not in e
for p in (
"grepucase_in/bar",
"grepucase_in/foo",
"grep_out/bar.out",
"grep_out/foo.out",
"ucase_out/bar.out/bar.out.out",
"ucase_out/foo.out/foo.out.out",
"data/main/in/grepucase_in/bar",
"data/main/in/grepucase_in/foo",
"data/main/out/ucase_out/bar.out/bar.out.out",
"data/main/out/ucase_out/foo.out/foo.out.out",
"data/main/grep/in/grepucase_in/bar",
"data/main/grep/in/grepucase_in/foo",
"data/main/grep/out/grep_out/bar.out",
"data/main/grep/out/grep_out/foo.out",
"data/main/ucase/in/grep_out/bar.out",
"data/main/ucase/in/grep_out/foo.out",
"data/main/ucase/out/ucase_out/bar.out/bar.out.out",
"data/main/ucase/out/ucase_out/foo.out/foo.out.out",
):
assert (output / p).is_file()

0 comments on commit fc20b61

Please sign in to comment.