Skip to content

Commit

Permalink
Do not block validate if there are outstanding watches
Browse files Browse the repository at this point in the history
This can cause deadlock and even though the existing tests
pass, we lose one of the threads. It is a complexity for a
feature that for as far as we know is not used.
  • Loading branch information
carlgsmith authored and blairsteven committed Dec 14, 2023
1 parent 17aed4c commit 6270466
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 54 deletions.
21 changes: 0 additions & 21 deletions apteryx.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ static rpc_instance rpc = NULL; /* RPC Service */
static bool bound = false; /* Do we have a listen socket open */
static bool have_callbacks = false; /* Have we ever registered any callbacks */

static pthread_mutex_t pending_watches_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t no_pending_watches = PTHREAD_COND_INITIALIZER;
static int pending_watch_count = 0;

/* Callback */
typedef struct _cb_t
{
Expand Down Expand Up @@ -281,9 +277,6 @@ handle_watch (rpc_message msg)
return true;
}

pthread_mutex_lock (&pending_watches_lock);
++pending_watch_count;
pthread_mutex_unlock (&pending_watches_lock);
if (cb.flags == 0)
{
path = rpc_msg_decode_string (msg);
Expand Down Expand Up @@ -330,10 +323,6 @@ handle_watch (rpc_message msg)
((void*(*)(const GNode*)) cb.fn) (root);
}
}
pthread_mutex_lock (&pending_watches_lock);
if (--pending_watch_count == 0)
pthread_cond_signal(&no_pending_watches);
pthread_mutex_unlock (&pending_watches_lock);
rpc_msg_reset (msg);
return true;
}
Expand All @@ -356,16 +345,6 @@ handle_validate (rpc_message msg)

DEBUG ("VALIDATE CB \"%s\" = \"%s\" (0x%"PRIx64")\n", path, value, ref);

/* We want to wait for all pending watches to be processed */
pthread_mutex_lock (&pending_watches_lock);
if (pending_watch_count)
{
pthread_cond_wait (&no_pending_watches, &pending_watches_lock);
pthread_mutex_unlock (&pending_watches_lock);
}
else
pthread_mutex_unlock (&pending_watches_lock);

/* Process callback */
result = (uint32_t) (size_t) call_callback (ref, path, value);
DEBUG (" = %d\n", result);
Expand Down
62 changes: 29 additions & 33 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2308,58 +2308,54 @@ test_validate_wildcard_internal()
CU_ASSERT (assert_apteryx_empty ());
}

static int already_set = 0;
static int failed = 0;

static int
test_validate_thread_client (void *data)
{
const char *path = TEST_PATH"/entity/zones/private/state";

if(!apteryx_set_string (path, NULL, (char*)data))
failed = errno;
const char *value = (char*)data;
bool res = apteryx_set_string (path, NULL, value);
if (g_strcmp0 (value, "down") == 0)
{
CU_ASSERT (res == false && errno == -EPERM);
}
else
{
CU_ASSERT (res);
}
return 0;
}

int
test_validate_conflicting_callback(const char *path, const char *value)
{
return !already_set ? 0 : -EPERM;
}

static bool
test_validate_test_watch_callback (const char *path, const char *value)
{
/* Block long enough to serialise the 2nd validate, avoiding RPC timeout */
usleep (RPC_TIMEOUT_US - 10000);
already_set++;
return true;
if (g_strcmp0 (value, "down") == 0)
return -EPERM;
return 0;
}

void
test_validate_conflicting ()
{
pthread_t client1, client2;
pthread_t clients[32];
const char *path = TEST_PATH"/entity/zones/private/state";

failed = 0;
already_set = 0;

_path = _value = NULL;
char *value;

CU_ASSERT (apteryx_validate (path, test_validate_conflicting_callback));
CU_ASSERT (apteryx_watch (path, test_validate_test_watch_callback));
usleep (TEST_SLEEP_TIMEOUT);
pthread_create (&client1, NULL, (void *) &test_validate_thread_client, "up");
pthread_create (&client2, NULL, (void *) &test_validate_thread_client, "down");
pthread_join (client1, NULL);
pthread_join (client2, NULL);
CU_ASSERT (failed == -EPERM || failed == -ETIMEDOUT);
usleep (TEST_SLEEP_TIMEOUT);
for (int i=0; i<32; i++)
{
pthread_create (&clients[i], NULL, (void *) &test_validate_thread_client, (i % 2 == 0) ? "up" : "down");
}
for (int i=0; i<32; i++)
{
pthread_join (clients[i], NULL);
}
CU_ASSERT ((value = apteryx_get (path)) != NULL);
CU_ASSERT (value && strcmp (value, "up") == 0);
free ((void *) value);

CU_ASSERT (apteryx_unvalidate (path, test_validate_conflicting_callback));
CU_ASSERT (apteryx_unwatch (path, test_validate_test_watch_callback));
apteryx_set_string (path, NULL, NULL);
apteryx_set (path, NULL);
CU_ASSERT (assert_apteryx_empty ());
}

Expand Down Expand Up @@ -2391,8 +2387,7 @@ test_validate_tree ()
static bool
test_set_from_watch_cb (const char *path, const char *value)
{
CU_ASSERT (apteryx_set_string (TEST_PATH"/entity/zones/public", "name", "public") == false);
CU_ASSERT (errno == -ETIMEDOUT);
CU_ASSERT (apteryx_set_string (TEST_PATH"/entity/zones/public", "name", "public"));
return true;
}

Expand All @@ -2406,6 +2401,7 @@ test_validate_from_watch_callback ()
CU_ASSERT (apteryx_unvalidate (TEST_PATH"/entity/zones/public/*", test_validate_callback));
CU_ASSERT (apteryx_unwatch (TEST_PATH"/entity/zones/private/*", test_set_from_watch_cb));
CU_ASSERT (apteryx_set_string (TEST_PATH"/entity/zones/private", "link", NULL));
CU_ASSERT (apteryx_prune (TEST_PATH"/entity/zones"));
CU_ASSERT (assert_apteryx_empty ());
}

Expand Down

0 comments on commit 6270466

Please sign in to comment.