Skip to content

Commit

Permalink
rdma: add user metadata (#854)
Browse files Browse the repository at this point in the history
Signed-off-by: Ric Li <[email protected]>
  • Loading branch information
ricmli authored May 13, 2024
1 parent bc8c04f commit 8806506
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 68 deletions.
3 changes: 3 additions & 0 deletions app/sample/rdma/rdma_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ int main(int argc, char** argv) {

/* print buffer string */
printf("Received buffer %d: %s\n", buffer_consumed, (char*)buffer->addr);
if (buffer->user_meta && buffer->user_meta_size) {
printf("User meta: %d\n", *(int*)buffer->user_meta);
}
usleep(10000); /* simulate consuming */

ret = mtl_rdma_rx_put_buffer(rx, buffer);
Expand Down
9 changes: 8 additions & 1 deletion app/sample/rdma/rdma_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ int main(int argc, char** argv) {
}
int ret = 0;
void* buffers[3] = {};
int meta[3] = {};
int meta_idx = 0;
mtl_rdma_handle mrh = NULL;
mtl_rdma_tx_handle tx = NULL;
struct mtl_rdma_init_params p = {
Expand Down Expand Up @@ -90,8 +92,13 @@ int main(int argc, char** argv) {
continue;
}

snprintf((char*)buffer->addr, buffer->capacity, "Hello, RDMA! %d", i);
meta[meta_idx] = buffer_acked;
snprintf((char*)buffer->addr, buffer->capacity, "Hello, RDMA! id %d acked %d", i,
meta[meta_idx]);
buffer->size = strlen((char*)buffer->addr) + 1;
buffer->user_meta = &meta[meta_idx];
buffer->user_meta_size = sizeof(int);
meta_idx = (meta_idx + 1) % 3;
usleep(20000); /* simulate producing */

ret = mtl_rdma_tx_put_buffer(tx, buffer);
Expand Down
24 changes: 17 additions & 7 deletions rdma/mt_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#define MT_RDMA_MSG_MAGIC (0x494D544C) /* ASCII representation of "IMTL" */

#define MT_RDMA_USER_META_MAX (1024 - sizeof(struct mt_rdma_message))
#define MT_RDMA_MSG_MAX_SIZE (1024)

#define MT_RDMA_USER_META_MAX_SIZE (MT_RDMA_MSG_MAX_SIZE - sizeof(struct mt_rdma_message))

#define MT_SAFE_FREE(obj, free_fn) \
do { \
Expand Down Expand Up @@ -66,6 +68,7 @@ enum mtl_rdma_log_level mt_rdma_get_log_level(void);

enum mt_rdma_message_type {
MT_RDMA_MSG_NONE = 0,
MT_RDMA_MSG_BUFFER_META,
MT_RDMA_MSG_BUFFER_READY,
MT_RDMA_MSG_BUFFER_DONE,
MT_RDMA_MSG_BYE,
Expand All @@ -76,15 +79,20 @@ struct mt_rdma_message {
uint32_t magic;
enum mt_rdma_message_type type;
union {
struct {
uint16_t buf_idx;
uint32_t seq_num;
size_t meta_size;
} buf_meta;
struct {
uint16_t buf_idx;
uint32_t seq_num;
} buf_ready;
struct {
uint16_t buf_idx;
uint32_t seq_num;
uint64_t rx_buf_addr;
uint32_t rx_buf_key;
uint32_t seq_num;
} buf_done;
};
};
Expand All @@ -93,6 +101,7 @@ enum mt_rdma_buffer_status {
MT_RDMA_BUFFER_STATUS_FREE, /* done */
MT_RDMA_BUFFER_STATUS_IN_PRODUCTION,
MT_RDMA_BUFFER_STATUS_IN_TRANSMISSION,
MT_RDMA_BUFFER_STATUS_WAIT_META,
MT_RDMA_BUFFER_STATUS_READY,
MT_RDMA_BUFFER_STATUS_IN_CONSUMPTION,
MT_RDMA_BUFFER_STATUS_MAX,
Expand All @@ -118,12 +127,13 @@ struct mt_rdma_tx_ctx {
struct rdma_cm_id* id;
struct ibv_pd* pd;
struct ibv_qp* qp;
struct ibv_mr* message_mr;
struct ibv_mr* send_msgs_mr;
struct ibv_mr* recv_msgs_mr;
struct rdma_cm_id* listen_id;

uint16_t buffer_producer_idx;
uint32_t buffer_seq_num;
void* message_region; /* 1024 bytes * buf_cnt */
void* send_msgs; /* 1024 bytes * buf_cnt, space reserved for metadata */
struct mt_rdma_message* recv_msgs;
struct mt_rdma_tx_buffer* tx_buffers;
uint16_t buffer_cnt;
pthread_t connect_thread;
Expand Down Expand Up @@ -155,9 +165,9 @@ struct mt_rdma_rx_ctx {
struct rdma_cm_id* id;
struct ibv_pd* pd;
struct ibv_qp* qp;
struct ibv_mr* message_mr;
struct ibv_mr* recv_msgs_mr;

void* message_region; /* 1024 bytes * buf_cnt */
void* recv_msgs; /* 1024 bytes * buf_cnt * 2, space reserved for metadata */
struct mt_rdma_rx_buffer* rx_buffers;
uint16_t buffer_cnt;
pthread_t connect_thread;
Expand Down
164 changes: 124 additions & 40 deletions rdma/mt_rdma_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@

#include "mt_rdma.h"

static struct mt_rdma_message* rdma_rx_get_recv_msg(struct mt_rdma_rx_ctx* ctx) {
for (int i = 0; i < ctx->buffer_cnt * 2; i++) {
struct mt_rdma_message* msg =
(struct mt_rdma_message*)(ctx->recv_msgs + i * MT_RDMA_MSG_MAX_SIZE);
if (msg->type == MT_RDMA_MSG_NONE) {
msg->type = MT_RDMA_MSG_MAX;
return msg;
}
}
err("%s(%s), no free recv msg\n", __func__, ctx->ops_name);
return NULL;
}

static int rdma_rx_send_buffer_done(struct mt_rdma_rx_ctx* ctx, uint16_t idx) {
struct mt_rdma_rx_buffer* rx_buffer = &ctx->rx_buffers[idx];
struct mt_rdma_message msg = {
Expand All @@ -22,19 +35,12 @@ static int rdma_rx_send_buffer_done(struct mt_rdma_rx_ctx* ctx, uint16_t idx) {
err("%s(%s), rdma_post_send failed: %s\n", __func__, ctx->ops_name, strerror(errno));
return -EIO;
}
/* post recv for next ready msg */
void* r_msg = ctx->message_region + idx * 1024;
ret = rdma_post_recv(ctx->id, r_msg, r_msg, 1024, ctx->message_mr);
if (ret) {
err("%s(%s), rdma_post_recv failed: %s\n", __func__, ctx->ops_name, strerror(errno));
return -EIO;
}
rx_buffer->status = MT_RDMA_BUFFER_STATUS_FREE;
return 0;
}

static int rdma_rx_uinit_mrs(struct mt_rdma_rx_ctx* ctx) {
MT_SAFE_FREE(ctx->message_mr, ibv_dereg_mr);
MT_SAFE_FREE(ctx->recv_msgs_mr, ibv_dereg_mr);
for (int i = 0; i < ctx->buffer_cnt; i++) {
struct mt_rdma_rx_buffer* rx_buffer = &ctx->rx_buffers[i];
MT_SAFE_FREE(rx_buffer->mr, ibv_dereg_mr);
Expand All @@ -56,21 +62,22 @@ static int rdma_rx_init_mrs(struct mt_rdma_rx_ctx* ctx) {
rx_buffer->mr = mr;
}

struct ibv_mr* mr = ibv_reg_mr(ctx->pd, ctx->message_region, ctx->buffer_cnt * 1024,
IBV_ACCESS_LOCAL_WRITE);
struct ibv_mr* mr =
ibv_reg_mr(ctx->pd, ctx->recv_msgs, ctx->buffer_cnt * 2 * MT_RDMA_MSG_MAX_SIZE,
IBV_ACCESS_LOCAL_WRITE);
if (!mr) {
err("%s(%s), ibv_reg_mr message failed\n", __func__, ctx->ops_name);
rdma_rx_uinit_mrs(ctx);
return -ENOMEM;
}
ctx->message_mr = mr;
ctx->recv_msgs_mr = mr;

return 0;
}

static int rdma_rx_free_buffers(struct mt_rdma_rx_ctx* ctx) {
rdma_rx_uinit_mrs(ctx);
MT_SAFE_FREE(ctx->message_region, free);
MT_SAFE_FREE(ctx->recv_msgs, free);
MT_SAFE_FREE(ctx->rx_buffers, free);
return 0;
}
Expand All @@ -93,9 +100,9 @@ static int rdma_rx_alloc_buffers(struct mt_rdma_rx_ctx* ctx) {
rx_buffer->buffer.capacity = ops->buffer_capacity;
}

/* alloc message region */
ctx->message_region = (char*)calloc(ctx->buffer_cnt, 1024);
if (!ctx->message_region) {
/* alloc receive message region including metadata, send messages are inlined */
ctx->recv_msgs = (char*)calloc(ctx->buffer_cnt * 2, MT_RDMA_MSG_MAX_SIZE);
if (!ctx->recv_msgs) {
err("%s(%s), message calloc failed\n", __func__, ctx->ops_name);
rdma_rx_free_buffers(ctx);
return -ENOMEM;
Expand Down Expand Up @@ -159,29 +166,85 @@ static void* rdma_rx_cq_poll_thread(void* arg) {
if (wc.opcode == IBV_WC_RECV) {
struct mt_rdma_message* msg = (struct mt_rdma_message*)wc.wr_id;
if (msg->magic == MT_RDMA_MSG_MAGIC) {
if (msg->type == MT_RDMA_MSG_BUFFER_READY) {
uint16_t idx = msg->buf_ready.buf_idx;
struct mt_rdma_rx_buffer* rx_buffer = &ctx->rx_buffers[idx];
rx_buffer->status = MT_RDMA_BUFFER_STATUS_READY;
ctx->stat_buffer_received++;
/* what about other meta? */
if (ops->notify_buffer_ready) {
ret = ops->notify_buffer_ready(ops->priv, &rx_buffer->buffer);
if (ret) {
err("%s(%s), notify_buffer_ready failed\n", __func__, ctx->ops_name);
/* todo: error handle */
uint16_t idx;
struct mt_rdma_rx_buffer* rx_buffer;
switch (msg->type) {
case MT_RDMA_MSG_BUFFER_META:
idx = msg->buf_meta.buf_idx;
dbg("%s(%s), buffer %u meta received\n", __func__, ctx->ops_name, idx);
rx_buffer = &ctx->rx_buffers[idx];
rx_buffer->buffer.user_meta =
(void*)(msg + 1); /* this msg buffer in use by meta */
rx_buffer->buffer.user_meta_size = msg->buf_meta.meta_size;
if (rx_buffer->status == MT_RDMA_BUFFER_STATUS_WAIT_META) {
rx_buffer->status = MT_RDMA_BUFFER_STATUS_READY;
ctx->stat_buffer_received++;
if (ops->notify_buffer_ready) {
ret = ops->notify_buffer_ready(ops->priv, &rx_buffer->buffer);
if (ret) {
err("%s(%s), notify_buffer_ready failed\n", __func__,
ctx->ops_name);
/* todo: error handle */
}
}
} else if (rx_buffer->status == MT_RDMA_BUFFER_STATUS_FREE) {
rx_buffer->status = MT_RDMA_BUFFER_STATUS_IN_TRANSMISSION;
} else {
err("%s(%s), buffer %u unexpected status %d\n", __func__, ctx->ops_name,
idx, rx_buffer->status);
goto out;
}
}
} else if (msg->type == MT_RDMA_MSG_BYE) {
info("%s(%s), received bye message\n", __func__, ctx->ops_name);
/* todo: handle tx bye, notice that cq poll thread may stop before receiving
* bye message */
break;
case MT_RDMA_MSG_BUFFER_READY:
idx = msg->buf_ready.buf_idx;
dbg("%s(%s), buffer %u ready received\n", __func__, ctx->ops_name, idx);
rx_buffer = &ctx->rx_buffers[idx];
if (rx_buffer->status == MT_RDMA_BUFFER_STATUS_IN_TRANSMISSION) {
rx_buffer->status = MT_RDMA_BUFFER_STATUS_READY;
ctx->stat_buffer_received++;
if (ops->notify_buffer_ready) {
ret = ops->notify_buffer_ready(ops->priv, &rx_buffer->buffer);
if (ret) {
err("%s(%s), notify_buffer_ready failed\n", __func__,
ctx->ops_name);
/* todo: error handle */
}
}
} else if (rx_buffer->status == MT_RDMA_BUFFER_STATUS_FREE) {
rx_buffer->status = MT_RDMA_BUFFER_STATUS_WAIT_META;
} else {
err("%s(%s), buffer %u unexpected status %d\n", __func__, ctx->ops_name,
idx, rx_buffer->status);
goto out;
}
msg->type = MT_RDMA_MSG_NONE; /* recycle receive msg */
break;
case MT_RDMA_MSG_BYE:
info("%s(%s), received bye message\n", __func__, ctx->ops_name);
/* todo: handle tx bye, notice that cq poll thread may stop before
* receiving bye message */
goto out;
default:
err("%s(%s), unknown message type: %d\n", __func__, ctx->ops_name,
msg->type);
goto out;
}
} else {
err("%s(%s), received invalid message\n", __func__, ctx->ops_name);
goto out;
}
void* r_msg = rdma_rx_get_recv_msg(ctx);
ret = rdma_post_recv(ctx->id, r_msg, r_msg, MT_RDMA_MSG_MAX_SIZE,
ctx->recv_msgs_mr);
if (ret) {
err("%s(%s), rdma_post_recv failed: %s\n", __func__, ctx->ops_name,
strerror(errno));
goto out;
}
} else if (wc.opcode == IBV_WC_SEND) {
if (wc.wr_id == MT_RDMA_MSG_BYE) {
info("%s(%s), sent bye message, shutdown cq thread\n", __func__,
ctx->ops_name);
dbg("%s(%s), sent bye message, shutdown cq thread\n", __func__,
ctx->ops_name);
goto out;
}
}
Expand Down Expand Up @@ -247,11 +310,9 @@ static void* rdma_rx_connect_thread(void* arg) {
struct ibv_qp_init_attr init_qp_attr = {
.cap.max_send_wr = ctx->buffer_cnt * 2,
.cap.max_recv_wr = ctx->buffer_cnt * 2,
.cap.max_send_sge = 2, /* gather message and meta */
.cap.max_recv_sge = 2, /* scatter message and meta */
.cap.max_inline_data =
64, /* todo: include metadata size, if that size is larger than 64, we
should consider not using inline for msg */
.cap.max_send_sge = 1,
.cap.max_recv_sge = 1,
.cap.max_inline_data = sizeof(struct mt_rdma_message),
.sq_sig_all = 1,
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
Expand Down Expand Up @@ -280,8 +341,22 @@ static void* rdma_rx_connect_thread(void* arg) {
}
break;
case RDMA_CM_EVENT_ESTABLISHED:
for (uint16_t i = 0; i < ctx->buffer_cnt; i++) /* start receiving */
rdma_rx_send_buffer_done(ctx, i);
for (uint16_t i = 0; i < ctx->buffer_cnt; i++) { /* start receiving */
/* post recv for meta/ready msg */
void* msg = rdma_rx_get_recv_msg(ctx);
ret = rdma_post_recv(ctx->id, msg, msg, MT_RDMA_MSG_MAX_SIZE,
ctx->recv_msgs_mr);
if (ret) {
err("%s(%s), rdma_post_recv failed: %s\n", __func__, ctx->ops_name,
strerror(errno));
goto connect_err;
}
ret = rdma_rx_send_buffer_done(ctx, i);
if (ret) {
err("%s(%s), rdma_rx_send_buffer_done failed\n", __func__, ctx->ops_name);
goto connect_err;
}
}
ctx->connected = true;

ctx->cq_poll_stop = false;
Expand Down Expand Up @@ -343,10 +418,19 @@ int mtl_rdma_rx_put_buffer(mtl_rdma_rx_handle handle, struct mtl_rdma_buffer* bu
for (int i = 0; i < ctx->buffer_cnt; i++) {
struct mt_rdma_rx_buffer* rx_buffer = &ctx->rx_buffers[i];
if (&rx_buffer->buffer == buffer) {
if (rx_buffer->status != MT_RDMA_BUFFER_STATUS_IN_CONSUMPTION) {
err("%s(%s), buffer %p not in consumption\n", __func__, ctx->ops_name, buffer);
return -EIO;
}
/* recycle meta in use receive msg */
struct mt_rdma_message* meta_msg =
(struct mt_rdma_message*)rx_buffer->buffer.user_meta - 1;
meta_msg->type = MT_RDMA_MSG_NONE;
return rdma_rx_send_buffer_done(ctx, rx_buffer->idx);
}
}

err("%s(%s), buffer %p not found\n", __func__, ctx->ops_name, buffer);
return -EIO;
}

Expand Down
Loading

0 comments on commit 8806506

Please sign in to comment.