Skip to content

Soft time limit for workers? #416

Closed
Closed
@wshanks

Description

@wshanks

Question

It would be nice to be able set a soft lifetime for a worker. When this time is exceeded, no new jobs are submitted to the worker, but existing jobs are allow to complete. Once they complete, the worker is shut down and recycled as with the current lifetime option. Is is possible to achieve this kind of behavior with dask/distributed/jobqueue now?

Motivation

I have been working with dask-joqueue on an LSF cluster with some long running jobs, and I am wondering what the best option is to avoid hitting the run limit. I have read through #122, and, if I understand correctly (which I might not since it is a lengthy discussion), the conclusion at the end (as of October 2019) was that dask-jobqueue should use the lifetime parameter introduced in dask/distributed#2892 to set a lifetime a little bit shorter than the specified walltime automatically, so that dask shuts down and recycles workers before the cluster's scheduler kills them. I think this would be a good addition because on my cluster a user is given a penalty to their queue priority each time they exceed the run limit for the queue.

In my testing with the lifetime worker option, I found that, when the lifetime is exceeded for a worker in a cluster using adapt, the worker is stopped and then a new worker is created. So setting lifetime to less than the run limit will keep workers from exceeding the run limit. However, the behavior is not ideal because if my jobs run for about 8 hours and the run limit for the queue is 24 hours, I could end up running two full 8 hour jobs and then having the third be stopped for hitting its lifetime around 7 hours into its run. It would be better to set a soft lifetime of around 15 hours so that the longest a worker could stay alive was about 23 hours.

Perhaps it is not common to run long jobs with dask? I am solving differential equations. Instead of solving for the full time, I could solve for intermediate time steps and chain jobs together with the intermediate solutions, though adding in this logic would take some work. When jobs are short, losing a worker mid-computation is not as big of a loss.

Extra details

In case it is useful to someone else, here is the script I was playing around with to test lifetime and walltime.

Some observations:

  • With sleep_time < lifetime < walltime (comfortably), all jobs complete okay with nothing hitting the walltime.
  • If sleep_time > lifetime, no errors are generated but dask will continually kill workers before they finish a job and restart them.
  • The two points above also hold for the LocalCluster when use_lsf == False.
  • If lifetime > walltime > sleep_time, LSF kills workers and then dask restarts them. All jobs complete okay, just with some killed along the way by LSF.
import math
import time

import distributed
import dask_jobqueue


sleep_time = 10
workers = 3
jobs = workers * math.ceil(75 / sleep_time)
lifetime = 20
use_lsf = True


def sleep(i):
    time.sleep(sleep_time)
    return i


def main():
    if use_lsf:
        cluster = dask_jobqueue.LSFCluster(
            name="dask-worker",
            cores=1,
            processes=1,
            memory="4GB",
            threads_per_worker=1, walltime="00:01",
            extra=["--lifetime", str(lifetime)],
        )
    else:
        cluster = distributed.LocalCluster(
            lifetime=str(lifetime),
            threads_per_worker=1
        )
    cluster.adapt(maximum=workers)


    client = distributed.Client(address=cluster)
    for future in distributed.as_completed(client.map(sleep, range(jobs))):
        print(future.result())


if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions