Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] File-based shared memory model #7449

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jan 4, 2023

This is a new approach to shared memory, alternative to #6503, using bare files on disk instead of third party libraries. The main advantage is that we retain fine control on performance and thread-safety.

Executive summary

As long as you have all of the below:

  1. all your data is numpy, numpy-based (sparse, scipy.sparse, pandas with numerical dtypes), pyarrow, or pandas with pyarrow strings - NOT pure Python, NOT pandas with object strings
  2. you can have multiple workers processes running on the same host
  3. you are observing GIL contention if you increase threads per worker
  4. your workflow requires heavy network transfers
  5. your workers run on Linux
  6. you have sudo access to reconfigure your mountpoints

then this has the potential to drastically reduce your RAM requirements (by a factor of 2.5x in the benchmarks below).

Business case

Use multithreading and hit the GIL...

Python is generally hostile to multi-threading due to the GIL. Workloads that use numpy, pandas, and pyarrow data typically release the GIL, thus allowing for multithreading; this notably excludes object-type pandas strings, which are the default, and a few other operations which should intuitively release the GIL but were implemented upstream in a way that they don't.
Even if your algorithm thoroughly releases the GIL at all steps, you still need to go in and out of pure-Python space, which acquires the GIL; this means that you will get GIL contention if you increase the number of worker threads per process too much or if your chunk size is too small (given the same overall data size, halving the chunk size will double the time you spend in pure-Python space).

Or use multiprocessing and duplicate data?

The current way to avoid GIL contention in dask is to start multiple processes on the same host - or, if running on the cloud, prefer smaller instance sizes. This however carries the problem that data needs to be duplicated across workers, which is expensive in terms of (de)serialization and transfer time and substantially increases overall memory usage. This issue is negligible in embarassingly parallel problems and can be extremely severe in O(n^2) problems, where each worker can end up needing all the data from all other workers.

A recent benchmark (coiled/benchmarks#551) showed major gains in RAM usage, but severe performance deterioration across the board, when increasing the number of threads per worker from 2 to 4, while keeping the total cluster-wide number of threads constant.

Shared memory model

Shared memory lets you store raw buffers (such as numpy arrays, non-object pandas dataframes, and pyarrow tables) in areas of memory that is accessible to multiple processes on the same host. Data is not duplicated and transferred over a socket, although you still incur in a deep-copy from private memory to shared memory (very hard to avoid in dask). You still incur in serialization costs; however, as long as you don't deal with pure-Python data types - importantly, you don't use object-type strings in pandas - serialization cost is constant and becomes negligible for modest (~64 MiB) chunk sizes.

System setup

This PR introduces a file-based shared memory model, which - if merged - will always remain off by default because it needs substantial changes to the host.
The user needs to use a tmpfs mountpoint for the dask-worker-space spill directory. tmpfs is a ramdisk which (a) doesn't consume RAM unless it's actually filled up and (b) automatically swaps out when the overall RAM usage exceeds physical RAM. This design is tested on Linux and will never work on Windows. MacOSX viability is to be explored; you'd need a filesystem with both of the above characteristics.

All linux distros mount a tmpfs on /dev/shm, which by default is capped at 50% of the physical RAM. For large problems, the user would need to mount a sizeable swap file and mount a tmpfs that is larger than RAM.
Typically swap files are dedicated partitions; however for the purpose of prototyping/testing you can create one in a regular file on your ext4 for a small performance penalty:

$ dd if=/dev/zero of=/somewhere/swap bs=1M count=81920
$ mkswap /somewhere/swap
$ sudo swapon /somewhere/swap
$ sudo mount -t tmpfs -o size=112G,mode=1777 dask /somewhere/spill

The above creates a 80 GiB swap file and a 112 GiB tmpfs on top of it (a good sizing if you have 64 GiB physical RAM).
You'll then need to start the workers with

$ export DASK_DISTRIBUTED__WORKER__MEMORY__SHARED=true
$ dask worker <scheduler address> --local-directory /somewhere/spill

You may also consider reducing your memory_limit and tweaking your distributed.worker.memory.target - read more below.

Finally, this design requires each worker process to keep a lot of file descriptors open; you should check your ulimit -a as some Linux distros use a very low default.

This PR requires dask/zict#80.

Low level design

Dask doesn't really know, nor care, that it's now spilling to tmpfs. What it does know however is that

  • it should now use memory-mapped I/O when unspilling, and
  • spilling on a worker and unspilling on another worker on the same host is a better idea than transferring everything over the network.

What changes when you enable distributed.worker.memory.shared:

  • Keys are not deleted from data.slow (e.g. the files on disk) when unspilling; only when deleting the key altogether from data.
  • When unspilling, buffers are created on top of memory-mapped I/O to the spill file. This makes unspilling extremely fast; however if the spilled data is on disk (ext4 or swapped-out tmpfs), you'll pay the cost of disk I/O later, when the task execution actually accesses the buffer.
  • managed memory is recalculated as sizeof(object) - size of the memory-mapped buffers.

For example:
a) store in Worker.data a 10 MB numpy array -> it's in private memory; so managed memory is 10,000,105 bytes (105 bytes is the overhead of an empty numpy array in memory)
b) spill it -> managed memory =0; spilled = 10,000,500 bytes (500 bytes is the overhead of an empty serialized numpy array). If you've spilled to tmpfs, your host-wide memory consumption has remained 10 MB, with a brief flare of 20 MB during the deep-copy from private to shared memory.
c) unspill it -> managed memory =105 bytes; spilled = 10,000,500 bytes. You will only need 105 bytes worth of process memory to accomodate this.

  • spilled memory per worker reported to the scheduler and prometheus is defined as total disk usage on the spill partition / number of workers, so effectively all workers on the same host will report the same amount. This will cause incorrect reporting if the spill directory is not the only thing on the partition, and is not a great design to begin with - but the alternative is to keep granular information on the scheduler side about the spill state of each key on each worker. Note that this alternate spilled memory reporting is only used when distributed.worker.memory.shared=True.

  • For as long as the unspilled object backed by memory-mapped data is referenced on the worker, there will be one matching open file descriptor, which is automatically closed when the mmap.mmap object is garbage-collected. This will happen after the last reference to the object is deleted; notably in case of network transfers to other hosts this may be after the key is deleted from Worker.data.
    Note that this design can cause problems with user-defined that contain circular dependencies and is hostile to pypy. I don't think this is an actual issue, as such data would currently already cause severe memory problems.

  • When a peer worker on the same host performs a get-data request, keys are spilled if they weren't already; the key in fast is deleted (unless it's already backed by shared memory) to avoid data duplication. This overrides the normal LRU algorithm for spilling. Then, instead of the serialized frames, the donating worker sends a triplet of (path on disk, size of non-shareable data, size of shareable buffers).

  • The receiving worker receives the above and creates a hard-link of the spilled file into its own spill directory.
    This keeps the lifecycle of spilled data tied to that of the worker, including emergency cleanup from the nanny in case of worker death, but with the caveat that the space on the spill disk will be freed when (a) all workers delete the object from data and (b) the unspilled data is dereferenced. This means that you may transitorily have open file descriptors to deleted files, which cause df -h /somewhere/spill to return a larger number than du -sh /somewhere/spill. Both Linux and MacOSX support deleting files with open file descriptors on them, whereas this upsets Windows greatly.

Problems

Not compatible with pure-python data

The fundamental assumption of this design is that a serialize->deserialize round-trip is extremely fast and creates negligible amounts of duplicated data. So it will perform very poorly - much worse than the fully-private design on main - if you have heavy amounts of pure-python data - namely, object-type pandas strings.

Transitory duplication between private and shared memory

If you

  1. finish task x on host a; thus storing its output in private memory in worker.data.fast
  2. start task y, which depends on x on the same host; thus acquiring a reference to the private-memory version of the object
  3. while task y is running, task z is scheduled on host b, also depending on x
  4. a will spill x, deleting it from worker.data.fast, but the private memory won't be freed until task y completed. You will thus end up with a temporary duplication in memory usage.

A simple workaround is to set distributed.worker.memory.target=0, so that everything is immediately spilled as soon as it's created. This however means that if a task is exclusively used by other tasks on the same worker and never transferred over the network, you'll end up paying serialization, deserialization, and deep-copy costs. You'll also pay for deserialization every single time you need to use the key. Also, if you have object-type pandas strings and you convert them to dtype=string[pyarrow], you'll need to make sure that either the conversion happens within the same task, or that the load and convert tasks are fused together during optimization.
We could investigate more sophisticated heuristics, but this is beyond the scope of this initial ticket.

Benchmarks

Setup

All benchmarks below ran on a LocalCluster on a single Linux desktop with

  • 64 GiB physical RAM
  • 16 CPUs, 32 hyperthreads
  • 8 workers, 2 threads per worker
  • memory_limit=8 GiB

The problem

I exclusively tested a n^2 problem that was expected to benefit from this design. Other use cases were not tested. Embarassingly parallel problems should receive little benefit, whereas manipulation of pandas DataFrames with object strings is expected to perform very poorly (as discussed above).

a = dask.array.random.random((n, n), chunks="128 MiB")
b = (a * a.T).sum()
b.compute()

All plots below were generated with #6241.

Benchmark 1: problem fits in memory

In this first benchmark the a array is 8 GiB in size and dask on main completes without ever touching the spill file.

Tested configurations:

  • shared memory=false
  • shared memory=true, spilling on tmpfs, distributed.worker.memory.target=0.6
  • shared memory=true, spilling on tmpfs, distributed.worker.memory.target=0

8 gib

Benchmark 2: problem requires splling

a is 16 GiB in size and dask on main requires very heavy spilling and unspilling.
Tested configurations:

  • shared memory=false, spilling on NVMe. In the plot below, "spilled" memory is fully on disk.
  • shared memory=false, spilling, pausing, and termination disabled (memory_limit=False), swapping on NVMe through the OS. In the plot below, "managed" memory is partially backed by disk.
  • shared memory=true, spilling on tmpfs, distributed.worker.memory.target=0.6. In the plot below, "spilled" memory is not enough to ever get swapped out by the OS.
  • shared memory=true, spilling on tmpfs, distributed.worker.memory.target=0. In the plot below, "spilled" memory is not enough to ever get swapped out by the OS.

16 gib

In the third plot from the left, the problem of private-shared memory duplication described in the previous paragraph is quite evident.

Out of scope / future expansions

  • This design exacerbates all existing problems related to spilling mechanics. Namely, Asynchronous Disk Access in Workers #4424 becomes an even higher priority.
  • This design exacerbates the issue of managed memory which becomes unmanaged. It would be straightforward to track and display this phenomenon thorugh weakrefs created by SpillBuffer.__getitem__.
  • To counteract the above, the scheduler could send a hint through annotations that it plans to schedule some dependencies of a task on different workers, selectively triggering an immediate spill on completion
  • We could easily extend this design to client.scatter / client.gather when the client and workers are on different processes but on the same host (e.g. LocalCluster). I understand @martindurant would be particularly interested in this.
  • We could explore a heuristic which infers if the data returned by each task is primarily buffer-based (so with trivial (de)serialization costs and trivial data duplication on roundtrip) or primarily pure-python, and transparently fall back to non-shared memory for the latter (so data is not duplicated between fast and slow).
  • We could explore a more sophisticated design where hybrid pandas dataframes (some columns are object, some are native) are rebuilt as soon as it's created without pickling/unpickling the object columns. This would require more than one file per key on disk.
  • This could be also used to speed up process pool executors. Although it would only benefit tasks that operate on buffers and don't release the GIL, which is somewhat of an edge case.

@crusaderky crusaderky self-assigned this Jan 4, 2023
@crusaderky crusaderky marked this pull request as draft January 4, 2023 13:25
@fjetter
Copy link
Member

fjetter commented Jan 4, 2023

I only skimmed this. IIUC this is not really shared memory from dasks POV but rather implements off-band data communication. In this case the off band communication uses a mem disk that is mounted by both workers and you're (ab)using the spill implementation for this. is this a more or less accurate summary?

@martindurant
Copy link
Member

Before going through this (and I promise I will), quick question: do you think my plan to go back to the original shm-scatter is still a useful thing? It would be totally orthogonal from the approach here, and require the user to wrap whatever object they was replicated to all workers in an object that pickles it via shm. My very first POC was of this type, rather than the complex serialisation layer I have now abandoned.

@crusaderky
Copy link
Collaborator Author

IIUC this is not really shared memory from dasks POV

It is shared memory, if you consider that the multiprocessing.shared_memory module (which this PR doesn't use) is just a thin, no-magic wrapper around a mmap'ed file on /dev/shm.
It is shared memory, since if something writes back to the input of a task (which is a big no-no in dask), it will be reflected on all workers holding the same key.
It isn't shared memory, in the sense that it doesn't have to be backed by a ramdisk, but performance will be very poor if it is'nt.

but rather implements off-band data communication.

The two processes end up pointing to the exact same memory region, and could potentially write back to it. I would not call it communication.

In this case the off band communication uses a mem disk that is mounted by both workers and you're (ab)using the spill implementation for this. is this a more or less accurate summary?

It is accurate. Again, abusing a ramdisk is exactly the same thing that distributed.shared_memory does.

do you think my plan to go back to the original shm-scatter is still a useful thing? It would be totally orthogonal from the approach here, and require the user to wrap whatever object they was replicated to all workers in an object that pickles it via shm.

Depends. How do you manage the lifecycle of the shared object? Does it rely on the scattering client to clean up after the compute is finished? This PR focuses on worker-to-worker comms with no change needed in the workflows, but can be straightforwardly expanded to client<->worker comms (after compute(), the client would end up holding a file descriptor to a deleted file on the dask-worker-space).

@martindurant
Copy link
Member

How do you manage the lifecycle of the shared object?

plasma/vineyard have reference counting, so the thing would be cleaned up when all python processes have released their reference to the memoryview object. A multiprocessing shm-manager version would be plausible too. Perhaps I should just make the implementation for comparison reasons, but I don't really have time right now :). It would be in everyone's interests if we can do all the things in the one framework presented here, of course.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 4, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

  12 files   -        12  12 suites   - 12   4m 19s ⏱️ - 10h 51m 8s
  32 tests  -   3 303  18 ✔️  -   3 214    11 💤  -      92  2 +2    1 🔥 +  1 
152 runs   - 39 166  36 ✔️  - 37 413  102 💤  - 1 767  4 +4  10 🔥 +10 

For more details on these failures and errors, see this check.

Results for commit 00a02f1. ± Comparison against base commit 70abff0.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the shared_memory branch 2 times, most recently from f4a163e to 1cb8c9a Compare January 9, 2023 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants