-
-
Notifications
You must be signed in to change notification settings - Fork 723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DNM] File-based shared memory model #7449
base: main
Are you sure you want to change the base?
Conversation
I only skimmed this. IIUC this is not really shared memory from dasks POV but rather implements off-band data communication. In this case the off band communication uses a mem disk that is mounted by both workers and you're (ab)using the spill implementation for this. is this a more or less accurate summary? |
Before going through this (and I promise I will), quick question: do you think my plan to go back to the original shm-scatter is still a useful thing? It would be totally orthogonal from the approach here, and require the user to wrap whatever object they was replicated to all workers in an object that pickles it via shm. My very first POC was of this type, rather than the complex serialisation layer I have now abandoned. |
It is shared memory, if you consider that the multiprocessing.shared_memory module (which this PR doesn't use) is just a thin, no-magic wrapper around a mmap'ed file on /dev/shm.
The two processes end up pointing to the exact same memory region, and could potentially write back to it. I would not call it communication.
It is accurate. Again, abusing a ramdisk is exactly the same thing that
Depends. How do you manage the lifecycle of the shared object? Does it rely on the scattering client to clean up after the compute is finished? This PR focuses on worker-to-worker comms with no change needed in the workflows, but can be straightforwardly expanded to client<->worker comms (after compute(), the client would end up holding a file descriptor to a deleted file on the dask-worker-space). |
plasma/vineyard have reference counting, so the thing would be cleaned up when all python processes have released their reference to the memoryview object. A multiprocessing shm-manager version would be plausible too. Perhaps I should just make the implementation for comparison reasons, but I don't really have time right now :). It would be in everyone's interests if we can do all the things in the one framework presented here, of course. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 12 files - 12 12 suites - 12 4m 19s ⏱️ - 10h 51m 8s For more details on these failures and errors, see this check. Results for commit 00a02f1. ± Comparison against base commit 70abff0. ♻️ This comment has been updated with latest results. |
f4a163e
to
1cb8c9a
Compare
e931384
to
00a02f1
Compare
This is a new approach to shared memory, alternative to #6503, using bare files on disk instead of third party libraries. The main advantage is that we retain fine control on performance and thread-safety.
Executive summary
As long as you have all of the below:
then this has the potential to drastically reduce your RAM requirements (by a factor of 2.5x in the benchmarks below).
Business case
Use multithreading and hit the GIL...
Python is generally hostile to multi-threading due to the GIL. Workloads that use numpy, pandas, and pyarrow data typically release the GIL, thus allowing for multithreading; this notably excludes object-type pandas strings, which are the default, and a few other operations which should intuitively release the GIL but were implemented upstream in a way that they don't.
Even if your algorithm thoroughly releases the GIL at all steps, you still need to go in and out of pure-Python space, which acquires the GIL; this means that you will get GIL contention if you increase the number of worker threads per process too much or if your chunk size is too small (given the same overall data size, halving the chunk size will double the time you spend in pure-Python space).
Or use multiprocessing and duplicate data?
The current way to avoid GIL contention in dask is to start multiple processes on the same host - or, if running on the cloud, prefer smaller instance sizes. This however carries the problem that data needs to be duplicated across workers, which is expensive in terms of (de)serialization and transfer time and substantially increases overall memory usage. This issue is negligible in embarassingly parallel problems and can be extremely severe in O(n^2) problems, where each worker can end up needing all the data from all other workers.
A recent benchmark (coiled/benchmarks#551) showed major gains in RAM usage, but severe performance deterioration across the board, when increasing the number of threads per worker from 2 to 4, while keeping the total cluster-wide number of threads constant.
Shared memory model
Shared memory lets you store raw buffers (such as numpy arrays, non-object pandas dataframes, and pyarrow tables) in areas of memory that is accessible to multiple processes on the same host. Data is not duplicated and transferred over a socket, although you still incur in a deep-copy from private memory to shared memory (very hard to avoid in dask). You still incur in serialization costs; however, as long as you don't deal with pure-Python data types - importantly, you don't use object-type strings in pandas - serialization cost is constant and becomes negligible for modest (~64 MiB) chunk sizes.
System setup
This PR introduces a file-based shared memory model, which - if merged - will always remain off by default because it needs substantial changes to the host.
The user needs to use a tmpfs mountpoint for the
dask-worker-space
spill directory. tmpfs is a ramdisk which (a) doesn't consume RAM unless it's actually filled up and (b) automatically swaps out when the overall RAM usage exceeds physical RAM. This design is tested on Linux and will never work on Windows. MacOSX viability is to be explored; you'd need a filesystem with both of the above characteristics.All linux distros mount a tmpfs on
/dev/shm
, which by default is capped at 50% of the physical RAM. For large problems, the user would need to mount a sizeable swap file and mount a tmpfs that is larger than RAM.Typically swap files are dedicated partitions; however for the purpose of prototyping/testing you can create one in a regular file on your ext4 for a small performance penalty:
The above creates a 80 GiB swap file and a 112 GiB tmpfs on top of it (a good sizing if you have 64 GiB physical RAM).
You'll then need to start the workers with
You may also consider reducing your memory_limit and tweaking your
distributed.worker.memory.target
- read more below.Finally, this design requires each worker process to keep a lot of file descriptors open; you should check your
ulimit -a
as some Linux distros use a very low default.This PR requires dask/zict#80.
Low level design
Dask doesn't really know, nor care, that it's now spilling to tmpfs. What it does know however is that
What changes when you enable
distributed.worker.memory.shared
:For example:
a) store in
Worker.data
a 10 MB numpy array -> it's in private memory; so managed memory is 10,000,105 bytes (105 bytes is the overhead of an empty numpy array in memory)b) spill it -> managed memory =0; spilled = 10,000,500 bytes (500 bytes is the overhead of an empty serialized numpy array). If you've spilled to tmpfs, your host-wide memory consumption has remained 10 MB, with a brief flare of 20 MB during the deep-copy from private to shared memory.
c) unspill it -> managed memory =105 bytes; spilled = 10,000,500 bytes. You will only need 105 bytes worth of process memory to accomodate this.
spilled memory per worker reported to the scheduler and prometheus is defined as total disk usage on the spill partition / number of workers, so effectively all workers on the same host will report the same amount. This will cause incorrect reporting if the spill directory is not the only thing on the partition, and is not a great design to begin with - but the alternative is to keep granular information on the scheduler side about the spill state of each key on each worker. Note that this alternate spilled memory reporting is only used when
distributed.worker.memory.shared=True
.For as long as the unspilled object backed by memory-mapped data is referenced on the worker, there will be one matching open file descriptor, which is automatically closed when the
mmap.mmap
object is garbage-collected. This will happen after the last reference to the object is deleted; notably in case of network transfers to other hosts this may be after the key is deleted fromWorker.data
.Note that this design can cause problems with user-defined that contain circular dependencies and is hostile to pypy. I don't think this is an actual issue, as such data would currently already cause severe memory problems.
When a peer worker on the same host performs a
get-data
request, keys are spilled if they weren't already; the key in fast is deleted (unless it's already backed by shared memory) to avoid data duplication. This overrides the normal LRU algorithm for spilling. Then, instead of the serialized frames, the donating worker sends a triplet of (path on disk, size of non-shareable data, size of shareable buffers).The receiving worker receives the above and creates a hard-link of the spilled file into its own spill directory.
This keeps the lifecycle of spilled data tied to that of the worker, including emergency cleanup from the nanny in case of worker death, but with the caveat that the space on the spill disk will be freed when (a) all workers delete the object from
data
and (b) the unspilled data is dereferenced. This means that you may transitorily have open file descriptors to deleted files, which causedf -h /somewhere/spill
to return a larger number thandu -sh /somewhere/spill
. Both Linux and MacOSX support deleting files with open file descriptors on them, whereas this upsets Windows greatly.Problems
Not compatible with pure-python data
The fundamental assumption of this design is that a serialize->deserialize round-trip is extremely fast and creates negligible amounts of duplicated data. So it will perform very poorly - much worse than the fully-private design on main - if you have heavy amounts of pure-python data - namely, object-type pandas strings.
Transitory duplication between private and shared memory
If you
A simple workaround is to set
distributed.worker.memory.target=0
, so that everything is immediately spilled as soon as it's created. This however means that if a task is exclusively used by other tasks on the same worker and never transferred over the network, you'll end up paying serialization, deserialization, and deep-copy costs. You'll also pay for deserialization every single time you need to use the key. Also, if you have object-type pandas strings and you convert them todtype=string[pyarrow]
, you'll need to make sure that either the conversion happens within the same task, or that the load and convert tasks are fused together during optimization.We could investigate more sophisticated heuristics, but this is beyond the scope of this initial ticket.
Benchmarks
Setup
All benchmarks below ran on a LocalCluster on a single Linux desktop with
The problem
I exclusively tested a n^2 problem that was expected to benefit from this design. Other use cases were not tested. Embarassingly parallel problems should receive little benefit, whereas manipulation of pandas DataFrames with object strings is expected to perform very poorly (as discussed above).
All plots below were generated with #6241.
Benchmark 1: problem fits in memory
In this first benchmark the
a
array is 8 GiB in size and dask on main completes without ever touching the spill file.Tested configurations:
distributed.worker.memory.target=0.6
distributed.worker.memory.target=0
Benchmark 2: problem requires splling
a
is 16 GiB in size and dask on main requires very heavy spilling and unspilling.Tested configurations:
distributed.worker.memory.target=0.6
. In the plot below, "spilled" memory is not enough to ever get swapped out by the OS.distributed.worker.memory.target=0
. In the plot below, "spilled" memory is not enough to ever get swapped out by the OS.In the third plot from the left, the problem of private-shared memory duplication described in the previous paragraph is quite evident.
Out of scope / future expansions
SpillBuffer.__getitem__
.