diff --git a/configure.ac b/configure.ac index d8a87111..fa407af3 100644 --- a/configure.ac +++ b/configure.ac @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM TARGETDIR="unknown" case "$host" in - + i?86-*-*) TARGET=X86; TARGETDIR=x86;; ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;; powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;; @@ -26,14 +26,14 @@ case "$host" in aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;; s390*-*-*) TARGET=S390; TARGETDIR=s390;; esac - + AC_SUBST(AM_RUNTESTFLAGS) AC_SUBST(AM_LTLDFLAGS) - + if test $TARGETDIR = unknown; then AC_MSG_ERROR(["it has not been ported to $host."]) fi - + AM_CONDITIONAL(X86, test x$TARGET = xX86) AM_CONDITIONAL(IA64, test x$TARGET = xIA64) AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC) @@ -100,6 +100,34 @@ AC_MSG_CHECKING( [for vma extra api]) AC_MSG_RESULT([${have_vma_api}]) +########################################################################## +# check DOCA communication channel API +# +AC_ARG_ENABLE( + [doca-communication-channel-api], + AC_HELP_STRING([--enable-doca-communication-channel-api], + [SOCKPERF: enable DOCA communication channel extra api support (default=no)]), + [have_doca_comm_channel_api=$enableval], + [have_doca_comm_channel_api=no]) +AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"], + if test "$have_doca_comm_channel_api" = "yes" + then + have_doca_comm_channel_api=/opt/mellanox/doca + fi + + CPPFLAGS="$CPPFLAGS -I$have_doca_comm_channel_api/include -I$have_doca_comm_channel_api/lib -I$have_doca_comm_channel_api/samples" + LIBS="$LIBS -ldoca_comm_channel -ldoca_common" + + [AC_CHECK_HEADERS([$have_doca_comm_channel_api/include/doca_comm_channel.h $have_doca_comm_channel_api/include/doca_dev.h $have_doca_comm_channel_api/samples/common.h], + [AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]]) + ], + [AC_MSG_ERROR([doca_comm_channel.h file not found])] + [have_doca_comm_channel_api=no])]) +AC_MSG_CHECKING( + [for doca communication channel extra api]) +AC_MSG_RESULT([${have_doca_comm_channel_api}]) + + ########################################################################## # check XLIO extra API # @@ -141,7 +169,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes") ########################## -# Enable tests +# Enable tests # SP_ARG_ENABLE_BOOL( [test], @@ -151,7 +179,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes") ########################## -# Enable tools +# Enable tools # SP_ARG_ENABLE_BOOL( [tool], @@ -233,6 +261,7 @@ AC_MSG_RESULT([ test: ${have_test} tool: ${have_tool} vma_api: ${have_vma_api} + doca_api: ${have_doca_comm_channel_api} xlio_api: ${have_xlio_api} debug: ${have_debug} ]) diff --git a/src/client.cpp b/src/client.cpp index ed9bc181..78985a38 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -878,7 +878,11 @@ int Clientactive_fd_list))) continue; const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info; - if (p_client_bind_addr->ss_family != AF_UNSPEC) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (p_client_bind_addr->ss_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { +#else + if (p_client_bind_addr->ss_family != AF_UNSPEC) { +#endif //USING_DOCA_COMM_CHANNEL_API socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len; std::string hostport = sockaddr_to_hostport(p_client_bind_addr); #ifdef __linux__ diff --git a/src/common.cpp b/src/common.cpp index fbb12409..599604bd 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr) if (addr->sa_family == AF_INET6) { return "[" + std::string(hbuf) + "]:" + std::string(pbuf); } else if (addr->sa_family == AF_UNIX) { - return std::string(addr->sa_data); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + return std::string(pbuf) + " [DOCA]"; +#endif + return std::string(pbuf) + " [UNIX]"; } else { return std::string(hbuf) + ":" + std::string(pbuf); } diff --git a/src/common.h b/src/common.h index 1d00e85d..fcfe9187 100644 --- a/src/common.h +++ b/src/common.h @@ -28,6 +28,7 @@ #ifndef COMMON_H_ #define COMMON_H_ +#define POLL_TIMEOUT_MS -1 #include @@ -112,6 +113,43 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[fd]->ep) { + doca_error_t doca_error; + + if (s_user_params.is_blocked) { // the condifiton remains outside due to perforamnce + doca_error_t arm_ret; + int doca_cc = g_fds_array[fd]->com_channel_fd_send; + struct pollfd fds = { .fd = doca_cc, .events = POLLIN | POLLOUT, }; + + if (strlen(s_user_params.feedfile_name)) { // Feedfile + if ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE, + g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) { + arm_ret = doca_comm_channel_ep_event_handle_arm_send(g_fds_array[fd]->ep); + if (arm_ret != DOCA_SUCCESS) { + log_err("Error: arming event handle failed in %s", __func__); + return arm_ret; + } + } + } else { // Not Feefile + while ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE, + g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) { + arm_ret = doca_comm_channel_ep_event_handle_arm_send(g_fds_array[fd]->ep); + if (arm_ret != DOCA_SUCCESS) { + printf("Error: arming event handle failed in %s", __func__); + return arm_ret; + } + poll(&fds, 1, POLL_TIMEOUT_MS); + } + } + } else { + while ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE, + g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) { + } + } + ret = nbytes; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen); } diff --git a/src/defs.h b/src/defs.h index 0ec450db..107f01fa 100644 --- a/src/defs.h +++ b/src/defs.h @@ -28,7 +28,6 @@ #ifndef DEFS_H_ #define DEFS_H_ - #define __STDC_FORMAT_MACROS #ifdef __windows__ @@ -142,6 +141,15 @@ typedef unsigned short int sa_family_t; #endif // USING_XLIO_EXTRA_API #endif // !WIN32 && !__FreeBSD__ +#ifdef USING_DOCA_COMM_CHANNEL_API +#include "doca_comm_channel.h" +#include "doca_dev.h" +//#include +#define MSG_SIZE 2048 +#define PCI_ADDR_LEN 8 +#define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ +#endif /* USING_DOCA_COMM_CHANNEL_API */ + #define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE) extern int MAX_PAYLOAD_SIZE; #define MAX_STREAM_SIZE (50 * 1024 * 1024) @@ -277,8 +285,14 @@ enum { OPT_HISTOGRAM, // 46 OPT_LOAD_XLIO, // 47 #if defined(DEFINED_TLS) - OPT_TLS + OPT_TLS, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + OPT_DOCA, + OPT_PCI, + OPT_PCI_REP +#endif /* USING_DOCA_COMM_CHANNEL_API */ + }; static const char *const round_trip_str[] = { "latency", "rtt" }; @@ -561,6 +575,12 @@ struct fds_data { #if defined(DEFINED_TLS) void *tls_handle = nullptr; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + struct doca_comm_channel_ep_t *ep = nullptr; + struct doca_comm_channel_addr_t *peer_addr = nullptr; + int com_channel_fd_recv = 0; /* Com Channel recv Doca FD*/ + int com_channel_fd_send = 0; /* Com Channel send Doca FD*/ +#endif /* USING_DOCA_COMM_CHANNEL_API */ fds_data() { @@ -769,6 +789,11 @@ struct user_params_t { #if defined(DEFINED_TLS) bool tls = false; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + bool doca_comm_channel = false; + char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ + char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ +#endif /* USING_DOCA_COMM_CHANNEL_API */ user_params_t() { memset(&client_bind_info, 0, sizeof(client_bind_info)); diff --git a/src/input_handlers.h b/src/input_handlers.h index c1f9b3cd..b660f04c 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -28,6 +28,7 @@ #ifndef INPUT_HANDLERS_H_ #define INPUT_HANDLERS_H_ +#define POLL_TIMEOUT -1 #include "message_parser.h" @@ -71,6 +72,50 @@ class RecvFromInputHandler : public MessageParser { ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[fd]->ep) { + size_t msg_len = m_recv_data.cur_size; + doca_error_t doca_error; + if (s_user_params.is_blocked) { + doca_error_t arm_ret; + int doca_cc = g_fds_array[fd]->com_channel_fd_recv; + struct pollfd fds = { .fd = doca_cc, .events = POLLIN | POLLOUT, }; + + if (strlen(s_user_params.feedfile_name)) { // Feedfile + doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE, + &(g_fds_array[fd]->peer_addr)); + if (doca_error == DOCA_ERROR_AGAIN) { + arm_ret = doca_comm_channel_ep_event_handle_arm_recv(g_fds_array[fd]->ep); + if (arm_ret != DOCA_SUCCESS) { + log_err("Error: arming event handle failed in %s", __func__); + return arm_ret; + } + msg_len = 0; + } + } else { // Not Feedfile + while ((doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE, + &(g_fds_array[fd]->peer_addr))) == DOCA_ERROR_AGAIN) { + arm_ret = doca_comm_channel_ep_event_handle_arm_recv(g_fds_array[fd]->ep); + if (arm_ret != DOCA_SUCCESS) { + log_err("Error: arming event handle failed in %s", __func__); + return arm_ret; + } + poll(&fds, 1, POLL_TIMEOUT); + } + } + } else { + while ((doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE, + &(g_fds_array[fd]->peer_addr))) == DOCA_ERROR_AGAIN) { + msg_len = MSG_SIZE; + } + } + + m_actual_buf = buf; + m_actual_buf_size = msg_len; + return msg_len; + + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = recvfrom(fd, buf, m_recv_data.cur_size, flags, (struct sockaddr *)recvfrom_addr, &size); diff --git a/src/iohandlers.cpp b/src/iohandlers.cpp index 0b496543..89836d67 100644 --- a/src/iohandlers.cpp +++ b/src/iohandlers.cpp @@ -38,6 +38,11 @@ static void print_addresses(const fds_data *data, int &list_count) NI_NUMERICHOST | NI_NUMERICSERV); switch (data->server_addr.ss_family) { case AF_UNIX: +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf); + else +#endif printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type)); break; default: diff --git a/src/server.cpp b/src/server.cpp index bdadc86a..4dd5a2ca 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -95,26 +95,30 @@ int ServerBase::initBeforeLoop() { p_bind_addr = &bind_addr; } - std::string hostport = sockaddr_to_hostport(p_bind_addr); - log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); - if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { - log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, - hostport.c_str()); - rc = SOCKPERF_ERR_SOCKET; - break; - } - /* - * since when using VMA there is no qp until the bind, and vma cannot - * check that rate-limit is supported this is done here and not - * with the rest of the setsockopt - */ - if (s_user_params.rate_limit > 0 && - sock_set_rate_limit(ifd, s_user_params.rate_limit)) { - log_err("[fd=%d] failed setting rate limit, %s\n", ifd, - hostport.c_str()); - rc = SOCKPERF_ERR_SOCKET; - break; - } + std::string hostport = sockaddr_to_hostport(p_bind_addr); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#else + log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); + if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#endif //USING_DOCA_COMM_CHANNEL_API + log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, + hostport.c_str()); + rc = SOCKPERF_ERR_SOCKET; + break; + } + /* + * since when using VMA there is no qp until the bind, and vma cannot + * check that rate-limit is supported this is done here and not + * with the rest of the setsockopt + */ + if (s_user_params.rate_limit > 0 && + sock_set_rate_limit(ifd, s_user_params.rate_limit)) { + log_err("[fd=%d] failed setting rate limit, %s\n", ifd, + hostport.c_str()); + rc = SOCKPERF_ERR_SOCKET; + break; + } if ((g_fds_array[ifd]->sock_type == SOCK_STREAM) && (listen(ifd, 10) < 0)) { log_err("Failed listen() for connection\n"); diff --git a/src/sockperf.cpp b/src/sockperf.cpp index 293d2174..e5e3a2b7 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -139,6 +139,7 @@ static int proc_mode_ping_pong(int, int, const char **); static int proc_mode_throughput(int, int, const char **); static int proc_mode_playback(int, int, const char **); static int proc_mode_server(int, int, const char **); +int bringup_for_doca(std::unique_ptr &tmp); static const struct app_modes { int (*func)(int, int, const char **); /* proc function */ @@ -296,6 +297,14 @@ static const AOPT_DESC common_opt_desc[] = { { OPT_TLS, AOPT_OPTARG, aopt_set_literal(0), aopt_set_string("tls"), "Use TLSv1.2 (default " TLS_CHIPER_DEFAULT ")." }, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_PCI, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, + { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-representor"), "Comm Channel DOCA device representor PCI address"}, +#endif /* USING_DOCA_COMM_CHANNEL_API */ { 'd', AOPT_NOARG, aopt_set_literal('d'), aopt_set_string("debug"), "Print extra debug information." }, { 0, AOPT_NOARG, aopt_set_literal(0), aopt_set_string(NULL), NULL } @@ -2203,6 +2212,33 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } } #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!rc && aopt_check(common_obj, OPT_DOCA)) { + if (aopt_check(common_obj, 'tls')) { + log_msg("--doca-comm-channel conflicts with --tls option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } + if (!aopt_check(common_obj, OPT_PCI)) { + log_msg("doca-comm-channel must have pci address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI); + if (optarg) + strcpy(s_user_params.cc_dev_pci_addr, optarg); + } + if (s_user_params.mode == MODE_SERVER) { + if (!aopt_check(common_obj, OPT_PCI_REP)) { + log_msg("doca-comm-channel server must have pci representor address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI_REP); + if (optarg) + strcpy(s_user_params.cc_dev_rep_pci_addr, optarg); + } + } + s_user_params.doca_comm_channel = true; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ } // resolve address: -i, -p and --tcp options must be processed before @@ -2371,6 +2407,19 @@ void cleanup() { FREE(g_fds_array[ifd]->memberships_addr); } if (s_user_params.addr.ss_family == AF_UNIX) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[ifd]->ep) { + doca_error_t res = DOCA_SUCCESS; + res = doca_comm_channel_ep_disconnect(g_fds_array[ifd]->ep, g_fds_array[ifd]->peer_addr); + if (res != DOCA_SUCCESS) + log_dbg("Failed to disconnect doca communication channel"); + res = doca_comm_channel_ep_destroy(g_fds_array[ifd]->ep); + if (res != DOCA_SUCCESS) + log_dbg("Failed to destroy doca communication Channel endpoint"); + g_fds_array[ifd]->ep = NULL; + g_fds_array[ifd]->peer_addr = NULL; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ os_unlink_unix_path(s_user_params.client_bind_info.addr_un.sun_path); #ifndef __windows__ // AF_UNIX with DGRAM isn't supported in __windows__ if (s_user_params.mode == MODE_CLIENT && s_user_params.sock_type == SOCK_DGRAM) { // unlink binded client @@ -2380,7 +2429,7 @@ void cleanup() { } #endif // __windows__ if (s_user_params.mode == MODE_SERVER) - os_unlink_unix_path(g_fds_array[ifd]->server_addr.addr_un.sun_path); + unlink(g_fds_array[ifd]->server_addr.addr_un.sun_path); } delete g_fds_array[ifd]; } @@ -2390,15 +2439,15 @@ void cleanup() { if (s_user_params.select_timeout) { FREE(s_user_params.select_timeout); } -#if defined(USING_VMA_EXTRA_API) || defined(USING_XLIO_EXTRA_API) - if ((g_vma_api || g_xlio_api) && s_user_params.is_zcopyread) { +#ifdef USING_VMA_EXTRA_API + if (g_vma_api && s_user_params.is_zcopyread) { zeroCopyMap::iterator it; while ((it = g_zeroCopyData.begin()) != g_zeroCopyData.end()) { delete it->second; g_zeroCopyData.erase(it); } } -#endif // USING_VMA_EXTRA_API || USING_XLIO_EXTRA_API +#endif // USING_VMA_EXTRA_API if (g_fds_array) { FREE(g_fds_array); @@ -3216,7 +3265,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { #endif std::unique_ptr tmp{ new fds_data }; - + bool skip_socket = false; int res = resolve_sockaddr(addr.c_str(), port.c_str(), sock_type, false, reinterpret_cast(&tmp->server_addr), tmp->server_addr_len); if (res != 0) { @@ -3292,23 +3341,70 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { g_fds_array[curr_fd]->memberships_size++; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.ss_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all + // over the way and avoid + // this cast } - fd_socket_map[port_desc_tmp] = curr_fd; - if (tmp->is_multicast) { - tmp->memberships_addr = reinterpret_cast(MALLOC( - IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + + // TODO: In the following malloc we have a one time memory allocation of + // 128KB that are not reclaimed + // This O(1) leak was introduced in revision 133 + tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->memberships_addr = NULL; + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; } - tmp->memberships_size = 0; - s_fd_num++; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + log_dbg("starting feedfile for doca"); + int curr_fd = bringup_for_doca(tmp); + s_fd_num++; + skip_socket = true; + fd_socket_map[port_desc_tmp] = curr_fd; + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); + } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); + } + } +#endif //USING_DOCA_COMM_CHANNEL_API + + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.ss_family, tmp->sock_type, 0)) < + 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6, SOCK_x)"); + rc = SOCKPERF_ERR_SOCKET; + } + fd_socket_map[port_desc_tmp] = curr_fd; + log_msg("FD is %d", curr_fd); + if (tmp->is_multicast) { + tmp->memberships_addr = reinterpret_cast(MALLOC( + IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + } else { + tmp->memberships_addr = NULL; + } + tmp->memberships_size = 0; + + s_fd_num++; + } } - if (curr_fd >= 0) { + if (!skip_socket && curr_fd >= 0) { if ((curr_fd >= MAX_FDS_NUM) || (prepare_socket(curr_fd, tmp.get()) == (int) INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid this cast @@ -3316,26 +3412,10 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { close(curr_fd); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all - // over the way and avoid - // this cast - } - // TODO: In the following malloc we have a one time memory allocation of - // 128KB that are not reclaimed - // This O(1) leak was introduced in revision 133 - tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); if (!tmp->recv.buf) { log_err("Failed to allocate memory with malloc()"); rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - if (new_socket_flag) { if (s_fd_num == 1) { /*it is the first fd*/ s_fd_min = curr_fd; @@ -3450,6 +3530,266 @@ static bool fds_array_is_valid() { return ((fd == s_fd_max) && ((i + 1) == s_fd_num) && (g_fds_array[fd]->next_fd == s_fd_min)); } +#if defined(USING_DOCA_COMM_CHANNEL_API) +typedef doca_error_t (*jobs_check)(struct doca_devinfo *); +doca_error_t +open_doca_device_with_pci(const struct doca_pci_bdf *value, jobs_check func, struct doca_dev **retval) +{ + struct doca_devinfo **dev_list; + uint32_t nb_devs; + struct doca_pci_bdf buf = {}; + doca_error_t res; + size_t i; + + /* Set default return value */ + *retval = NULL; + + res = doca_devinfo_list_create(&dev_list, &nb_devs); + if (res != DOCA_SUCCESS) { + log_err("Failed to load doca devices list. Doca_error value: %d", res); + return res; + } + + /* Search */ + for (i = 0; i < nb_devs; i++) { + res = doca_devinfo_get_pci_addr(dev_list[i], &buf); + if (res == DOCA_SUCCESS && buf.raw == value->raw) { + /* If any special capabilities are needed */ + if (func != NULL && func(dev_list[i]) != DOCA_SUCCESS) + continue; + + /* if device can be opened */ + res = doca_dev_open(dev_list[i], retval); + if (res == DOCA_SUCCESS) { + doca_devinfo_list_destroy(dev_list); + return res; + } + } + } + + log_err("Matching device not found."); + res = DOCA_ERROR_NOT_FOUND; + + doca_devinfo_list_destroy(dev_list); + return res; +} + +doca_error_t +open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_dev_rep_filter filter, struct doca_pci_bdf *pci_bdf, + struct doca_dev_rep **retval) +{ + uint32_t nb_rdevs = 0; + struct doca_devinfo_rep **rep_dev_list = NULL; + struct doca_pci_bdf queried_pci_bdf; + doca_error_t result; + size_t i; + + *retval = NULL; + + /* Search */ + result = doca_devinfo_rep_list_create(local, filter, &rep_dev_list, &nb_rdevs); + if (result != DOCA_SUCCESS) { + log_err( + "Failed to create devinfo representors list. Representor devices are available only on DPU, do not run on Host."); + return DOCA_ERROR_INVALID_VALUE; + } + + for (i = 0; i < nb_rdevs; i++) { + result = doca_devinfo_rep_get_pci_addr(rep_dev_list[i], &queried_pci_bdf); + if (result == DOCA_SUCCESS && queried_pci_bdf.raw == pci_bdf->raw && + doca_dev_rep_open(rep_dev_list[i], retval) == DOCA_SUCCESS) { + doca_devinfo_rep_list_destroy(rep_dev_list); + return DOCA_SUCCESS; + } + } + + log_err("Matching device not found."); + doca_devinfo_rep_list_destroy(rep_dev_list); + return DOCA_ERROR_NOT_FOUND; +} + + +doca_error_t +parse_pci_addr(char const *pci_addr, struct doca_pci_bdf *out_bdf) +{ + unsigned int bus_bitmask = 0xFFFFFF00; + unsigned int dev_bitmask = 0xFFFFFFE0; + unsigned int func_bitmask = 0xFFFFFFF8; + uint32_t tmpu; + char tmps[4]; + + if (pci_addr == NULL || strlen(pci_addr) != 7 || pci_addr[2] != ':' || pci_addr[5] != '.') + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[0]; + tmps[1] = pci_addr[1]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & bus_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + out_bdf->bus = tmpu; + + tmps[0] = pci_addr[3]; + tmps[1] = pci_addr[4]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & dev_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + out_bdf->device = tmpu; + + tmps[0] = pci_addr[6]; + tmps[1] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & func_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + out_bdf->function = tmpu; + + return DOCA_SUCCESS; +} + + +int bringup_for_doca(std::unique_ptr &tmp) +{ + log_dbg("creating ep with name %s", s_user_params.addr.addr_un.sun_path); + struct doca_pci_bdf dev_pcie = {0}; + struct doca_comm_channel_ep_t *ep; + doca_error_t doca_error = DOCA_SUCCESS; + struct doca_comm_channel_addr_t *peer_addr = NULL; + struct doca_dev *cc_dev = NULL; + struct doca_dev_rep *cc_dev_rep = NULL; + int epoll_fd = epoll_create(MAX_FDS_NUM); + struct epoll_event ev = { 0, { 0 } }; + + /* Convert the PCI addresses into the matching struct */ + doca_error = parse_pci_addr(s_user_params.cc_dev_pci_addr, &dev_pcie); + if (doca_error != DOCA_SUCCESS) { + errno = EPERM; + log_dbg("doca error %s", doca_get_error_string(doca_error)); + exit_with_err("Failed to parse the device PCI address", SOCKPERF_ERR_FATAL); + } + + /* Create Comm Channel endpoint */ + doca_error = doca_comm_channel_ep_create(&ep); + if (doca_error != DOCA_SUCCESS) { + errno = EPERM; + log_dbg("doca error %s", doca_get_error_string(doca_error)); + exit_with_err("Failed to create Comm Channel endpoint", SOCKPERF_ERR_FATAL); + } + + /* Open DOCA device according to the given PCI address */ + doca_error = open_doca_device_with_pci(&dev_pcie, NULL, &cc_dev); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device based on PCI address %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + if (s_user_params.mode == MODE_SERVER) { + /* Convert the PCI addresses into the matching struct */ + struct doca_pci_bdf dev_rep_pcie = {0}; + doca_error = parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to parse the device representor PCI address %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + /* Open DOCA device representor according to the given PCI address */ + doca_error = open_doca_device_rep_with_pci(cc_dev, DOCA_DEV_REP_FILTER_NET, &dev_rep_pcie, &cc_dev_rep); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device representor based on PCI address %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + } + /* Set all endpoint properties */ + doca_error = doca_comm_channel_ep_set_device(ep, cc_dev); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to set device property %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + doca_error = doca_comm_channel_ep_set_max_msg_size(ep, MSG_SIZE); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to set max_msg_size property %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + doca_error = doca_comm_channel_ep_set_send_queue_size(ep, CC_MAX_QUEUE_SIZE); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to set snd_queue_size property %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + doca_error = doca_comm_channel_ep_set_recv_queue_size(ep, CC_MAX_QUEUE_SIZE); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to set rcv_queue_size property %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + if (s_user_params.mode == MODE_SERVER) { + doca_error = doca_comm_channel_ep_set_device_rep(ep, cc_dev_rep); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to set DOCA device representor property %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + // Sockperf allows for several listen sockets- Phase 2 to support several listen sockets. + log_msg("listening to doca device, name is %s", s_user_params.addr.addr_un.sun_path); + doca_error = doca_comm_channel_ep_listen(ep, s_user_params.addr.addr_un.sun_path); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Comm Channel server couldn't start listening: %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + } else { // MODE = CLIENT + /* Connect to server node */ + log_msg("connecting to doca device, name is %s", s_user_params.addr.addr_un.sun_path); + doca_error = doca_comm_channel_ep_connect(ep, s_user_params.addr.addr_un.sun_path, &peer_addr); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Couldn't establish a connection with the server: %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + /* Make sure peer address is valid */ + while ((doca_error = doca_comm_channel_peer_addr_update_info(peer_addr)) == DOCA_ERROR_CONNECTION_INPROGRESS) { + usleep(1); + } + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to validate the connection with the DPU: %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + } + + doca_event_channel_t recv_fd, send_fd; + doca_error = doca_comm_channel_ep_get_event_channel(ep, &send_fd ,&recv_fd); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to get DOCA event channel %s", doca_get_error_string(doca_error)); + goto destroy_cc; + } + + tmp->com_channel_fd_recv = recv_fd; + tmp->com_channel_fd_send = send_fd; + tmp->peer_addr = peer_addr; + tmp->ep = ep; + // Sock type is not needed for comm channel flow + tmp->sock_type = -1; + + + ev.events = EPOLLIN | EPOLLPRI; + ev.data.fd = recv_fd; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ev.data.fd, &ev); + // ev.data.fd = send_fd; + // epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ev.data.fd, &ev); + log_dbg("epoll fd is %d, Doca recv fd is %d, Doca send fd is %d",epoll_fd ,recv_fd, send_fd); + +destroy_cc: + if (doca_error != DOCA_SUCCESS) { + /* Destroy Comm Channel DOCA device representor */ + if (s_user_params.mode == MODE_SERVER) + doca_dev_rep_close(cc_dev_rep); + /* Destroy Comm Channel DOCA device */ + doca_dev_close(cc_dev); + exit_with_err("Com channel bringup failed", SOCKPERF_ERR_FATAL); + } + + return epoll_fd; +} +#endif //USING_DOCA_COMM_CHANNEL_API + //------------------------------------------------------------------------------ int bringup(const int *p_daemonize) { int rc = SOCKPERF_ERR_NONE; @@ -3540,7 +3880,6 @@ int bringup(const int *p_daemonize) { (int)INVALID_SOCKET; // TODO: use SOCKET all over the way and avoid this cast std::unique_ptr tmp{ new fds_data }; - if (s_user_params.addr.ss_family == AF_UNIX) { log_dbg("UNIX domain socket was provided %s\n", s_user_params.addr.addr_un.sun_path); s_user_params.tcp_nodelay = false; @@ -3561,37 +3900,46 @@ int bringup(const int *p_daemonize) { rc = SOCKPERF_ERR_NO_MEMORY; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.ss_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + s_fd_num = 1; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; + } + tmp->recv.buf = + (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - if ((curr_fd >= MAX_FDS_NUM) || - (prepare_socket(curr_fd, tmp.get()) == - (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid - // this cast - log_err("Invalid socket"); - close(curr_fd); + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; + } + bool skip_socket = false; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + int epoll_fd = bringup_for_doca(tmp); + skip_socket = true; + s_fd_min = s_fd_max = epoll_fd; + g_fds_array[s_fd_min] = tmp.release(); + g_fds_array[s_fd_min]->next_fd = s_fd_min; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.ss_family, tmp->sock_type, 0)) < 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - s_fd_num = 1; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; - } - tmp->recv.buf = - (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; + if ((curr_fd >= MAX_FDS_NUM) || + (prepare_socket(curr_fd, tmp.get()) == + (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid + // this cast + log_err("Invalid socket"); + close(curr_fd); + rc = SOCKPERF_ERR_SOCKET; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - s_fd_min = s_fd_max = curr_fd; g_fds_array[s_fd_min] = tmp.release(); g_fds_array[s_fd_min]->next_fd = s_fd_min; @@ -3610,12 +3958,12 @@ int bringup(const int *p_daemonize) { } } } - +#ifndef USING_DOCA_COMM_CHANNEL_API if (!rc && !fds_array_is_valid()) { log_err("Sanity check failed for sockets list"); rc = SOCKPERF_ERR_FATAL; } - +#endif /* USING_DOCA_COMM_CHANNEL_API */ if (!rc && (s_user_params.threads_num > s_fd_num || s_user_params.threads_num == 0)) { log_msg("Number of threads should be less than sockets count"); rc = SOCKPERF_ERR_BAD_ARGUMENT;