Skip to content

Commit

Permalink
enable servers to use pmi to get rank and count
Browse files Browse the repository at this point in the history
  • Loading branch information
adammoody committed Jul 20, 2021
1 parent a2d33f0 commit ad57997
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 192 deletions.
120 changes: 86 additions & 34 deletions common/src/unifyfs_keyval.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
const char* const key_unifyfsd_socket = "unifyfsd.socket";
const char* const key_unifyfsd_margo_shm = "unifyfsd.margo-shm";
const char* const key_unifyfsd_margo_svr = "unifyfsd.margo-svr";
const char* const key_unifyfsd_pmi_rank = "unifyfsd.pmi-rank";

// key-value store state
static int kv_initialized; // = 0
Expand All @@ -58,6 +57,9 @@ static size_t kv_max_vallen; // = 0
# define UNIFYFS_MAX_KV_VALLEN 4096
#endif

/* PMI information */
int glb_pmi_rank = -1;
int glb_pmi_size; /* = 0 */

//--------------------- PMI2 K-V Store ---------------------
#if defined(USE_PMI2)
Expand Down Expand Up @@ -127,12 +129,17 @@ static void unifyfs_pmi2_errstr(int rc)
}

// initialize PMI2
static int unifyfs_pmi2_init(void)
int unifyfs_pmi2_init(void)
{
int nprocs, rank, rc, val, len, found;
int pmi_world_rank = -1;
int pmi_world_nprocs = -1;

/* return success if we're already initialized */
if (pmi2_initialized) {
return (int)UNIFYFS_SUCCESS;
}

kv_max_keylen = PMI2_MAX_KEYLEN;
kv_max_vallen = PMI2_MAX_VALLEN;

Expand Down Expand Up @@ -191,6 +198,9 @@ static int unifyfs_pmi2_init(void)
kv_myrank = pmi_world_rank;
kv_nranks = pmi_world_nprocs;

glb_pmi_rank = kv_myrank;
glb_pmi_size = kv_nranks;

LOGDBG("PMI2 Job Id: %s, Rank: %d of %d, hasNameServer=%d", pmi_jobid,
kv_myrank, kv_nranks, pmi2_has_nameserv);

Expand Down Expand Up @@ -239,6 +249,26 @@ static int unifyfs_pmi2_lookup(const char* key,
LOGERR("PMI2_KVS_Get(%s) failed: %s", key, pmi2_errstr);
return (int)UNIFYFS_ERROR_PMI;
}

// HACK: replace '!' with ';' for SLURM PMI2
// This assumes the value does not actually use "!"
//
// At least one version of SLURM PMI2 seems to use ";"
// characters to separate key/value pairs, so the following:
//
// PMI2_KVS_Put("unifyfs.margo-svr", "ofi+tcp;ofi_rxm://ip:port")
//
// leads to an error like:
//
// slurmstepd: error: mpi/pmi2: no value for key ;ofi_rxm://ip:port; in req
char* p = pmi2_val;
while (*p != '\0') {
if (*p == '!') {
*p = ';';
}
p++;
}

*oval = strdup(pmi2_val);
return (int)UNIFYFS_SUCCESS;
}
Expand All @@ -257,6 +287,26 @@ static int unifyfs_pmi2_publish(const char* key,

strncpy(pmi2_key, key, sizeof(pmi2_key));
strncpy(pmi2_val, val, sizeof(pmi2_val));

// HACK: replace ';' with '!' for SLURM PMI2
// This assumes the value does not actually use "!"
//
// At least one version of SLURM PMI2 seems to use ";"
// characters to separate key/value pairs, so the following:
//
// PMI2_KVS_Put("unifyfs.margo-svr", "ofi+tcp;ofi_rxm://ip:port")
//
// leads to an error like:
//
// slurmstepd: error: mpi/pmi2: no value for key ;ofi_rxm://ip:port; in req
char* p = pmi2_val;
while (*p != '\0') {
if (*p == ';') {
*p = '!';
}
p++;
}

rc = PMI2_KVS_Put(pmi2_key, pmi2_val);
if (rc != PMI2_SUCCESS) {
unifyfs_pmi2_errstr(rc);
Expand Down Expand Up @@ -294,14 +344,19 @@ static pmix_proc_t pmix_myproc;
#endif

// initialize PMIx
static int unifyfs_pmix_init(void)
int unifyfs_pmix_init(void)
{
int rc;
size_t pmix_univ_nprocs;
pmix_value_t value;
pmix_value_t* valp = &value;
pmix_proc_t proc;

/* return success if we're already initialized */
if (pmix_initialized) {
return (int)UNIFYFS_SUCCESS;
}

/* init PMIx */
PMIX_PROC_CONSTRUCT(&pmix_myproc);
rc = PMIx_Init(&pmix_myproc, NULL, 0);
Expand All @@ -328,6 +383,9 @@ static int unifyfs_pmix_init(void)
kv_myrank = pmix_myproc.rank;
kv_nranks = (int)pmix_univ_nprocs;

glb_pmi_rank = kv_myrank;
glb_pmi_size = kv_nranks;

LOGDBG("PMIX Job Id: %s, Rank: %d of %d", pmix_myproc.nspace,
kv_myrank, kv_nranks);

Expand Down Expand Up @@ -490,7 +548,6 @@ static int unifyfs_fskv_init(unifyfs_cfg_t* cfg)
int rc, err;
struct stat s;


if (NULL == cfg) {
LOGERR("NULL config");
return EINVAL;
Expand Down Expand Up @@ -709,6 +766,27 @@ static int unifyfs_fskv_lookup_local(const char* key,
return (int)UNIFYFS_SUCCESS;
}

// publish a key-value pair
static int unifyfs_fskv_publish_local(const char* key,
const char* val)
{
FILE* kvf;
char kvfile[UNIFYFS_MAX_FILENAME];

scnprintf(kvfile, sizeof(kvfile), "%s/%s",
localfs_kvdir, key);
kvf = fopen(kvfile, "w");
if (NULL == kvf) {
LOGERR("failed to create kvstore entry %s", kvfile);
return (int)UNIFYFS_ERROR_KEYVAL;
}
fprintf(kvf, "%s\n", val);
fclose(kvf);

return (int)UNIFYFS_SUCCESS;
}

#if (!defined(USE_PMI2)) && (!defined(USE_PMIX))
static int unifyfs_fskv_lookup_remote(int rank,
const char* key,
char** oval)
Expand Down Expand Up @@ -742,26 +820,6 @@ static int unifyfs_fskv_lookup_remote(int rank,
return (int)UNIFYFS_SUCCESS;
}

// publish a key-value pair
static int unifyfs_fskv_publish_local(const char* key,
const char* val)
{
FILE* kvf;
char kvfile[UNIFYFS_MAX_FILENAME];

scnprintf(kvfile, sizeof(kvfile), "%s/%s",
localfs_kvdir, key);
kvf = fopen(kvfile, "w");
if (NULL == kvf) {
LOGERR("failed to create kvstore entry %s", kvfile);
return (int)UNIFYFS_ERROR_KEYVAL;
}
fprintf(kvf, "%s\n", val);
fclose(kvf);

return (int)UNIFYFS_SUCCESS;
}

static int unifyfs_fskv_publish_remote(const char* key,
const char* val)
{
Expand All @@ -785,7 +843,6 @@ static int unifyfs_fskv_publish_remote(const char* key,
return (int)UNIFYFS_SUCCESS;
}

#if (!defined(USE_PMI2)) && (!defined(USE_PMIX))
static int unifyfs_fskv_fence(void)
{
if (!have_sharedfs_kvstore) {
Expand Down Expand Up @@ -834,6 +891,7 @@ int unifyfs_keyval_init(unifyfs_cfg_t* cfg,
kv_nranks = *nranks;
}
#endif

// NOTE: do this after getting rank/n_ranks info
rc = unifyfs_fskv_init(cfg);
if (rc != (int)UNIFYFS_SUCCESS) {
Expand All @@ -849,6 +907,7 @@ int unifyfs_keyval_init(unifyfs_cfg_t* cfg,
if (NULL != nranks) {
*nranks = kv_nranks;
}

return (int)UNIFYFS_SUCCESS;
}

Expand Down Expand Up @@ -953,11 +1012,8 @@ int unifyfs_keyval_lookup_remote(int rank,
#elif defined(USE_PMI2)
rc = unifyfs_pmi2_lookup(rank_key, oval);
#else
rc = (int)UNIFYFS_FAILURE;
rc = unifyfs_fskv_lookup_remote(rank, key, oval);
#endif
if (rc != (int)UNIFYFS_SUCCESS) {
rc = unifyfs_fskv_lookup_remote(rank, key, oval);
}
if (rc != (int)UNIFYFS_SUCCESS) {
LOGERR("remote keyval lookup for '%s' failed", key);
}
Expand Down Expand Up @@ -1037,12 +1093,8 @@ int unifyfs_keyval_publish_remote(const char* key,
#elif defined(USE_PMI2)
rc = unifyfs_pmi2_publish(rank_key, val);
#else
rc = (int)UNIFYFS_FAILURE;
rc = unifyfs_fskv_publish_remote(key, val);
#endif
if (rc != (int)UNIFYFS_SUCCESS) {
rc = unifyfs_fskv_publish_remote(key, val);
}

if (rc != (int)UNIFYFS_SUCCESS) {
LOGERR("remote keyval publish for '%s' failed", key);
} else {
Expand Down
7 changes: 6 additions & 1 deletion common/src/unifyfs_keyval.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
extern "C" {
#endif

extern int glb_pmi_rank;
extern int glb_pmi_size;

int unifyfs_pmix_init(void);
int unifyfs_pmi2_init(void);

// keys we use
extern const char* const key_unifyfsd_socket; // server domain socket path
extern const char* const key_unifyfsd_margo_shm; // client-server margo address
extern const char* const key_unifyfsd_margo_svr; // server-server margo address
extern const char* const key_unifyfsd_pmi_rank; // server-server pmi rank

// initialize key-value store
int unifyfs_keyval_init(unifyfs_cfg_t* cfg,
Expand Down
10 changes: 10 additions & 0 deletions server/src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ else # ! USE_MDHIM

endif # USE_MDHIM

if USE_PMIX
OPT_C_FLAGS += -DUSE_PMIX
OPT_LIBS += -lpmix
endif

if USE_PMI2
OPT_C_FLAGS += -DUSE_PMI2
OPT_LIBS += -lpmi2
endif

unifyfsd_CFLAGS = $(AM_CFLAGS) $(UNIFYFS_COMMON_FLAGS) $(OPT_C_FLAGS)
unifyfsd_LDFLAGS = $(OPT_LD_FLAGS)
unifyfsd_LDADD = $(UNIFYFS_COMMON_LIBS) $(OPT_LIBS)
Expand Down
Loading

0 comments on commit ad57997

Please sign in to comment.