Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft time limit for workers? #416

Closed
wshanks opened this issue Apr 24, 2020 · 7 comments
Closed

Soft time limit for workers? #416

wshanks opened this issue Apr 24, 2020 · 7 comments

Comments

@wshanks
Copy link

wshanks commented Apr 24, 2020

Question

It would be nice to be able set a soft lifetime for a worker. When this time is exceeded, no new jobs are submitted to the worker, but existing jobs are allow to complete. Once they complete, the worker is shut down and recycled as with the current lifetime option. Is is possible to achieve this kind of behavior with dask/distributed/jobqueue now?

Motivation

I have been working with dask-joqueue on an LSF cluster with some long running jobs, and I am wondering what the best option is to avoid hitting the run limit. I have read through #122, and, if I understand correctly (which I might not since it is a lengthy discussion), the conclusion at the end (as of October 2019) was that dask-jobqueue should use the lifetime parameter introduced in dask/distributed#2892 to set a lifetime a little bit shorter than the specified walltime automatically, so that dask shuts down and recycles workers before the cluster's scheduler kills them. I think this would be a good addition because on my cluster a user is given a penalty to their queue priority each time they exceed the run limit for the queue.

In my testing with the lifetime worker option, I found that, when the lifetime is exceeded for a worker in a cluster using adapt, the worker is stopped and then a new worker is created. So setting lifetime to less than the run limit will keep workers from exceeding the run limit. However, the behavior is not ideal because if my jobs run for about 8 hours and the run limit for the queue is 24 hours, I could end up running two full 8 hour jobs and then having the third be stopped for hitting its lifetime around 7 hours into its run. It would be better to set a soft lifetime of around 15 hours so that the longest a worker could stay alive was about 23 hours.

Perhaps it is not common to run long jobs with dask? I am solving differential equations. Instead of solving for the full time, I could solve for intermediate time steps and chain jobs together with the intermediate solutions, though adding in this logic would take some work. When jobs are short, losing a worker mid-computation is not as big of a loss.

Extra details

In case it is useful to someone else, here is the script I was playing around with to test lifetime and walltime.

Some observations:

  • With sleep_time < lifetime < walltime (comfortably), all jobs complete okay with nothing hitting the walltime.
  • If sleep_time > lifetime, no errors are generated but dask will continually kill workers before they finish a job and restart them.
  • The two points above also hold for the LocalCluster when use_lsf == False.
  • If lifetime > walltime > sleep_time, LSF kills workers and then dask restarts them. All jobs complete okay, just with some killed along the way by LSF.
import math
import time

import distributed
import dask_jobqueue


sleep_time = 10
workers = 3
jobs = workers * math.ceil(75 / sleep_time)
lifetime = 20
use_lsf = True


def sleep(i):
    time.sleep(sleep_time)
    return i


def main():
    if use_lsf:
        cluster = dask_jobqueue.LSFCluster(
            name="dask-worker",
            cores=1,
            processes=1,
            memory="4GB",
            threads_per_worker=1, walltime="00:01",
            extra=["--lifetime", str(lifetime)],
        )
    else:
        cluster = distributed.LocalCluster(
            lifetime=str(lifetime),
            threads_per_worker=1
        )
    cluster.adapt(maximum=workers)


    client = distributed.Client(address=cluster)
    for future in distributed.as_completed(client.map(sleep, range(jobs))):
        print(future.result())


if __name__ == "__main__":
    main()
@guillaumeeb
Copy link
Member

Hi @willsALMANJ,

Thanks for this very interesting issue. I did not recall the end of the discussion in #122, but the lifetime parameter looks indeed very promising.

I'm not sure what is the problem in this issue though. Were you able to use this parameter with the current dask-jobqueue version? It seems like yes with your code snippet above? Or do you need the fix you asked for in #417?

Do you want advice for using this parameter with long running tasks? I'd say your solution of soft lifetime at 15h is good enough, isn't it?

In any case, this is a very useful issue, and adding some documentation according to what you describe here (and even with your example) to finaly fix #122 would be really welcomed!

@lesteve
Copy link
Member

lesteve commented Apr 27, 2020

@willsALMANJ it looks like you have a good understanding of the issue with a use case that would motivate you to improve the situation about this in Dask-Jobqueue, which would be really welcome!

I would advise you to start with the simplest use case first which is to fix the error mentioned in #122 (comment). There is a snippet which should allow you to reproduce the problem (you need to change the cluster class but I am sure you will easily figure that out).

@wshanks
Copy link
Author

wshanks commented Apr 28, 2020

I can look at #122 and adding to the documentation.

I will try to clarify this issue because "soft time limit" perhaps wasn't clear above. There are two time limits right now for a dask worker created by dask_jobqueue: the lifetime set by using the --lifetime parameter to dask-worker and the walltime limit that might be imposed by the job queuing system. --lifetime causes the dask worker to shut down cleanly when the lifetime limit is reached. The job queuing system might shut down the dask worker more forcefully if its time limit is reached.

In this issue, I was asking about a different time limit. This limit would set the time after which the worker would not be given any more tasks. Once all of the worker's tasks completed, the worker would be shut down and then the scheduler could create a new worker if it was adaptive. The motivation is that the lifetime parameter interrupts current tasks on the worker which then have to be started over on a new worker, so any work on those tasks is lost and has to be redone.

I was wondering first if it is already possible to impose a time limit on a worker like this. If not, it might be worth adding depending on how hard it would be to add.

@lesteve
Copy link
Member

lesteve commented Apr 29, 2020

OK IIUC you would want something that

  1. tell the scheduler to stop accepting new tasks
  2. let the worker finish the tasks he has
  3. kill the worker once its tasks are finished

There are some things that remind me of dask/distributed#3564 (in particular the part where the worker is removed from an "active list" is kind of your 1.).

In general my very rough feeling is that Dask is more oriented towards killing workers and replying quickly to scaling commands at the cost of recomputing. I don't know exactly whether there is a philosophical reason behind it.

In general, this kind of base functionality are best discussed in the dask/distributed issue tracker. There are more experts of the Dask scheduler there. I agree that sometimes there is a specific Dask-Jobqueue "touch" on some of these issues, in this particular case the job queue walltime, so the separation is now always clear.

@lesteve
Copy link
Member

lesteve commented Apr 29, 2020

Having said all of this, --lifetime and --lifetime-restart were added to distributed a while ago specifically for Dask-Jobqueue walltime use case, it would be great if we could try to use them to fix the simpler problem in #122 (i.e. the weird error when all the jobs reach their walltime simultaneously).

@guillaumeeb
Copy link
Member

Looks like there is an issue for the problem you describe in distributed : dask/distributed#3141.

I encourage you to weight in an if possible try to propose some improvement.

Should we keep this issue open at least until the simple case (where tasks are short enough and recomputation is not a problem) is documented, close it now, or wait for the fix to be implemented in distributed?

@wshanks
Copy link
Author

wshanks commented May 1, 2020

Thanks! I had not found that issue in distributed. It describes exactly what I was looking for. I think this issue can be closed since dask-jobqueue will benefit whenever distributed addresses dask/distributed#3141. We just need to address #417 and keep an eye on dask/distributed#3141 in case the API for --lifetime changes to accommodate both the "kill stalled workers immediately" and "close worker's queue and kill worker after all tasks complete" cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants