Description
Question
It would be nice to be able set a soft lifetime for a worker. When this time is exceeded, no new jobs are submitted to the worker, but existing jobs are allow to complete. Once they complete, the worker is shut down and recycled as with the current lifetime
option. Is is possible to achieve this kind of behavior with dask/distributed/jobqueue now?
Motivation
I have been working with dask-joqueue on an LSF cluster with some long running jobs, and I am wondering what the best option is to avoid hitting the run limit. I have read through #122, and, if I understand correctly (which I might not since it is a lengthy discussion), the conclusion at the end (as of October 2019) was that dask-jobqueue should use the lifetime
parameter introduced in dask/distributed#2892 to set a lifetime a little bit shorter than the specified walltime
automatically, so that dask shuts down and recycles workers before the cluster's scheduler kills them. I think this would be a good addition because on my cluster a user is given a penalty to their queue priority each time they exceed the run limit for the queue.
In my testing with the lifetime
worker option, I found that, when the lifetime
is exceeded for a worker in a cluster using adapt
, the worker is stopped and then a new worker is created. So setting lifetime
to less than the run limit will keep workers from exceeding the run limit. However, the behavior is not ideal because if my jobs run for about 8 hours and the run limit for the queue is 24 hours, I could end up running two full 8 hour jobs and then having the third be stopped for hitting its lifetime around 7 hours into its run. It would be better to set a soft lifetime of around 15 hours so that the longest a worker could stay alive was about 23 hours.
Perhaps it is not common to run long jobs with dask? I am solving differential equations. Instead of solving for the full time, I could solve for intermediate time steps and chain jobs together with the intermediate solutions, though adding in this logic would take some work. When jobs are short, losing a worker mid-computation is not as big of a loss.
Extra details
In case it is useful to someone else, here is the script I was playing around with to test lifetime
and walltime
.
Some observations:
- With
sleep_time < lifetime < walltime
(comfortably), all jobs complete okay with nothing hitting the walltime. - If
sleep_time > lifetime
, no errors are generated but dask will continually kill workers before they finish a job and restart them. - The two points above also hold for the
LocalCluster
whenuse_lsf == False
. - If
lifetime > walltime > sleep_time
, LSF kills workers and then dask restarts them. All jobs complete okay, just with some killed along the way by LSF.
import math
import time
import distributed
import dask_jobqueue
sleep_time = 10
workers = 3
jobs = workers * math.ceil(75 / sleep_time)
lifetime = 20
use_lsf = True
def sleep(i):
time.sleep(sleep_time)
return i
def main():
if use_lsf:
cluster = dask_jobqueue.LSFCluster(
name="dask-worker",
cores=1,
processes=1,
memory="4GB",
threads_per_worker=1, walltime="00:01",
extra=["--lifetime", str(lifetime)],
)
else:
cluster = distributed.LocalCluster(
lifetime=str(lifetime),
threads_per_worker=1
)
cluster.adapt(maximum=workers)
client = distributed.Client(address=cluster)
for future in distributed.as_completed(client.map(sleep, range(jobs))):
print(future.result())
if __name__ == "__main__":
main()