From 593bc65becef492d6d5d77fd604b68a477e1f884 Mon Sep 17 00:00:00 2001 From: Joseph Malloch Date: Wed, 11 Sep 2024 14:00:31 -0300 Subject: [PATCH] Add map version to /unmap handshake messages and use them to solve race condition with immediate map renewal. Added testremap.c to test suite. --- .gitignore | 1 + src/device.c | 18 ++- src/graph.c | 3 +- src/map.c | 16 ++- src/map.h | 2 +- src/network.c | 68 ++++++---- src/object.c | 35 +++-- src/object.h | 2 +- src/signal.c | 14 +- test/Makefile.am | 8 ++ test/testremap.c | 346 +++++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 450 insertions(+), 63 deletions(-) create mode 100644 test/testremap.c diff --git a/.gitignore b/.gitignore index 03b12833..9beebe1e 100644 --- a/.gitignore +++ b/.gitignore @@ -83,6 +83,7 @@ test/testparams test/testparser test/testprops test/testrate +test/testremap test/testreverse test/testselfmap test/testsetiface diff --git a/src/device.c b/src/device.c index 85912946..072e0fae 100644 --- a/src/device.c +++ b/src/device.c @@ -341,7 +341,7 @@ void mpr_local_dev_add_sig(mpr_local_dev dev, mpr_local_sig sig, mpr_dir dir) if (dev->registered) mpr_local_sig_add_to_net(sig, mpr_graph_get_net(dev->obj.graph)); - mpr_obj_increment_version((mpr_obj)dev); + mpr_obj_incr_version((mpr_obj)dev); dev->obj.status |= MPR_DEV_SIG_CHANGED; } @@ -353,7 +353,7 @@ void mpr_dev_remove_sig(mpr_dev dev, mpr_sig sig) if (dir & MPR_DIR_OUT) --dev->num_outputs; if (dev->obj.is_local) { - mpr_obj_increment_version((mpr_obj)dev); + mpr_obj_incr_version((mpr_obj)dev); dev->obj.status |= MPR_DEV_SIG_CHANGED; } } @@ -454,8 +454,10 @@ void mpr_dev_process_incoming_maps(mpr_local_dev dev) maps = mpr_list_get_next(maps); if (mpr_obj_get_is_local((mpr_obj)map)) mpr_map_receive((mpr_local_map)map, dev->time); - else + else { + /* local maps are always located at the start of the list */ break; + } } } @@ -477,8 +479,10 @@ static int process_outgoing_maps(mpr_local_dev dev) list = mpr_list_get_next(list); if (mpr_obj_get_is_local((mpr_obj)map)) mpr_map_send((mpr_local_map)map, dev->time); - else + else { + /* local maps are always located at the start of the list */ break; + } } dev->sending = 0; list = mpr_graph_get_list(graph, MPR_LINK); @@ -519,7 +523,7 @@ static int mpr_dev_send_maps(mpr_local_dev dev, mpr_dir dir, int msg) mpr_list maps = mpr_dev_get_maps((mpr_dev)dev, dir); int sent = 0; while (maps) { - mpr_map_send_state((mpr_map)*maps, -1, msg); + mpr_map_send_state((mpr_map)*maps, -1, msg, 0); maps = mpr_list_get_next(maps); ++sent; } @@ -985,8 +989,10 @@ int mpr_dev_set_from_msg(mpr_dev dev, mpr_msg m) break; } } - if (updated) + if (updated) { dev->obj.status |= MPR_STATUS_MODIFIED; + mpr_obj_incr_version((mpr_obj)dev); + } return updated; } diff --git a/src/graph.c b/src/graph.c index 587b19e3..5f5dddda 100644 --- a/src/graph.c +++ b/src/graph.c @@ -223,7 +223,8 @@ void mpr_graph_cleanup(mpr_graph g) mpr_map map = (mpr_map)*maps; int status = mpr_obj_get_status((mpr_obj)map); maps = mpr_list_get_next(maps); - if (status & MPR_STATUS_ACTIVE || !mpr_obj_get_is_local((mpr_obj)map)) + if ( !mpr_obj_get_is_local((mpr_obj)map) + || (status & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) == MPR_STATUS_ACTIVE) continue; #ifdef DEBUG diff --git a/src/map.c b/src/map.c index 8803ad1a..65f1f0e3 100644 --- a/src/map.c +++ b/src/map.c @@ -343,14 +343,14 @@ mpr_map mpr_map_new(int num_src, mpr_sig *src, int num_dst, mpr_sig *dst) void mpr_map_release(mpr_map m) { mpr_net_use_bus(mpr_graph_get_net(m->obj.graph)); - mpr_map_send_state(m, -1, MSG_UNMAP); + mpr_map_send_state(m, -1, MSG_UNMAP, 0); } void mpr_map_refresh(mpr_map m) { RETURN_UNLESS(m); mpr_net_use_bus(mpr_graph_get_net(m->obj.graph)); - mpr_map_send_state(m, -1, m->obj.is_local ? MSG_MAP_TO : MSG_MAP); + mpr_map_send_state(m, -1, m->obj.is_local ? MSG_MAP_TO : MSG_MAP, 0); } static void release_local_inst(mpr_local_map map, mpr_dev scope) @@ -1083,13 +1083,13 @@ void mpr_map_alloc_values(mpr_local_map m, int quiet) if (MPR_DIR_OUT == mpr_slot_get_dir((mpr_slot)m->dst)) { /* Inform remote destination */ mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link((mpr_slot)m->dst))); - mpr_map_send_state((mpr_map)m, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)m, -1, MSG_MAPPED, 0); } else { /* Inform remote sources */ for (i = 0; i < m->num_src; i++) { mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link((mpr_slot)m->src[i]))); - i = mpr_map_send_state((mpr_map)m, i, MSG_MAPPED); + i = mpr_map_send_state((mpr_map)m, i, MSG_MAPPED, 0); } } } @@ -1620,6 +1620,10 @@ int mpr_local_map_set_from_msg(mpr_local_map m, mpr_msg msg) if (orig_loc != m->process_loc) ++updated; + if (m->obj.status & (MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED)) { + m->obj.status &= ~(MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED); + ++updated; + } return updated; } @@ -1818,7 +1822,7 @@ int mpr_map_set_from_msg(mpr_map m, mpr_msg msg) /* If the "slot_idx" argument is >= 0, we can assume this message will be sent * to a peer device rather than a session manager. */ -int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd) +int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd, int version) { lo_message msg; char buffer[256]; @@ -1871,6 +1875,8 @@ int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd) } if (MSG_UNMAP == cmd || MSG_UNMAPPED == cmd) { + lo_message_add_string(msg, mpr_prop_as_str(PROP(VERSION), 0)); + lo_message_add_int32(msg, version ? version : m->obj.version); mpr_net_add_msg(mpr_graph_get_net(m->obj.graph), 0, cmd, msg); return i-1; } diff --git a/src/map.h b/src/map.h index 440d20bc..bb2a7d18 100644 --- a/src/map.h +++ b/src/map.h @@ -39,7 +39,7 @@ int mpr_map_set_from_msg(mpr_map map, mpr_msg msg); int mpr_local_map_update_status(mpr_local_map map); -int mpr_map_send_state(mpr_map map, int slot, net_msg_t cmd); +int mpr_map_send_state(mpr_map map, int slot, net_msg_t cmd, int version); void mpr_map_init(mpr_map map, int num_src, mpr_sig *src, mpr_sig dst, int is_local); diff --git a/src/network.c b/src/network.c index a7d4f5dc..0f0adcd9 100644 --- a/src/network.c +++ b/src/network.c @@ -59,7 +59,6 @@ extern const char* prop_msg_strings[MPR_PROP_EXTRA+1]; #define FIND 0 #define UPDATE 1 #define ADD 2 -#define REMOVED 4 /*! A structure that keeps information about network communications. */ typedef struct _mpr_net { @@ -1231,7 +1230,7 @@ static int handler_dev(const char *path, const char *types, lo_arg **av, int ac, continue; mpr_sig_send_state(sig, MSG_SIG); } - mpr_map_send_state(map, -1, MSG_MAP_TO); + mpr_map_send_state(map, -1, MSG_MAP_TO, 0); } if (locality & MPR_LOC_DST) { int i; @@ -1240,7 +1239,7 @@ static int handler_dev(const char *path, const char *types, lo_arg **av, int ac, continue; mpr_net_use_mesh(net, mpr_link_get_admin_addr(link)); mpr_sig_send_state(mpr_map_get_dst_sig(map), MSG_SIG); - i = mpr_map_send_state(map, i, MSG_MAP_TO); + i = mpr_map_send_state(map, i, MSG_MAP_TO, 0); } } } @@ -1636,9 +1635,6 @@ static mpr_map find_map(mpr_net net, const char *types, int ac, lo_arg **av, mpr #endif if (map) { RETURN_ARG_UNLESS(!loc || (loc & mpr_map_get_locality(map)), MPR_MAP_ERROR); - RETURN_ARG_UNLESS( (flags & REMOVED) - || !(MPR_STATUS_REMOVED & mpr_obj_get_status((mpr_obj)map)), - MPR_MAP_ERROR); if (mpr_map_get_num_src(map) < num_src && (flags & UPDATE)) { /* add additional sources */ for (i = 0; i < num_src; i++) @@ -1738,7 +1734,7 @@ static void mpr_net_handle_map(mpr_net net, mpr_local_map map, mpr_msg props) trace_dev(dev, "informing subscribers (MAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP); - mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0); } return; } @@ -1762,7 +1758,7 @@ static void mpr_net_handle_map(mpr_net net, mpr_local_map map, mpr_msg props) /* TODO: do we need this call? */ mpr_sig_send_state(sig, MSG_SIG); - i = mpr_map_send_state((mpr_map)map, i, MSG_MAP_TO); + i = mpr_map_send_state((mpr_map)map, i, MSG_MAP_TO, 0); } } @@ -1847,7 +1843,7 @@ static int handler_map_to(const char *path, const char *types, lo_arg **av, if (MPR_DIR_OUT == mpr_slot_get_dir(slot)) { mpr_link link = mpr_slot_get_link(slot); mpr_net_use_mesh(net, mpr_link_get_admin_addr(link)); - mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0); for (i = 0; i < num_src; i++) { mpr_sig sig = mpr_map_get_src_sig((mpr_map)map, i); if (!mpr_obj_get_is_local((mpr_obj)sig)) @@ -1860,7 +1856,7 @@ static int handler_map_to(const char *path, const char *types, lo_arg **av, mpr_slot slot = mpr_map_get_src_slot((mpr_map)map, i); mpr_link link = mpr_slot_get_link(slot); mpr_net_use_mesh(net, mpr_link_get_admin_addr(link)); - i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED); + i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED, 0); mpr_sig_send_state(mpr_map_get_dst_sig((mpr_map)map), MSG_SIG); } } @@ -1948,14 +1944,14 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av, if (MPR_DIR_OUT == mpr_slot_get_dir(slot)) { /* Inform remote destination */ mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot))); - mpr_map_send_state(map, -1, MSG_MAPPED); + mpr_map_send_state(map, -1, MSG_MAPPED, 0); } else { /* Inform remote sources */ for (i = 0; i < num_src; i++) { mpr_slot slot = mpr_map_get_src_slot(map, i); mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot))); - i = mpr_map_send_state(map, i, MSG_MAPPED); + i = mpr_map_send_state(map, i, MSG_MAPPED, 0); } } @@ -1996,7 +1992,7 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av, if (mpr_local_dev_has_subscribers(dev)) { trace_dev(dev, "informing subscribers (MAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_OUT); - mpr_map_send_state(map, -1, MSG_MAPPED); + mpr_map_send_state(map, -1, MSG_MAPPED, 0); } } } @@ -2006,7 +2002,7 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av, if (mpr_local_dev_has_subscribers(dev)) { trace_dev(dev, "informing subscribers (MAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_IN); - mpr_map_send_state(map, -1, MSG_MAPPED); + mpr_map_send_state(map, -1, MSG_MAPPED, 0); } } } @@ -2031,9 +2027,16 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av, trace_net(net); map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND); - RETURN_ARG_UNLESS(map && MPR_MAP_ERROR != (mpr_map)map, 0); + if (!map || MPR_MAP_ERROR == (mpr_map)map) { + trace("Map not found! Forwarding map message to map handler...\n"); + handler_map(path, types, av, ac, msg, user); + return 0; + } RETURN_ARG_UNLESS(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE, 0); + /* remove MPR_STATUS_REMOVED and MPR_STATUS_EXPIRED status flags if they are set */ + mpr_obj_set_status((mpr_obj)map, 0, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED); + props = mpr_msg_parse_props(ac, types, av); TRACE_RETURN_UNLESS(props, 0, " ignoring /map/modify, no properties.\n"); @@ -2052,7 +2055,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av, mpr_slot slot = mpr_map_get_dst_slot((mpr_map)map); if (!mpr_slot_get_sig_if_local(slot)) { mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot))); - mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0); } else { int i; @@ -2061,7 +2064,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av, if (mpr_slot_get_sig_if_local(slot)) continue; mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot))); - i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED); + i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED, 0); } } } @@ -2076,7 +2079,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av, if (mpr_local_dev_has_subscribers(dev)) { trace_dev(dev, "informing subscribers (MAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_OUT); - mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0); } } } @@ -2089,7 +2092,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av, if (mpr_local_dev_has_subscribers(dev)) { trace_dev(dev, "informing subscribers (MAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_IN); - mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0); } } } @@ -2111,12 +2114,21 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av, mpr_slot slot; lo_address addr; mpr_sig sig; - int i, num_src; + int i, num_src, version = 0; trace_net(net); - map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND | REMOVED); + map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND); RETURN_ARG_UNLESS(map && MPR_MAP_ERROR != (mpr_map)map, 0); + if (types[ac-2] == 's' && types[ac-1] == 'i' && strcmp(&av[ac-2]->s, "@version") == 0) { + version = av[ac-1]->i; + if (version < mpr_obj_get_version((mpr_obj)map)) { + trace("ignoring stale /unmap message (version %d < %d)\n", + version, mpr_obj_get_version((mpr_obj)map)); + return 0; + } + } + num_src = mpr_map_get_num_src((mpr_map)map); for (i = 0; i < num_src; i++) { @@ -2129,7 +2141,7 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av, if (dev != last_dev) { trace_dev(dev, "informing subscribers (UNMAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_OUT); - mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED, 0); } trace_dev(dev, "informing subscribers (SIGNAL)\n"); mpr_net_use_subscribers(net, dev, MPR_SIG); @@ -2142,7 +2154,7 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av, if (!inform_device_subscribers(net, dev)) { trace_dev(dev, "informing subscribers (UNMAPPED)\n") mpr_net_use_subscribers(net, dev, MPR_MAP_IN); - mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED); + mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED, 0); trace_dev(dev, "informing subscribers (SIGNAL)\n"); mpr_net_use_subscribers(net, dev, MPR_SIG); @@ -2160,22 +2172,22 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av, addr = mpr_slot_get_addr(slot); if (addr) { mpr_net_use_mesh(net, addr); - i = mpr_map_send_state((mpr_map)map, i, MSG_UNMAP); + i = mpr_map_send_state((mpr_map)map, i, MSG_UNMAP, version); } } } /* if destination is remote this guarantees all sources are local */ /* only send /unmap to destination the second time this message is received */ - else if (!(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) { + else if ((mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_REMOVED)) { mpr_net_use_mesh(net, addr); - mpr_map_send_state((mpr_map)map, -1, MSG_UNMAP); + mpr_map_send_state((mpr_map)map, -1, MSG_UNMAP, version); } /* Goal here is to ensure that map cleanup (including releasing related instances at the * destingation device) occurs _after_ any cached signal updates have propagated across the map * so that stray updates don't re-activate destination instances after tha map is removed.*/ if ( MPR_LOC_BOTH == mpr_map_get_locality((mpr_map)map) - || !(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) { + || (mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_REMOVED)) { /* can remove immediately */ trace("removing map\n"); mpr_graph_remove_map(graph, (mpr_map)map, MPR_STATUS_REMOVED); @@ -2183,7 +2195,7 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av, else { /* remove ACTIVE flag, add REMOVED and EXPIRED flags for eventual cleanup */ trace("deferring map removal\n"); - mpr_obj_set_status((mpr_obj)map, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED, MPR_STATUS_ACTIVE); + mpr_obj_set_status((mpr_obj)map, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED, 0); } return 0; } diff --git a/src/object.c b/src/object.c index 5a3bcc63..d5debe0c 100644 --- a/src/object.c +++ b/src/object.c @@ -35,7 +35,7 @@ mpr_type mpr_obj_get_type(mpr_obj o) return o ? o->type : 0; } -void mpr_obj_increment_version(mpr_obj o) +void mpr_obj_incr_version(mpr_obj o) { RETURN_UNLESS(o); if (o->is_local) { @@ -242,8 +242,8 @@ mpr_prop mpr_obj_set_prop(mpr_obj o, mpr_prop p, const char *s, int len, if (!publish) flags |= LOCAL_ACCESS; updated = mpr_tbl_add_record(tbl, p, s, len, type, val, flags); - if (updated) - mpr_obj_increment_version(o); + if (updated && o->is_local) + mpr_obj_incr_version(o); return updated ? p : MPR_PROP_UNKNOWN; } @@ -256,14 +256,14 @@ int mpr_obj_remove_prop(mpr_obj o, mpr_prop p, const char *s) local = o->props.staged ? 0 : 1; if (MPR_PROP_UNKNOWN == p) p = mpr_prop_from_str(s); - if (MPR_PROP_DATA == p || local) + if ((MPR_PROP_DATA == p) || local) updated = mpr_tbl_remove_record(o->props.synced, p, s, MOD_LOCAL); else if (MPR_PROP_EXTRA == p) updated = mpr_tbl_add_record(o->props.staged, p | PROP_REMOVE, s, 0, 0, 0, MOD_REMOTE); else trace("Cannot remove static property [%d] '%s'\n", p, s ? s : mpr_prop_as_str(p, 1)); - if (updated) - mpr_obj_increment_version(o); + if (updated && o->is_local) + mpr_obj_incr_version(o); return updated ? 1 : 0; } @@ -272,6 +272,7 @@ void mpr_obj_push(mpr_obj o) mpr_net n; RETURN_UNLESS(o); n = mpr_graph_get_net(o->graph); + ++o->version; if (MPR_DEV == o->type) { mpr_dev d = (mpr_dev)o; @@ -303,17 +304,23 @@ void mpr_obj_push(mpr_obj o) int status = o->status; mpr_map m = (mpr_map)o; mpr_net_use_bus(n); - if (status & MPR_STATUS_ACTIVE) - mpr_map_send_state(m, -1, MSG_MAP_MOD); + if ((status & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) == MPR_STATUS_ACTIVE) { + mpr_map_send_state(m, -1, MSG_MAP_MOD, 0); + } + // TODO: combine these conditionals else if (o->is_local) { status = mpr_local_map_update_status((mpr_local_map)m); - if (status & MPR_SLOT_DEV_KNOWN) - mpr_map_send_state(m, -1, MSG_MAP); - else - printf("didn't send /map message\n"); + if (status & MPR_SLOT_DEV_KNOWN) { + mpr_map_send_state(m, -1, MSG_MAP, 0); + } + else { + trace("didn't send /map message\n"); + --o->version; + } + } + else { + mpr_map_send_state(m, -1, MSG_MAP, 0); } - else - mpr_map_send_state(m, -1, MSG_MAP); } else { trace("mpr_obj_push(): unknown object type %d\n", o->type); diff --git a/src/object.h b/src/object.h index 68386d19..8c8150fd 100644 --- a/src/object.h +++ b/src/object.h @@ -33,7 +33,7 @@ void mpr_obj_init(mpr_obj obj, mpr_graph graph, mpr_type type); void mpr_obj_free(mpr_obj obj); -void mpr_obj_increment_version(mpr_obj obj); +void mpr_obj_incr_version(mpr_obj obj); MPR_INLINE static mpr_id mpr_obj_get_id(mpr_obj obj) { return obj->id; } diff --git a/src/signal.c b/src/signal.c index de1932c2..2e8ca5c7 100644 --- a/src/signal.c +++ b/src/signal.c @@ -193,7 +193,7 @@ static void process_maps(mpr_local_sig sig, int id_map_idx) for (i = 0; i < sig->num_maps_in; i++) { mpr_local_slot dst_slot = sig->slots_in[i]; map = (mpr_local_map)mpr_slot_get_map((mpr_slot)dst_slot); - if (!(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) + if ((mpr_obj_get_status((mpr_obj)map) & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) != MPR_STATUS_ACTIVE) continue; mpr_id_map tmp = mpr_local_map_get_id_map(map); @@ -224,7 +224,7 @@ static void process_maps(mpr_local_sig sig, int id_map_idx) for (i = 0; i < sig->num_maps_out; i++) { mpr_local_slot src_slot = sig->slots_out[i], dst_slot; map = (mpr_local_map)mpr_slot_get_map((mpr_slot)src_slot); - if (!(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) + if ((mpr_obj_get_status((mpr_obj)map) & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) != MPR_STATUS_ACTIVE) continue; /* reset associated output memory */ @@ -264,7 +264,7 @@ static void process_maps(mpr_local_sig sig, int id_map_idx) src_slot = sig->slots_out[i]; map = (mpr_local_map)mpr_slot_get_map((mpr_slot)src_slot); - if (!(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) + if ((mpr_obj_get_status((mpr_obj)map) & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) != MPR_STATUS_ACTIVE) continue; /* TODO: should we continue for out-of-scope local destination updates? */ @@ -410,7 +410,7 @@ int mpr_sig_osc_handler(const char *path, const char *types, lo_arg **argv, int } TRACE_RETURN_UNLESS(slot, 0, "error in mpr_sig_osc_handler: slot %d not found.\n", slot_id); slot_sig = mpr_slot_get_sig((mpr_slot)slot); - TRACE_RETURN_UNLESS(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE, 0, + TRACE_RETURN_UNLESS((mpr_obj_get_status((mpr_obj)map) & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) == MPR_STATUS_ACTIVE, 0, "error in mpr_sig_osc_handler: map not yet ready.\n"); if ((expr = mpr_local_map_get_expr(map)) && MPR_LOC_BOTH != mpr_map_get_locality((mpr_map)map)) { vals = check_types(types, val_len, slot_sig->type, slot_sig->len); @@ -1317,7 +1317,7 @@ int mpr_sig_reserve_inst(mpr_sig sig, int num, mpr_id *ids, void **data) else mpr_value_realloc(lsig->value, lsig->len, lsig->type, 1, lsig->num_inst, 0); - mpr_obj_increment_version((mpr_obj)lsig); + mpr_obj_incr_version((mpr_obj)lsig); if (old_num > 0 && (lsig->num_inst / 8) == (old_num / 8)) return count; @@ -1558,7 +1558,7 @@ void mpr_sig_remove_inst(mpr_sig sig, mpr_id id) if (lsig->inst[i]->idx > remove_idx) --lsig->inst[i]->idx; } - mpr_obj_increment_version((mpr_obj)sig); + mpr_obj_incr_version((mpr_obj)sig); } const void *mpr_sig_get_value(mpr_sig sig, mpr_id id, mpr_time *time) @@ -1837,7 +1837,7 @@ int mpr_sig_set_from_msg(mpr_sig sig, mpr_msg msg) } } if (updated) - mpr_obj_increment_version((mpr_obj)sig); + mpr_obj_incr_version((mpr_obj)sig); return updated; } diff --git a/test/Makefile.am b/test/Makefile.am index 6bf4e3bd..ad3e307b 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -34,6 +34,7 @@ if WINDOWS_DLL testparser \ testprops \ testrate \ + testremap \ testreverse \ testselfmap \ testsetremote \ @@ -67,6 +68,7 @@ if WINDOWS_DLL testmapinput \ testconvergent \ testunmap \ + testremap \ testmapfail \ testmaplocation \ testmapprotocol \ @@ -109,6 +111,7 @@ else testparser \ testprops \ testrate \ + testremap \ testreverse \ testselfmap \ testsetremote \ @@ -143,6 +146,7 @@ else testmapinput \ testconvergent \ testunmap \ + testremap \ testmapfail \ testmaplocation \ testmapprotocol \ @@ -271,6 +275,10 @@ testrate_CFLAGS = $(TEST_CFLAGS) testrate_SOURCES = testrate.c testrate_LDADD = $(TEST_LDADD) +testremap_CFLAGS = $(TEST_CFLAGS) +testremap_SOURCES = testremap.c +testremap_LDADD = $(TEST_LDADD) + testreverse_CFLAGS = $(TEST_CFLAGS) testreverse_SOURCES = testreverse.c testreverse_LDADD = $(TEST_LDADD) diff --git a/test/testremap.c b/test/testremap.c new file mode 100644 index 00000000..3144c8eb --- /dev/null +++ b/test/testremap.c @@ -0,0 +1,346 @@ +#include "../src/graph.h" +#include "../src/link.h" +#include +#include +#include +#include +#ifdef WIN32 +#include +#else +#include +#endif +#include +#include + +#define NUM_SRC 1 + +int verbose = 1; +int terminate = 0; +int shared_graph = 0; +int num_inst = 10; +int done = 0; + +mpr_dev srcs[NUM_SRC]; +mpr_dev dst = 0; + +mpr_sig sendsigs[NUM_SRC]; +mpr_sig recvsig = 0; + +mpr_id map_id = 0; + +int sent = 0; +int received = 0; + +static void eprintf(const char *format, ...) +{ + va_list args; + if (!verbose) + return; + va_start(args, format); + vprintf(format, args); + va_end(args); +} + +int setup_srcs(mpr_graph g, const char *iface) +{ + int i, mn = 0, mx = 1; + mpr_list l; + + for (i = 0; i < NUM_SRC; i++) { + srcs[i] = mpr_dev_new("testremap-send", g); + if (!srcs[i]) + goto error; + + if (iface) + mpr_graph_set_interface(mpr_obj_get_graph((mpr_obj)srcs[i]), iface); + eprintf("source created using interface %s.\n", + mpr_graph_get_interface(mpr_obj_get_graph((mpr_obj)srcs[i]))); + + sendsigs[i] = mpr_sig_new(srcs[i], MPR_DIR_OUT, "outsig", 1, MPR_INT32, NULL, + &mn, &mx, &num_inst, NULL, 0); + if (!sendsigs[i]) + goto error; + + eprintf("Output signal 'outsig' registered with %d instances.\n", num_inst); + l = mpr_dev_get_sigs(srcs[i], MPR_DIR_OUT); + eprintf("Number of outputs: %d\n", mpr_list_get_size(l)); + mpr_list_free(l); + } + + return 0; + + error: + return 1; +} + +void cleanup_src(void) +{ + int i; + for (i = 0; i < NUM_SRC; i++) { + if (srcs[i]) { + eprintf("Freeing source.. "); + fflush(stdout); + mpr_dev_free(srcs[i]); + eprintf("ok\n"); + } + } +} + +void handler(mpr_sig sig, mpr_sig_evt evt, mpr_id id, int len, mpr_type type, + const void *val, mpr_time t) +{ + if (evt == MPR_STATUS_REL_UPSTRM) { + eprintf("%s.%llu got release\n", + mpr_obj_get_prop_as_str((mpr_obj)sig, MPR_PROP_NAME, NULL), id); + mpr_sig_release_inst(sig, id); + } + else if (val) { + eprintf("%s.%llu got %f\n", + mpr_obj_get_prop_as_str((mpr_obj)sig, MPR_PROP_NAME, NULL), id, *(float*)val); + received++; + } +} + +int setup_dst(mpr_graph g, const char *iface) +{ + float mn = 0, mx = 1; + mpr_list l; + + dst = mpr_dev_new("testremap-recv", g); + if (!dst) + goto error; + if (iface) + mpr_graph_set_interface(mpr_obj_get_graph((mpr_obj)dst), iface); + eprintf("destination created using interface %s.\n", + mpr_graph_get_interface(mpr_obj_get_graph((mpr_obj)dst))); + + recvsig = mpr_sig_new(dst, MPR_DIR_IN, "insig", 1, MPR_FLT, NULL, + &mn, &mx, &num_inst, handler, MPR_SIG_ALL); + if (!recvsig) + goto error; + + eprintf("Input signal 'insig' registered with %d instances.\n", num_inst); + l = mpr_dev_get_sigs(dst, MPR_DIR_IN); + eprintf("Number of inputs: %d\n", mpr_list_get_size(l)); + mpr_list_free(l); + + return 0; + + error: + return 1; +} + +void cleanup_dst(void) +{ + if (dst) { + eprintf("Freeing destination.. "); + fflush(stdout); + mpr_dev_free(dst); + eprintf("ok\n"); + } +} + +int setup_maps(void) +{ + mpr_map map = mpr_map_new(NUM_SRC, sendsigs, 1, &recvsig); + mpr_obj_push((mpr_obj)map); + map_id = mpr_obj_get_id((mpr_obj)map); + + /* Wait until mapping has been established */ + while (!done && !mpr_map_get_is_ready(map)) { + int i; + for (i = 0; i < NUM_SRC; i++) + mpr_dev_poll(srcs[i], 10); + mpr_dev_poll(dst, 10); + } + + return 0; +} + +int wait_ready(void) +{ + eprintf("waiting for %d devices\n", NUM_SRC + 1); + int ready = 0; + while (!done && !ready) { + int i; + ready = 1; + for (i = 0; i < NUM_SRC; i++) { + mpr_dev_poll(srcs[i], 25); + ready &= mpr_dev_get_is_ready(srcs[i]); + } + mpr_dev_poll(dst, 25); + ready &= mpr_dev_get_is_ready(dst); + } + return done; +} + +int loop() +{ + int i = 0, j, result = 0; + eprintf("Polling device..\n"); + while ((!terminate || i < 99) && !done) { + int inst = random() % num_inst; + for (j = 0; j < NUM_SRC; j++) { + eprintf("Updating srcs[%d].%d to %d\n", j, inst, sent); + mpr_sig_set_value(sendsigs[j], inst, 1, MPR_INT32, &sent); + if (!shared_graph) + mpr_dev_poll(srcs[j], 0); + } + mpr_dev_poll(dst, 100); + sent++; + + if (!verbose) { + printf("\r Sent: %4i, Received: %4i ", sent, received); + fflush(stdout); + } + ++i; + + if (i % 50 == 0) { + mpr_map map = (mpr_map)mpr_graph_get_obj(mpr_obj_get_graph((mpr_obj)dst), map_id, MPR_MAP); + if (map) { + eprintf("Removing map.\n"); + mpr_map_release(map); + map = 0; + } + else { + eprintf("Error retrieving map.\n"); + result = 1; + break; + } + + eprintf("Recreating map with same signals.\n"); + map = mpr_map_new(NUM_SRC, sendsigs, 1, &recvsig); + mpr_obj_push((mpr_obj)map); + map_id = mpr_obj_get_id((mpr_obj)map); + } + } + return result; +} + +void ctrlc(int signal) +{ + done = 1; +} + +int main(int argc, char **argv) +{ + int i, j, result = 0; + char *iface = 0; + mpr_graph g; + mpr_map map; + + /* process flags for -v verbose, -h help */ + for (i = 1; i < argc; i++) { + if (argv[i] && argv[i][0] == '-') { + int len = strlen(argv[i]); + for (j = 1; j < len; j++) { + switch (argv[i][j]) { + case 'h': + printf("testremap.c: possible arguments " + "-q quiet (suppress output), " + "-t terminate automatically, " + "-s shared (use one mpr_graph only), " + "-h help, " + "--iface network interface\n"); + return 1; + break; + case 'q': + verbose = 0; + break; + case 't': + terminate = 1; + break; + case 's': + shared_graph = 1; + break; + case '-': + if (strcmp(argv[i], "--iface")==0 && argc>i+1) { + i++; + iface = argv[i]; + j = len; + } + break; + default: + break; + } + } + } + } + + signal(SIGINT, ctrlc); + + g = shared_graph ? mpr_graph_new(0) : 0; + + if (setup_dst(g, iface)) { + eprintf("Error initializing destination.\n"); + result = 1; + goto done; + } + + if (setup_srcs(g, iface)) { + eprintf("Done initializing source.\n"); + result = 1; + goto done; + } + + if (wait_ready()) { + eprintf("Device registration aborted.\n"); + result = 1; + goto done; + } + + if (setup_maps()) { + eprintf("Error initializing map.\n"); + result = 1; + goto done; + } + + /* update some instances */ + if ((result = loop())) + goto done; + + /* remove the map */ + map = (mpr_map)mpr_graph_get_obj(mpr_obj_get_graph((mpr_obj)dst), map_id, MPR_MAP); + if (map) { + eprintf("Removing map.\n"); + mpr_map_release(map); + } + else { + eprintf("Error retrieving map.\n"); + result = 1; + goto done; + } + + for (i = 0; i < 10; i++) { + if (!shared_graph) { + for (j = 0; j < NUM_SRC; j++) { + mpr_dev_poll(srcs[j], 0); + } + } + mpr_dev_poll(dst, 100); + } + + /* check whether destination instances were released */ + num_inst = mpr_sig_get_num_inst(recvsig, MPR_STATUS_ACTIVE); + if (num_inst) { + eprintf("Destination has %d active instance%s (should be 0).\n", num_inst, + num_inst > 1 ? "s" : ""); + result = 1; + goto done; + } + + if (sent != received) { + eprintf("Mismatch between sent and received messages.\n"); + eprintf("Updated value %d time%s, but received %d update%s.\n", + sent, sent == 1 ? "" : "s", received, received == 1 ? "" : "s"); + result = 1; + } + + done: + cleanup_dst(); + cleanup_src(); + if (g) mpr_graph_free(g); + printf("...................Test %s\x1B[0m.\n", + result ? "\x1B[31mFAILED" : "\x1B[32mPASSED"); + return result; +}