Skip to content

Commit

Permalink
Adding Com Channel fast data path capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
EldarShalev committed Feb 7, 2024
1 parent 2ccd99a commit ea90798
Show file tree
Hide file tree
Showing 7 changed files with 692 additions and 67 deletions.
37 changes: 35 additions & 2 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,44 @@ int Client<IoType, SwitchCycleDuration, PongModeCare>::initBeforeLoop() {
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel) {
// Waiting for connection
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx;
while (ctx_client->state != CC_CONNECTED) {
doca_error_t result;
while (data->doca_cc_ctx->state != CC_CONNECTED) {
doca_pe_progress(s_user_params.pe);
}
log_dbg("[fd=%d] Client connected successfully", ifd);
if (s_user_params.doca_cc_fifo) {
struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem);
struct cc_local_mem_bufs *local_consumer_mem = &(data->doca_cc_ctx->ctx_fifo.consumer_mem);
// Buf is needed for registering with memrange
local_producer_mem->mem = m_pMsgRequest->getBuf();
result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx);
if (result != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result));
return result;
}
log_dbg("[fd=%d] Init producer memory succeeded", ifd);
while (data->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) {
doca_pe_progress(s_user_params.pe);
}
if (!g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) {
struct doca_ctx *ctx;
enum doca_ctx_states state;
ctx = doca_cc_consumer_as_ctx(data->doca_cc_ctx->ctx_fifo.consumer);
doca_ctx_get_state(ctx, &state);
while (state != DOCA_CTX_STATE_RUNNING) {
doca_pe_progress(s_user_params.pe_underload);
doca_ctx_get_state(ctx, &state);
}
}
result = cc_init_doca_consumer_task(local_consumer_mem, &data->doca_cc_ctx->ctx_fifo);
if (result != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to init doca consumer task with error = %s", doca_error_get_name(result));
}
result = cc_init_doca_producer_task(local_producer_mem, &data->doca_cc_ctx->ctx_fifo);
if (result != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to init doca producer task with error = %s", doca_error_get_name(result));
}
}
}
// Avoid Client binding in Com Channel mode
if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) {
Expand Down
90 changes: 58 additions & 32 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,62 +114,88 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes,
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel) {
doca_error_t doca_error;
doca_error_t doca_error = DOCA_SUCCESS;
int result;
struct doca_cc_producer_send_task *producer_task;
struct doca_cc_send_task *task;
struct doca_buf *doca_buf;
struct doca_task *task_obj;
struct timespec ts = {
.tv_sec = 0,
.tv_nsec = NANOS_10_X_1000,
};
struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx;
do {
if (s_user_params.mode == MODE_SERVER) {
struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx;
doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf,
nbytes, &task);
} else { // MODE_CLIENT
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx;
doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf,
nbytes, &task);
}
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);

if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
errno = EAGAIN;
ret = -1;
} else {
log_err("Doca task_alloc_init failed");
ret = RET_SOCKET_SHUTDOWN;
if (s_user_params.doca_cc_fifo) {
doca_error = doca_buf_set_data(ctx->ctx_fifo.doca_buf_producer, ctx->ctx_fifo.producer_mem.mem, nbytes);
if (doca_error != DOCA_SUCCESS) {
log_err("failed setting doca data data with error = %s", doca_error_get_name(doca_error));
}
} else { // task_alloc_init succeeded
task_obj = doca_cc_send_task_as_task(task);
task_obj = doca_cc_producer_send_task_as_task(ctx->ctx_fifo.producer_task);
do {
doca_error = doca_task_submit(task_obj);
// need to submit until no AGAIN
if (doca_error == DOCA_ERROR_AGAIN) {
nanosleep(&ts, &ts);
}
} while (doca_error == DOCA_ERROR_AGAIN);
} else { // Not doca fast path
do {
if (s_user_params.mode == MODE_SERVER) {
struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx;
doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf,
nbytes, &task);
} else { // MODE_CLIENT
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx;
doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf,
nbytes, &task);
}
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN);

} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);
if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
errno = EAGAIN;
ret = -1;
} else {
log_err("Doca doca_task_submit failed");
log_err("Doca task_alloc_init failed");
ret = RET_SOCKET_SHUTDOWN;
}
} else {
ret = nbytes;
task_obj = doca_cc_send_task_as_task(task);
do {
doca_error = doca_task_submit(task_obj);
if (doca_error == DOCA_ERROR_AGAIN) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN);
}
}

if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked
errno = EAGAIN;
ret = -1;
doca_task_free(task_obj);
} else {
log_err("Doca doca_task_submit failed");
ret = RET_SOCKET_SHUTDOWN;
}
} else {
ret = nbytes;
}

// Additional call for better performance- release pressure on send queue
doca_pe_progress(s_user_params.pe);
if (!s_user_params.doca_cc_fifo || doca_error == DOCA_ERROR_NO_MEMORY) {
doca_pe_progress(s_user_params.pe);
} else { // fast path and task submitted successfully
do {
result = doca_pe_progress(s_user_params.pe);
nanosleep(&ts, &ts);
} while (result == 0);
}
} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
Expand Down
5 changes: 4 additions & 1 deletion src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ enum {
#if defined(USING_DOCA_COMM_CHANNEL_API)
OPT_DOCA,
OPT_PCI,
OPT_PCI_REP
OPT_PCI_REP,
OPT_DOCA_FAST_PATH
#endif /* USING_DOCA_COMM_CHANNEL_API */

};
Expand Down Expand Up @@ -820,9 +821,11 @@ struct user_params_t {
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/
bool doca_cc_fifo = false; /* Flag to indicate using fast path*/
char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */
char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */
struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/
struct doca_pe *pe_underload = nullptr; /* Progress engine for doca, one per thread, underload mode */
#endif /* USING_DOCA_COMM_CHANNEL_API */

user_params_t() {
Expand Down
Loading

0 comments on commit ea90798

Please sign in to comment.