Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting distributed configuration is not considered in the workers/nanny #679

Closed
Obi-Wan opened this issue Jan 13, 2025 · 10 comments
Closed

Comments

@Obi-Wan
Copy link

Obi-Wan commented Jan 13, 2025

Description:
If I for instance set the pre-spawn-environ configuration, it is not taken into account when the jobs/workers/nanny are spawned.

Minimal Complete Verifiable Example:

from distributed import Client, get_worker, print as print_dd
from dask_jobqueue import SLURMCluster
from dask import config as dd_config

NUM_THREADS = 8

def get_env(num_threads: int = NUM_THREADS) -> dict[str, str]:
    return {var: f"{num_threads}" for var in ["OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS"]}


def get_exports(env: dict[str, str]) -> list[str]:
    return [f"export {var}={key}" for var, key in env.items()]


def op(ii: int) -> float:
    try:
        worker = get_worker()
        print(f"{ii = } - {worker.name = }")
        print_dd(f"{dd_config.get('distributed.nanny.pre-spawn-environ') = }")
    except:
        print(f"{ii = }")

if __name__ == "__main__":
    dd_config.set({"distributed.nanny.pre-spawn-environ": get_env()})

    with SLURMCluster(
        queue="my_queue",
        cores=1,
        processes=1,
        job_cpu=NUM_THREADS,
        memory="4GB",
        log_directory="tmp",
        job_script_prologue=get_exports(get_env()),
    ) as cluster:
        cluster.scale(jobs=2)
        with Client(cluster) as client:
            futures = [client.submit(op, ii) for ii in range(N_TRIES)]
            res_d = [f.result() for f in tqdm(futures, desc=f"Distributed ({NUM_THREADS})", total=N_TRIES)]

And the output is the default:

{'MALLOC_TRIM_THRESHOLD_': 65536, 'OMP_NUM_THREADS': 1, 'MKL_NUM_THREADS': 1, 'OPENBLAS_NUM_THREADS': 1}

The job_script_prologue seems to not have an effect either... How are we supposed to pass these variables?

Environment:

  • Dask version: 2024.8.2
  • Python version: 3.11
  • Operating System: Ubuntu 2020.4
  • Install method (conda, pip, source): conda
@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jan 14, 2025

Config from the client is not automatically passed to the workers. I would expect you to use an option like extra_env or job_script_prologue to set environment variables in the job script.

Here's an example of environment variables being set https://jobqueue.dask.org/en/latest/clusters-example-deployments.html#slurm-deployment-low-priority-node-usage

@Obi-Wan
Copy link
Author

Obi-Wan commented Jan 15, 2025

Ok, thanks. If you see in my example, I am also doing, but they get overriden by the default dask configuration.

The way I'm doing it now is by using the longer variables like DASK_DISTRIBUTED__NANNY__PRE_SPAWN_ENVIRON__OMP_NUM_THREADS (with the small caveat, that this needs a tweak to dak's config code to actually work).

However, say I wanted to entirely override a section like distribued.nanny.pre-spawn-environ, because that is the only way to avoid setting MALLOC_TRIM_THRESHOLD_ (which for me has very bad performance impacts). How would I do that?

@jacobtomlinson
Copy link
Member

Have you considered using the YAML config method instead of environment variables? As you're on a SLURM system I expect that all nodes will have access to your home directory, so you could put YAML configuration in ~/.config/dask/ to set these parameters. That way the casing issue you're running into will also not be a factor.

@Obi-Wan
Copy link
Author

Obi-Wan commented Jan 15, 2025

Yes, and unfortunately it is not an option: it would be "system-wide", and we might want to modulate those variables per submission, instead.

@Obi-Wan
Copy link
Author

Obi-Wan commented Jan 15, 2025

I should also mention that we want to distribute the code to users that won't necessarily be able to change/edit/locate configuration files themselves. This means that we should not mess with their global configuration files, and that they might not even be able to do it themselves if needed.

Moreover, the local dask configuration is visible by LocalCluster (also when using the context managers). Thus, it is either a bug or a missing feature that this is not the case with sibling classes (i.e. the ones derived from SpecCluster).

Using environment variables is more of a workaround, but it has to properly work for all the configuration options. Otherwise, it is not feasible.

@jacobtomlinson
Copy link
Member

Sure, you don't have to do it in the global config. You could create a separate YAML file and point to it with DASK_CONFIG.

Generally the YAML config is the first-class citizen of Dask's config, with env vars and inline configs being secondary due to their limitations.

@Obi-Wan
Copy link
Author

Obi-Wan commented Jan 16, 2025

OK, thanks, that is already a better solution, but very tedious to work with in our scenario: We would like to adapt the number of OpenMP threads based on the machines, available resources, function, etc. This would require fiddling with temporary config files. It is not too complicated with NamedTemporaryFile from the standard module tempfile, but it's still not ideal.

Moreover, I also agree that passing the whole configuration through a swarm of environment files might not be ideal.

In the end, by more thouroughly looking through the documentation I could find a slightly more convenient way for us, through the use of the DASK_INTERNAL_INHERIT_CONFIG, and the serialization of the configuration. If used within context managers, it is actually quite neat:

with dask.config.set({"distributed.nanny.pre-spawn-environ": my_env}):
    serialized_conf = dask.config.serialize(dask.config.global_config)
    with SLURMCluster(
        **kwds,
        job_script_prologue=f"export DASK_INTERNAL_INHERIT_CONFIG={serialized_conf}"
    ) as cluster:

It does the job quite nicely, except for the fact that it is still impossible to erase fields from the default configuration.
It would be great if this were to be handled internally by dask_jobqueue, and I may suggest looking into it.
If you don't have any specific problem with this solution being used on the user side, I would close the issue.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jan 17, 2025

Sure that can be a good solution. Some other cluster managers like dask-cloudprovider handle this internally, so I see no reason why someone couldn't open a PR to dask-jobqueue to do the same.

We did run into problems like dask/dask-cloudprovider#249 where the config in some cases was too large to send via the job scheduler (in this case EC2 VM launch parameters). Not a blocker but a gotcha to watch out for.

The other confusion can be that changes to the config made after the cluster is created do not get propagated to the workers.

@Obi-Wan
Copy link
Author

Obi-Wan commented Jan 17, 2025

OK, thanks, good to know!
I do not think that the max variable length will ever be a problem with the current implementation/usage of dask-jobqueue.

I'll test this internally first. Possibly, down the line, I could work on a simple PR for that. It would be cool to have it out of the box.

Anyway, this closes the issue for me. Thanks for the useful discussion!

@Obi-Wan Obi-Wan closed this as completed Jan 17, 2025
@jacobtomlinson
Copy link
Member

Just want to cross refernece that this config sending is also done in SSHCluster, however there are known issues with doing this on Windows due to environment variable length limits, see dask/distributed#8138.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants