-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancement Request - Dask Workers lifetime option not waiting for job to finish #3141
Comments
Just to clarify, you want the And does adaptive better suite your needs? |
Lifetime was intended to provide a mechanism for a periodic reset of workers, such as might be useful when dealing with libraries that might leak memory and so benefit from cycling worker processes. It is orthogonal to adaptive.
I think that @ameetshah1983 's request is in scope, and is probably the intent of the keyword. I think that what you say above Tom is correct, and should be the meaning of the current spelling in the CLI. To fix this, we probably want to modify the lifetime code to close down the worker to new tasks, wait until the thread pool clears out of current tasks (or some large timeout), and then close down. |
Not sure what the status is on this feature request, but for what it's worth, I'm using --lifetime to deal with sporadic worker stalls. So, in my case, there is a task on a worker indefinitely, and that is exactly what I'd like to restart. Perhaps, when this feature is added, would it be possible to add it with flexibility to either (1) wait for tasks to finish, or (2) restart regardless of the status of tasks. |
Any update on this feature request? |
I don’t see any. Are you interested in working on it? |
Sure! |
What is the safest mechanism for closing the worker? I attempted to write a WorkerPlugin that closed the worker on task completion using the same mechanism as See the issue dask/dask-jobqueue#597 |
Hi all - is there any update to this request? I am also curious on what the best way would be. With some guidance I might be able to implement it? @EvanKomp - did you every figure out a solution? I am wanting to use this in a SLURM setting. |
Just wanted to add that this would be helpful for my use case too. I am running in a SLURM environment and trying to keep my workers within the walltime limit of the cluster without having jobs fail. |
The current mechanism is to "gracefully downscale" a worker. This typically evicts all data and runnable tasks but is not waiting for the current one to finish. Instead of using distributed/distributed/worker.py Lines 842 to 844 in fcd921c
We'd need a method that is almost equal to close_gracefully but one that waits until all threads are idle. Could be something like the following. def lifetime_close_gracefully(...):
# Same as close_gracefully but waits for the threads to be idle
...
while self.state.executing_count:
await asyncio.sleep(0.01)
await self.close(...) Anybody is welcome to pick this up and create a PR (with a unit test). If you are struggling to complete, I suggest to open a Draft PR with how far you got and we can help you push this over the finishing line. Any volunteers? |
I think i see how it comes together. Thanks very much for the example!
I can give it a whirl over this week.
Do you think it would be better to build this into the existing close
gracefully method, or separate it out completely as you suggested?
…On Tue, 23 May 2023, 8:21 pm Florian Jetter, ***@***.***> wrote:
The current mechanism is to "gracefully downscale" a worker. This
typically evicts all data and runnable tasks but is not waiting for the
current one to finish.
Instead of using Worker.close_gracefully here
https://github.com/dask/distributed/blob/fcd921c581162f4536fd92cf2aa81da32462939c/distributed/worker.py#L842-L844
We'd need a method that is almost equal to close_gracefully but one that
waits until all threads are idle. Could be something like the following.
def lifetime_close_gracefully(...):
# Same as close_gracefully but waits for the threads to be idle
...
while self.state.executing_count:
await asyncio.sleep(0.01)
await self.close(...)
Anybody is welcome to pick this up and create a PR (with a unit test). If
you are struggling to complete, I suggest to open a Draft PR with how far
you got and we can help you push this over the finishing line. Any
volunteers?
—
Reply to this email directly, view it on GitHub
<#3141 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACQOAJXCXWT55ZSN3GYZNJLXHSTVLANCNFSM4I75KMCQ>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Hi all, I was talking offline with @tjgalvin and he implemented a potential The changes are available in https://github.com/AlecThomson/distributed/tree/drainclose If the maintainers are happy I can go ahead and open a PR Here's a demo of firing up a scheduler + worker with a drain and it handling some work that will go over the lifetime: drain_1080.mov |
BTW - my motivation for using this feature is with |
I'm not thrilled about the drain option and would prefer teaching lifetime / retire_workers to not kill workers which are still running stuff. diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py
index 724bfc189..9c4d53760 100644
--- a/distributed/active_memory_manager.py
+++ b/distributed/active_memory_manager.py
@@ -736,4 +736,4 @@ class RetireWorker(ActiveMemoryManagerPolicy):
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
- return all(len(ts.who_has or ()) > 1 for ts in ws.has_what)
+ return all(len(ts.who_has or ()) > 1 for ts in ws.has_what) and not ws.processing I could imagine a bunch of tests tripping with this (and I'd like to see a new one testing this bahvior) but generally speaking this addition to graceful downscaling would be nice |
For what it was worth, I added the Might be able to look at this problem again. |
Another quick thought on this - the `drain' state exists in batch management systems like Slurm. With the idea that the worker / job finishes its current work but stops accepting new work. This seems exactly like the kind of behaviour @ameetshah1983's request was asking for. |
When applying workers lifetime option with restart, looks like if the worker is running a job, it still moves ahead with restart.
Applied lifetime restart option for every 60 secs using 1 worker and ran a job which simply sleeps for twice the amount of time. The restart still appears to take place even if the worker is running the job.
For graceful restart, thought the worker would wait for a long running task / job to finish and when idle would then restart itself. That way even if you have along running task its not interrupted by the auto restart option.
The text was updated successfully, but these errors were encountered: