Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runner does not work with dask adaptive scaling client #326

Open
Kostusas opened this issue Sep 21, 2021 · 7 comments
Open

runner does not work with dask adaptive scaling client #326

Kostusas opened this issue Sep 21, 2021 · 7 comments

Comments

@Kostusas
Copy link

Minimal code to reproduce the error on local Jupyter notebook:

import distributed
import adaptive
adaptive.notebook_extension()

cluster = distributed.LocalCluster()
cluster.adapt(minimum=0, maximum=5) # works with manual scaling cluster.scale(5)

client = distributed.Client(cluster)

learner = adaptive.Learner1D(lambda x: x, bounds=(-1, 1))
runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)
runner.live_info()

cluster.close()

returns error:

Task exception was never retrieved
future: <Task finished name='Task-327' coro=<live_info.<locals>.update() done, defined at /opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py:217> exception=AssertionError()>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 226, in update
    status.value = _info_html(runner)
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 258, in _info_html
    ("elapsed time", datetime.timedelta(seconds=runner.elapsed_time())),
  File "/opt/conda/lib/python3.9/site-packages/adaptive/runner.py", line 658, in elapsed_time
    assert self.task.cancelled()
AssertionError

The same thing happens when running on a cluster with manual scaling without giving enough time to connect to the workers. It seems adaptive does not see any workers and terminates the process.

@akhmerov
Copy link
Contributor

I think we should change the heuristic for determining how many workers we have available by checking the client configuration and scaling strategies.

@basnijholt
Copy link
Member

You are running into this error:

adaptive/adaptive/runner.py

Lines 403 to 404 in f28bab0

if self._get_max_tasks() < 1:
raise RuntimeError("Executor has no workers")

because you start with 0 cores.

If you change your argument from minimum=0 to minimum=1, Adaptive does detect the scaling correctly.

Would this be good enough for you?

@akhmerov
Copy link
Contributor

This seems to be a workaround, but I think actually detecting the configuration would be more reliable. Unfortunately I can't quite find the correct API in distributed.

@akhmerov
Copy link
Contributor

I've asked whether there's a better way on stack overflow (AFAIR that's the preferred channel for dask): https://stackoverflow.com/q/69326568/2217463

@basnijholt
Copy link
Member

Why would the maximal number of cores matter instead of the currently available cores?

@akhmerov
Copy link
Contributor

akhmerov commented Sep 27, 2021

It's a chicken and egg problem otherwise: the adaptive scaling of dask won't request new workers if there are no tasks in the queue.

@basnijholt
Copy link
Member

basnijholt commented Sep 27, 2021

Hmm, then we would already query some points that will not be calculated yet.

Why not change the following

adaptive/adaptive/runner.py

Lines 832 to 833 in a81be7a

elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):
return sum(n for n in ex._client.ncores().values())

to

elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor): 
    ncores = sum(n for n in ex._client.ncores().values()) 
    return max(1, ncores)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants