Skip to content

Commit

Permalink
Prevent multiple threads from polling a network simultaneously.
Browse files Browse the repository at this point in the history
  • Loading branch information
malloch committed Sep 3, 2024
1 parent b60599b commit 95337ee
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
18 changes: 18 additions & 0 deletions src/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,20 @@ void mpr_graph_free_cbs(mpr_graph g)
}
}

static void mpr_graph_gc(mpr_graph g)
{
mpr_list list;

/* check if any signals need to be removed */
list = mpr_list_from_data(g->sigs);
while (list) {
mpr_obj sig = *list;
list = mpr_list_get_next(list);
if (sig->is_local && sig->status & MPR_STATUS_REMOVED)
mpr_graph_remove_sig(g, (mpr_sig)sig, MPR_STATUS_REMOVED);
}
}

void mpr_graph_free(mpr_graph g)
{
mpr_list list;
Expand All @@ -297,6 +311,8 @@ void mpr_graph_free(mpr_graph g)
/* remove callbacks now so they won't be called when removing devices */
mpr_graph_free_cbs(g);

mpr_graph_gc(g);

/* unsubscribe from and remove any autorenewing subscriptions */
while (g->subscriptions)
mpr_graph_subscribe(g, g->subscriptions->dev, 0, 0);
Expand Down Expand Up @@ -887,6 +903,8 @@ void mpr_graph_housekeeping(mpr_graph g)
mpr_time t;
mpr_time_set(&t, MPR_NOW);

mpr_graph_gc(g);

/* check if any known devices have expired */
t.sec -= TIMEOUT_SEC;
while (list) {
Expand Down
26 changes: 22 additions & 4 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ typedef struct _mpr_net {
uint8_t generic_dev_methods_added;
uint8_t registered;
uint8_t force_next_ping;
uint8_t polling;
} mpr_net_t;

static int is_alphabetical(int num, lo_arg **names)
Expand Down Expand Up @@ -961,10 +962,17 @@ static void mpr_net_housekeeping(mpr_net net, int force_ping)
return;
}

int mpr_net_poll(mpr_net net, int block_ms)
int mpr_net_poll_internal(mpr_net net, int block_ms)
{
int i, j, count = 0, left_ms, elapsed_ms, admin_elapsed_ms = 0;
double then = mpr_get_current_time();
double then;

if (++net->polling > 1) {
trace("Network polling already in process.\n");
return 0;
}

then = mpr_get_current_time();

mpr_net_housekeeping(net, 0);

Expand Down Expand Up @@ -1015,15 +1023,25 @@ int mpr_net_poll(mpr_net net, int block_ms)
for (i = 0; i < net->num_devs; i++) {
mpr_dev_update_subscribers(net->devs[i]);
}
net->polling = 0;
return count;
}

int mpr_net_poll(mpr_net net, int block_ms)
{
if (net->thread_data || net->polling) {
trace("Network polling already in process.\n");
return 0;
}
return mpr_net_poll_internal(net, block_ms);
}

#ifdef HAVE_LIBPTHREAD
static void *net_thread_func(void *data)
{
mpr_thread_data td = (mpr_thread_data)data;
while (td->is_active) {
mpr_net_poll((mpr_net)td->object, td->block_ms);
mpr_net_poll_internal((mpr_net)td->object, td->block_ms);
}
td->is_done = 1;
pthread_exit(NULL);
Expand All @@ -1036,7 +1054,7 @@ static unsigned __stdcall net_thread_func(void *data)
{
mpr_thread_data td = (mpr_thread_data)data;
while (td->is_active) {
mpr_net_poll((mpr_net)td->object, td->block_ms);
mpr_net_poll_internal((mpr_net)td->object, td->block_ms);
}
td->is_done = 1;
_endthread();
Expand Down

0 comments on commit 95337ee

Please sign in to comment.