Skip to content

Commit

Permalink
[PJRT:GPU] Treat GPU collective memory space as device memory space
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenying-liu committed Sep 9, 2024
1 parent 68a55c7 commit c37f245
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 23 deletions.
1 change: 1 addition & 0 deletions xla/layout.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ class Layout {
static constexpr int64_t kDefaultMemorySpace = 0;
static constexpr int64_t kGenericFastMemorySpace = 1;
static constexpr int64_t kHostMemorySpace = 5;
static constexpr int64_t kCollectiveMemorySpace = 6;
int64_t memory_space() const { return memory_space_; }
Layout& set_memory_space(int64_t value) {
memory_space_ = value;
Expand Down
8 changes: 4 additions & 4 deletions xla/pjrt/gpu/se_gpu_pjrt_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ GetStreamExecutorGpuDeviceAllocator(
allocator_config.preallocate, false, false, true));
allocators.emplace_back(std::move(async_allocator),
ordinal_and_device.second->compute_stream(),
/*memory_space=*/0);
/*memory_space=*/Layout::kDefaultMemorySpace);
}
break;
}
Expand All @@ -950,7 +950,7 @@ GetStreamExecutorGpuDeviceAllocator(
allocator_config.gpu_system_memory_size));
allocators.emplace_back(std::move(bfc_allocator),
ordinal_and_device.second->compute_stream(),
/*memory_space=*/0);
/*memory_space=*/Layout::kDefaultMemorySpace);
}
break;
}
Expand All @@ -977,7 +977,7 @@ GetStreamExecutorGpuDeviceAllocator(
allocator_config.collective_memory_size));
allocators.emplace_back(std::move(collective_bfc_allocator),
ordinal_and_device.second->compute_stream(),
/*memory_space=*/1);
/*memory_space=*/Layout::kCollectiveMemorySpace);
}

for (const auto& ordinal_and_device : addressable_devices) {
Expand All @@ -986,7 +986,7 @@ GetStreamExecutorGpuDeviceAllocator(
allocators.emplace_back(std::move(host_allocator),
ordinal_and_device.second->compute_stream(),
/*memory_space=*/
static_cast<int>(se::MemoryType::kHost));
Layout::kHostMemorySpace);
}

#if defined(GOOGLE_CUDA) && CUDA_VERSION >= 11020
Expand Down
62 changes: 62 additions & 0 deletions xla/pjrt/gpu/se_gpu_pjrt_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,24 @@ constexpr char const* kD2HProgramTupleOutput = R"(
}
)";

constexpr char const* kCollectiveMemorySpaceOutput = R"(
HloModule jit__psum, entry_computation_layout={(s32[1,4]{1,0})->s32[4]{0}}
region_0.3 {
Arg_0.0 = s32[] parameter(0)
Arg_1.0 = s32[] parameter(1)
ROOT add.0 = s32[] add(Arg_0.0, Arg_1.0)
}
ENTRY main.10_spmd {
param = s32[1,4]{1,0} parameter(0)
reshape = s32[4]{0} reshape(param)
ROOT all-reduce = s32[4]{0} all-reduce(reshape), channel_id=1, to_apply=region_0.3
}
)";

} // namespace

TEST(StreamExecutorGpuClientTest, ExecutePinnedHostOutputTest) {
Expand Down Expand Up @@ -1197,6 +1215,50 @@ TEST(StreamExecutorGpuClientTest, ExecutablePinnedHostOutputMemoryKindTest) {
EXPECT_EQ(memory_kinds[0][0], "pinned_host");
}

// Verify the output device memory kind with collective memory space shape when
// NCCL user buffer is enabled.
TEST(StreamExecutorGpuClientTest,
ExecutableCollectiveMemoryOutputMemoryKindTest) {
TF_ASSERT_OK_AND_ASSIGN(auto client,
GetStreamExecutorGpuClient(GpuClientOptions()));
xla::CompileOptions options;
options.executable_build_options.mutable_debug_options()
->set_xla_gpu_enable_nccl_user_buffers(true);

TF_ASSERT_OK_AND_ASSIGN(
auto executable,
CompileExecutable(kCollectiveMemorySpaceOutput, *client, options));
std::vector<int32_t> data{1, 2, 3, 4};
// Build the input shape with the correct memory space set.
Shape shape = ShapeUtil::MakeShapeWithDenseLayout(S32, {1, 4},
/*major_to_minor=*/{1, 0});
shape.mutable_layout()->set_memory_space(Layout::kDefaultMemorySpace);

auto device = client->addressable_devices()[0];
TF_EXPECT_OK(device->default_memory_space());
TF_ASSIGN_OR_RETURN(
auto input, client->BufferFromHostBuffer(
data.data(), shape.element_type(), shape.dimensions(),
/*byte_strides=*/std::nullopt,
PjRtClient::HostBufferSemantics::kImmutableOnlyDuringCall,
/*on_done_with_host_buffer=*/nullptr, device));
EXPECT_EQ(input->memory_space()->kind(), "device");

TF_ASSERT_OK_AND_ASSIGN(auto memory_kinds,
executable->GetOutputMemoryKinds());
EXPECT_EQ(memory_kinds.size(), 1);
EXPECT_EQ(memory_kinds[0].size(), 1);
EXPECT_EQ(memory_kinds[0][0], "device");

TF_ASSERT_OK_AND_ASSIGN(
auto result, executable->Execute({{input.get()}}, ExecuteOptions()));
std::vector<std::unique_ptr<xla::PjRtBuffer>>& result_buffers = result[0];
EXPECT_EQ(result_buffers[0]->memory_space()->kind(), "device");
Shape result_shape = result_buffers[0]->on_device_shape();
auto memory_space = result_shape.layout().memory_space();
EXPECT_EQ(memory_space, Layout::kCollectiveMemorySpace);
}

TEST(StreamExecutorGpuClientTest,
ExecutablePinnedHostTupleOutputMemoryKindTest) {
TF_ASSERT_OK_AND_ASSIGN(auto client,
Expand Down
2 changes: 2 additions & 0 deletions xla/pjrt/pjrt_stream_executor_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,7 @@ absl::StatusOr<std::unique_ptr<PjRtBuffer>> OutputBufferHelper(
device->default_memory_space().value_or(nullptr);
if (shape.has_layout()) {
switch (shape.layout().memory_space()) {
case Layout::kCollectiveMemorySpace:
case Layout::kDefaultMemorySpace:
// Nothing to do, we have already set the default memory space.
break;
Expand Down Expand Up @@ -3334,6 +3335,7 @@ absl::StatusOr<absl::string_view> MemoryKindFromSimpleShape(
switch (shape.layout().memory_space()) {
case Layout::kHostMemorySpace:
return PinnedHostMemorySpace::kKind;
case Layout::kCollectiveMemorySpace:
case Layout::kDefaultMemorySpace:
return default_memory_kind;
default:
Expand Down
6 changes: 3 additions & 3 deletions xla/service/gpu/gpu_memory_space_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.

#include "absl/status/status.h"
#include "xla/hlo/ir/hlo_opcode.h"
#include "xla/layout.h"
#include "xla/service/buffer_assignment.h"
#include "xla/service/hlo_alias_analysis.h"
#include "xla/service/hlo_ordering.h"
Expand All @@ -28,10 +29,9 @@ limitations under the License.
namespace xla {
namespace gpu {

inline constexpr int64_t kCollectiveMemorySpaceColor = 1;
inline constexpr int64_t kTempBufferMemorySpaceColor = 2;

// Set memory space to kCollectiveMemorySpaceColor for all allocations used by
// Set memory space to kCollectiveMemorySpace for all allocations used by
// all-reduce, all-gather, and reduce-scatter. This memory space maps to
// collective memory using ncclMemAlloc in the runtime.
inline BufferAssigner::Colorer CollectiveColorer() {
Expand All @@ -58,7 +58,7 @@ inline BufferAssigner::Colorer CollectiveColorer() {
alias->instruction()->opcode() == HloOpcode::kAsyncDone) &&
kSupportedOpcodes->contains(
alias->instruction()->async_wrapped_opcode()))) {
value->set_color(kCollectiveMemorySpaceColor);
value->set_color(Layout::kCollectiveMemorySpace);
}
}
if (!value->has_color()) {
Expand Down
6 changes: 3 additions & 3 deletions xla/service/gpu/runtime/nccl_collective_thunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ limitations under the License.
#include "mlir/IR/Value.h"
#include "xla/debug_options_flags.h"
#include "xla/hlo/ir/hlo_instructions.h"
#include "xla/layout.h"
#include "xla/layout_util.h"
#include "xla/primitive_util.h"
#include "xla/service/collective_ops_utils.h"
Expand Down Expand Up @@ -66,7 +67,6 @@ namespace xla {
namespace gpu {
namespace {

static constexpr int64_t kCollectiveMemorySpaceColor = 1;
static constexpr NcclStreamId kNoStreamId = NcclStreamId(0);

bool IsTypeSupportedByNccl(PrimitiveType element_type,
Expand Down Expand Up @@ -347,11 +347,11 @@ absl::Status MaybeRegisterBuffers(NcclApi* nccl_api, int device_ordinal,
const std::vector<DeviceBufferPair>& buffers,
NcclApi::NcclCommHandle comm) {
for (int i = 0; i < buffers.size(); ++i) {
if (buffers[i].source_memory_space == kCollectiveMemorySpaceColor) {
if (buffers[i].source_memory_space == Layout::kCollectiveMemorySpace) {
TF_RETURN_IF_ERROR(RegisterBufferOnce(nccl_api, device_ordinal, comm,
buffers[i].source_buffer));
}
if (buffers[i].destination_memory_space == kCollectiveMemorySpaceColor) {
if (buffers[i].destination_memory_space == Layout::kCollectiveMemorySpace) {
TF_RETURN_IF_ERROR(RegisterBufferOnce(nccl_api, device_ordinal, comm,
buffers[i].destination_buffer));
}
Expand Down
1 change: 1 addition & 0 deletions xla/stream_executor/cuda/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ cuda_only_cc_library(
":cuda_platform_id",
":cuda_runtime", # buildcleaner: keep
":cuda_version_parser",
"//xla:shape_util",
"//xla/stream_executor",
"//xla/stream_executor:blas",
"//xla/stream_executor:command_buffer",
Expand Down
3 changes: 2 additions & 1 deletion xla/stream_executor/cuda/cuda_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ limitations under the License.
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
#include "third_party/gpus/cuda/include/cuda.h"
#include "xla/layout.h"
#include "xla/stream_executor/blas.h"
#include "xla/stream_executor/command_buffer.h"
#include "xla/stream_executor/cuda/cuda_diagnostics.h"
Expand Down Expand Up @@ -446,7 +447,7 @@ absl::Status GpuExecutor::GetKernelMetadata(GpuKernel* cuda_kernel,
}

DeviceMemoryBase GpuExecutor::Allocate(uint64_t size, int64_t memory_space) {
if (memory_space == 1) {
if (memory_space == xla::Layout::kCollectiveMemorySpace) {
auto result = GpuCollectives::CollectiveMemoryAllocate(context_, size);
if (!result.ok()) {
LOG(ERROR) << result.status();
Expand Down
1 change: 1 addition & 0 deletions xla/stream_executor/integrations/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ xla_cc_test(
srcs = ["tf_allocator_adapter_test.cc"],
deps = [
":tf_allocator_adapter",
"//xla:shape_util",
"//xla/service:cpu_plugin",
"//xla/service:platform_util",
"//xla/stream_executor",
Expand Down
37 changes: 25 additions & 12 deletions xla/stream_executor/integrations/tf_allocator_adapter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ limitations under the License.
#include "absl/container/node_hash_set.h"
#include "absl/log/check.h"
#include "xla/service/platform_util.h"
#include "xla/layout.h"
#include "xla/stream_executor/device_memory_allocator.h"
#include "xla/stream_executor/platform.h"
#include "xla/stream_executor/stream.h"
Expand Down Expand Up @@ -79,35 +80,46 @@ TEST(MultiDeviceAdapter, UsesCorrectAllocator) {

std::vector<se::MultiDeviceAdapter::AllocatorInfo> infos;
infos.emplace_back(std::make_unique<TestAllocator>(0x1000), stream.get(),
/*memory_space=*/0, /*device_ordinal=*/0);
/*memory_space=*/xla::Layout::kDefaultMemorySpace,
/*device_ordinal=*/0);
infos.emplace_back(std::make_unique<TestAllocator>(0x2000), stream.get(),
/*memory_space=*/0, /*device_ordinal=*/1);
/*memory_space=*/xla::Layout::kDefaultMemorySpace,
/*device_ordinal=*/1);
infos.emplace_back(std::make_unique<TestAllocator>(0x3000), stream.get(),
/*memory_space=*/1, /*device_ordinal=*/0);
/*memory_space=*/xla::Layout::kCollectiveMemorySpace,
/*device_ordinal=*/0);
infos.emplace_back(std::make_unique<TestAllocator>(0x4000), stream.get(),
/*memory_space=*/1, /*device_ordinal=*/1);
/*memory_space=*/xla::Layout::kCollectiveMemorySpace,
/*device_ordinal=*/1);
std::unique_ptr<se::DeviceMemoryAllocator> allocator =
std::make_unique<se::MultiDeviceAdapter>(platform, std::move(infos));

TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff0,
allocator->Allocate(/*device_ordinal=*/0, 4, false, /*memory_space=*/0));
allocator->Allocate(/*device_ordinal=*/0, 4, false,
/*memory_space=*/xla::Layout::kDefaultMemorySpace));
CHECK_EQ(reinterpret_cast<size_t>(buff0->opaque()), 0x1001);
TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff1,
allocator->Allocate(/*device_ordinal=*/0, 4, false, /*memory_space=*/0));
allocator->Allocate(/*device_ordinal=*/0, 4, false,
/*memory_space=*/xla::Layout::kDefaultMemorySpace));
CHECK_EQ(reinterpret_cast<size_t>(buff1->opaque()), 0x1002);
TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff2,
allocator->Allocate(/*device_ordinal=*/0, 4, false, /*memory_space=*/1));
allocator->Allocate(
/*device_ordinal=*/0, 4, false,
/*memory_space=*/xla::Layout::kCollectiveMemorySpace));
CHECK_EQ(reinterpret_cast<size_t>(buff2->opaque()), 0x3001);
TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff3,
allocator->Allocate(/*device_ordinal=*/1, 4, false, /*memory_space=*/0));
allocator->Allocate(/*device_ordinal=*/1, 4, false,
/*memory_space=*/xla::Layout::kDefaultMemorySpace));
CHECK_EQ(reinterpret_cast<size_t>(buff3->opaque()), 0x2001);
TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff4,
allocator->Allocate(/*device_ordinal=*/1, 4, false, /*memory_space=*/1));
allocator->Allocate(
/*device_ordinal=*/1, 4, false,
/*memory_space=*/xla::Layout::kCollectiveMemorySpace));
CHECK_EQ(reinterpret_cast<size_t>(buff4->opaque()), 0x4001);
}

Expand All @@ -123,7 +135,7 @@ TEST(MultiDeviceAdapter, DeallocationWithDifferentAllocator) {
std::vector<se::MultiDeviceAdapter::AllocatorInfo> info_allocator;
info_allocator.emplace_back(
std::make_unique<TestAllocator>(0x1000, allocations), stream.get(),
/*memory_space=*/0, /*device_ordinal=*/0);
/*memory_space=*/xla::Layout::kDefaultMemorySpace, /*device_ordinal=*/0);

std::unique_ptr<se::DeviceMemoryAllocator> allocator =
std::make_unique<se::MultiDeviceAdapter>(platform,
Expand All @@ -132,14 +144,15 @@ TEST(MultiDeviceAdapter, DeallocationWithDifferentAllocator) {
std::vector<se::MultiDeviceAdapter::AllocatorInfo> info_deallocator;
info_deallocator.emplace_back(
std::make_unique<TestAllocator>(0x1000, allocations), stream.get(),
/*memory_space=*/0, /*device_ordinal=*/0);
/*memory_space=*/xla::Layout::kDefaultMemorySpace, /*device_ordinal=*/0);
std::unique_ptr<se::DeviceMemoryAllocator> deallocator =
std::make_unique<se::MultiDeviceAdapter>(platform,
std::move(info_deallocator));

TF_ASSERT_OK_AND_ASSIGN(
se::OwningDeviceMemory buff0,
allocator->Allocate(/*device_ordinal=*/0, 4, false, /*memory_space=*/0));
allocator->Allocate(/*device_ordinal=*/0, 4, false,
/*memory_space=*/xla::Layout::kDefaultMemorySpace));
CHECK_EQ(allocations->size(), 1);
CHECK_EQ(reinterpret_cast<size_t>(buff0->opaque()), 0x1001);

Expand Down

0 comments on commit c37f245

Please sign in to comment.