From a0b497e21906412348e380ee2b56bacfeae439c4 Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Mon, 4 Nov 2024 12:32:23 -0400 Subject: [PATCH 1/5] Removed a bunch of duplicate calls to poll/select, use wait() for lo_server_recv as well. Fixed buggy behaviours in which handling a newly queued message would take precedence over dispatching an existing queued message with a lower timestamp, and in which undispatched messages would still cause lo_server_recv to return. --- lo/lo_lowlevel.h | 8 +- src/lo_types_internal.h | 3 +- src/server.c | 431 +++++++++++++++++----------------------- 3 files changed, 187 insertions(+), 255 deletions(-) diff --git a/lo/lo_lowlevel.h b/lo/lo_lowlevel.h index fa38b62..812ca83 100644 --- a/lo/lo_lowlevel.h +++ b/lo/lo_lowlevel.h @@ -799,8 +799,9 @@ void lo_server_free(lo_server s); * \param timeout A timeout in milliseconds to wait for the incoming packet. * a value of 0 will return immediately. * - * The return value is 1 if there is a message waiting or 0 if - * there is no message. If there is a message waiting you can now + * The return value is 1 if there is a message waiting, 0 if + * there is no message, and -1 if there is an error that causes the + * function to return early. If there is a message waiting you can now * call lo_server_recv() to receive that message. */ int lo_server_wait(lo_server s, int timeout); @@ -815,7 +816,8 @@ int lo_server_wait(lo_server s, int timeout); * a value of 0 will return immediately. * * The return value is the number of servers with a message waiting or - * 0 if there is no message. If there is a message waiting you can now + * -1 if there is an error that causes the function to return early. + * If there is a message waiting you can now * call lo_server_recv() to receive that message. */ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout); diff --git a/src/lo_types_internal.h b/src/lo_types_internal.h index bc7cd5d..d1db6e3 100644 --- a/src/lo_types_internal.h +++ b/src/lo_types_internal.h @@ -148,7 +148,8 @@ struct socket_context { #ifdef HAVE_POLL typedef struct pollfd lo_server_fd_type; #else - typedef struct { int fd; } lo_server_fd_type; + typedef struct { int fd; int revents; } lo_server_fd_type; + #define POLLIN 0x001 #endif typedef struct _lo_server { diff --git a/src/server.c b/src/server.c index 130f6d5..8b69296 100644 --- a/src/server.c +++ b/src/server.c @@ -90,6 +90,10 @@ static int reuseport_supported = 1; static int lo_can_coerce_spec(const char *a, const char *b); static int lo_can_coerce(char a, char b); +static int lo_servers_wait_internal(lo_server *s, int *status, + int *queued, int num_servers, + int timeout); +static int lo_server_recv_internal(lo_server s); static void dispatch_method(lo_server s, const char *path, lo_message msg, int sock); static int dispatch_data(lo_server s, void *data, @@ -892,13 +896,24 @@ int lo_server_enable_queue(lo_server s, int enable, return prev; } +static void *lo_server_recv_raw(lo_server s, size_t * size) { char *buffer = NULL; int ret, heap_buffer = 0; void *data = NULL; - if (s->max_msg_size > 4096) { +#if defined(WIN32) || defined(_MSC_VER) + if (!initWSock()) { + return NULL; + } +#endif + + if (s->max_msg_size<=0) { + return NULL; + } + else if (s->max_msg_size > 4096) { + // TODO: can we use a static buffer here instead? buffer = (char*) malloc(s->max_msg_size); heap_buffer = 1; } @@ -909,18 +924,6 @@ void *lo_server_recv_raw(lo_server s, size_t * size) if (!buffer) return NULL; - if (s->max_msg_size<=0) { - if (heap_buffer) free(buffer); - return NULL; - } - -#if defined(WIN32) || defined(_MSC_VER) - if (!initWSock()) { - if (heap_buffer) free(buffer); - return NULL; - } -#endif - s->addr_len = sizeof(s->addr); ret = (int) recvfrom(s->sockets[0].fd, buffer, s->max_msg_size, 0, @@ -1060,6 +1063,8 @@ uint32_t lo_server_buffer_contains_msg(lo_server s, int isock) return 0; } +/*! \internal Copy received data into buffer, return pointer to + * message if buffer contains a complete message. */ static void *lo_server_buffer_copy_for_dispatch(lo_server s, int isock, size_t *psize) { @@ -1101,12 +1106,6 @@ int lo_server_recv_raw_stream_socket(lo_server s, int isock, again: - // Check if there is already a message waiting in the buffer. - if ((*pdata = lo_server_buffer_copy_for_dispatch(s, isock, psize))) { - // There could be more data, so return true. - return 1; - } - buffer_bytes_left = (int) (sc->buffer_size - sc->buffer_read_offset); // If we need more than half the buffer, double the buffer size. @@ -1272,105 +1271,34 @@ int lo_server_recv_raw_stream_socket(lo_server s, int isock, static void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock) { - struct sockaddr_storage addr; - socklen_t addr_len = sizeof(addr); int i; void *data = NULL; int sock = -1; -#ifdef HAVE_SELECT -#ifndef HAVE_POLL - fd_set ps; - int nfds = 0; -#endif -#endif assert(psock != NULL); - again: - - /* check sockets in reverse order so that already-open sockets - * have priority. this allows checking for closed sockets even - * when new connections are being requested. it also allows to - * continue looping through the list of sockets after closing and - * deleting a socket, since deleting sockets doesn't affect the - * order of the array to the left of the index. */ - -#ifdef HAVE_POLL - for (i = 0; i < s->sockets_len; i++) { - s->sockets[i].events = POLLIN | POLLPRI; - s->sockets[i].revents = 0; - - if ((data = lo_server_buffer_copy_for_dispatch(s, i, size))) { - *psock = s->sockets[i].fd; - return data; - } - } - - poll(s->sockets, s->sockets_len, -1); - - for (i = s->sockets_len - 1; i >= 0 && !data; --i) { - if (s->sockets[i].revents & (POLLERR | POLLHUP)) { - if (i > 0) { - closesocket(s->sockets[i].fd); - lo_server_del_socket(s, i, s->sockets[i].fd); - continue; - } else - return NULL; - } - if (s->sockets[i].revents) { - sock = s->sockets[i].fd; - -#else +#ifndef HAVE_POLL #ifdef HAVE_SELECT #if defined(WIN32) || defined(_MSC_VER) if (!initWSock()) return NULL; #endif - - nfds = 0; - FD_ZERO(&ps); - for (i = (s->sockets_len - 1); i >= 0; --i) { - FD_SET(s->sockets[i].fd, &ps); -#ifndef WIN32 - if (s->sockets[i].fd > nfds) - nfds = s->sockets[i].fd; #endif - if ((data = lo_server_buffer_copy_for_dispatch(s, i, size))) { - *psock = s->sockets[i].fd; - return data; - } - } +#endif - if (select(nfds + 1, &ps, NULL, NULL, NULL) == SOCKET_ERROR) - return NULL; + /* check sockets in reverse order so that already-open sockets + * have priority. this allows checking for closed sockets even + * when new connections are being requested. it also allows to + * continue looping through the list of sockets after closing and + * deleting a socket, since deleting sockets doesn't affect the + * order of the array to the left of the index. */ for (i = 0; i < s->sockets_len && !data; i++) { - if (FD_ISSET(s->sockets[i].fd, &ps)) { + if (s->sockets[i].revents) { sock = s->sockets[i].fd; - -#endif -#endif - if (sock == -1) return NULL; - /* zeroeth socket is listening for new connections */ - if (sock == s->sockets[0].fd) { - sock = accept(sock, (struct sockaddr *) &addr, &addr_len); - - i = lo_server_add_socket(s, sock, 0, &addr, addr_len); - - /* after adding a new socket, call select()/poll() - * again, since we are supposed to block until a - * message is received. */ - goto again; - } - - if (i < 0) { - closesocket(sock); - return NULL; - } - /* Handle incoming socket data */ if (lo_server_recv_raw_stream_socket(s, i, size, &data) && !data) @@ -1389,17 +1317,32 @@ void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock) int lo_server_wait(lo_server s, int timeout) { - return lo_servers_wait(&s, 0, 1, timeout); + return lo_servers_wait_internal(&s, 0, 0, 1, timeout); } int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) { - int i, j, k, sched_timeout; + int *queued = alloca(sizeof(int) * num_servers); + int ret = lo_servers_wait_internal(s, status, queued, num_servers, timeout); + if (status) { + int i; + for (i = 0; i < num_servers; i++) + status[i] |= queued[i]; + } + return ret; +} + +static +int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_servers, int timeout) +{ + int i, j, k, res, sched_timeout; if (!status) status = alloca(sizeof(int) * num_servers); + if (!queued) + queued = alloca(sizeof(int) * num_servers); for (i = 0; i < num_servers; i++) - status[i] = 0; + status[i] = queued[i] = 0; lo_timetag now, then; #ifdef HAVE_SELECT @@ -1423,6 +1366,7 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) if (lo_server_buffer_contains_msg(s[j], i)) { status[j] = 1; ++k; + break; } ++num_sockets; } @@ -1439,7 +1383,7 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) for (i = 0; i < s[j]->sockets_len; i++) { sockets[k].fd = s[j]->sockets[i].fd; sockets[k].events = POLLIN | POLLPRI | POLLERR | POLLHUP; - sockets[k].revents = 0; + sockets[k].revents = s[j]->sockets[i].revents = 0; ++k; } int server_timeout = lo_server_next_event_delay(s[j]) * 1000; @@ -1449,14 +1393,16 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) lo_timetag_now(&then); - poll(sockets, num_sockets, timeout > sched_timeout ? sched_timeout : timeout); + res = poll(sockets, num_sockets, timeout > sched_timeout ? sched_timeout : timeout); - // If poll() was reporting a new connection on the listening - // socket rather than a ready message, accept it and check again. - for (j = 0, k = 0; j < num_servers; j++) { - if (sockets[k].revents) { - if (sockets[k].revents & (POLLIN | POLLPRI)) { - if (s[j]->protocol == LO_TCP) { + if (res == -1) + return -1; + else if (res) { + for (j = 0, k = 0; j < num_servers; j++) { + if (s[j]->protocol == LO_TCP) { + if (sockets[k].revents & (POLLIN | POLLPRI)) { + // If poll() was reporting a new connection on the listening + // socket rather than a ready message, accept it and check again. int sock = accept(sockets[k].fd, (struct sockaddr *) &addr[j], &addr_len); @@ -1474,41 +1420,54 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) goto again; } - else { - status[j] = 1; + sockets[k].revents = 0; + for (i = 1, ++k; i < s[j]->sockets_len; i++, k++) { + if (!sockets[k].revents) + continue; + if (sockets[k].revents & (POLLERR | POLLHUP)) { + closesocket(sockets[k].fd); + lo_server_del_socket(s[j], i, sockets[k].fd); + } + else { + s[j]->sockets[i].revents = POLLIN; + status[j] = 1; + } } } - } - k += s[j]->sockets_len; - } - - for (j = num_servers - 1, k = num_sockets - 1; j >= 0; j--, k--) { - for (i = s[j]->sockets_len - 1; i > 0; i--, k--) { - if (!sockets[k].revents) - continue; - if (sockets[k].revents & (POLLERR | POLLHUP)) { - closesocket(sockets[k].fd); - lo_server_del_socket(s[j], i, sockets[k].fd); + else { + if (sockets[k].revents) { + s[j]->sockets[0].revents = POLLIN; + status[j] = 1; + } + ++k; } - else - status[j] = 1; } } - - for (j = 0; j < num_servers; j++) { - if (lo_server_next_event_delay(s[j]) < 0.01) - status[j] = 1; - } #else #ifdef HAVE_SELECT - int res, to, nfds = 0; + int to, num_fds; #if defined(WIN32) || defined(_MSC_VER) - if (!initWSock()) - return 0; + if (!initWSock()) { + return -1; + } #endif again: + num_fds = 0; + for (j = 0, k = 0; j < num_servers; j++) { + for (i = 0; i < s[j]->sockets_len; i++) { + if (lo_server_buffer_contains_msg(s[j], i)) { + status[j] = 1; + ++k; + break; + } + } + } + + // Return immediately if one or more servers already have messages waiting. + if (k > 0) + return k; sched_timeout = timeout; for (j = 0; j < num_servers; j++) { @@ -1524,22 +1483,21 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) FD_ZERO(&ps); for (j = 0; j < num_servers; j++) { for (i = 0; i < s[j]->sockets_len; i++) { + s[j]->sockets[i].revents = 0; FD_SET(s[j]->sockets[i].fd, &ps); #ifndef WIN32 - if (s[j]->sockets[i].fd > nfds) - nfds = s[j]->sockets[i].fd; + if (s[j]->sockets[i].fd > num_fds) + num_fds = s[j]->sockets[i].fd; #endif - if (lo_server_buffer_contains_msg(s[j], i)) { - status[j] = 1; - } } } lo_timetag_now(&then); - res = select(nfds + 1, &ps, NULL, NULL, &stimeout); + + res = select(num_fds + 1, &ps, NULL, NULL, &stimeout); if (res == SOCKET_ERROR) - return 0; + return -1; else if (res) { for (j = 0; j < num_servers; j++) { if (FD_ISSET(s[j]->sockets[0].fd, &ps)) { @@ -1582,26 +1540,29 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) goto again; } else { + s[j]->sockets[0].revents = POLLIN; status[j] = 1; } } for (i = 1; i < s[j]->sockets_len; i++) { - if (FD_ISSET(s[j]->sockets[i].fd, &ps)) + if (FD_ISSET(s[j]->sockets[i].fd, &ps)) { + s[j]->sockets[i].revents = POLLIN; status[j] = 1; + } } } } +#endif +#endif for (j = 0; j < num_servers; j++) { if (lo_server_next_event_delay(s[j]) < 0.01) { - status[j] = 1; + queued[j] = 1; } } -#endif -#endif for (i = 0, j = 0; i < num_servers; i++) - j += status[i]; + j += (status[i] | queued[i]); return j; } @@ -1610,13 +1571,34 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers, int timeout) { int i, total_bytes = 0; - if (!lo_servers_wait(s, recvd, num_servers, timeout)) { - return 0; - } - for (i = 0; i < num_servers; i++) { - if (recvd[i]) { - recvd[i] = lo_server_recv(s[i]); - total_bytes += recvd[i]; + int *queued = alloca(sizeof(int) * num_servers); + + lo_timetag then; + lo_timetag_now(&then); + +again: + if (lo_servers_wait_internal(s, recvd, queued, num_servers, timeout) > 0) { + for (i = 0; i < num_servers; i++) { + // a new message might be queued for future dispatch, in which case + // any queued messages that are ready should take precedence + if (recvd[i] && (recvd[i] = lo_server_recv_internal(s[i])) > 0) { + // new message was received and dispatched + total_bytes += recvd[i]; + } + else if (queued[i]) { + // queued message is ready for dispatch + recvd[i] = dispatch_queued(s[i], 0); + total_bytes += recvd[i]; + } + else { + // if the received message was queued we need to keep waiting + lo_timetag now; + lo_timetag_now(&now); + double diff = lo_timetag_diff(now, then); + timeout -= (int)(diff*1000); + if (timeout > 0.01) + goto again; + } } } return total_bytes; @@ -1628,121 +1610,65 @@ int lo_server_recv_noblock(lo_server s, int timeout) return lo_servers_recv_noblock(&s, &status, 1, timeout); } +// we can make this more efficient: +// - use semaphore to ensure server only receiving on one thread +// - use one preallocated buffer per server for receiving +// - don't copy again unless queueing + int lo_server_recv(lo_server s) +{ + int ret, recvd, queued; + while ((ret = lo_servers_wait_internal(&s, &recvd, &queued, 1, 100)) == 0) {} + if (ret > 0) { + // new messages might be queued for future dispatch, in which case any queued msgs that are ready should take precedence + if (recvd && (ret = lo_server_recv_internal(s))) { + // new message was received and dispatched + return ret; + } + else if (queued) { + // queued message is ready for dispatch + return dispatch_queued(s, 0); + } + } + return 0; +} + +// This function only handles incoming data, not previously queued messages +static +int lo_server_recv_internal(lo_server s) { void *data; size_t size; - double sched_time = lo_server_next_event_delay(s); int sock = -1; int i; -#ifdef HAVE_SELECT -#ifndef HAVE_POLL - fd_set ps; - struct timeval stimeout; - int res, nfds = 0; -#endif -#endif - - again: - if (sched_time > 0.01) { - if (sched_time > 10.0) { - sched_time = 10.0; - } -#ifdef HAVE_POLL - for (i = 0; i < s->sockets_len; i++) { - s->sockets[i].events = POLLIN | POLLPRI | POLLERR | POLLHUP; - s->sockets[i].revents = 0; - - if (s->protocol == LO_TCP - && (data = lo_server_buffer_copy_for_dispatch(s, i, &size))) - { - sock = s->sockets[i].fd; - goto got_data; - } - } - - poll(s->sockets, s->sockets_len, (int) (sched_time * 1000.0)); - - for (i = 0; i < s->sockets_len; i++) { - if (!s->sockets[i].revents) - continue; - if (s->sockets[i].revents & (POLLERR | POLLHUP)) { - if (i > 0) { - closesocket(s->sockets[i].fd); - lo_server_del_socket(s, i, s->sockets[i].fd); - continue; - } else - return 0; - } - break; - } - - if (i >= s->sockets_len) { - sched_time = lo_server_next_event_delay(s); - - if (sched_time > 0.01) - goto again; - - return dispatch_queued(s, 0); - } -#else -#ifdef HAVE_SELECT -#if defined(WIN32) || defined(_MSC_VER) - if (!initWSock()) - return 0; -#endif - FD_ZERO(&ps); + if (s->protocol == LO_TCP) { for (i = 0; i < s->sockets_len; i++) { - FD_SET(s->sockets[i].fd, &ps); -#ifndef WIN32 - if (s->sockets[i].fd > nfds) - nfds = s->sockets[i].fd; -#endif - if (s->protocol == LO_TCP - && (data = lo_server_buffer_copy_for_dispatch(s, i, &size))) - { + // first check if there are additional messages in the buffer + if ((data = lo_server_buffer_copy_for_dispatch(s, i, &size))) { sock = s->sockets[i].fd; goto got_data; } + if (s->sockets[i].revents) + break; } - - stimeout.tv_sec = sched_time; - stimeout.tv_usec = (sched_time - stimeout.tv_sec) * 1.e6; - res = select(nfds + 1, &ps, NULL, NULL, &stimeout); - if (res == SOCKET_ERROR) { - return 0; - } - - if (!res) { - sched_time = lo_server_next_event_delay(s); - - if (sched_time > 0.01) - goto again; - - return dispatch_queued(s, 0); - } -#endif -#endif - } else { - return dispatch_queued(s, 0); - } - if (s->protocol == LO_TCP) { data = lo_server_recv_raw_stream(s, &size, &sock); - } else { + } + else { data = lo_server_recv_raw(s, &size); } if (!data) { return 0; } + got_data: - if (dispatch_data(s, data, size, sock) < 0) { - free(data); + i = dispatch_data(s, data, size, sock); + free(data); + if (i < 0) { return -1; } - free(data); - return (int) size; + return i ? (int) size : 0; } int lo_server_add_socket(lo_server s, int socket, lo_address a, @@ -1829,8 +1755,8 @@ void lo_server_del_socket(lo_server s, int index, int socket) s->sockets_len--; } -static int dispatch_data(lo_server s, void *data, - size_t size, int sock) +static +int dispatch_data(lo_server s, void *data, size_t size, int sock) { int result = 0; char *path = (char*) data; @@ -1861,6 +1787,7 @@ static int dispatch_data(lo_server s, void *data, pos += 4; remain -= 8; + // TODO: surely the bundle start/stop handlers shouldn't be called here if the contents will be queued?! if (s->bundle_start_handler) s->bundle_start_handler(ts, s->bundle_handler_user_data); @@ -1887,15 +1814,17 @@ static int dispatch_data(lo_server s, void *data, lo_message_incref(msg); // test for immediate dispatch - if ((ts.sec == LO_TT_IMMEDIATE.sec - && ts.frac == LO_TT_IMMEDIATE.frac) - || lo_timetag_diff(ts, now) <= 0.0 - || (s->flags & LO_SERVER_ENQUEUE) == 0) + if ((s->flags & LO_SERVER_ENQUEUE) == 0 + || ( ts.sec == LO_TT_IMMEDIATE.sec + && ts.frac == LO_TT_IMMEDIATE.frac) + || lo_timetag_diff(ts, now) <= 0.0) { dispatch_method(s, pos, msg, sock); lo_message_free(msg); } else { queue_data(s, ts, pos, msg, sock); + // set size to 0 for return + size = 0; } } From a36277151589ee9eece5513f36850adbb6da2ba5 Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Tue, 5 Nov 2024 22:11:03 -0400 Subject: [PATCH 2/5] Add a configure option for disabling poll since select is much faster for certain contexts. --- cmake/CMakeLists.txt | 5 ++++- configure.ac | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/cmake/CMakeLists.txt b/cmake/CMakeLists.txt index f8399e0..bf94018 100644 --- a/cmake/CMakeLists.txt +++ b/cmake/CMakeLists.txt @@ -6,6 +6,7 @@ option(WITH_EXAMPLES "Enable building examples." ON) option(WITH_CPP_TESTS "Enable building C++ wrapper tests." ON) option(WITH_STATIC "Enable building static library." OFF) option(THREADING "Build with threading support." ON) +option(WITH_POLL "Build with poll support." ON) if (WITH_STATIC) message(STATUS "If you are using the static library build, please keep in mind (and inform yourself of the implications) that liblo is licensed with LGPL v2.1+.") @@ -141,7 +142,9 @@ set(EXAMPLE_SERVER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../examples/example_serve set(EXAMPLE_TCP_ECHO_SERVER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../examples/example_tcp_echo_server.c) set(NONBLOCKING_SERVER_EXAMPLE_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../examples/nonblocking_server_example.c) -check_symbol_exists(poll poll.h HAVE_POLL) +if (WITH_POLL) + check_symbol_exists(poll poll.h HAVE_POLL) +endif() check_symbol_exists(select sys/select.h HAVE_SELECT) if(NOT HAVE_POLL AND NOT HAVE_SELECT) if(CMAKE_SYSTEM_NAME MATCHES "Windows") diff --git a/configure.ac b/configure.ac index ab7e70b..d6d82c7 100644 --- a/configure.ac +++ b/configure.ac @@ -210,7 +210,6 @@ AC_CHECK_FUNC([select], [AC_DEFINE(HAVE_SELECT, [1], [Define to 1 if select() is AC_DEFINE(HAVE_SELECT, [1], [Define to 1 if select() is available.]) is_windows=yes],[AC_MSG_RESULT(no)]) ]) -AC_CHECK_FUNC([poll], [AC_DEFINE(HAVE_POLL, [1], [Define to 1 if poll() is available.])]) AC_CHECK_FUNC([setvbuf], [AC_DEFINE(HAVE_SETVBUF, [1], [Define to 1 if setvbuf() is available.])]) AM_CONDITIONAL(WINDOWS, test x$is_windows = xyes) @@ -260,6 +259,17 @@ AM_CONDITIONAL([COMPILE_TOOLS],[test x$enable_tools != xno]) AM_CONDITIONAL([COMPILE_EXAMPLES],[test x$enable_examples != xno]) AM_CONDITIONAL([ENABLE_NETWORK_TESTS],[test x$enable_network_tests != xno]) +# Check if poll is wanted +AC_ARG_ENABLE(poll, + [ --disable-poll Disable compiling with poll support], + [want_poll=$enableval], + [want_poll=yes]) + +# Check for whether poll is wanted, and if so, did we find it. +if test "x$want_poll" = "xyes"; then + AC_CHECK_FUNC([poll], [AC_DEFINE(HAVE_POLL, [1], [Define to 1 if poll() is available.])]) +fi + if ! test x$enable_network_tests = xno; then AC_DEFINE(ENABLE_NETWORK_TESTS, [1], [Define this to enable network tests.]) From 86ebf8e2fb8e7c8c2785c3a19dfb1173dd57717c Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Mon, 11 Nov 2024 20:33:58 -0400 Subject: [PATCH 3/5] Fix cmake WITH_POLL option; added documentation on disabling poll to README. --- README.md | 10 ++++++++++ cmake/CMakeLists.txt | 3 +++ 2 files changed, 13 insertions(+) diff --git a/README.md b/README.md index 040a76d..22a98d2 100644 --- a/README.md +++ b/README.md @@ -88,3 +88,13 @@ sockets. IPv6 is currently disabled by default, but you can enable it using ./configure --enable-ipv6 + +## Poll() vs Select() + +Some platforms may have both `poll()` and `select()` available for listening efficiently on multiple servers/sockets. In this case, liblo will default to using `poll` since it is comsidered to be more scalable. However, on some platforms (e.g. MacOS) the liblo code path using `select()` is considerably faster so you may wish to explictly disable support for `poll` if your applications do not require extreme scalability and are sensitive to small differences in efficiency. This can be done when compiling the library from source, either using configure: + + ./configure --disable-poll + +or if using cmake: + + cmake -DWITH_POLL=OFF \ No newline at end of file diff --git a/cmake/CMakeLists.txt b/cmake/CMakeLists.txt index bf94018..168b1f2 100644 --- a/cmake/CMakeLists.txt +++ b/cmake/CMakeLists.txt @@ -144,6 +144,9 @@ set(NONBLOCKING_SERVER_EXAMPLE_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../examples/n if (WITH_POLL) check_symbol_exists(poll poll.h HAVE_POLL) +else() + message(STATUS "Excluding poll support.") + set(HAVE_POLL OFF) endif() check_symbol_exists(select sys/select.h HAVE_SELECT) if(NOT HAVE_POLL AND NOT HAVE_SELECT) From f3f1e710dd1f4162e2d854efe2681398ccc25f08 Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Tue, 19 Nov 2024 14:17:08 -0400 Subject: [PATCH 4/5] Fix waiting logic in lo_servers_recv_noblock. --- src/server.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/server.c b/src/server.c index 8b69296..467d188 100644 --- a/src/server.c +++ b/src/server.c @@ -1590,14 +1590,15 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers, recvd[i] = dispatch_queued(s[i], 0); total_bytes += recvd[i]; } - else { - // if the received message was queued we need to keep waiting - lo_timetag now; - lo_timetag_now(&now); - double diff = lo_timetag_diff(now, then); - timeout -= (int)(diff*1000); - if (timeout > 0.01) - goto again; + } + if (!total_bytes) { + // we need to keep waiting if no received or queued messages are ready for dispatch + lo_timetag now; + lo_timetag_now(&now); + double diff = lo_timetag_diff(now, then); + timeout -= (int)(diff*1000); + if (timeout > 2) { + goto again; } } } From 68b49adcb741ba64f4d924fc0b417564ab5e5fbc Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Tue, 19 Nov 2024 14:18:48 -0400 Subject: [PATCH 5/5] Check sockets in reverse order; refactoring. --- src/server.c | 73 +++++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/src/server.c b/src/server.c index 467d188..ec4b219 100644 --- a/src/server.c +++ b/src/server.c @@ -90,7 +90,7 @@ static int reuseport_supported = 1; static int lo_can_coerce_spec(const char *a, const char *b); static int lo_can_coerce(char a, char b); -static int lo_servers_wait_internal(lo_server *s, int *status, +static int lo_servers_wait_internal(lo_server *s, int *recvd, int *queued, int num_servers, int timeout); static int lo_server_recv_internal(lo_server s); @@ -1293,7 +1293,7 @@ void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock) * deleting a socket, since deleting sockets doesn't affect the * order of the array to the left of the index. */ - for (i = 0; i < s->sockets_len && !data; i++) { + for (i = s->sockets_len - 1; i >= 0 && !data; i--) { if (s->sockets[i].revents) { sock = s->sockets[i].fd; if (sock == -1) @@ -1317,13 +1317,15 @@ void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock) int lo_server_wait(lo_server s, int timeout) { - return lo_servers_wait_internal(&s, 0, 0, 1, timeout); + int recvd, queued; + return lo_servers_wait_internal(&s, &recvd, &queued, 1, timeout); } int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) { + int *recvd = status ? status : alloca(sizeof(int) * num_servers); int *queued = alloca(sizeof(int) * num_servers); - int ret = lo_servers_wait_internal(s, status, queued, num_servers, timeout); + int ret = lo_servers_wait_internal(s, recvd, queued, num_servers, timeout); if (status) { int i; for (i = 0; i < num_servers; i++) @@ -1333,16 +1335,13 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout) } static -int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_servers, int timeout) +int lo_servers_wait_internal(lo_server *s, int *recvd, int *queued, int num_servers, int timeout) { int i, j, k, res, sched_timeout; + assert(recvd && queued); - if (!status) - status = alloca(sizeof(int) * num_servers); - if (!queued) - queued = alloca(sizeof(int) * num_servers); for (i = 0; i < num_servers; i++) - status[i] = queued[i] = 0; + recvd[i] = queued[i] = 0; lo_timetag now, then; #ifdef HAVE_SELECT @@ -1354,6 +1353,8 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser #endif #endif + lo_timetag_now(&then); + #ifdef HAVE_POLL socklen_t addr_len = sizeof(struct sockaddr_storage); struct sockaddr_storage *addr = alloca (addr_len * num_servers); @@ -1364,7 +1365,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser for (j = 0, k = 0; j < num_servers; j++) { for (i = 0; i < s[j]->sockets_len; i++) { if (lo_server_buffer_contains_msg(s[j], i)) { - status[j] = 1; + recvd[j] = 1; ++k; break; } @@ -1373,8 +1374,9 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser } // Return immediately if one or more servers already have messages waiting. - if (k > 0) - return k; + if (k > 0) { + return k; + } struct pollfd *sockets = alloca(sizeof(struct pollfd) * num_sockets); @@ -1391,9 +1393,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser sched_timeout = server_timeout; } - lo_timetag_now(&then); - - res = poll(sockets, num_sockets, timeout > sched_timeout ? sched_timeout : timeout); + res = poll(sockets, num_sockets, sched_timeout); if (res == -1) return -1; @@ -1421,23 +1421,32 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser goto again; } sockets[k].revents = 0; - for (i = 1, ++k; i < s[j]->sockets_len; i++, k++) { + + /* check sockets in reverse order so that already-open sockets + * have priority. this allows checking for closed sockets even + * when new connections are being requested. it also allows to + * continue looping through the list of sockets after closing and + * deleting a socket, since deleting sockets doesn't affect the + * order of the array to the left of the index. */ + + for (i = s[j]->sockets_len - 1, k += i; i > 0; i--, k--) { if (!sockets[k].revents) continue; if (sockets[k].revents & (POLLERR | POLLHUP)) { closesocket(sockets[k].fd); lo_server_del_socket(s[j], i, sockets[k].fd); + s[j]->sockets[i].revents = 0; } else { s[j]->sockets[i].revents = POLLIN; - status[j] = 1; + recvd[j] = 1; } } } else { if (sockets[k].revents) { s[j]->sockets[0].revents = POLLIN; - status[j] = 1; + recvd[j] = 1; } ++k; } @@ -1458,7 +1467,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser for (j = 0, k = 0; j < num_servers; j++) { for (i = 0; i < s[j]->sockets_len; i++) { if (lo_server_buffer_contains_msg(s[j], i)) { - status[j] = 1; + recvd[j] = 1; ++k; break; } @@ -1467,7 +1476,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser // Return immediately if one or more servers already have messages waiting. if (k > 0) - return k; + return k; sched_timeout = timeout; for (j = 0; j < num_servers; j++) { @@ -1492,8 +1501,6 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser } } - lo_timetag_now(&then); - res = select(num_fds + 1, &ps, NULL, NULL, &stimeout); if (res == SOCKET_ERROR) @@ -1541,13 +1548,13 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser } else { s[j]->sockets[0].revents = POLLIN; - status[j] = 1; + recvd[j] = 1; } } for (i = 1; i < s[j]->sockets_len; i++) { if (FD_ISSET(s[j]->sockets[i].fd, &ps)) { s[j]->sockets[i].revents = POLLIN; - status[j] = 1; + recvd[j] = 1; } } } @@ -1562,7 +1569,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser } for (i = 0, j = 0; i < num_servers; i++) - j += (status[i] | queued[i]); + j += (recvd[i] | queued[i]); return j; } @@ -1572,6 +1579,8 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers, { int i, total_bytes = 0; int *queued = alloca(sizeof(int) * num_servers); + if (!recvd) + recvd = alloca(sizeof(int) * num_servers); lo_timetag then; lo_timetag_now(&then); @@ -1607,15 +1616,9 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers, int lo_server_recv_noblock(lo_server s, int timeout) { - int status; - return lo_servers_recv_noblock(&s, &status, 1, timeout); + return lo_servers_recv_noblock(&s, NULL, 1, timeout); } -// we can make this more efficient: -// - use semaphore to ensure server only receiving on one thread -// - use one preallocated buffer per server for receiving -// - don't copy again unless queueing - int lo_server_recv(lo_server s) { int ret, recvd, queued; @@ -1641,10 +1644,10 @@ int lo_server_recv_internal(lo_server s) void *data; size_t size; int sock = -1; - int i; + int i = 0; if (s->protocol == LO_TCP) { - for (i = 0; i < s->sockets_len; i++) { + for (i = s->sockets_len - 1; i >= 0; i--) { // first check if there are additional messages in the buffer if ((data = lo_server_buffer_copy_for_dispatch(s, i, &size))) { sock = s->sockets[i].fd;