[RFC] GPU object store support in Ray Core #51173
Labels
core
Issues that should be addressed in Ray Core
enhancement
Request for new feature and/or capability
RFC
RFC issues
GPU Support in Ray Core
Authors: @stephanie-wang @edoakes
TLDR: We discuss a design for GPU objects (specifically
torch.Tensors
) in the Ray Core API.Requirements
The goal of this API proposal is to add support for GPU “objects” and direct GPU-GPU communication in the Ray Core API.
Goals:
(Current) limitations:
Background
This doc is motivated by recent evidence that Compiled Graphs may have limited applicability to current applications. Its main use cases are online/offline inference (which is currently bottlenecked on vLLM development) and distributed training (which will take a while to develop). Meanwhile, we have other applications such as RLHF that we would like to support. These applications can use the Compiled Graphs API, but it requires significant developer effort and they are structured in such a way that the added performance gain is negligible. See Ray Compiled Graphs Q1 2025 update for more information.
Therefore, our goal is to introduce an “interpreted” version of the Ray Compiled Graphs API that enables direct GPU-GPU movement of torch.Tensors between Ray actors. This has been a common user request for almost as long as Ray has been around. This is to support the single-controller dataflow model for orchestrating GPU devices, in contrast to the current options with Ray:
Ultimately, the goal is for this API to be consistent with the existing Ray Compiled Graphs API. Ideally, it should be simple to change between these APIs.
Other useful docs:
APIs
See [PUBLIC] RFC: GPU objects in Ray Core API.
Proposed Design
The actor’s creator will be responsible for coordinating the transfers between actors. For simplicity, we will call this creator process the “driver”, although it may not be the driver of the overall Ray job. The driver will order all transfers between actors to ensure that collective operations are scheduled on actors in a consistent order, to avoid deadlock.
Each actor will locally store the tensors that they are sending/receiving in Python. We will extend each Ray actor with the following Python state:
Dict[CommID, Communicator]
: A map of (NCCL) communicators that the actor is a participant intensor_store
:Dict[Tuple[ObjectID, tensor_index], Tuple[torch.Tensor, int]]
: A map from ObjectRef to the torch.Tensor and its current reference count. Tensor_index is used for objects that may contain multiple torch.TensorsCollective group initialization and destruction
Collective group initialization and destruction is accomplished by having the driver send a creation/destruction task to each actor. For example, if the user code looks like this:
Then, during init_group, the driver will launch a pre-defined task to each actor that:
self.communicators
Example: GPU-GPU communication via NCCL
Suppose we have example code like this that sends a torch.Tensor from actor A to actor B:
In this case, the steps on the driver are:
On actor A:
On actor B:
The flow of messages looks something like this. Steps that have the same number can proceed concurrently:
The protocol for collective communication is similar. The only difference is that the driver must dispatch to all actors in the group, and we would use a BeginCollective RPC instead of BeginSend/BeginRecv.
WARNING: Ensuring data consistency
One caveat of this approach is that the user may still have a pointer to the tensor while it’s in the tensor_store and pending transfers or collectives to other nodes. This can lead to data inconsistency if the user modifies the tensor while or before it is sent to other actors.
Detecting whether the user has a pointer is also hard to detect. Tracking Python references is not sufficient because different torch.Tensors could share the same physical data, etc.
Therefore, the user needs to be careful when sending tensors. Ideally, we should expose an API to allow the user to synchronize with any ongoing sends/collectives, so that they know when it’s safe to write the data. This kind of synchronization would only be possible for actors with concurrency enabled, because otherwise synchronization could hang the actor.
One possibility is to provide a future-like syntax, keyed by torch.Tensor. For example:
This program could hang if the GPUObjectRef corresponding to `self.tensor` never goes out of scope at the driver. One way to fix this is to allow copy-on-write: copy self.tensor back into the actor’s tensor storage after a timeout, allowing the user to use the original copy.
WARNING: Deadlock prevention
TODO
Dynamic tensor shapes
If the tensor shape is not known, then the driver needs to wait until A has finished and extracted all GPU tensors before sending to B. This looks something like this:
If there are multiple tensors in the value, the user can specify them using a “key” into the value. For example, if the returned value is a TensorDict, then the user would use the key strings to distinguish different tensors. Also, the tensor shape(s) can be specified on-the-fly, per method invocation instead of per method definition. For example, the following code specifies the shapes of two different tensors that are nested inside one Python object:
Memory management
The protocol must hook into Ray Core’s reference counting protocol (C++). In particular, if the driver’s GPUObjectRef goes out of scope, then we should send DecrementRefCount RPCs to the actor(s) that stored the original copy of this object. We can find these actors by storing weak refs to these actors’ handles inside the GPUObjectRef.
We should support configuration of each actor’s maximum allowed GPU memory for its
self.tensor_store
. If the actor tries to place a GPU object in its store and it would exceed the store’s capacity, then the actor should throw an OutOfMemoryError. This error should get propagated to all downstream tasks.In the future, we can consider more advanced memory management such as:
The same tensor may be passed as a task argument multiple times to the same actor. If the tensor must be received from a different actor, then we have two options:
We will favor the second option initially since this is simpler, but less efficient if significant data needs to be transferred.
Overlapping compute and communication
This is a critical performance feature in most distributed GPU applications. To support this, we can use a similar design as Ray Compiled Graphs: [PUBLIC] Design: Overlapping GPU communication in aDAG. The main difference would be that we cannot rearrange the order of tasks before execution; instead the driver will guarantee a consistent global order by submitting operations one at a time.
To avoid blocking on the CPU, we may need to use Python or potentially C++ multithreading to handle the BeginSend/BeginRecv/BeginCollective RPCs. Also, we may need to rate-limit the pending communication ops to avoid memory buildup.
Other communication transports
Intra-actor: Skipping serialization/communication
If a
GPUObjectRef
is passed back to a task on the same actor that created the data, then we can avoid serialization. This optimization is already done in Ray Compiled Graphs but has not been possible in Ray Core because we always serialize the data into the object store.Intra-node: CUDA memcpy and IPC
TODO
CPU-GPU
TODO
Driver-specific communication
Dynamic/autoscaling actor groups: RDMA / NVSHMEM / etc
NCCL only supports static actor groups. If the membership of the group needs to be changed, e.g., for autoscaling or upon failure, then the NCCL group needs to be recreated. NCCL group creation is also quite slow. This is a known issue for NCCL users in autoscaling environments.
Initially, we plan to use NCCL because it is the industry standard for NVIDIA GPU communication. However, in the future, we can consider adding support for dynamic actor groups. This includes two different features:
A simple version of Feature 1 is to simply re-create a NCCL group upon actor addition or deletion. If it happens relatively infrequently, the performance overhead is okay.
Feature 2 is more challenging. NCCL group (re)creation is more likely to be a bottleneck when there is an elastic group of actors and many possible actor-actor pairs. Options:
Deadlock prevention
TODO
Implementation
Ray Compiled Graphs
Our top priority is to support vLLM. The secondary priority is to support distributed training, the development of which is primarily happening at UW.
To support these applications, we must harden the following features, some of which are shared with this current design proposal:
Specifically, this also means that we will de-prioritize the following features that were originally planned. This timeline also matches the current proposal better, in that it gives us more time to develop the current proposal before deciding how the two APIs can be flexibly used together.
Project Timeline
Target applications:
Initial prototype:
Checkpoint: veRL prototype complete, Ray Data GPU-GPU prototype possible?
Remaining features:
The text was updated successfully, but these errors were encountered: