-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] Polylithic: Enabling multi-threaded DataLoading through non-monolithic parallelism #1334
Comments
Some comments / discussions from earlier:
|
I was perfectly happy with datapipes, it provided me simple building blocks that allowed to optimize heavy-weight processes. I don't understand the need to kill them with no replacement but a promise of a better solution which addresses a completely different problem. |
Thanks @knoopx for the comment, is there something particular that you are doing with datapipes that wouldn't be possible with this proposal? |
@andrewkho I mostly use iterable-style datapipes, I like the simplicity and being able to easily chain them together and defer execution. I use them for all sort of things, not just for ML stuff. Iterable datapipes feel like python-esque observables/streams/deferables/futures/promises to me. the problems the proposal tries to solve are novel, and I'm pretty sure I could accomplish the same things but imho this new api looks like a step backwards in developer experience and i'm not sure it will solve all the existing pitfalls (like "debuggability"), after-all parallelism is intrinsically a hard problem (plus python gotchas) and adding more lower-level abstractions won't make it easier for regular users. just hopping an alternative higher-level api comes later, after you figure out all the necessary building blocks. |
@knoopx thanks for the feedback! yes we definitely want dev-ex to be the primary thing we optimize alongside efficiency. We do eventually want to have higher-level building blocks built on the lower-level foundational building blocks. We'd also welcome any contributions if you see gaps! One question in terms of dev-ex: we currently are trying to optimize for Eager-first execution so things like debugging and experimenting are easier. Can you point to examples of how you're currently using datapies with futures/promises? |
you got me wrong, I use iter-datapipes as a more convenient replacement of promises, makes it easy to turn existing sync code into async. this example also shows auto-tuning is not necessarily useful for every scenario as the primary bottleneck in here is network. # async def fetch(url):
await asyncio.gather(*[fetch(url) for url in urls])
# def fetch(url):
list(IterableWrapper(urls).threadpool_map(fetch)) I would also strongly suggest you to get some inspiration from some of the concepts/api design of ReactiveX (hybrid observable+iterable pattern) (https://reactivex.io/) |
@knoopx would something like this work for you?
We can iterate on ideas for syntax sugar to make it look more like the IterableWrapper().threadpool_map example you've given, eg
|
Hey @andrewkho, this is very cool! The design makes a lot of sense for me and the focus on modularity and fine-grained control over parallelism and sharding is very appreciated! I haven't read the RFC in detail yet so I can't give thorough feedback yet. However, one point I am missing is a
which is then used here:
Otherwise, I am seeing very similar ideas to my own design, so I really like this new direction. E.g. something I had to implement ad-hoc as well:
It sounds like you plan to implement very similar building blocks (nodes) if I read the RFC correctly? Very cool. |
Thanks for the comment @sehoffmann ! Yes we've actually landed some of this already, for prefetcher see: https://github.com/pytorch/data/blob/main/torchdata/nodes/prefetch.py#L16 For epochs, there's an open PR to add this functionality, PTAL and let me know your thoughts: #1357 One thing I'm contemplating is some global callback mechanism that will traverse the dag and check |
This RFC was re-created due to a problem with the original. Summary of comments from previous issue below.
🚀 The feature
TL;DR - We want to lean into modular Multi-Threading/Multi-Processing instead of the current monolithic Multi-Processing, and steer users away from the monolithic Dataset parallelism approach towards composable DataSources, and composable IterableDatasets for pre-proc operations, with parallelism configured within each operation. This will enable multi-threaded dataloading (with NoGIL support), auto-tunable parallelism, torch.compilable and GPU enabled preproc operations, more efficient loading of mixed-modalities, and composable dataloading and pre-proc graphs.
Motivation, pitch
Working name for the project: Polylithic (non-monolithic)
Where it will live: torchdata
Multimodal DataLoading is here and torch.utils.data doesn’t support it well
Multi-Modal LLMs are here. Tasks like fine-tuning, alignment, and distillation will require multi-modal dataloading for our users. LLM training often requires reading from 10s-100s of multi-modal datasets, tokenizing them, and packing them into a “token-buffer” where tokens from individual datasets are shuffled and combined into training examples for the model.
Audio, Image, and Video datasets may also require heavy-weight decoding operations to be performed before tokenization, and the difference in the data sizes between text, image, and video may be orders of magnitude. GPU decoding of images and video is an option for users as well, and libraries like Nvidia DALI will compile the entire pre-proc pipeline into GPU operations, minimizing the overhead of transfers between CPU and GPU memory.
torch.utils.data’s Dataset and DataLoader abstractions are extremely popular with users, however they are not well equipped to handle MultiModal DataLoading and accelerated pre-proc, because of the monolithic, black-box way in which it treated parallelism with multiprocessing; ie running GPU Preproc under multiprocessing is not currently realistic. While the abstractions are extremely flexible and very easy to experiment with, users are often required to write bespoke classes to create pre-proc pipelines, handle data sharding and combine multiple datasets. Optimizing is also a challenge because of the lack of control in parallelism.
Existing Context and definitions
Torch.utils.data contains the following abstractions today:
“Monolithic” parallelism
Currently users have a single lever to control parallelism, num_workers. When num_workers > 0, the DataLoader creates background processes and holds a copy of the entire Dataset object in process memory, treating it as a “monolithic” object to be parallelized.
Consider the scenario in the figure below, where a user has defined an iterable dataset which combines two text datasets and one image dataset. There is no parallelism in this example.
Now consider the common case when only the image-decoding and tokenization is a bottleneck causing GPU Starvation. With today’s tooling, users simply increase dataloader num_workers > 1. The image below depicts how this is done today, by treating the entire IterableDataset as a monolith that is forked/spawned to another process.
Pain-points with Monolithic Parallelism for Multi-Modal LLM training
Multimodal data loading may require different levels of parallelism for different modalities, e.g. text tokenization may require only a single worker, while image decoding may benefit from 4+. The “monolithic” approach needlessly parallelizes operators that don’t need them, increasing memory and CPU utilization for things like token buffers. Tuning parallelism for performance is difficult as there is only one knob (num_workers) available.
Enabling GPU-PreProc pipelines (see Nvidia-DALI) may improve total training throughput for many users, however combining multiprocessing (eg to parallelize blob-fetching) and GPU PreProc (eg for image decoding / cropping) in the same Dataset is not currently possible.
Tensor and Pipeline parallelism offer opportunities for more efficient and more resilient/correct dataloading, however the current torch.utils.data.DataLoader is not well equipped to take advantage of this.
As we gradually move to a NoGIL world and multi-threading becomes a viable method to parallelize, the current monolithic approach requires the entire Dataset (dataloading and preproc) and its dependencies to be thread-safe, which may cause problems with adoption.
We also suffer from the usual multi-processing pain points:
A granular parallelism approach
To fix the monolithic parallelism problem, we want to introduce abstractions and tooling that expose more granular parallelism controls to users. This implies a solution where users construct their dataloading and pre-proc pipelines by defining and stitching together datasource and pre-proc nodes into a graph, in a similar fashion to tf.data and datapipes, with data passing between the nodes. The root of the graph is the node which produces batches that are passed to the model. The leaves are data-sources which produce data by reading from local disk, remote storage, or eg random number generators. Intermediate nodes may transform data, perform pre-fetching, combine data from multiple nodes, perform “enrichments” by eg fetching images from blob stores, perform decoding, schedule GPU operations etc.
Requirements and Constraints
To adequately support Multi Modal LLM training for PyTorch users, address the above pain points, and give us the best chance for wide-adoption, we want our solution to meet the following requirements and constraints:
How will we achieve this/what will we build? Plan of Record
We will introduce a new base class, (working name) say class PolylithicNode(torch.utils.data.IterableDataset). Nodes in the graph will be instances of subclasses of PolylithicNode. Nodes will define a .iterator() method instead of overriding __iter__(). This is inspired by nn.Module’s implementation where users define .forward() instead of __call__. This will allow PolylithicNode to instantiate user-defined iterators and wrap them, insert queues for pipeline-parallelism, and measure latency. For backwards compatibility, we’ll provide a wrapper which takes an existing IterableDataset. Users can compose their datasets by composing PolylithicNodes (ie through iter() and next()).
Example of composing iterable datasets to create a multimodal dataloader. [Note that we are open to ideas on syntactical sugar]
More complex diagram
What about DataPipes and DL v2?
DataPipes and DL v2 were designed to address issues like composability, and there is a lot of value in what they’ve built, however their parallelism and sharding structure is still based on a monolithic approach (eg plug a datapipe into DL v1, or DL v2 + multiprocess reading service). They required migration/rewrite of datasets with often no improvement in performance, identifying dataloading-preproc bottlenecks was a challenge, and shuffling/sharding pain points weren’t adequately addressed.
The proposed approach improves upon DataPipes + DLv2 in the following ways:
We want to maintain the composable aspects of datapipes, the eager-execution, and continue our partnerships with storage and cloud providers (AWS, Azure, GCP) where they provide high-performance clients, share customer pain points, and provide recommended solutions and examples to their users.
Alternatives
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: