Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions adaptive_scheduler/_mock_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class MockScheduler:
def __init__(
self,
*,
startup_delay: int = 3,
startup_delay: int = 1,
max_running_jobs: int = 4,
refresh_interval: float = 0.1,
bash: str = "bash",
Expand Down Expand Up @@ -90,43 +90,53 @@ def queue(
}

def _queue_is_full(self) -> bool:
n_running = sum(info["state"] == "R" for info in self._current_queue.values())
n_running = sum(
info["state"] == "RUNNING" for info in self._current_queue.values()
)
return n_running >= self.max_running_jobs

def _get_new_job_id(self) -> str:
job_id = self._job_id
self._job_id += 1
return str(job_id)

async def _submit_coro(self, job_name: str, job_id: str, fname: str) -> None:
async def _submit_coro(
self,
job_name: str,
job_id: str,
fname: str,
log_fname: str,
) -> None:
await asyncio.sleep(self.startup_delay)
while self._queue_is_full():
await asyncio.sleep(self.refresh_interval)
self._submit(job_name, job_id, fname)
self._submit(job_name, job_id, fname, log_fname)

def _submit(self, job_name: str, job_id: str, fname: str) -> None:
def _submit(self, job_name: str, job_id: str, fname: str, log_fname: str) -> None:
if job_id in self._current_queue:
# job_id could be cancelled before it started
cmd = f"{self.bash} {fname}"
f = open(log_fname, "w") # noqa: PTH123, SIM115
proc = subprocess.Popen(
cmd.split(),
stdout=subprocess.PIPE,
stdout=f,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
env=dict(os.environ, JOB_ID=job_id, NAME=job_name),
preexec_fn=os.setpgrp, # Set a new process group for the process
)
info = self._current_queue[job_id]
info["proc"] = proc
info["state"] = "R"
info["state"] = "RUNNING"

def submit(self, job_name: str, fname: str) -> str:
def submit(self, job_name: str, fname: str, log_fname: str) -> str:
job_id = self._get_new_job_id()
self._current_queue[job_id] = {
"job_name": job_name,
"proc": None,
"state": "P",
"state": "PENDING",
"timestamp": str(datetime.datetime.now()), # noqa: DTZ005
}
self.ioloop.create_task(self._submit_coro(job_name, job_id, fname))
self.ioloop.create_task(self._submit_coro(job_name, job_id, fname, log_fname))
return job_id

def cancel(self, job_id: str) -> None:
Expand All @@ -148,8 +158,8 @@ async def _refresh_coro(self) -> Coroutine[None, None, None]:

def _refresh(self) -> None:
for _job_id, info in self._current_queue.items():
if info["state"] == "R" and info["proc"].poll() is not None:
info["state"] = "F"
if info["state"] == "RUNNING" and info["proc"].poll() is not None:
info["state"] = "FINISHED"

async def _command_listener(self) -> Coroutine[None, None, None]:
log.debug("started _command_listener")
Expand All @@ -171,9 +181,14 @@ def _dispatch(
request_type, *request_arg = request
try:
if request_type == "submit":
job_name, fname = request_arg
log.debug("submitting a task", fname=fname, job_name=job_name)
job_id = self.submit(job_name, fname) # type: ignore[arg-type]
job_name, fname, log_fname = request_arg
log.debug(
"submitting a task",
fname=fname,
job_name=job_name,
log_fname=log_fname,
)
job_id = self.submit(job_name, fname, log_fname) # type: ignore[arg-type]
return job_id
if request_type == "cancel":
job_id = request_arg[0] # type: ignore[assignment]
Expand Down
5 changes: 4 additions & 1 deletion adaptive_scheduler/_scheduler/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ def queue(self, *, me_only: bool = True) -> dict[str, dict]: # noqa: ARG002
def start_job(self, name: str) -> None:
"""Start a job."""
name_prefix = name.rsplit("-", 1)[0]
submit_cmd = f"{self.submit_cmd} {name} {self.batch_fname(name_prefix)}"
output_log = self.output_fnames(name)[0]
submit_cmd = (
f"{self.submit_cmd} {name} {self.batch_fname(name_prefix)} {output_log}"
)
run_submit(submit_cmd, name)

@property
Expand Down