diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f91a43f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM nvcr.io/nvidia/pytorch:23.11-py3 + +WORKDIR / + +# change the download source of apt, comment it out if you are abroad +COPY sources.list /etc/apt/sources.list +RUN apt-get update && \ + apt-get install -y openssh-server vim curl inetutils-ping net-tools telnet lsof + +COPY start.sh /start.sh +COPY sshd_config /etc/ssh/sshd_config +COPY nccl-tests /nccl-tests + +CMD ["/bin/bash", "start.sh"] \ No newline at end of file diff --git a/README.md b/README.md index de5ffbf..f75a3f6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,73 @@ -# build-nccl-tests-with-pytorch -This is a dockerfile to build PyTorch executing NCCL-Tests. +# Build-NCCL-Tests-With-PyTorch + +![license](https://img.shields.io/hexpm/l/plug.svg) +[![docker](https://img.shields.io/docker/pulls/mayooot/nccl-tests-with-pytorch.svg)](https://hub.docker.com/r/mayooot/nccl-tests-with-pytorch) + +# Overview + +Build [NCCL-Tests](https://github.com/NVIDIA/nccl-tests) and configure SSHD in PyTorch container to help you test NCCL +faster! + +PyTorch Version: 23.11 + +# Quick Start + +~~~shell +docker pull mayooot/nccl-tests-with-pytorch:v0.0.1 +~~~ + +# Build From Source + +~~~shell +git clone https://github.com/mayooot/build-nccl-tests-with-pytorch +cd build-nccl-tests-with-pytorch + +docker build -t nccl-tests-with-pytorch:latest . +~~~ + +# Usage + +The default values for `PORT` and `PASS` are 12345, you can replace them with `-e`. + +In addition, you need to mount the host's `id_rsa` and `id_rsa.pub` to the container. + +~~~shell +docker run --name foo \ + -d -it \ + --network=host \ + -e PORT=1998 -e PASS=P@88w0rd \ + -v /tmp/id_rsa:/root/.ssh/id_rsa \ + -v /tmp/id_rsa.pub:/root/.ssh/id_rsa.pub \ + --gpus all --shm-size=1g \ + --cap-add=IPC_LOCK --device=/dev/infiniband \ + mayooot/nccl-tests-with-pytorch:v0.0.1 +~~~ + +The code and executable for NCCL-Tests is located in `/nccl-tests`, so let me show you how to use it, +using `all_reduce_perf` as an example. + +Before using `all_reduce_perf`, you need to configure SSH intercommunication. + +~~~shell +ssh-copy-id -p 1998 root@all_cluster_ip +~~~ + +Please replace `--host cluster_ip1,cluster_ip2,...` to the real cluster's IP address. + +~~~shell +docker exec -it foo bash + +cd /nccl-tests + +mpirun --allow-run-as-root \ + -mca plm_rsh_args "-p 1998" \ + -x NCCL_DEBUG=INFO \ + -x NCCL_IB_HCA=mlx5_10,mlx5_11,mlx5_12,mlx5_13,mlx5_14,mlx5_15,mlx5_16,mlx5_17 \ + --host cluster_ip1,cluster_ip2,... \ + ./build/all_reduce_perf \ + -b 1G -e 4G -f 2 -g 8 +~~~ + +# Contribute + +Feel free to open issues and pull requests. Any feedback is highly appreciated! \ No newline at end of file diff --git a/nccl-tests/LICENSE.txt b/nccl-tests/LICENSE.txt new file mode 100644 index 0000000..4573c07 --- /dev/null +++ b/nccl-tests/LICENSE.txt @@ -0,0 +1,27 @@ + + Copyright (c) 2016-2017, NVIDIA CORPORATION. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of NVIDIA CORPORATION, nor the names of their + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/nccl-tests/Makefile b/nccl-tests/Makefile new file mode 100644 index 0000000..f652b78 --- /dev/null +++ b/nccl-tests/Makefile @@ -0,0 +1,23 @@ +# +# Copyright (c) 2017, NVIDIA CORPORATION. All rights reserved. +# +# See LICENCE.txt for license information +# + +BUILDDIR ?= build +override BUILDDIR := $(abspath $(BUILDDIR)) + +.PHONY: all clean + +default: src.build + +TARGETS=src + +all: ${TARGETS:%=%.build} +clean: ${TARGETS:%=%.clean} + +%.build: + ${MAKE} -C $* build BUILDDIR=${BUILDDIR} + +%.clean: + ${MAKE} -C $* clean BUILDDIR=${BUILDDIR} diff --git a/nccl-tests/README.md b/nccl-tests/README.md new file mode 100644 index 0000000..4281799 --- /dev/null +++ b/nccl-tests/README.md @@ -0,0 +1,72 @@ +# NCCL Tests + +These tests check both the performance and the correctness of [NCCL](http://github.com/nvidia/nccl) operations. + +## Build + +To build the tests, just type `make`. + +If CUDA is not installed in /usr/local/cuda, you may specify CUDA\_HOME. Similarly, if NCCL is not installed in /usr, you may specify NCCL\_HOME. + +```shell +$ make CUDA_HOME=/path/to/cuda NCCL_HOME=/path/to/nccl +``` + +NCCL tests rely on MPI to work on multiple processes, hence multiple nodes. If you want to compile the tests with MPI support, you need to set MPI=1 and set MPI\_HOME to the path where MPI is installed. + +```shell +$ make MPI=1 MPI_HOME=/path/to/mpi CUDA_HOME=/path/to/cuda NCCL_HOME=/path/to/nccl +``` + +## Usage + +NCCL tests can run on multiple processes, multiple threads, and multiple CUDA devices per thread. The number of process is managed by MPI and is therefore not passed to the tests as argument. The total number of ranks (=CUDA devices) will be equal to (number of processes)\*(number of threads)\*(number of GPUs per thread). + +### Quick examples + +Run on 8 GPUs (`-g 8`), scanning from 8 Bytes to 128MBytes : +```shell +$ ./build/all_reduce_perf -b 8 -e 128M -f 2 -g 8 +``` + +Run with MPI on 10 processes (potentially on multiple nodes) with 4 GPUs each, for a total of 40 GPUs: +```shell +$ mpirun -np 10 ./build/all_reduce_perf -b 8 -e 128M -f 2 -g 4 +``` + +### Performance + +See the [Performance](doc/PERFORMANCE.md) page for explanation about numbers, and in particular the "busbw" column. + +### Arguments + +All tests support the same set of arguments : + +* Number of GPUs + * `-t,--nthreads ` number of threads per process. Default : 1. + * `-g,--ngpus ` number of gpus per thread. Default : 1. +* Sizes to scan + * `-b,--minbytes ` minimum size to start with. Default : 32M. + * `-e,--maxbytes ` maximum size to end at. Default : 32M. + * Increments can be either fixed or a multiplication factor. Only one of those should be used + * `-i,--stepbytes ` fixed increment between sizes. Default : 1M. + * `-f,--stepfactor ` multiplication factor between sizes. Default : disabled. +* NCCL operations arguments + * `-o,--op ` Specify which reduction operation to perform. Only relevant for reduction operations like Allreduce, Reduce or ReduceScatter. Default : Sum. + * `-d,--datatype ` Specify which datatype to use. Default : Float. + * `-r,--root ` Specify which root to use. Only for operations with a root like broadcast or reduce. Default : 0. +* Performance + * `-n,--iters ` number of iterations. Default : 20. + * `-w,--warmup_iters ` number of warmup iterations (not timed). Default : 5. + * `-m,--agg_iters ` number of operations to aggregate together in each iteration. Default : 1. + * `-a,--average <0/1/2/3>` Report performance as an average across all ranks (MPI=1 only). <0=Rank0,1=Avg,2=Min,3=Max>. Default : 1. +* Test operation + * `-p,--parallel_init <0/1>` use threads to initialize NCCL in parallel. Default : 0. + * `-c,--check ` perform count iterations, checking correctness of results on each iteration. This can be quite slow on large numbers of GPUs. Default : 1. + * `-z,--blocking <0/1>` Make NCCL collective blocking, i.e. have CPUs wait and sync after each collective. Default : 0. + * `-G,--cudagraph ` Capture iterations as a CUDA graph and then replay specified number of times. Default : 0. + +## Copyright + +NCCL tests are provided under the BSD license. All source code and accompanying documentation is copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. + diff --git a/nccl-tests/build/all_gather_perf b/nccl-tests/build/all_gather_perf new file mode 100755 index 0000000..2365e17 Binary files /dev/null and b/nccl-tests/build/all_gather_perf differ diff --git a/nccl-tests/build/all_reduce_perf b/nccl-tests/build/all_reduce_perf new file mode 100755 index 0000000..5e1cac3 Binary files /dev/null and b/nccl-tests/build/all_reduce_perf differ diff --git a/nccl-tests/build/alltoall_perf b/nccl-tests/build/alltoall_perf new file mode 100755 index 0000000..308cf9b Binary files /dev/null and b/nccl-tests/build/alltoall_perf differ diff --git a/nccl-tests/build/broadcast_perf b/nccl-tests/build/broadcast_perf new file mode 100755 index 0000000..2af5a3c Binary files /dev/null and b/nccl-tests/build/broadcast_perf differ diff --git a/nccl-tests/build/gather_perf b/nccl-tests/build/gather_perf new file mode 100755 index 0000000..afaaab0 Binary files /dev/null and b/nccl-tests/build/gather_perf differ diff --git a/nccl-tests/build/hypercube_perf b/nccl-tests/build/hypercube_perf new file mode 100755 index 0000000..c504e14 Binary files /dev/null and b/nccl-tests/build/hypercube_perf differ diff --git a/nccl-tests/build/reduce_perf b/nccl-tests/build/reduce_perf new file mode 100755 index 0000000..b5e90c7 Binary files /dev/null and b/nccl-tests/build/reduce_perf differ diff --git a/nccl-tests/build/reduce_scatter_perf b/nccl-tests/build/reduce_scatter_perf new file mode 100755 index 0000000..8c13291 Binary files /dev/null and b/nccl-tests/build/reduce_scatter_perf differ diff --git a/nccl-tests/build/scatter_perf b/nccl-tests/build/scatter_perf new file mode 100755 index 0000000..4302642 Binary files /dev/null and b/nccl-tests/build/scatter_perf differ diff --git a/nccl-tests/build/sendrecv_perf b/nccl-tests/build/sendrecv_perf new file mode 100755 index 0000000..d432bc9 Binary files /dev/null and b/nccl-tests/build/sendrecv_perf differ diff --git a/nccl-tests/build/timer.o b/nccl-tests/build/timer.o new file mode 100644 index 0000000..2b159c4 Binary files /dev/null and b/nccl-tests/build/timer.o differ diff --git a/nccl-tests/build/verifiable/verifiable.o b/nccl-tests/build/verifiable/verifiable.o new file mode 100644 index 0000000..84ab6f1 Binary files /dev/null and b/nccl-tests/build/verifiable/verifiable.o differ diff --git a/nccl-tests/doc/PERFORMANCE.md b/nccl-tests/doc/PERFORMANCE.md new file mode 100644 index 0000000..21fef60 --- /dev/null +++ b/nccl-tests/doc/PERFORMANCE.md @@ -0,0 +1,144 @@ +# Performance reported by NCCL tests + +NCCL tests report the average operation time in ms, and two bandwidths in GB/s : algorithm bandwidth and bus bandwidth. This page explains what those numbers mean and what you should expect depending on the hardware used. + +# Time + +Time is useful with small sizes, to measure the constant overhead (or latency) associated with operations. + +On large sizes, the time becomes linear with the size (since it is roughly equal to overhead + size / bw) and is no longer measuring the latency but +also the bandwidth multiplied by the size. + +Therefore, on large sizes, it makes more sense to look at the bandwidth. + +# Bandwidth + +## Algorithm bandwidth + +Algorithm bandwidth is using the most commonly used formula for bandwidth : size (_S_) / time (_t_). It is useful to compute how much time any large operation would take by simply dividing the size of the operation by the algorithm bandwidth. + +`algbw = S/t` + +## Bus bandwidth + +While the algorithm bandwidth makes sense for point-to-point operations like Send/Receive, it is not always helpful to measure collective operations speed, since the theoretical peak algorithm bandwidth is not equal to the hardware peak bandwidth, usually depending on the number of ranks. +Most benchmarks only provide time measurements, which is hard to interpret for large sizes. Some others also provide algorithms bandwidth, but see that depending on the number of ranks, that bandwidth varies (and decreases as the number of ranks increase). + +To provide a number which reflects how optimally the hardware is used, NCCL tests introduce the notion of "Bus Bandwidth" ("busbw" column in the tests output). +This number is obtained applying a formula to the algorithm bandwidth to reflect the speed of the inter-GPU communication. +Using this bus bandwidth, we can compare it with the hardware peak bandwidth, independently of the number of ranks used. + +The formula depends on the collective operation. + +### AllReduce + +An allreduce operation, for each element of the N arrays (input i_X and output o_X, each situated on rank X), is performing the following operation : + +`o_0 = o_1 = o_2 = ... = o_{n-1} = i_0 + i_1 + i_2 + ... + i_{n-1}` + +**Note : this is independent of the algorithm used (ring, tree, or other) as long as they use point-to-point operations (send/receive).** + +A ring would do that operation in an order which follows the ring : + +`i_0 + i_1 + ... + i_{n-1} -> o_{n-1} -> o_0 -> o_1 -> .. -> o_{n-2}` + +A tree would do it hierarchically : + +`(((((i_{n-1} + i_{n-2}) + (i_{n-3} + i_{n-4})) + ... + (i_1 + i_0))))) -> o_0 -> (o_{n/2} -> (o_{3n/4} ...))` + +In all cases, we need n-1 additions and n assignments for each element. Since every step is on a different rank except potentially one (the last input and the first output), +we need 2(n-1) data transfers (x number of elements) to perform an allReduce operation. + +Considering that each rank has a bandwidth to the outside world of _B_, the time to perform an allReduce operation of _S_ elements is at best : + + `t = (S*2*(n-1)) / (n*B)` + +Indeed, we have _S_ elements, 2*(n-1) operations per element, and _n_ links of bandwidth _B_ to perform them. +Reordering the equation, we find that + + `t = (S/B) * (2*(n-1)/n)` + +Therefore, to get an AllReduce bandwidth measurement which we can compare to the hardware peak bandwidth, we compute : + + `B = S/t * (2*(n-1)/n) = algbw * (2*(n-1)/n)` + +### ReduceScatter + +The ReduceScatter operation requires only to perform the addition part of the allReduce operation : + + `o_K = i_0 + i_1 + i_2 + ... + i_{n-1}` + +With K being the rank which is getting the final result(K=offset/recvsize). + +The perfect reduceScatter time with a rank bandwidth of B would therefore be : + + `t = S*(n-1) / (B*n)` + +And the Bus Bandwidth is therefore computed as : + + `B = S/t * (n-1)/n = algbw * (n-1)/n` + +Note that here, S is the size in bytes of the total array, which for NCCL is equal to `recvcount*sizeof(datatype)*n` as the `recvcount` argument is the count per rank. + +### AllGather + +The AllGather operation requires only to perform the assignment part of the allReduce operation : + + `o_0 = o_1 = o_2 = ... = o_{n-1} = i_K` + +With K being the rank where the data originates from (K=offset*sendsize). + +The perfect allGather time with a rank bandwidth of B would therefore be : + + `t = S*(n-1) / (B*n)` + +And the Bus Bandwidth is therefore computed as : + + `B = S/t * (n-1)/n = algbw * (n-1)/n` + +Note that here, S is the size in bytes of the total array, which for NCCL is equal to `sendcount*sizeof(datatype)*n` as the `sendcount` argument is the count per rank. + +### Broadcast + +The broadcast operation representation is similar to allGather : + + `o_0 = o_1 = o_2 = ... = o_{n-1} = i_R` + +R being the root of the operation. + +However, in this case, since the i_R input is not evenly distributed on the ranks, we cannot use all N links to perform the transfer operations. +Indeed, *all* data has to get out of the root rank, hence the bottleneck is on the root rank which only has B as capacity to get data out : + + `t = S/B` + +And : + + `B = S/t` + +### Reduce + +The reduce operation performs : + + `o_R = i_0 + i_1 + i_2 + ... + i_{n-1}` + +R being the root of the operation. + +Similarly to broadcast, all data need to be sent to the root, hence : + + `t = S/B` + +And : + + `B = S/t` + +### Summary + +To obtain a bus bandwidth which should be independent of the number of ranks _n_, we apply a correction factor to the algorithm bandwidth : + +* AllReduce : 2*(_n_-1)/_n_ +* ReduceScatter : (_n_-1)/_n_ +* AllGather : (_n_-1)/_n_ +* Broadcast : 1 +* Reduce : 1 + +The bus bandwidth should reflect the speed of the hardware bottleneck : NVLink, PCI, QPI, or network. diff --git a/nccl-tests/src/Makefile b/nccl-tests/src/Makefile new file mode 100644 index 0000000..393de8e --- /dev/null +++ b/nccl-tests/src/Makefile @@ -0,0 +1,105 @@ +# +# Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved. +# +# See LICENSE.txt for license information +# + +CUDA_HOME ?= /usr/local/cuda +PREFIX ?= /usr/local +VERBOSE ?= 0 +DEBUG ?= 0 + +CUDA_LIB ?= $(CUDA_HOME)/lib64 +CUDA_INC ?= $(CUDA_HOME)/include +NVCC ?= $(CUDA_HOME)/bin/nvcc +CUDARTLIB ?= cudart + +CUDA_VERSION = $(strip $(shell which $(NVCC) >/dev/null && $(NVCC) --version | grep release | sed 's/.*release //' | sed 's/\,.*//')) +CUDA_MAJOR = $(shell echo $(CUDA_VERSION) | cut -d "." -f 1) + +# Better define NVCC_GENCODE in your environment to the minimal set +# of archs to reduce compile time. +ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 11; echo $$?),0) +NVCC_GENCODE ?= -gencode=arch=compute_60,code=sm_60 \ + -gencode=arch=compute_61,code=sm_61 \ + -gencode=arch=compute_70,code=sm_70 \ + -gencode=arch=compute_80,code=sm_80 \ + -gencode=arch=compute_80,code=compute_80 +else +NVCC_GENCODE ?= -gencode=arch=compute_35,code=sm_35 \ + -gencode=arch=compute_50,code=sm_50 \ + -gencode=arch=compute_60,code=sm_60 \ + -gencode=arch=compute_61,code=sm_61 \ + -gencode=arch=compute_70,code=sm_70 \ + -gencode=arch=compute_70,code=compute_70 +endif + +NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 +CXXFLAGS := -std=c++11 + +LDFLAGS := -L${CUDA_LIB} -lcudart -lrt +NVLDFLAGS := -L${CUDA_LIB} -l${CUDARTLIB} -lrt + +ifeq ($(DEBUG), 0) +NVCUFLAGS += -O3 -g +CXXFLAGS += -O3 -g +else +NVCUFLAGS += -O0 -G -g +CXXFLAGS += -O0 -g -ggdb3 +endif + +ifneq ($(VERBOSE), 0) +NVCUFLAGS += -Xcompiler -Wall,-Wextra,-Wno-unused-parameter +else +.SILENT: +endif + +.PHONY: build clean + +BUILDDIR ?= ../build +ifneq ($(NCCL_HOME), "") +NVCUFLAGS += -I$(NCCL_HOME)/include/ +NVLDFLAGS += -L$(NCCL_HOME)/lib +endif + +ifeq ($(MPI), 1) +NVCUFLAGS += -DMPI_SUPPORT -I$(MPI_HOME)/include +NVLDFLAGS += -L$(MPI_HOME)/lib -L$(MPI_HOME)/lib64 -lmpi +endif +ifeq ($(MPI_IBM),1) +NVCUFLAGS += -DMPI_SUPPORT +NVLDFLAGS += -lmpi_ibm +endif +LIBRARIES += nccl +NVLDFLAGS += $(LIBRARIES:%=-l%) + +DST_DIR := $(BUILDDIR) +SRC_FILES := $(wildcard *.cu) +OBJ_FILES := $(SRC_FILES:%.cu=${DST_DIR}/%.o) +BIN_FILES_LIST := all_reduce all_gather broadcast reduce_scatter reduce alltoall scatter gather sendrecv hypercube +BIN_FILES := $(BIN_FILES_LIST:%=${DST_DIR}/%_perf) + +build: ${BIN_FILES} + +clean: + rm -rf ${DST_DIR} + +TEST_VERIFIABLE_SRCDIR := ../verifiable +TEST_VERIFIABLE_BUILDDIR := $(BUILDDIR)/verifiable +include ../verifiable/verifiable.mk + +${DST_DIR}/%.o: %.cu common.h $(TEST_VERIFIABLE_HDRS) + @printf "Compiling %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(NVCC) -o $@ $(NVCUFLAGS) -c $< + +${DST_DIR}/timer.o: timer.cc timer.h + @printf "Compiling %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(CXX) $(CXXFLAGS) -o $@ -c timer.cc + +${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_OBJS) + @printf "Linking %-35s > %s\n" $< $@ + @mkdir -p ${DST_DIR} + $(NVCC) -o $@ $(NVCUFLAGS) $^ ${NVLDFLAGS} + diff --git a/nccl-tests/src/all_gather.cu b/nccl-tests/src/all_gather.cu new file mode 100644 index 0000000..0831207 --- /dev/null +++ b/nccl-tests/src/all_gather.cu @@ -0,0 +1,93 @@ +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "cuda_runtime.h" +#include "common.h" + +#define ALIGN 4 + +void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) { + size_t base = (count/(ALIGN*nranks))*ALIGN; + *sendcount = base; + *recvcount = base*nranks; + *sendInplaceOffset = base; + *recvInplaceOffset = 0; + *paramcount = base; +} + +testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) { + size_t sendcount = args->sendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + for (int i=0; inGpus; i++) { + CUDACHECK(cudaSetDevice(args->gpus[i])); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0)); + for (int j=0; jexpected[i] + args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + return testSuccess; +} + +void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = ((double)(nranks - 1))/((double)nranks); + *busBw = baseBw * factor; +} + +testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream)); + return testSuccess; +} + +struct testColl allGatherTest = { + "AllGather", + AllGatherGetCollByteCount, + AllGatherInitData, + AllGatherGetBw, + AllGatherRunColl +}; + +void AllGatherGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &allGatherTest; + ncclDataType_t *run_types; + const char **run_typenames; + int type_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = test_typenum; + run_types = test_types; + run_typenames = test_typenames; + } + + for (int i=0; isendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + for (int i=0; inGpus; i++) { + CUDACHECK(cudaSetDevice(args->gpus[i])); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank)); + TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks)); + CUDACHECK(cudaDeviceSynchronize()); + } + return testSuccess; +} + +void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = ((double)(2*(nranks - 1)))/((double)nranks); + *busBw = baseBw * factor; +} + +testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream)); + return testSuccess; +} + +struct testColl allReduceTest = { + "AllReduce", + AllReduceGetCollByteCount, + AllReduceInitData, + AllReduceGetBw, + AllReduceRunColl +}; + +void AllReduceGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &allReduceTest; + ncclDataType_t *run_types; + ncclRedOp_t *run_ops; + const char **run_typenames, **run_opnames; + int type_count, op_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = test_typenum; + run_types = test_types; + run_typenames = test_typenames; + } + + if ((int)op != -1) { + op_count = 1; + run_ops = &op; + run_opnames = &opName; + } else { + op_count = test_opnum; + run_ops = test_ops; + run_opnames = test_opnames; + } + + for (int i=0; isendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + for (int i=0; inGpus; i++) { + CUDACHECK(cudaSetDevice(args->gpus[i])); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0)); + for (int j=0; jexpected[i] + j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0)); + } + CUDACHECK(cudaDeviceSynchronize()); + } + // We don't support in-place alltoall + args->reportErrors = in_place ? 0 : 1; + return testSuccess; +} + +void AlltoAllGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * nranks * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = ((double)(nranks-1))/((double)(nranks)); + *busBw = baseBw * factor; +} + +testResult_t AlltoAllRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t rankOffset = count * wordSize(type); + +#if NCCL_MAJOR < 2 || NCCL_MINOR < 7 + printf("NCCL 2.7 or later is needed for alltoall. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR); + return testNcclError; +#else + NCCLCHECK(ncclGroupStart()); + for (int r=0; rcollTest = &alltoAllTest; + ncclDataType_t *run_types; + const char **run_typenames; + int type_count; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = test_typenum; + run_types = test_types; + run_typenames = test_typenames; + } + + for (int i=0; isendBytes / wordSize(type); + size_t recvcount = args->expectedBytes / wordSize(type); + + for (int i=0; inGpus; i++) { + CUDACHECK(cudaSetDevice(args->gpus[i])); + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); + void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i]; + if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0)); + TESTCHECK(InitData(args->expected[i], recvcount, 0, type, ncclSum, rep, 1, 0)); + CUDACHECK(cudaDeviceSynchronize()); + } + return testSuccess; +} + +void BroadcastGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = 1; + *busBw = baseBw * factor; +} + +testResult_t BroadcastRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); +#if NCCL_MAJOR >= 2 && NCCL_MINOR >= 2 + NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, type, root, comm, stream)); +#else + if (rank == root) { + NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream)); + } else { + NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream)); + } +#endif + return testSuccess; +} + +struct testColl broadcastTest = { + "Broadcast", + BroadcastGetCollByteCount, + BroadcastInitData, + BroadcastGetBw, + BroadcastRunColl +}; + +void BroadcastGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) { + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + BroadcastGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) { + args->collTest = &broadcastTest; + ncclDataType_t *run_types; + const char **run_typenames; + int type_count; + int begin_root, end_root; + + if ((int)type != -1) { + type_count = 1; + run_types = &type; + run_typenames = &typeName; + } else { + type_count = test_typenum; + run_types = test_types; + run_typenames = test_typenames; + } + + if (root != -1) { + begin_root = end_root = root; + } else { + begin_root = 0; + end_root = args->nProcs*args->nThreads*args->nGpus-1; + } + + for (int i=0; i +#include +#include +#include +#include +#include "cuda.h" + +#include "../verifiable/verifiable.h" + +int test_ncclVersion = 0; // init'd with ncclGetVersion() + +#if NCCL_MAJOR >= 2 + ncclDataType_t test_types[ncclNumTypes] = { + ncclInt8, ncclUint8, ncclInt32, ncclUint32, ncclInt64, ncclUint64, ncclHalf, ncclFloat, ncclDouble + #if defined(__CUDA_BF16_TYPES_EXIST__) && NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) + , ncclBfloat16 + #endif + }; + const char *test_typenames[ncclNumTypes] = { + "int8", "uint8", "int32", "uint32", "int64", "uint64", "half", "float", "double" + #if defined(__CUDA_BF16_TYPES_EXIST__) && NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) + , "bfloat16" + #endif + }; + int test_typenum = -1; + + const char *test_opnames[] = {"sum", "prod", "max", "min", "avg", "mulsum"}; + ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) + , ncclAvg + #endif + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) + , ncclNumOps // stand in for ncclRedOpCreatePreMulSum() created on-demand + #endif + }; + int test_opnum = -1; +#else + ncclDataType_t test_types[ncclNumTypes] = {ncclChar, ncclInt, ncclHalf, ncclFloat, ncclDouble, ncclInt64, ncclUint64}; + const char *test_typenames[ncclNumTypes] = {"char", "int", "half", "float", "double", "int64", "uint64"}; + int test_typenum = 7; + const char *test_opnames[] = {"sum", "prod", "max", "min"}; + ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin}; + int test_opnum = 4; +#endif + +// For libnccl's < 2.13 +extern "C" __attribute__((weak)) char const* ncclGetLastError(ncclComm_t comm) { + return ""; +} + +int is_main_proc = 0; +thread_local int is_main_thread = 0; + +// Command line parameter defaults +static int nThreads = 1; +static int nGpus = 1; +static size_t minBytes = 32*1024*1024; +static size_t maxBytes = 32*1024*1024; +static size_t stepBytes = 1*1024*1024; +static size_t stepFactor = 1; +static int datacheck = 1; +static int warmup_iters = 5; +static int iters = 20; +static int agg_iters = 1; +static int ncclop = ncclSum; +static int nccltype = ncclFloat; +static int ncclroot = 0; +static int parallel_init = 0; +static int blocking_coll = 0; +static int streamnull = 0; +static int timeout = 0; +static int cudaGraphLaunches = 0; +static int report_cputime = 0; +// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) +static int average = 1; + +#define NUM_BLOCKS 32 + +static double parsesize(const char *value) { + long long int units; + double size; + char size_lit; + + int count = sscanf(value, "%lf %1s", &size, &size_lit); + + switch (count) { + case 2: + switch (size_lit) { + case 'G': + case 'g': + units = 1024*1024*1024; + break; + case 'M': + case 'm': + units = 1024*1024; + break; + case 'K': + case 'k': + units = 1024; + break; + default: + return -1.0; + }; + break; + case 1: + units = 1; + break; + default: + return -1.0; + } + + return size * units; +} + +testResult_t CheckDelta(void* results, void* expected, size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int64_t *wrongEltN) { + ncclVerifiableVerify(results, expected, count, (int)type, (int)op, nranks, seed, offset, wrongEltN, cudaStreamDefault); + CUDACHECK(cudaDeviceSynchronize()); + return testSuccess; +} + +testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks) { + ncclVerifiablePrepareExpected(data, count, (int)type, (int)op, nranks, seed, offset, cudaStreamDefault); + return testSuccess; +} + +testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int rank) { + ncclVerifiablePrepareInput(data, count, (int)type, (int)op, nranks, rank, seed, offset, cudaStreamDefault); + return testSuccess; +} + +void Barrier(struct threadArgs *args) { + thread_local int epoch = 0; + static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}; + static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER}; + static int counter[2] = {0, 0}; + + pthread_mutex_lock(&lock[epoch]); + if(++counter[epoch] == args->nThreads) + pthread_cond_broadcast(&cond[epoch]); + + if(args->thread+1 == args->nThreads) { + while(counter[epoch] != args->nThreads) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + #ifdef MPI_SUPPORT + MPI_Barrier(MPI_COMM_WORLD); + #endif + counter[epoch] = 0; + pthread_cond_broadcast(&cond[epoch]); + } + else { + while(counter[epoch] != 0) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + } + pthread_mutex_unlock(&lock[epoch]); + epoch ^= 1; +} + +// Inter-thread/process barrier+allreduce. The quality of the return value +// for average=0 (which means broadcast from rank=0) is dubious. The returned +// value will actually be the result of process-local broadcast from the local thread=0. +template +void Allreduce(struct threadArgs* args, T* value, int average) { + thread_local int epoch = 0; + static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}; + static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER}; + static T accumulator[2]; + static int counter[2] = {0, 0}; + + pthread_mutex_lock(&lock[epoch]); + if(counter[epoch] == 0) { + if(average != 0 || args->thread == 0) accumulator[epoch] = *value; + } else { + switch(average) { + case /*r0*/ 0: if(args->thread == 0) accumulator[epoch] = *value; break; + case /*avg*/1: accumulator[epoch] += *value; break; + case /*min*/2: accumulator[epoch] = std::min(accumulator[epoch], *value); break; + case /*max*/3: accumulator[epoch] = std::max(accumulator[epoch], *value); break; + case /*sum*/4: accumulator[epoch] += *value; break; + } + } + + if(++counter[epoch] == args->nThreads) + pthread_cond_broadcast(&cond[epoch]); + + if(args->thread+1 == args->nThreads) { + while(counter[epoch] != args->nThreads) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + + #ifdef MPI_SUPPORT + if(average != 0) { + static_assert(std::is_same::value || std::is_same::value, "Allreduce only for T in {long long, double}"); + MPI_Datatype ty = std::is_same::value ? MPI_LONG_LONG : + std::is_same::value ? MPI_DOUBLE : + MPI_Datatype(); + MPI_Op op = average == 1 ? MPI_SUM : + average == 2 ? MPI_MIN : + average == 3 ? MPI_MAX : + average == 4 ? MPI_SUM : MPI_Op(); + MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD); + } + #endif + + if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads; + counter[epoch] = 0; + pthread_cond_broadcast(&cond[epoch]); + } + else { + while(counter[epoch] != 0) + pthread_cond_wait(&cond[epoch], &lock[epoch]); + } + pthread_mutex_unlock(&lock[epoch]); + + *value = accumulator[epoch]; + epoch ^= 1; +} + +testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int64_t *wrongElts) { + int nranks = args->nProcs*args->nGpus*args->nThreads; + size_t count = args->expectedBytes/wordSize(type); + + int64_t *wrongPerGpu = nullptr; + CUDACHECK(cudaHostAlloc((void**)&wrongPerGpu, args->nGpus*sizeof(int64_t), cudaHostAllocMapped)); + + for (int i=0; inGpus; i++) { + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + CUDACHECK(cudaSetDevice(args->gpus[i])); + void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i]; + + TESTCHECK(CheckDelta(data, args->expected[i], count, 0, type, op, 0, nranks, wrongPerGpu+i)); + +#if 1 && DEBUG_PRINT + if (args->reportErrors && wrongPerGpu[i] != 0) { + printf("rank=%d #wrong=%d\n", rank, (int)wrongPerGpu[i]); + char *expectedHost = (char*)malloc(args->expectedBytes); + char *dataHost = (char*)malloc(args->expectedBytes); + int eltsz = wordSize(type); + cudaMemcpy(expectedHost, args->expected[i], args->expectedBytes, cudaMemcpyDeviceToHost); + cudaMemcpy(dataHost, data, args->expectedBytes, cudaMemcpyDeviceToHost); + + for(int j=0; jexpectedBytes/eltsz; j++) { + unsigned long long want, got; + want = 0; + memcpy(&want, expectedHost + j*eltsz, eltsz); + got = 0; + memcpy(&got, dataHost + j*eltsz, eltsz); + if(want != got) { + printf(" rank=%d elt[%d]: want=0x%llx got=0x%llx\n", rank, j, want, got); + } + } + free(expectedHost); + free(dataHost); + } +#endif + } + + *wrongElts = 0; + for (int i=0; i < args->nGpus; i++) *wrongElts += wrongPerGpu[i]; + cudaFreeHost(wrongPerGpu); + + if (args->reportErrors && *wrongElts) args->errors[0]++; + return testSuccess; +} + +testResult_t testStreamSynchronize(int ngpus, cudaStream_t* streams, ncclComm_t* comms) { + cudaError_t cudaErr; + int remaining = ngpus; + int* done = (int*)malloc(sizeof(int)*ngpus); + memset(done, 0, sizeof(int)*ngpus); + timer tim; + + while (remaining) { + int idle = 1; + for (int i=0; i= NCCL_VERSION(2,4,0) + if (test_ncclVersion >= NCCL_VERSION(2,4,0) && comms) { + ncclResult_t ncclAsyncErr; + NCCLCHECK(ncclCommGetAsyncError(comms[i], &ncclAsyncErr)); + if (ncclAsyncErr != ncclSuccess) { + // An asynchronous error happened. Stop the operation and destroy + // the communicator + for (int i=0; i timeout && timeout > 0) { + for (int i=0; inbytes / wordSize(type); + + // Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange + size_t totalnbytes = max(args->sendBytes, args->expectedBytes); + size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1; + size_t shift = totalnbytes * (iter % steps); + + if (args->nGpus > 1) NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { +#ifndef NCCL_MAJOR + CUDACHECK(cudaSetDevice(args->gpus[i])); +#endif + int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); + char* recvBuff = ((char*)args->recvbuffs[i]) + shift; + char* sendBuff = ((char*)args->sendbuffs[i]) + shift; + ncclRedOp_t op; + + if(opIndex < ncclNumOps) { + op = opIndex; + } + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) + else { + union { + int8_t i8; uint8_t u8; int32_t i32; uint32_t u32; int64_t i64; uint64_t u64; + half f16; float f32; double f64; + #if defined(__CUDA_BF16_TYPES_EXIST__) + __nv_bfloat16 bf16; + #endif + }; + switch(type) { + case ncclInt8: i8 = ncclVerifiablePremulScalar(rank); break; + case ncclUint8: u8 = ncclVerifiablePremulScalar(rank); break; + case ncclInt32: i32 = ncclVerifiablePremulScalar(rank); break; + case ncclUint32: u32 = ncclVerifiablePremulScalar(rank); break; + case ncclInt64: i64 = ncclVerifiablePremulScalar(rank); break; + case ncclUint64: u64 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat16: f16 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat32: f32 = ncclVerifiablePremulScalar(rank); break; + case ncclFloat64: f64 = ncclVerifiablePremulScalar(rank); break; + #if defined(__CUDA_BF16_TYPES_EXIST__) + case ncclBfloat16: bf16 = ncclVerifiablePremulScalar<__nv_bfloat16>(rank); break; + #endif + } + NCCLCHECK(ncclRedOpCreatePreMulSum(&op, &u64, type, ncclScalarHostImmediate, args->comms[i])); + } + #endif + + TESTCHECK(args->collTest->runColl( + (void*)(in_place ? recvBuff + args->sendInplaceOffset*rank : sendBuff), + (void*)(in_place ? recvBuff + args->recvInplaceOffset*rank : recvBuff), + count, type, op, root, args->comms[i], args->streams[i])); + + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) + if(opIndex >= ncclNumOps) { + NCCLCHECK(ncclRedOpDestroy(op, args->comms[i])); + } + #endif + } + if (args->nGpus > 1) NCCLCHECK(ncclGroupEnd()); + + if (blocking_coll) { + // Complete op before returning + TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); + } + if (blocking_coll) Barrier(args); + return testSuccess; +} + +testResult_t completeColl(struct threadArgs* args) { + if (blocking_coll) return testSuccess; + + TESTCHECK(testStreamSynchronize(args->nGpus, args->streams, args->comms)); + return testSuccess; +} + +testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) { + size_t count = args->nbytes / wordSize(type); + if (datacheck) { + // Initialize sendbuffs, recvbuffs and expected + TESTCHECK(args->collTest->initData(args, type, op, root, 99, in_place)); + } + + // Sync + TESTCHECK(startColl(args, type, op, root, in_place, 0)); + TESTCHECK(completeColl(args)); + + Barrier(args); + +#if CUDART_VERSION >= 11030 + cudaGraph_t graphs[args->nGpus]; + cudaGraphExec_t graphExec[args->nGpus]; + if (cudaGraphLaunches >= 1) { + // Begin cuda graph capture + for (int i=0; inGpus; i++) { + // Thread local mdoe is needed for: + // - Multi-thread mode: where graph capture and instantiation can happen concurrently across threads + // - P2P pre-connect: when there is no warm-up, P2P pre-connect is done during graph capture. + // Since pre-connect calls cudaMalloc, we cannot use global capture mode + CUDACHECK(cudaStreamBeginCapture(args->streams[i], cudaStreamCaptureModeThreadLocal)); + } + } +#endif + + // Performance Benchmark + timer tim; + for (int iter = 0; iter < iters; iter++) { + if (agg_iters>1) NCCLCHECK(ncclGroupStart()); + for (int aiter = 0; aiter < agg_iters; aiter++) { + TESTCHECK(startColl(args, type, op, root, in_place, iter*agg_iters+aiter)); + } + if (agg_iters>1) NCCLCHECK(ncclGroupEnd()); + } + +#if CUDART_VERSION >= 11030 + if (cudaGraphLaunches >= 1) { + // End cuda graph capture + for (int i=0; inGpus; i++) { + CUDACHECK(cudaStreamEndCapture(args->streams[i], graphs+i)); + } + // Instantiate cuda graph + for (int i=0; inGpus; i++) { + CUDACHECK(cudaGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0)); + } + // Resync CPU, restart timing, launch cuda graph + Barrier(args); + tim.reset(); + for (int l=0; lnGpus; i++) { + CUDACHECK(cudaGraphLaunch(graphExec[i], args->streams[i])); + } + } + } +#endif + + double cputimeSec = tim.elapsed()/(iters*agg_iters); + TESTCHECK(completeColl(args)); + + double deltaSec = tim.elapsed(); + deltaSec = deltaSec/(iters*agg_iters); + if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches; + Allreduce(args, &deltaSec, average); + +#if CUDART_VERSION >= 11030 + if (cudaGraphLaunches >= 1) { + //destroy cuda graph + for (int i=0; inGpus; i++) { + CUDACHECK(cudaGraphExecDestroy(graphExec[i])); + CUDACHECK(cudaGraphDestroy(graphs[i])); + } + } +#endif + + double algBw, busBw; + args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus); + + Barrier(args); + + int64_t wrongElts = 0; + static __thread int rep = 0; + rep++; + for (int c = 0; c < datacheck; c++) { + // Initialize sendbuffs, recvbuffs and expected + TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place)); + +#if CUDART_VERSION >= 11030 + if (cudaGraphLaunches >= 1) { + // Begin cuda graph capture for data check + for (int i=0; inGpus; i++) { + CUDACHECK(cudaStreamBeginCapture(args->streams[i], args->nThreads > 1 ? cudaStreamCaptureModeThreadLocal : cudaStreamCaptureModeGlobal)); + } + } +#endif + + //test validation in single itertion, should ideally be included into the multi-iteration run + TESTCHECK(startColl(args, type, op, root, in_place, 0)); + +#if CUDART_VERSION >= 11030 + if (cudaGraphLaunches >= 1) { + // End cuda graph capture + for (int i=0; inGpus; i++) { + CUDACHECK(cudaStreamEndCapture(args->streams[i], graphs+i)); + } + // Instantiate cuda graph + for (int i=0; inGpus; i++) { + CUDACHECK(cudaGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0)); + } + // Launch cuda graph + for (int i=0; inGpus; i++) { + CUDACHECK(cudaGraphLaunch(graphExec[i], args->streams[i])); + } + } +#endif + + TESTCHECK(completeColl(args)); + +#if CUDART_VERSION >= 11030 + if (cudaGraphLaunches >= 1) { + //destroy cuda graph + for (int i=0; inGpus; i++) { + CUDACHECK(cudaGraphExecDestroy(graphExec[i])); + CUDACHECK(cudaGraphDestroy(graphs[i])); + } + } +#endif + + TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts)); + + //aggregate delta from all threads and procs + long long wrongElts1 = wrongElts; + //if (wrongElts) fprintf(stderr, "\nERROR: Data corruption : rank %d size %ld wrongElts %ld\n", args->proc, args->expectedBytes, wrongElts); + Allreduce(args, &wrongElts1, /*sum*/4); + wrongElts = wrongElts1; + if (wrongElts) break; + } + + double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6; + char timeStr[100]; + if (timeUsec >= 10000.0) { + sprintf(timeStr, "%7.0f", timeUsec); + } else if (timeUsec >= 100.0) { + sprintf(timeStr, "%7.1f", timeUsec); + } else { + sprintf(timeStr, "%7.2f", timeUsec); + } + if (args->reportErrors) { + PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts); + } else { + PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); + } + + args->bw[0] += busBw; + args->bw_count[0]++; + return testSuccess; +} + +void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) { + int nranks = args->nProcs*args->nGpus*args->nThreads; + size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; + + count = size / wordSize(type); + args->collTest->getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset, (size_t)count, (size_t)nranks); + + args->nbytes = paramCount * wordSize(type); + args->sendBytes = sendCount * wordSize(type); + args->expectedBytes = recvCount * wordSize(type); + args->sendInplaceOffset = sendInplaceOffset * wordSize(type); + args->recvInplaceOffset = recvInplaceOffset * wordSize(type); +} + +testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) { + // Sync to avoid first-call timeout + Barrier(args); + + // Warm-up for large size + setupArgs(args->maxbytes, type, args); + for (int iter = 0; iter < warmup_iters; iter++) { + TESTCHECK(startColl(args, type, op, root, 0, iter)); + } + TESTCHECK(completeColl(args)); + + // Warm-up for small size + setupArgs(args->minbytes, type, args); + for (int iter = 0; iter < warmup_iters; iter++) { + TESTCHECK(startColl(args, type, op, root, 0, iter)); + } + TESTCHECK(completeColl(args)); + + // Benchmark + for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) { + setupArgs(size, type, args); + char rootName[100]; + sprintf(rootName, "%6i", root); + PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName); + TESTCHECK(BenchTime(args, type, op, root, 0)); + TESTCHECK(BenchTime(args, type, op, root, 1)); + PRINT("\n"); + } + return testSuccess; +} + +testResult_t threadRunTests(struct threadArgs* args) { + // Set device to the first of our GPUs. If we don't do that, some operations + // will be done on the current GPU (by default : 0) and if the GPUs are in + // exclusive mode those operations will fail. + CUDACHECK(cudaSetDevice(args->gpus[0])); + TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop])); + return testSuccess; +} + +testResult_t threadInit(struct threadArgs* args) { + char hostname[1024]; + getHostName(hostname, 1024); + int nranks = args->nProcs*args->nThreads*args->nGpus; + + //set main thread again + is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0; + + NCCLCHECK(ncclGroupStart()); + for (int i=0; inGpus; i++) { + int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i; + CUDACHECK(cudaSetDevice(args->gpus[i])); + NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank)); + } + NCCLCHECK(ncclGroupEnd()); + + TESTCHECK(threadRunTests(args)); + + for (int i=0; inGpus; i++) { + NCCLCHECK(ncclCommDestroy(args->comms[i])); + } + return testSuccess; +} + +void* threadLauncher(void* thread_) { + struct testThread* thread = (struct testThread*)thread_; + thread->ret = thread->func(&thread->args); + return NULL; +} +testResult_t threadLaunch(struct testThread* thread) { + pthread_create(&thread->thread, NULL, threadLauncher, thread); + return testSuccess; +} + +testResult_t AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, size_t nbytes) { + CUDACHECK(cudaMalloc(sendbuff, nbytes)); + CUDACHECK(cudaMalloc(recvbuff, nbytes)); + if (datacheck) CUDACHECK(cudaMalloc(expected, recvBytes)); + return testSuccess; +} + +testResult_t run(); // Main function + +int main(int argc, char* argv[]) { + // Make sure everyline is flushed so that we see the progress of the test + setlinebuf(stdout); + + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,4,0) + ncclGetVersion(&test_ncclVersion); + #else + test_ncclVersion = NCCL_VERSION_CODE; + #endif + //printf("# NCCL_VERSION_CODE=%d ncclGetVersion=%d\n", NCCL_VERSION_CODE, test_ncclVersion); + #if NCCL_VERSION_CODE >= NCCL_VERSION(2,0,0) + test_opnum = 4; + test_typenum = 9; + if (NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) && test_ncclVersion >= NCCL_VERSION(2,10,0)) { + test_opnum++; // ncclAvg + #if defined(__CUDA_BF16_TYPES_EXIST__) + test_typenum++; // bfloat16 + #endif + } + if (NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) && test_ncclVersion >= NCCL_VERSION(2,11,0)) { + test_opnum++; // PreMulSum + } + #endif + + // Parse args + double parsed; + int longindex; + static struct option longopts[] = { + {"nthreads", required_argument, 0, 't'}, + {"ngpus", required_argument, 0, 'g'}, + {"minbytes", required_argument, 0, 'b'}, + {"maxbytes", required_argument, 0, 'e'}, + {"stepbytes", required_argument, 0, 'i'}, + {"stepfactor", required_argument, 0, 'f'}, + {"iters", required_argument, 0, 'n'}, + {"agg_iters", required_argument, 0, 'm'}, + {"warmup_iters", required_argument, 0, 'w'}, + {"parallel_init", required_argument, 0, 'p'}, + {"check", required_argument, 0, 'c'}, + {"op", required_argument, 0, 'o'}, + {"datatype", required_argument, 0, 'd'}, + {"root", required_argument, 0, 'r'}, + {"blocking", required_argument, 0, 'z'}, + {"stream_null", required_argument, 0, 'y'}, + {"timeout", required_argument, 0, 'T'}, + {"cudagraph", required_argument, 0, 'G'}, + {"report_cputime", required_argument, 0, 'C'}, + {"average", required_argument, 0, 'a'}, + {"help", no_argument, 0, 'h'}, + {} + }; + + while(1) { + int c; + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:y:T:hG:C:a:", longopts, &longindex); + + if (c == -1) + break; + + switch(c) { + case 't': + nThreads = strtol(optarg, NULL, 0); + break; + case 'g': + nGpus = strtol(optarg, NULL, 0); + break; + case 'b': + parsed = parsesize(optarg); + if (parsed < 0) { + fprintf(stderr, "invalid size specified for 'minbytes'\n"); + return -1; + } + minBytes = (size_t)parsed; + break; + case 'e': + parsed = parsesize(optarg); + if (parsed < 0) { + fprintf(stderr, "invalid size specified for 'maxbytes'\n"); + return -1; + } + maxBytes = (size_t)parsed; + break; + case 'i': + stepBytes = strtol(optarg, NULL, 0); + break; + case 'f': + stepFactor = strtol(optarg, NULL, 0); + break; + case 'n': + iters = (int)strtol(optarg, NULL, 0); + break; + case 'm': +#if NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 2) + agg_iters = (int)strtol(optarg, NULL, 0); +#else + fprintf(stderr, "Option -m not supported before NCCL 2.2. Ignoring\n"); +#endif + break; + case 'w': + warmup_iters = (int)strtol(optarg, NULL, 0); + break; + case 'c': + datacheck = (int)strtol(optarg, NULL, 0); + break; + case 'p': + parallel_init = (int)strtol(optarg, NULL, 0); + break; + case 'o': + ncclop = ncclstringtoop(optarg); + break; + case 'd': + nccltype = ncclstringtotype(optarg); + break; + case 'r': + ncclroot = strtol(optarg, NULL, 0); + break; + case 'z': + blocking_coll = strtol(optarg, NULL, 0); + break; + case 'y': + streamnull = strtol(optarg, NULL, 0); + break; + case 'T': + timeout = strtol(optarg, NULL, 0); + break; + case 'G': +#if (NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 9)) && CUDART_VERSION >= 11030 + cudaGraphLaunches = strtol(optarg, NULL, 0); +#else + printf("Option -G (CUDA graph) not supported before NCCL 2.9 + CUDA 11.3. Ignoring\n"); +#endif + break; + case 'C': + report_cputime = strtol(optarg, NULL, 0); + break; + case 'a': + average = (int)strtol(optarg, NULL, 0); + break; + case 'h': + default: + if (c != 'h') printf("invalid option '%c'\n", c); + printf("USAGE: %s \n\t" + "[-t,--nthreads ] \n\t" + "[-g,--ngpus ] \n\t" + "[-b,--minbytes ] \n\t" + "[-e,--maxbytes ] \n\t" + "[-i,--stepbytes ] \n\t" + "[-f,--stepfactor ] \n\t" + "[-n,--iters ] \n\t" + "[-m,--agg_iters ] \n\t" + "[-w,--warmup_iters ] \n\t" + "[-p,--parallel_init <0/1>] \n\t" + "[-c,--check ] \n\t" +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) + "[-o,--op ] \n\t" +#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) + "[-o,--op ] \n\t" +#else + "[-o,--op ] \n\t" +#endif + "[-d,--datatype ] \n\t" + "[-r,--root ] \n\t" + "[-z,--blocking <0/1>] \n\t" + "[-y,--stream_null <0/1>] \n\t" + "[-T,--timeout