Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should we merge Dask HPC Runners in here? #638

Open
jacobtomlinson opened this issue May 9, 2024 · 6 comments
Open

Should we merge Dask HPC Runners in here? #638

jacobtomlinson opened this issue May 9, 2024 · 6 comments

Comments

@jacobtomlinson
Copy link
Member

For a while I've been playing around with this prototype repo which implements Dask Runners for HPC systems. I'm motivated to reduce the fragmentation and confusion around tooling in the Dask HPC community, so perhaps this new code should live here.

In dask/community#346 I wrote up the difference between Dask Clusters and Dask Runners. The TL;DR is that a Cluster creates the scheduler and worker tasks directly, for example dask_jobqueue.SLURMCluster submits jobs to SLURM for each worker. A Dask Runner is different because it is invoked from within an existing allocation and populates that job with Dask processes. This the same as how Dask MPI works.

SlurmRunner Example

If I write a Python script and call it with srun -n 6 python myscript.py the script will be invoked by Slurm 6 times in parallel on 6 different nodes/cores on the HPC. The Dask Runner class then uses the Slurm process ID environment variable to decide what role reach process should play and uses the shared filesystem to bootstrap communications with a scheduler file.

# myscript.py
from dask.distributed import Client
from dask_hpc_runner import SlurmRunner

# When entering the SlurmRunner context manager processes will decide if they should be 
# the client, schdeduler or a worker.
# Only process ID 1 executes the contents of the context manager.
# All other processes start the Dask components and then block here forever.
with SlurmRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json") as runner:

    # The runner object contains the scheduler address info and can be used to construct a client.
    with Client(runner) as client:

        # Wait for all the workers to be ready before continuing.
        client.wait_for_workers(runner.n_workers)

        # Then we can submit some work to the Dask scheduler.
        assert client.submit(lambda x: x + 1, 10).result() == 11
        assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21

# When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.

Should this live in dask-jobqueue?

I'm at the point of trying to decide where this code should live within the Dask ecosystem. So far I have implemented MPIRunner and SlurmRunner as a proof-of-concept. It would be very straight forward to write runners for other batch systems provided it is possible to detect the process ID/rank from the environment.

I can imagine users choosing between SLURMCluster and SlurmRunner depending on their use case and how they want to deploy Dask. There are pros/cons to each deployment model, for example the cluster can adaptively scale, but the runner only requires a single job submission which will guarantee better node locality. So perhaps it makes sense for SlurmRunner to live here in dask-jobqueue and we can use documentation to help users choose the right one for them? (We can make the name casing more consistent).

The MPIRunner and SlurmRunner share a common base class, so I'm not sure if that means MPIRunner should also live here, or whether we should accept some code duplication and put it in dask-mpi?

Alternatively my prototype repo could just move to dask-contrib and become a separate project?

Or we could roll all of dask-jobqueue, dask-mpi and the new dask-hpc-runners into a single dask-hpc package? Or pull everything into dask-jobqueue?

The Dask HPC tooling is currently very fragmented and I'm keen to make things better, not worse. But I'm very keen to hear opinions from folks like @guillaumeeb @lesteve @jrbourbeau @kmpaul @mrocklin on what we should do here.

@guillaumeeb
Copy link
Member

Thanks @jacobtomlinson for sharing this and opening this discussion!

The Dask HPC tooling is currently very fragmented and I'm keen to make things better, not worse.

I totally agree with this statement. I would love to see a single dask-hpc package with all of this, but this would require a non negligible amount of work, especially in documentation content.

@kmpaul
Copy link

kmpaul commented May 24, 2024

This is fantastic, @jacobtomlinson! Very cool stuff.

I agree with @guillaumeeb that it would be really nice to have a Dask HPC package with everything in one place. I would volunteer to help maintain that package, since Dask MPI would get rolled into it.

This reminds me that I have had a major PR in Dask MPI waiting for quite a while now. I should finish that and merge it in before moving DaskMPI into any new package.

@jacobtomlinson
Copy link
Member Author

Awesome thanks @kmpaul. Your new implementation looks interesting, feel free to ping me again for review once the conflicts are worked out.

I think my plan will look like the following:

  • Make a PR to merge my SLURMRunner and MPIRunner in here
  • Make a PR to merge dask-mpi in here (and make mpi4py an extras install)
  • Give the documentation here an overhaul and pull in the dask-mpi documentation
  • Retire dask-mpi
  • Rename this project to dask-hpc
  • Set up redirects for old names and packages to bring folks to the new package

I've submitted an abstract to EuroSciPy to talk about Dask and HPC (and GPUs/UCX) quite broadly, but if it gets accepted that could make a nice deadline to work towards and announce these changes.

@sanghyukmoon
Copy link

Hello from a new user! I'm putting this here rather than opening a new issue, but let me know if I should do the latter instead.

Following the documentation, I am trying to run my very first "hello dask" script that looks like the following:

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner() as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

When I submit the job using slurm, I get the following network-related warning

2025-02-12 16:22:11,565 - distributed.scheduler - INFO - State start
/home/sm69/.conda/envs/pyathena/lib/python3.13/site-packages/distributed/utils.py:189: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to hostname: [Errno 101] Network is unreachable
  warnings.warn(
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   Scheduler at:  tcp://10.33.81.152:35737
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   dashboard at:  http://10.33.81.152:8787/status
2025-02-12 16:22:11,569 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2025-02-12 16:22:11,647 - distributed.scheduler - INFO - Receive client connection: Client-6c2bbb5b-e987-11ef-b579-78ac4413ab38
2025-02-12 16:22:11,647 - distributed.core - INFO - Starting established connection to tcp://10.33.81.152:58686
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -          Listening to:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:38967
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:44313
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42309
2025-02-12 16:22:11,658 - distributed.worker - INFO -           Worker name:                          9
2025-02-12 16:22:11,659 - distributed.worker - INFO -          dashboard at:         10.33.81.152:46699
2025-02-12 16:22:11,659 - distributed.worker - INFO - Waiting to connect to:   tcp://10.33.81.152:35737
2025-02-12 16:22:11,659 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:34517
...

Followed by StreamClosedError and CommClosedError

Before get into the Runner, I have already tried using Cluster, by, e.g.,

ncores = 96
SLURMCluster(cores=ncores, memory='720 GiB', processes=ncores, interface="ib0")

As you can see here, I had to set interface="ib0" (the cluster uses infiniband for inter-node communication); otherwise I got similar error.

This made me think that I have to do something similar to interface="ib0" when using SLURMRunner as well, but I couldn't find such thing in the documentation. Could you guide me what to do?

Somewhat related feedback from a new user's perspective: It was a surprise to me when I first realize SLURMCluster does not support multi-node job. I was not mentioned explicitly in the documentation, and I had to surf through several issues to come to realize that is the case. I think one of the main motivation to use dask is to overcome single node memory bound when analyzing large simulation data, so I naively assumed that dask-jobqueue would support multi-node job. It might be very helpful that documentation explicitly says that SLURMCluster cannot submit multi-node job.

@jacobtomlinson
Copy link
Member Author

@sanghyukmoon I think this mostly should be a new issue. I've opened #681 to continue the conversation around interfaces there.

It was a surprise to me when I first realize SLURMCluster does not support multi-node job. I was not mentioned explicitly in the documentation, and I had to surf through several issues to come to realize that is the case. I think one of the main motivation to use dask is to overcome single node memory bound when analyzing large simulation data, so I naively assumed that dask-jobqueue would support multi-node job. It might be very helpful that documentation explicitly says that SLURMCluster cannot submit multi-node job.

I think there is some confusion here. The SLURMCluster class supports multi-node Dask clusters, however it achieves this by submitting many single-node jobs to the SLURM scheduler. Dask then uses all of these single-node jobs as a multi-node cluster. This is beneficial on interactive clusters where node-locality and a coordinated launch doesn't matter. It can often be easier to get 10 single-node allocations then one 10-node allocation. In a typical SLURM cluster there are many small gaps in the schedule that never get filled, so launching Dask this way can fill those gaps.

However there are situations where node-locality and coordinated launches matter. Or SLURM schedulers that are configured to prioritise larger allocations. In this case you want to use the SLURMRunner to launch the Dask cluster as a single multi-node allocation.

From Dask's perspective it doesn't care whether you launch it as many single-node or one multi-node job. Either way you get the same result. It just depends which behaviour suits your SLURM cluster practices the best.

@sanghyukmoon
Copy link

Thanks @jacobtomlinson for very clear answer!

however it achieves this by submitting many single-node jobs to the SLURM scheduler.

This was what I intended when I confusingly wrote "does not support multi-node job" (thanks for the clarification!)

Or SLURM schedulers that are configured to prioritise larger allocations.

Yes, this was the reason that drew me to SLURMRunner. Thanks for adding this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants