Skip to content

Add support for OAR Scheduler #713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pydra/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
available_workers.append("dask")
if bool(shutil.which("sbatch")):
available_workers.append("slurm")
if bool(shutil.which("oarsub")):
available_workers.append("oar")

Check warning on line 38 in pydra/conftest.py

View check run for this annotation

Codecov / codecov/patch

pydra/conftest.py#L38

Added line #L38 was not covered by tests
else:
available_workers = [only_worker]
# Set the available workers as a parameter to the
Expand Down
4 changes: 4 additions & 0 deletions pydra/engine/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
not (bool(shutil.which("qsub")) and bool(shutil.which("qacct"))),
reason="sge not available",
)
need_oar = pytest.mark.skipif(
not (bool(shutil.which("oarsub")) and bool(shutil.which("oarstat"))),
reason="oar not available",
)


def num_python_cache_roots(cache_path: Path) -> int:
Expand Down
189 changes: 189 additions & 0 deletions pydra/workers/oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import asyncio
import os
import sys
import json
import re
import typing as ty
from tempfile import gettempdir
from pathlib import Path
from shutil import copyfile
import logging
import attrs
from pydra.engine.job import Job, save
from pydra.workers import base


logger = logging.getLogger("pydra.worker")

if ty.TYPE_CHECKING:
from pydra.engine.result import Result

Check warning on line 19 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L19

Added line #L19 was not covered by tests


@attrs.define
class OarWorker(base.Worker):
"""A worker to execute tasks on OAR systems."""

_cmd = "oarsub"

poll_delay: int = attrs.field(default=1, converter=base.ensure_non_negative)
oarsub_args: str = ""
error: dict[str, ty.Any] = attrs.field(factory=dict)

def __getstate__(self) -> dict[str, ty.Any]:
"""Return state for pickling."""
state = super().__getstate__()
del state["error"]
return state

Check warning on line 36 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L34-L36

Added lines #L34 - L36 were not covered by tests

def __setstate__(self, state: dict[str, ty.Any]):
"""Set state for unpickling."""
state["error"] = {}
super().__setstate__(state)

Check warning on line 41 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L40-L41

Added lines #L40 - L41 were not covered by tests

def _prepare_runscripts(self, job, interpreter="/bin/sh", rerun=False):
if isinstance(job, Job):
cache_root = job.cache_root
ind = None
uid = job.uid

Check warning on line 47 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L45-L47

Added lines #L45 - L47 were not covered by tests
else:
assert isinstance(job, tuple), f"Expecting a job or a tuple, not {job!r}"
assert len(job) == 2, f"Expecting a tuple of length 2, not {job!r}"
ind = job[0]
cache_root = job[-1].cache_root
uid = f"{job[-1].uid}_{ind}"

Check warning on line 53 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L49-L53

Added lines #L49 - L53 were not covered by tests

script_dir = cache_root / f"{self.plugin_name()}_scripts" / uid
script_dir.mkdir(parents=True, exist_ok=True)

Check warning on line 56 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L55-L56

Added lines #L55 - L56 were not covered by tests
if ind is None:
if not (script_dir / "_job.pklz").exists():
save(script_dir, job=job)

Check warning on line 59 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L59

Added line #L59 was not covered by tests
else:
copyfile(job[1], script_dir / "_job.pklz")

Check warning on line 61 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L61

Added line #L61 was not covered by tests

job_pkl = script_dir / "_job.pklz"

Check warning on line 63 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L63

Added line #L63 was not covered by tests
if not job_pkl.exists() or not job_pkl.stat().st_size:
raise Exception("Missing or empty job!")

Check warning on line 65 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L65

Added line #L65 was not covered by tests

batchscript = script_dir / f"batchscript_{uid}.sh"
python_string = (

Check warning on line 68 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L67-L68

Added lines #L67 - L68 were not covered by tests
f"""'from pydra.engine.job import load_and_run; """
f"""load_and_run("{job_pkl}", rerun={rerun}) '"""
)
bcmd = "\n".join(

Check warning on line 72 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L72

Added line #L72 was not covered by tests
(
f"#!{interpreter}",
f"{sys.executable} -c " + python_string,
)
)
with batchscript.open("wt") as fp:
fp.writelines(bcmd)
os.chmod(batchscript, 0o544)
return script_dir, batchscript

Check warning on line 81 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L79-L81

Added lines #L79 - L81 were not covered by tests

async def run(self, job: "Job[base.TaskType]", rerun: bool = False) -> "Result":
"""Worker submission API."""
script_dir, batch_script = self._prepare_runscripts(job, rerun=rerun)

Check warning on line 85 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L85

Added line #L85 was not covered by tests
if (script_dir / script_dir.parts[1]) == gettempdir():
logger.warning("Temporary directories may not be shared across computers")
script_dir = job.cache_root / f"{self.plugin_name()}_scripts" / job.uid
sargs = self.oarsub_args.split()
jobname = re.search(r"(?<=-n )\S+|(?<=--name=)\S+", self.oarsub_args)

Check warning on line 90 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L87-L90

Added lines #L87 - L90 were not covered by tests
if not jobname:
jobname = ".".join((job.name, job.uid))
sargs.append(f"--name={jobname}")
output = re.search(r"(?<=-O )\S+|(?<=--stdout=)\S+", self.oarsub_args)

Check warning on line 94 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L92-L94

Added lines #L92 - L94 were not covered by tests
if not output:
output_file = str(script_dir / "oar-%jobid%.out")
sargs.append(f"--stdout={output_file}")
error = re.search(r"(?<=-E )\S+|(?<=--stderr=)\S+", self.oarsub_args)

Check warning on line 98 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L96-L98

Added lines #L96 - L98 were not covered by tests
if not error:
error_file = str(script_dir / "oar-%jobid%.err")
sargs.append(f"--stderr={error_file}")

Check warning on line 101 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L100-L101

Added lines #L100 - L101 were not covered by tests
else:
error_file = None
sargs.append(str(batch_script))

Check warning on line 104 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L103-L104

Added lines #L103 - L104 were not covered by tests
# TO CONSIDER: add random sleep to avoid overloading calls
rc, stdout, stderr = await base.read_and_display_async(

Check warning on line 106 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L106

Added line #L106 was not covered by tests
self._cmd, *sargs, hide_display=True
)
jobid = re.search(r"OAR_JOB_ID=(\d+)", stdout)

Check warning on line 109 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L109

Added line #L109 was not covered by tests
if rc:
raise RuntimeError(f"Error returned from oarsub: {stderr}")

Check warning on line 111 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L111

Added line #L111 was not covered by tests
elif not jobid:
raise RuntimeError("Could not extract job ID")
jobid = jobid.group(1)

Check warning on line 114 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L113-L114

Added lines #L113 - L114 were not covered by tests
if error_file:
error_file = error_file.replace("%jobid%", jobid)
self.error[jobid] = error_file.replace("%jobid%", jobid)

Check warning on line 117 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L116-L117

Added lines #L116 - L117 were not covered by tests
# intermittent polling
while True:

Check warning on line 119 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L119

Added line #L119 was not covered by tests
# 4 possibilities
# False: job is still pending/working
# Terminated: job is complete
# Error + idempotent: job has been stopped and resubmited with another jobid
# Error: Job failure
done = await self._poll_job(jobid)

Check warning on line 125 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L125

Added line #L125 was not covered by tests
if not done:
await asyncio.sleep(self.poll_delay)

Check warning on line 127 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L127

Added line #L127 was not covered by tests
elif done == "Terminated":
return True

Check warning on line 129 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L129

Added line #L129 was not covered by tests
elif done == "Error" and "idempotent" in self.oarsub_args:
jobid = await self._handle_resubmission(jobid, job)
continue

Check warning on line 132 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L131-L132

Added lines #L131 - L132 were not covered by tests
else:
error_file = self.error[jobid]

Check warning on line 134 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L134

Added line #L134 was not covered by tests
if not Path(error_file).exists():
logger.debug(

Check warning on line 136 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L136

Added line #L136 was not covered by tests
f"No error file for job {jobid}. Checking if job was resubmitted by OAR..."
)
jobid = await self._handle_resubmission(jobid, job)

Check warning on line 139 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L139

Added line #L139 was not covered by tests
if jobid:
continue

Check warning on line 141 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L141

Added line #L141 was not covered by tests
for _ in range(5):
if Path(error_file).exists():
break
await asyncio.sleep(1)

Check warning on line 145 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L144-L145

Added lines #L144 - L145 were not covered by tests
else:
raise RuntimeError(

Check warning on line 147 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L147

Added line #L147 was not covered by tests
f"OAR error file not found: {error_file}, and no resubmission detected."
)
error_line = Path(error_file).read_text().split("\n")[-2]

Check warning on line 150 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L150

Added line #L150 was not covered by tests
if "Exception" in error_line:
error_message = error_line.replace("Exception: ", "")

Check warning on line 152 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L152

Added line #L152 was not covered by tests
elif "Error" in error_line:
error_message = error_line.replace("Error: ", "")

Check warning on line 154 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L154

Added line #L154 was not covered by tests
else:
error_message = "Job failed (unknown reason - TODO)"
raise Exception(error_message)

Check warning on line 157 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L156-L157

Added lines #L156 - L157 were not covered by tests
return True

async def _poll_job(self, jobid):
cmd = ("oarstat", "-J", "-s", "-j", jobid)
logger.debug(f"Polling job {jobid}")
_, stdout, _ = await base.read_and_display_async(*cmd, hide_display=True)

Check warning on line 163 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L161-L163

Added lines #L161 - L163 were not covered by tests
if not stdout:
raise RuntimeError("Job information not found")
status = json.loads(stdout)[jobid]

Check warning on line 166 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L165-L166

Added lines #L165 - L166 were not covered by tests
if status in ["Waiting", "Launching", "Running", "Finishing"]:
return False
return status

Check warning on line 169 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L168-L169

Added lines #L168 - L169 were not covered by tests

async def _handle_resubmission(self, jobid, job):
logger.debug(f"Job {jobid} has been stopped. Looking for its resubmission...")

Check warning on line 172 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L172

Added line #L172 was not covered by tests
# loading info about task with a specific uid
info_file = job.cache_root / f"{job.uid}_info.json"

Check warning on line 174 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L174

Added line #L174 was not covered by tests
if info_file.exists():
checksum = json.loads(info_file.read_text())["checksum"]
lock_file = job.cache_root / f"{checksum}.lock"

Check warning on line 177 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L176-L177

Added lines #L176 - L177 were not covered by tests
if lock_file.exists():
lock_file.unlink()
cmd_re = ("oarstat", "-J", "--sql", f"resubmit_job_id='{jobid}'")
_, stdout, _ = await base.read_and_display_async(*cmd_re, hide_display=True)

Check warning on line 181 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L179-L181

Added lines #L179 - L181 were not covered by tests
if stdout:
return next(iter(json.loads(stdout).keys()), None)

Check warning on line 183 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L183

Added line #L183 was not covered by tests
else:
return None

Check warning on line 185 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L185

Added line #L185 was not covered by tests


# Alias so it can be referred to as oar.Worker
Worker = OarWorker
81 changes: 81 additions & 0 deletions pydra/workers/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
need_sge,
need_slurm,
need_singularity,
need_oar,
BasicWorkflow,
BasicWorkflowWithThreadCount,
BasicWorkflowWithThreadCountConcurrent,
Expand Down Expand Up @@ -602,6 +603,86 @@
assert job_1_endtime > job_2_starttime


@need_oar
def test_oar_wf(tmpdir):
wf = BasicWorkflow(x=1)

Check warning on line 608 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L608

Added line #L608 was not covered by tests
# submit workflow and every task as oar job
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 611 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L611

Added line #L611 was not covered by tests

outputs = res.outputs
assert outputs.out == 5
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 616 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L613-L616

Added lines #L613 - L616 were not covered by tests
# ensure each task was executed with oar
assert len([sd for sd in script_dir.listdir() if sd.isdir()]) == 2


@pytest.mark.skip(
reason=(
"There currently isn't a way to specify a worker to run a whole workflow within "
"a single OAR job"
)
)
@need_oar
def test_oar_wf_cf(tmpdir):
# submit entire workflow as single job executing with cf worker
wf = BasicWorkflow(x=1)

Check warning on line 630 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L630

Added line #L630 was not covered by tests
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 632 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L632

Added line #L632 was not covered by tests

outputs = res.outputs
assert outputs.out == 5
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 637 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L634-L637

Added lines #L634 - L637 were not covered by tests
# ensure only workflow was executed with oar
sdirs = [sd for sd in script_dir.listdir() if sd.isdir()]
assert len(sdirs) == 1

Check warning on line 640 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L640

Added line #L640 was not covered by tests
# oar scripts should be in the dirs that are using uid in the name
assert sdirs[0].basename == wf.uid

Check warning on line 642 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L642

Added line #L642 was not covered by tests


@need_oar
def test_oar_wf_state(tmpdir):
wf = BasicWorkflow().split(x=[5, 6])

Check warning on line 647 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L647

Added line #L647 was not covered by tests
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 649 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L649

Added line #L649 was not covered by tests

outputs = res.outputs
assert outputs.out == [9, 10]
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 654 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L651-L654

Added lines #L651 - L654 were not covered by tests
sdirs = [sd for sd in script_dir.listdir() if sd.isdir()]
assert len(sdirs) == 2 * len(wf.x)

Check warning on line 656 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L656

Added line #L656 was not covered by tests


@need_oar
def test_oar_args_1(tmpdir):
"""testing sbatch_args provided to the submitter"""
task = SleepAddOne(x=1)

Check warning on line 662 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L662

Added line #L662 was not covered by tests
# submit workflow and every task as oar job
with Submitter(worker="oar", cache_root=tmpdir, oarsub_args="-l nodes=2") as sub:
res = sub(task)

Check warning on line 665 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L665

Added line #L665 was not covered by tests

assert res.outputs.out == 2
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 669 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L667-L669

Added lines #L667 - L669 were not covered by tests


@need_oar
def test_oar_args_2(tmpdir):
"""testing oarsub_args provided to the submitter
exception should be raised for invalid options
"""
task = SleepAddOne(x=1)

Check warning on line 677 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L677

Added line #L677 was not covered by tests
# submit workflow and every task as oar job
with pytest.raises(RuntimeError, match="Error returned from oarsub:"):
with Submitter(
worker="oar", cache_root=tmpdir, oarsub_args="-l nodes=2 --invalid"
) as sub:
sub(task)

Check warning on line 683 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L683

Added line #L683 was not covered by tests


def test_hash_changes_in_task_inputs_file(tmp_path):
@python.define
def cache_dir_as_input(out_dir: Directory) -> Directory:
Expand Down
Loading