Skip to content

Commit

Permalink
Check sockets in reverse order; refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
malloch committed Nov 19, 2024
1 parent f3f1e71 commit 68b49ad
Showing 1 changed file with 38 additions and 35 deletions.
73 changes: 38 additions & 35 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static int reuseport_supported = 1;

static int lo_can_coerce_spec(const char *a, const char *b);
static int lo_can_coerce(char a, char b);
static int lo_servers_wait_internal(lo_server *s, int *status,
static int lo_servers_wait_internal(lo_server *s, int *recvd,
int *queued, int num_servers,
int timeout);
static int lo_server_recv_internal(lo_server s);
Expand Down Expand Up @@ -1293,7 +1293,7 @@ void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock)
* deleting a socket, since deleting sockets doesn't affect the
* order of the array to the left of the index. */

for (i = 0; i < s->sockets_len && !data; i++) {
for (i = s->sockets_len - 1; i >= 0 && !data; i--) {
if (s->sockets[i].revents) {
sock = s->sockets[i].fd;
if (sock == -1)
Expand All @@ -1317,13 +1317,15 @@ void *lo_server_recv_raw_stream(lo_server s, size_t * size, int *psock)

int lo_server_wait(lo_server s, int timeout)
{
return lo_servers_wait_internal(&s, 0, 0, 1, timeout);
int recvd, queued;
return lo_servers_wait_internal(&s, &recvd, &queued, 1, timeout);
}

int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout)
{
int *recvd = status ? status : alloca(sizeof(int) * num_servers);
int *queued = alloca(sizeof(int) * num_servers);
int ret = lo_servers_wait_internal(s, status, queued, num_servers, timeout);
int ret = lo_servers_wait_internal(s, recvd, queued, num_servers, timeout);
if (status) {
int i;
for (i = 0; i < num_servers; i++)
Expand All @@ -1333,16 +1335,13 @@ int lo_servers_wait(lo_server *s, int *status, int num_servers, int timeout)
}

static
int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_servers, int timeout)
int lo_servers_wait_internal(lo_server *s, int *recvd, int *queued, int num_servers, int timeout)
{
int i, j, k, res, sched_timeout;
assert(recvd && queued);

if (!status)
status = alloca(sizeof(int) * num_servers);
if (!queued)
queued = alloca(sizeof(int) * num_servers);
for (i = 0; i < num_servers; i++)
status[i] = queued[i] = 0;
recvd[i] = queued[i] = 0;

lo_timetag now, then;
#ifdef HAVE_SELECT
Expand All @@ -1354,6 +1353,8 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
#endif
#endif

lo_timetag_now(&then);

#ifdef HAVE_POLL
socklen_t addr_len = sizeof(struct sockaddr_storage);
struct sockaddr_storage *addr = alloca (addr_len * num_servers);
Expand All @@ -1364,7 +1365,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
for (j = 0, k = 0; j < num_servers; j++) {
for (i = 0; i < s[j]->sockets_len; i++) {
if (lo_server_buffer_contains_msg(s[j], i)) {
status[j] = 1;
recvd[j] = 1;
++k;
break;
}
Expand All @@ -1373,8 +1374,9 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
}

// Return immediately if one or more servers already have messages waiting.
if (k > 0)
return k;
if (k > 0) {
return k;
}

struct pollfd *sockets = alloca(sizeof(struct pollfd) * num_sockets);

Expand All @@ -1391,9 +1393,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
sched_timeout = server_timeout;
}

lo_timetag_now(&then);

res = poll(sockets, num_sockets, timeout > sched_timeout ? sched_timeout : timeout);
res = poll(sockets, num_sockets, sched_timeout);

if (res == -1)
return -1;
Expand Down Expand Up @@ -1421,23 +1421,32 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
goto again;
}
sockets[k].revents = 0;
for (i = 1, ++k; i < s[j]->sockets_len; i++, k++) {

/* check sockets in reverse order so that already-open sockets
* have priority. this allows checking for closed sockets even
* when new connections are being requested. it also allows to
* continue looping through the list of sockets after closing and
* deleting a socket, since deleting sockets doesn't affect the
* order of the array to the left of the index. */

for (i = s[j]->sockets_len - 1, k += i; i > 0; i--, k--) {
if (!sockets[k].revents)
continue;
if (sockets[k].revents & (POLLERR | POLLHUP)) {
closesocket(sockets[k].fd);
lo_server_del_socket(s[j], i, sockets[k].fd);
s[j]->sockets[i].revents = 0;
}
else {
s[j]->sockets[i].revents = POLLIN;
status[j] = 1;
recvd[j] = 1;
}
}
}
else {
if (sockets[k].revents) {
s[j]->sockets[0].revents = POLLIN;
status[j] = 1;
recvd[j] = 1;
}
++k;
}
Expand All @@ -1458,7 +1467,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
for (j = 0, k = 0; j < num_servers; j++) {
for (i = 0; i < s[j]->sockets_len; i++) {
if (lo_server_buffer_contains_msg(s[j], i)) {
status[j] = 1;
recvd[j] = 1;
++k;
break;
}
Expand All @@ -1467,7 +1476,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser

// Return immediately if one or more servers already have messages waiting.
if (k > 0)
return k;
return k;

sched_timeout = timeout;
for (j = 0; j < num_servers; j++) {
Expand All @@ -1492,8 +1501,6 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
}
}

lo_timetag_now(&then);

res = select(num_fds + 1, &ps, NULL, NULL, &stimeout);

if (res == SOCKET_ERROR)
Expand Down Expand Up @@ -1541,13 +1548,13 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
}
else {
s[j]->sockets[0].revents = POLLIN;
status[j] = 1;
recvd[j] = 1;
}
}
for (i = 1; i < s[j]->sockets_len; i++) {
if (FD_ISSET(s[j]->sockets[i].fd, &ps)) {
s[j]->sockets[i].revents = POLLIN;
status[j] = 1;
recvd[j] = 1;
}
}
}
Expand All @@ -1562,7 +1569,7 @@ int lo_servers_wait_internal(lo_server *s, int *status, int *queued, int num_ser
}

for (i = 0, j = 0; i < num_servers; i++)
j += (status[i] | queued[i]);
j += (recvd[i] | queued[i]);

return j;
}
Expand All @@ -1572,6 +1579,8 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers,
{
int i, total_bytes = 0;
int *queued = alloca(sizeof(int) * num_servers);
if (!recvd)
recvd = alloca(sizeof(int) * num_servers);

lo_timetag then;
lo_timetag_now(&then);
Expand Down Expand Up @@ -1607,15 +1616,9 @@ int lo_servers_recv_noblock(lo_server *s, int *recvd, int num_servers,

int lo_server_recv_noblock(lo_server s, int timeout)
{
int status;
return lo_servers_recv_noblock(&s, &status, 1, timeout);
return lo_servers_recv_noblock(&s, NULL, 1, timeout);
}

// we can make this more efficient:
// - use semaphore to ensure server only receiving on one thread
// - use one preallocated buffer per server for receiving
// - don't copy again unless queueing

int lo_server_recv(lo_server s)
{
int ret, recvd, queued;
Expand All @@ -1641,10 +1644,10 @@ int lo_server_recv_internal(lo_server s)
void *data;
size_t size;
int sock = -1;
int i;
int i = 0;

if (s->protocol == LO_TCP) {
for (i = 0; i < s->sockets_len; i++) {
for (i = s->sockets_len - 1; i >= 0; i--) {
// first check if there are additional messages in the buffer
if ((data = lo_server_buffer_copy_for_dispatch(s, i, &size))) {
sock = s->sockets[i].fd;
Expand Down

0 comments on commit 68b49ad

Please sign in to comment.