Skip to content

Commit

Permalink
Use single-threaded network-poll/graph housekeeping functions to hand…
Browse files Browse the repository at this point in the history
…le removal of signals and devices.
  • Loading branch information
malloch committed Sep 5, 2024
1 parent 95337ee commit 7a426fd
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 73 deletions.
25 changes: 15 additions & 10 deletions src/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void mpr_dev_init(mpr_dev dev, int is_local, const char *name, mpr_id id)
mpr_list qry;

dev->obj.is_local = is_local;
dev->obj.status = MPR_STATUS_STAGED;
dev->obj.status = 0;
if (name) {
assert(!dev->name);
dev->name = strdup(name);
Expand Down Expand Up @@ -190,7 +190,6 @@ mpr_dev mpr_dev_new(const char *name_prefix, mpr_graph graph)

dev->ordinal_allocator.val = 1;
dev->ordinal_allocator.count_time = mpr_get_current_time();
mpr_net_add_dev(mpr_graph_get_net(g), dev);
dev->id_maps.active = (mpr_id_map*) malloc(sizeof(mpr_id_map));
dev->id_maps.active[0] = 0;
dev->num_sig_groups = 1;
Expand All @@ -215,16 +214,17 @@ void mpr_dev_free(mpr_dev dev)
own_graph = ldev->own_graph;
net = mpr_graph_get_net(graph);

/* free any queued graph messages without sending */
mpr_net_free_msgs(net);

/* remove OSC handlers associated with this device */
mpr_net_remove_dev(net, ldev);

/* remove local graph handlers here so they are not called when child objects are freed */
/* CHANGE: if graph is not owned then its callbacks _should_ be called when device is removed. */
if (own_graph)
if (own_graph) {
/* free any queued graph messages without sending */
mpr_net_free_msgs(net);

mpr_graph_free_cbs(graph);
}

/* remove OSC handlers associated with this device */
mpr_net_remove_dev(net, ldev);

/* remove subscribers */
while (ldev->subscribers) {
Expand Down Expand Up @@ -279,7 +279,7 @@ void mpr_dev_free(mpr_dev dev)
free(id_map);
}

mpr_graph_remove_dev(graph, dev, MPR_STATUS_REMOVED);
dev->obj.status |= MPR_STATUS_REMOVED;
if (own_graph)
mpr_graph_free(graph);
}
Expand All @@ -293,6 +293,7 @@ void mpr_dev_free_mem(mpr_dev dev)
static void on_registered(mpr_local_dev dev)
{
char *name;
mpr_net net = mpr_graph_get_net(dev->obj.graph);
mpr_list qry;

/* Add unique device id to locally-activated signal instances. */
Expand All @@ -301,6 +302,7 @@ static void on_registered(mpr_local_dev dev)
mpr_local_sig sig = (mpr_local_sig)*sigs;
sigs = mpr_list_get_next(sigs);
mpr_local_sig_set_dev_id(sig, dev->obj.id);
mpr_local_sig_add_to_net(sig, net);
}
qry = mpr_graph_new_query(dev->obj.graph, 0, MPR_SIG, (void*)cmp_qry_sigs,
"hi", dev->obj.id, MPR_DIR_ANY);
Expand Down Expand Up @@ -336,6 +338,9 @@ void mpr_local_dev_add_sig(mpr_local_dev dev, mpr_local_sig sig, mpr_dir dir)
else
++dev->num_outputs;

if (dev->registered)
mpr_local_sig_add_to_net(sig, mpr_graph_get_net(dev->obj.graph));

mpr_obj_increment_version((mpr_obj)dev);
dev->obj.status |= MPR_DEV_SIG_CHANGED;
}
Expand Down
46 changes: 27 additions & 19 deletions src/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,16 @@ void mpr_graph_free(mpr_graph g)
list = mpr_list_from_data(g->devs);
while (list) {
mpr_obj dev = *list;
int no_local_dev_maps = 1;
mpr_list sigs;
list = mpr_list_get_next(list);
if (dev->is_local)
continue;

sigs = mpr_dev_get_sigs((mpr_dev)dev, MPR_DIR_ANY);
while (sigs) {
int no_local_sig_maps = 1;
mpr_obj sig = *sigs;
mpr_list maps = mpr_sig_get_maps((mpr_sig)sig, MPR_DIR_ANY);
while (maps) {
mpr_obj map = *maps;
if (map->is_local) {
no_local_dev_maps = no_local_sig_maps = 0;
mpr_list_free(maps);
break;
}
maps = mpr_list_get_next(maps);
}
sigs = mpr_list_get_next(sigs);
if (no_local_sig_maps)
mpr_graph_remove_sig(g, (mpr_sig)sig, MPR_STATUS_REMOVED);
}
if (no_local_dev_maps)
mpr_graph_remove_dev(g, (mpr_dev)dev, MPR_STATUS_REMOVED);
mpr_graph_remove_dev(g, (mpr_dev)dev, MPR_STATUS_REMOVED);
}

FUNC_IF(mpr_expr_free_eval_buffer, g->expr_eval_buff);
Expand Down Expand Up @@ -443,17 +427,23 @@ int mpr_graph_add_cb(mpr_graph g, mpr_graph_handler *h, int types, const void *u
void mpr_graph_call_cbs(mpr_graph g, mpr_obj o, mpr_type t, mpr_graph_evt e)
{
fptr_list cb = g->callbacks, temp;
int handled = 0;

/* add event to object and graph status */
mpr_obj_set_status(o, e, 0);
g->obj.status |= e;

while (cb) {
temp = cb->next;
if (cb->types & t)
if (cb->types & t) {
((mpr_graph_handler*)cb->f)(g, o, e, cb->ctx);
handled = 1;
}
cb = temp;
}

if (handled)
o->status &= ~(MPR_STATUS_NEW | MPR_STATUS_MODIFIED);
}

void *mpr_graph_remove_cb(mpr_graph g, mpr_graph_handler *h, const void *user)
Expand Down Expand Up @@ -538,6 +528,10 @@ mpr_dev mpr_graph_add_dev(mpr_graph g, const char *name, mpr_msg msg, int force)

if (!mpr_dev_get_is_subscribed(dev) && g->autosub)
mpr_graph_subscribe(g, dev, g->autosub, -1);
if (!msg) {
/* this is a local device, wait for graph_housekeeping to call any callbacks */
return dev;
}
}

if (dev) {
Expand All @@ -557,7 +551,7 @@ mpr_dev mpr_graph_add_dev(mpr_graph g, const char *name, mpr_msg msg, int force)
return dev;
}

/* Internal function called by /logout protocol handler */
/* Internal function called by mpr_dev_free() and the /logout protocol handler */
void mpr_graph_remove_dev(mpr_graph g, mpr_dev d, mpr_graph_evt e)
{
mpr_list list;
Expand Down Expand Up @@ -919,7 +913,21 @@ void mpr_graph_housekeeping(mpr_graph g)
mpr_graph_subscribe(g, (mpr_dev)dev, 0, 0);
mpr_graph_remove_dev(g, (mpr_dev)dev, MPR_STATUS_EXPIRED);
}
continue;
}
}
else {
if (dev->status & MPR_STATUS_REMOVED) {
trace_graph(g, "Cleaning up removed device.\n");
mpr_graph_remove_dev(g, (mpr_dev)dev, MPR_STATUS_REMOVED);
}
else if (!(dev->status & (MPR_STATUS_STAGED | MPR_STATUS_ACTIVE))) {
mpr_net_add_dev(g->net, (mpr_local_dev)dev);
dev->status |= MPR_STATUS_STAGED;
mpr_graph_call_cbs(g, dev, MPR_DEV, MPR_STATUS_NEW);
}
else if (dev->status & MPR_STATUS_MODIFIED)
mpr_graph_call_cbs(g, dev, MPR_DEV, MPR_STATUS_MODIFIED);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/mpr_signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ typedef int mpr_sig_group;
#include "id_map.h"
#include "mpr_time.h"
#include "mpr_type.h"
#include "network.h"
#include "slot.h"
#include "value.h"

Expand All @@ -25,6 +26,8 @@ int mpr_sig_osc_handler(const char *path, const char *types, lo_arg **argv, int
void mpr_sig_init(mpr_sig sig, mpr_dev dev, int is_local, mpr_dir dir, const char *name, int len,
mpr_type type, const char *unit, const void *min, const void *max, int *num_inst);

void mpr_local_sig_add_to_net(mpr_local_sig sig, mpr_net net);

void mpr_sig_call_handler(mpr_local_sig sig, int evt, mpr_id inst, unsigned int inst_idx, float diff);

int mpr_sig_set_from_msg(mpr_sig sig, mpr_msg msg);
Expand Down
42 changes: 13 additions & 29 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,10 @@ void mpr_net_add_dev(mpr_net net, mpr_local_dev dev)
++net->num_devs;

if ((net->num_servers - 2) < (net->num_devs * 2)) {
net->num_servers = net->num_devs * 2 + 2;
net->servers = realloc(net->servers, net->num_servers * sizeof(lo_server));
net->server_status = realloc(net->server_status, net->num_servers * sizeof(int));
int num_servers = net->num_devs * 2 + 2;
net->servers = realloc(net->servers, num_servers * sizeof(lo_server));
net->server_status = realloc(net->server_status, num_servers * sizeof(int));
net->num_servers = num_servers;
}
net->servers[net->num_devs * 2] = net->servers[net->num_devs * 2 + 1] = 0;
}
Expand Down Expand Up @@ -518,7 +519,7 @@ void mpr_net_add_dev(mpr_net net, mpr_local_dev dev)

void mpr_net_remove_dev(mpr_net net, mpr_local_dev dev)
{
int i, j;
int i;
char path[256];

for (i = 0; i < net->num_devs; i++) {
Expand Down Expand Up @@ -551,27 +552,6 @@ void mpr_net_remove_dev(mpr_net net, mpr_local_dev dev)
lo_server_del_method(net->servers[SERVER_BUS], path, dev_handlers_specific[i].types);
lo_server_del_method(net->servers[SERVER_MESH], path, dev_handlers_specific[i].types);
}
if (net->num_devs == 0) {
/* Also remove generic device handlers. */
for (i = 0; i < NUM_DEV_HANDLERS_GENERIC; i++) {
/* make sure method isn't also used by graph */
int found = 0;
for (j = 0; j < NUM_GRAPH_HANDLERS; j++) {
if (dev_handlers_generic[i].str_idx == graph_handlers[j].str_idx) {
found = 1;
break;
}
}
if (found)
continue;
lo_server_del_method(net->servers[SERVER_BUS],
net_msg_strings[dev_handlers_generic[i].str_idx],
dev_handlers_generic[i].types);
lo_server_del_method(net->servers[SERVER_MESH],
net_msg_strings[dev_handlers_generic[i].str_idx],
dev_handlers_generic[i].types);
}
}
}

int mpr_net_get_num_devs(mpr_net net)
Expand Down Expand Up @@ -1299,6 +1279,14 @@ static int handler_logout(const char *path, const char *types, lo_arg **av,
RETURN_ARG_UNLESS(ac && MPR_STR == types[0], 0);
trace_net(net);
remote = mpr_graph_get_dev_by_name(gph, &av[0]->s);
if (remote) {
if (mpr_obj_get_is_local((mpr_obj)remote))
return 0;
else {
mpr_graph_unsubscribe(gph, remote);
mpr_graph_remove_dev(gph, remote, MPR_STATUS_REMOVED);
}
}
if (net->num_devs) {
int i = 0, ordinal;
char *prefix_str, *ordinal_str;
Expand All @@ -1314,10 +1302,6 @@ static int handler_logout(const char *path, const char *types, lo_arg **av,
mpr_local_dev_handler_logout(net->devs[i], remote, prefix_str, ordinal);
}
}
if (remote) {
mpr_graph_unsubscribe(gph, remote);
mpr_graph_remove_dev(gph, remote, MPR_STATUS_REMOVED);
}
return 0;
}

Expand Down
9 changes: 5 additions & 4 deletions src/signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "device.h"
#include "graph.h"
#include "mpr_signal.h"
#include "network.h"
#include "object.h"
#include "path.h"
#include "table.h"
Expand Down Expand Up @@ -621,7 +620,6 @@ mpr_sig mpr_sig_new(mpr_dev dev, mpr_dir dir, const char *name, int len,
{
mpr_graph g;
mpr_local_sig lsig;
mpr_net net;

/* For now we only allow adding signals to devices. */
RETURN_ARG_UNLESS(dev && mpr_obj_get_is_local((mpr_obj)dev), 0);
Expand All @@ -635,7 +633,6 @@ mpr_sig mpr_sig_new(mpr_dev dev, mpr_dir dir, const char *name, int len,
return (mpr_sig)lsig;

g = mpr_obj_get_graph((mpr_obj)dev);
net = mpr_graph_get_net(g);

lsig = (mpr_local_sig)mpr_graph_add_obj(g, MPR_SIG, 1);
lsig->obj.id = mpr_dev_generate_unique_id(dev);
Expand All @@ -644,11 +641,15 @@ mpr_sig mpr_sig_new(mpr_dev dev, mpr_dir dir, const char *name, int len,
lsig->event_flags = events;
mpr_sig_init((mpr_sig)lsig, dev, 1, dir, name, len, type, unit, min, max, num_inst);

mpr_net_add_dev_server_method(net, (mpr_local_dev)dev, lsig->path, mpr_sig_osc_handler, lsig);
mpr_local_dev_add_sig((mpr_local_dev)dev, lsig, dir);
return (mpr_sig)lsig;
}

void mpr_local_sig_add_to_net(mpr_local_sig sig, mpr_net net)
{
mpr_net_add_dev_server_method(net, sig->dev, sig->path, mpr_sig_osc_handler, sig);
}

void mpr_sig_init(mpr_sig sig, mpr_dev dev, int is_local, mpr_dir dir, const char *name, int len,
mpr_type type, const char *unit, const void *min, const void *max, int *num_inst)
{
Expand Down
9 changes: 5 additions & 4 deletions test/testinstance_coordination.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <mapper/mapper.h>
#include "../src/util/mpr_debug.h"
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
Expand Down Expand Up @@ -218,7 +219,7 @@ void loop(void)
if (value) {
int k, len = mpr_obj_get_prop_as_int32((mpr_obj)recvsig1, MPR_PROP_LEN, NULL);
if (verbose) {
printf("%s.%llu got ",
printf("%s.%"PR_MPR_ID" got ",
mpr_obj_get_prop_as_str((mpr_obj)recvsig1, MPR_PROP_NAME, NULL), id);
if (len > 1)
printf("[");
Expand All @@ -240,7 +241,7 @@ void loop(void)
}
}
if (status & MPR_STATUS_REL_UPSTRM) {
eprintf("%s.%llu got release\n",
eprintf("%s.%"PR_MPR_ID" got release\n",
mpr_obj_get_prop_as_str((mpr_obj)recvsig1, MPR_PROP_NAME, NULL), id);
mpr_sig_release_inst(recvsig1, id);
}
Expand All @@ -251,7 +252,7 @@ void loop(void)
if (value) {
int k, len = mpr_obj_get_prop_as_int32((mpr_obj)recvsig2, MPR_PROP_LEN, NULL);
if (verbose) {
printf("%s.%llu got ",
printf("%s.%"PR_MPR_ID" got ",
mpr_obj_get_prop_as_str((mpr_obj)recvsig2, MPR_PROP_NAME, NULL), id);
if (len > 1)
printf("[");
Expand All @@ -273,7 +274,7 @@ void loop(void)
}
}
if (status & MPR_STATUS_REL_UPSTRM) {
eprintf("%s.%llu got release\n",
eprintf("%s.%"PR_MPR_ID" got release\n",
mpr_obj_get_prop_as_str((mpr_obj)recvsig2, MPR_PROP_NAME, NULL), id);
mpr_sig_release_inst(recvsig2, id);
}
Expand Down
Loading

0 comments on commit 7a426fd

Please sign in to comment.