Skip to content

Commit

Permalink
added python script to run cylon ucx/ucc without mpirun
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyingshan committed Aug 29, 2022
1 parent 7494fed commit 6cefdcc
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 18 deletions.
3 changes: 0 additions & 3 deletions cpp/src/cylon/net/ucx/ucx_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,6 @@ Status UCXUCCCommunicator::Make(const std::shared_ptr<CommConfig> &config,
// init ucc context
ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_OOB;

ctx_params.oob.allgather = [](void *sbuf, void *rbuf, size_t msglen,
void *coll_info, void **req) { return UCC_OK; };

if (ucc_oob_ctx->Type() == OOBType::OOB_REDIS) {
ctx_params.oob.allgather = team_params.oob.allgather =
UCCRedisOOBContext::oob_allgather;
Expand Down
28 changes: 17 additions & 11 deletions cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

namespace cylon {
namespace net {
UCXRedisOOBContext::UCXRedisOOBContext(std::shared_ptr<sw::redis::Redis> rds,
int ws)
: redis(rds), world_size(ws) {}
UCXRedisOOBContext::UCXRedisOOBContext(int ws, std::string rds)
: redis(std::make_shared<sw::redis::Redis>(rds)), world_size(ws) {}

Status UCXRedisOOBContext::InitOOB() { return Status::OK(); };

Expand All @@ -18,6 +17,7 @@ Status UCXRedisOOBContext::getWorldSizeAndRank(int &world_size, int &rank) {

Status UCXRedisOOBContext::OOBAllgather(uint8_t *src, uint8_t *dst,
size_t srcSize, size_t dstSize) {
CYLON_UNUSED(dstSize);
const auto ucc_worker_addr_mp_str = "ucp_worker_addr_mp";
redis->hset(ucc_worker_addr_mp_str, std::to_string(rank),
std::string((char *)src, (char *)src + srcSize));
Expand Down Expand Up @@ -78,16 +78,14 @@ Status UCXMPIOOBContext::Finalize() {
void UCCRedisOOBContext::InitOOB(int rank) { this->rank = rank; }

std::shared_ptr<UCXOOBContext> UCCRedisOOBContext::makeUCXOOBContext() {
return std::make_shared<UCXRedisOOBContext>(redis, world_size);
return std::make_shared<UCXRedisOOBContext>(world_size, redis_addr);
}

void *UCCRedisOOBContext::getCollInfo() { return this; }

ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf,
size_t msglen, void *coll_info,
void **req) {
auto oob_allgather_func = [](void *sbuf, void *rbuf, size_t msglen,
void *coll_info, void **req) { return UCC_OK; };
int world_size = ((UCCRedisOOBContext *)coll_info)->world_size;
int rank = ((UCCRedisOOBContext *)coll_info)->rank;
int num_comm = ((UCCRedisOOBContext *)coll_info)->num_oob_allgather;
Expand All @@ -104,7 +102,7 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf,

for (int i = 0; i < world_size; i++) {
if (i == rank) {
memcpy(rbuf + i * msglen, s.data(), msglen);
memcpy((uint8_t*)rbuf + i * msglen, s.data(), msglen);
} else {
auto helperName =
"ucc_helper" + std::to_string(num_comm) + ":" + std::to_string(i);
Expand All @@ -117,16 +115,22 @@ ucc_status_t UCCRedisOOBContext::oob_allgather(void *sbuf, void *rbuf,
std::to_string(i));
} while (!val);

memcpy(rbuf + i * msglen, val.value().data(), msglen);
memcpy((uint8_t*)rbuf + i * msglen, val.value().data(), msglen);
}
}

return UCC_OK;
}

UCCRedisOOBContext::UCCRedisOOBContext(int ws,
std::shared_ptr<sw::redis::Redis> &rds)
: world_size(ws), redis(rds) {}
std::string rds)
: world_size(ws), redis(std::make_shared<sw::redis::Redis>(rds)), redis_addr(rds) {}

UCCRedisOOBContext::UCCRedisOOBContext() {
redis_addr = "tcp://" + std::string(getenv("CYLON_UCX_OOB_REDIS_ADDR"));
world_size = std::atoi(getenv("CYLON_UCX_OOB_WORLD_SIZE"));
redis = std::make_shared<sw::redis::Redis>(redis_addr);
}

ucc_status_t UCCRedisOOBContext::oob_allgather_test(void *req) {
CYLON_UNUSED(req);
Expand All @@ -150,7 +154,9 @@ void UCCRedisOOBContext::setRank(int rk) { rank = rk; }

int UCCRedisOOBContext::getRank() { return rank; }

void UCCMPIOOBContext::InitOOB(int rank){};
void UCCMPIOOBContext::InitOOB(int rank){
CYLON_UNUSED(rank);
};

std::shared_ptr<UCXOOBContext> UCCMPIOOBContext::makeUCXOOBContext() {
return std::make_shared<UCXMPIOOBContext>();
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/cylon/net/ucx/ucx_ucc_oob_contexts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UCXOOBContext {

class UCXRedisOOBContext : public UCXOOBContext {
public:
UCXRedisOOBContext(std::shared_ptr<sw::redis::Redis> redis, int world_size);
UCXRedisOOBContext(int world_size, std::string redis_addr);
Status InitOOB() override;

Status getWorldSizeAndRank(int &world_size, int &rank) override;
Expand Down Expand Up @@ -73,7 +73,13 @@ class UCCRedisOOBContext : public UCCOOBContext {

OOBType Type() override;

UCCRedisOOBContext(int world_size, std::shared_ptr<sw::redis::Redis> &redis);
UCCRedisOOBContext(int world_size, std::string redis_addr);

/***
* This constructor is used with python script `run_ucc_with_redis.py`
* Extracts environment variables set by the script and initializes metadata
*/
UCCRedisOOBContext();

static ucc_status_t oob_allgather(void *sbuf, void *rbuf, size_t msglen,
void *coll_info, void **req);
Expand All @@ -90,6 +96,7 @@ class UCCRedisOOBContext : public UCCOOBContext {
int rank = -1;
std::shared_ptr<sw::redis::Redis> redis;
int num_oob_allgather = 0;
std::string redis_addr;
};

class UCCMPIOOBContext : public UCCOOBContext {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/examples/ucc_operators_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ int main(int argc, char **argv) {
if(argc > 1 && std::string(argv[1]) == "mpi") {
oob_ctx = std::make_shared<cylon::net::UCCMPIOOBContext>();
} else {
auto redis = std::make_shared<sw::redis::Redis>("tcp://127.0.0.1:6379");
oob_ctx = std::make_shared<cylon::net::UCCRedisOOBContext>(4, redis);
// auto redis = std::make_shared<sw::redis::Redis>();
oob_ctx = std::make_shared<cylon::net::UCCRedisOOBContext>(
4, "tcp://127.0.0.1:6379");
}

std::shared_ptr<cylon::CylonContext> ctx;
Expand Down
26 changes: 26 additions & 0 deletions python/pycylon/run_ucc_with_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import argparse
import subprocess
import redis

def main(world_size: int, redis_addr: str, executable_name: str):
host, port = redis_addr.split(':')
r = redis.Redis(host, int(port), db=0)
r.flushall()
d = dict(os.environ)
d["CYLON_UCX_OOB_WORLD_SIZE"] = str(world_size)
d["CYLON_UCX_OOB_REDIS_ADDR"] = redis_addr
children = []
for _ in range(world_size):
children.append(subprocess.Popen(executable_name, env=d))

for child in children:
child.wait()

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--world_size', "-n", type=int, help="world size")
parser.add_argument("--redis_addr", "-r", type=str, help="redis address, default to 127.0.0.1:6379", default="127.0.0.1:6379")
parser.add_argument("--execute", "-e", type=str, help="name of executable")
args = parser.parse_args()
main(args.world_size, args.redis_addr, args.execute)

0 comments on commit 6cefdcc

Please sign in to comment.