Skip to content

Commit

Permalink
Add map version to /unmap handshake messages and use them to solve ra…
Browse files Browse the repository at this point in the history
…ce condition with immediate map renewal. Added testremap.c to test suite.
  • Loading branch information
malloch committed Sep 11, 2024
1 parent bc51f44 commit 593bc65
Show file tree
Hide file tree
Showing 11 changed files with 450 additions and 63 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ test/testparams
test/testparser
test/testprops
test/testrate
test/testremap
test/testreverse
test/testselfmap
test/testsetiface
Expand Down
18 changes: 12 additions & 6 deletions src/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void mpr_local_dev_add_sig(mpr_local_dev dev, mpr_local_sig sig, mpr_dir dir)
if (dev->registered)
mpr_local_sig_add_to_net(sig, mpr_graph_get_net(dev->obj.graph));

mpr_obj_increment_version((mpr_obj)dev);
mpr_obj_incr_version((mpr_obj)dev);
dev->obj.status |= MPR_DEV_SIG_CHANGED;
}

Expand All @@ -353,7 +353,7 @@ void mpr_dev_remove_sig(mpr_dev dev, mpr_sig sig)
if (dir & MPR_DIR_OUT)
--dev->num_outputs;
if (dev->obj.is_local) {
mpr_obj_increment_version((mpr_obj)dev);
mpr_obj_incr_version((mpr_obj)dev);
dev->obj.status |= MPR_DEV_SIG_CHANGED;
}
}
Expand Down Expand Up @@ -454,8 +454,10 @@ void mpr_dev_process_incoming_maps(mpr_local_dev dev)
maps = mpr_list_get_next(maps);
if (mpr_obj_get_is_local((mpr_obj)map))
mpr_map_receive((mpr_local_map)map, dev->time);
else
else {
/* local maps are always located at the start of the list */
break;
}
}
}

Expand All @@ -477,8 +479,10 @@ static int process_outgoing_maps(mpr_local_dev dev)
list = mpr_list_get_next(list);
if (mpr_obj_get_is_local((mpr_obj)map))
mpr_map_send((mpr_local_map)map, dev->time);
else
else {
/* local maps are always located at the start of the list */
break;
}
}
dev->sending = 0;
list = mpr_graph_get_list(graph, MPR_LINK);
Expand Down Expand Up @@ -519,7 +523,7 @@ static int mpr_dev_send_maps(mpr_local_dev dev, mpr_dir dir, int msg)
mpr_list maps = mpr_dev_get_maps((mpr_dev)dev, dir);
int sent = 0;
while (maps) {
mpr_map_send_state((mpr_map)*maps, -1, msg);
mpr_map_send_state((mpr_map)*maps, -1, msg, 0);
maps = mpr_list_get_next(maps);
++sent;
}
Expand Down Expand Up @@ -985,8 +989,10 @@ int mpr_dev_set_from_msg(mpr_dev dev, mpr_msg m)
break;
}
}
if (updated)
if (updated) {
dev->obj.status |= MPR_STATUS_MODIFIED;
mpr_obj_incr_version((mpr_obj)dev);
}
return updated;
}

Expand Down
3 changes: 2 additions & 1 deletion src/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ void mpr_graph_cleanup(mpr_graph g)
mpr_map map = (mpr_map)*maps;
int status = mpr_obj_get_status((mpr_obj)map);
maps = mpr_list_get_next(maps);
if (status & MPR_STATUS_ACTIVE || !mpr_obj_get_is_local((mpr_obj)map))
if ( !mpr_obj_get_is_local((mpr_obj)map)
|| (status & (MPR_STATUS_ACTIVE | MPR_STATUS_REMOVED)) == MPR_STATUS_ACTIVE)
continue;

#ifdef DEBUG
Expand Down
16 changes: 11 additions & 5 deletions src/map.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ mpr_map mpr_map_new(int num_src, mpr_sig *src, int num_dst, mpr_sig *dst)
void mpr_map_release(mpr_map m)
{
mpr_net_use_bus(mpr_graph_get_net(m->obj.graph));
mpr_map_send_state(m, -1, MSG_UNMAP);
mpr_map_send_state(m, -1, MSG_UNMAP, 0);
}

void mpr_map_refresh(mpr_map m)
{
RETURN_UNLESS(m);
mpr_net_use_bus(mpr_graph_get_net(m->obj.graph));
mpr_map_send_state(m, -1, m->obj.is_local ? MSG_MAP_TO : MSG_MAP);
mpr_map_send_state(m, -1, m->obj.is_local ? MSG_MAP_TO : MSG_MAP, 0);
}

static void release_local_inst(mpr_local_map map, mpr_dev scope)
Expand Down Expand Up @@ -1083,13 +1083,13 @@ void mpr_map_alloc_values(mpr_local_map m, int quiet)
if (MPR_DIR_OUT == mpr_slot_get_dir((mpr_slot)m->dst)) {
/* Inform remote destination */
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link((mpr_slot)m->dst)));
mpr_map_send_state((mpr_map)m, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)m, -1, MSG_MAPPED, 0);
}
else {
/* Inform remote sources */
for (i = 0; i < m->num_src; i++) {
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link((mpr_slot)m->src[i])));
i = mpr_map_send_state((mpr_map)m, i, MSG_MAPPED);
i = mpr_map_send_state((mpr_map)m, i, MSG_MAPPED, 0);
}
}
}
Expand Down Expand Up @@ -1620,6 +1620,10 @@ int mpr_local_map_set_from_msg(mpr_local_map m, mpr_msg msg)
if (orig_loc != m->process_loc)
++updated;

if (m->obj.status & (MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED)) {
m->obj.status &= ~(MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED);
++updated;
}
return updated;
}

Expand Down Expand Up @@ -1818,7 +1822,7 @@ int mpr_map_set_from_msg(mpr_map m, mpr_msg msg)

/* If the "slot_idx" argument is >= 0, we can assume this message will be sent
* to a peer device rather than a session manager. */
int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd)
int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd, int version)
{
lo_message msg;
char buffer[256];
Expand Down Expand Up @@ -1871,6 +1875,8 @@ int mpr_map_send_state(mpr_map m, int slot_idx, net_msg_t cmd)
}

if (MSG_UNMAP == cmd || MSG_UNMAPPED == cmd) {
lo_message_add_string(msg, mpr_prop_as_str(PROP(VERSION), 0));
lo_message_add_int32(msg, version ? version : m->obj.version);
mpr_net_add_msg(mpr_graph_get_net(m->obj.graph), 0, cmd, msg);
return i-1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/map.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int mpr_map_set_from_msg(mpr_map map, mpr_msg msg);

int mpr_local_map_update_status(mpr_local_map map);

int mpr_map_send_state(mpr_map map, int slot, net_msg_t cmd);
int mpr_map_send_state(mpr_map map, int slot, net_msg_t cmd, int version);

void mpr_map_init(mpr_map map, int num_src, mpr_sig *src, mpr_sig dst, int is_local);

Expand Down
68 changes: 40 additions & 28 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ extern const char* prop_msg_strings[MPR_PROP_EXTRA+1];
#define FIND 0
#define UPDATE 1
#define ADD 2
#define REMOVED 4

/*! A structure that keeps information about network communications. */
typedef struct _mpr_net {
Expand Down Expand Up @@ -1231,7 +1230,7 @@ static int handler_dev(const char *path, const char *types, lo_arg **av, int ac,
continue;
mpr_sig_send_state(sig, MSG_SIG);
}
mpr_map_send_state(map, -1, MSG_MAP_TO);
mpr_map_send_state(map, -1, MSG_MAP_TO, 0);
}
if (locality & MPR_LOC_DST) {
int i;
Expand All @@ -1240,7 +1239,7 @@ static int handler_dev(const char *path, const char *types, lo_arg **av, int ac,
continue;
mpr_net_use_mesh(net, mpr_link_get_admin_addr(link));
mpr_sig_send_state(mpr_map_get_dst_sig(map), MSG_SIG);
i = mpr_map_send_state(map, i, MSG_MAP_TO);
i = mpr_map_send_state(map, i, MSG_MAP_TO, 0);
}
}
}
Expand Down Expand Up @@ -1636,9 +1635,6 @@ static mpr_map find_map(mpr_net net, const char *types, int ac, lo_arg **av, mpr
#endif
if (map) {
RETURN_ARG_UNLESS(!loc || (loc & mpr_map_get_locality(map)), MPR_MAP_ERROR);
RETURN_ARG_UNLESS( (flags & REMOVED)
|| !(MPR_STATUS_REMOVED & mpr_obj_get_status((mpr_obj)map)),
MPR_MAP_ERROR);
if (mpr_map_get_num_src(map) < num_src && (flags & UPDATE)) {
/* add additional sources */
for (i = 0; i < num_src; i++)
Expand Down Expand Up @@ -1738,7 +1734,7 @@ static void mpr_net_handle_map(mpr_net net, mpr_local_map map, mpr_msg props)

trace_dev(dev, "informing subscribers (MAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0);
}
return;
}
Expand All @@ -1762,7 +1758,7 @@ static void mpr_net_handle_map(mpr_net net, mpr_local_map map, mpr_msg props)
/* TODO: do we need this call? */
mpr_sig_send_state(sig, MSG_SIG);

i = mpr_map_send_state((mpr_map)map, i, MSG_MAP_TO);
i = mpr_map_send_state((mpr_map)map, i, MSG_MAP_TO, 0);
}
}

Expand Down Expand Up @@ -1847,7 +1843,7 @@ static int handler_map_to(const char *path, const char *types, lo_arg **av,
if (MPR_DIR_OUT == mpr_slot_get_dir(slot)) {
mpr_link link = mpr_slot_get_link(slot);
mpr_net_use_mesh(net, mpr_link_get_admin_addr(link));
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0);
for (i = 0; i < num_src; i++) {
mpr_sig sig = mpr_map_get_src_sig((mpr_map)map, i);
if (!mpr_obj_get_is_local((mpr_obj)sig))
Expand All @@ -1860,7 +1856,7 @@ static int handler_map_to(const char *path, const char *types, lo_arg **av,
mpr_slot slot = mpr_map_get_src_slot((mpr_map)map, i);
mpr_link link = mpr_slot_get_link(slot);
mpr_net_use_mesh(net, mpr_link_get_admin_addr(link));
i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED);
i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED, 0);
mpr_sig_send_state(mpr_map_get_dst_sig((mpr_map)map), MSG_SIG);
}
}
Expand Down Expand Up @@ -1948,14 +1944,14 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av,
if (MPR_DIR_OUT == mpr_slot_get_dir(slot)) {
/* Inform remote destination */
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot)));
mpr_map_send_state(map, -1, MSG_MAPPED);
mpr_map_send_state(map, -1, MSG_MAPPED, 0);
}
else {
/* Inform remote sources */
for (i = 0; i < num_src; i++) {
mpr_slot slot = mpr_map_get_src_slot(map, i);
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot)));
i = mpr_map_send_state(map, i, MSG_MAPPED);
i = mpr_map_send_state(map, i, MSG_MAPPED, 0);
}
}

Expand Down Expand Up @@ -1996,7 +1992,7 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av,
if (mpr_local_dev_has_subscribers(dev)) {
trace_dev(dev, "informing subscribers (MAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_OUT);
mpr_map_send_state(map, -1, MSG_MAPPED);
mpr_map_send_state(map, -1, MSG_MAPPED, 0);
}
}
}
Expand All @@ -2006,7 +2002,7 @@ static int handler_mapped(const char *path, const char *types, lo_arg **av,
if (mpr_local_dev_has_subscribers(dev)) {
trace_dev(dev, "informing subscribers (MAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_IN);
mpr_map_send_state(map, -1, MSG_MAPPED);
mpr_map_send_state(map, -1, MSG_MAPPED, 0);
}
}
}
Expand All @@ -2031,9 +2027,16 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av,
trace_net(net);

map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND);
RETURN_ARG_UNLESS(map && MPR_MAP_ERROR != (mpr_map)map, 0);
if (!map || MPR_MAP_ERROR == (mpr_map)map) {
trace("Map not found! Forwarding map message to map handler...\n");
handler_map(path, types, av, ac, msg, user);
return 0;
}
RETURN_ARG_UNLESS(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE, 0);

/* remove MPR_STATUS_REMOVED and MPR_STATUS_EXPIRED status flags if they are set */
mpr_obj_set_status((mpr_obj)map, 0, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED);

props = mpr_msg_parse_props(ac, types, av);
TRACE_RETURN_UNLESS(props, 0, " ignoring /map/modify, no properties.\n");

Expand All @@ -2052,7 +2055,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av,
mpr_slot slot = mpr_map_get_dst_slot((mpr_map)map);
if (!mpr_slot_get_sig_if_local(slot)) {
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot)));
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0);
}
else {
int i;
Expand All @@ -2061,7 +2064,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av,
if (mpr_slot_get_sig_if_local(slot))
continue;
mpr_net_use_mesh(net, mpr_link_get_admin_addr(mpr_slot_get_link(slot)));
i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED);
i = mpr_map_send_state((mpr_map)map, i, MSG_MAPPED, 0);
}
}
}
Expand All @@ -2076,7 +2079,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av,
if (mpr_local_dev_has_subscribers(dev)) {
trace_dev(dev, "informing subscribers (MAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_OUT);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0);
}
}
}
Expand All @@ -2089,7 +2092,7 @@ static int handler_map_mod(const char *path, const char *types, lo_arg **av,
if (mpr_local_dev_has_subscribers(dev)) {
trace_dev(dev, "informing subscribers (MAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_IN);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_MAPPED, 0);
}
}
}
Expand All @@ -2111,12 +2114,21 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av,
mpr_slot slot;
lo_address addr;
mpr_sig sig;
int i, num_src;
int i, num_src, version = 0;

trace_net(net);
map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND | REMOVED);
map = (mpr_local_map)find_map(net, types, ac, av, MPR_LOC_ANY, FIND);
RETURN_ARG_UNLESS(map && MPR_MAP_ERROR != (mpr_map)map, 0);

if (types[ac-2] == 's' && types[ac-1] == 'i' && strcmp(&av[ac-2]->s, "@version") == 0) {
version = av[ac-1]->i;
if (version < mpr_obj_get_version((mpr_obj)map)) {
trace("ignoring stale /unmap message (version %d < %d)\n",
version, mpr_obj_get_version((mpr_obj)map));
return 0;
}
}

num_src = mpr_map_get_num_src((mpr_map)map);

for (i = 0; i < num_src; i++) {
Expand All @@ -2129,7 +2141,7 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av,
if (dev != last_dev) {
trace_dev(dev, "informing subscribers (UNMAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_OUT);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED, 0);
}
trace_dev(dev, "informing subscribers (SIGNAL)\n");
mpr_net_use_subscribers(net, dev, MPR_SIG);
Expand All @@ -2142,7 +2154,7 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av,
if (!inform_device_subscribers(net, dev)) {
trace_dev(dev, "informing subscribers (UNMAPPED)\n")
mpr_net_use_subscribers(net, dev, MPR_MAP_IN);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAPPED, 0);

trace_dev(dev, "informing subscribers (SIGNAL)\n");
mpr_net_use_subscribers(net, dev, MPR_SIG);
Expand All @@ -2160,30 +2172,30 @@ static int handler_unmap(const char *path, const char *types, lo_arg **av,
addr = mpr_slot_get_addr(slot);
if (addr) {
mpr_net_use_mesh(net, addr);
i = mpr_map_send_state((mpr_map)map, i, MSG_UNMAP);
i = mpr_map_send_state((mpr_map)map, i, MSG_UNMAP, version);
}
}
}
/* if destination is remote this guarantees all sources are local */
/* only send /unmap to destination the second time this message is received */
else if (!(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) {
else if ((mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_REMOVED)) {
mpr_net_use_mesh(net, addr);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAP);
mpr_map_send_state((mpr_map)map, -1, MSG_UNMAP, version);
}

/* Goal here is to ensure that map cleanup (including releasing related instances at the
* destingation device) occurs _after_ any cached signal updates have propagated across the map
* so that stray updates don't re-activate destination instances after tha map is removed.*/
if ( MPR_LOC_BOTH == mpr_map_get_locality((mpr_map)map)
|| !(mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_ACTIVE)) {
|| (mpr_obj_get_status((mpr_obj)map) & MPR_STATUS_REMOVED)) {
/* can remove immediately */
trace("removing map\n");
mpr_graph_remove_map(graph, (mpr_map)map, MPR_STATUS_REMOVED);
}
else {
/* remove ACTIVE flag, add REMOVED and EXPIRED flags for eventual cleanup */
trace("deferring map removal\n");
mpr_obj_set_status((mpr_obj)map, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED, MPR_STATUS_ACTIVE);
mpr_obj_set_status((mpr_obj)map, MPR_STATUS_REMOVED | MPR_STATUS_EXPIRED, 0);
}
return 0;
}
Expand Down
Loading

0 comments on commit 593bc65

Please sign in to comment.