Skip to content

Commit f74278a

Browse files
committed
Fix rsync and thread pool bugs.
+Mistakenly (of course, it was a bug) the returned value from rsync execution was being confused with the returned value from execvp call. The main problem was when rsync returned a code 12 (Error in rsync protocol data stream); in that case, the caller confused that error with ENOMEM (also with value 12), which led to terminate execution. +The thread pool wait function wasn't considering pending taks at the queue; also the poll function was holding and releasing the mutex more than it was needed, and the thread attributes are now globally initialized (thanks @ydahhrk for the code review). +Increment the number of threads at the internal pool to 10.
1 parent c648259 commit f74278a

File tree

3 files changed

+62
-38
lines changed

3 files changed

+62
-38
lines changed

src/internal_pool.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* related to the validation or server thread pool tasks) can be pushed here.
1111
*/
1212

13-
#define INTERNAL_POOL_MAX 5
13+
#define INTERNAL_POOL_MAX 10
1414

1515
struct thread_pool *pool;
1616

src/rsync/rsync.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ handle_child_thread(char **args, int fds[2][2])
258258
strerror(error));
259259

260260
/* https://stackoverflow.com/a/14493459/1735458 */
261-
exit(error);
261+
exit(-error);
262262
}
263263

264264
static int
@@ -445,8 +445,9 @@ do_rsync(struct rpki_uri *uri, bool is_ta, bool log_operation)
445445
if (WIFEXITED(child_status)) {
446446
/* Happy path (but also sad path sometimes). */
447447
error = WEXITSTATUS(child_status);
448-
pr_val_debug("Child terminated with error code %d.", error);
449-
if (error == ENOMEM)
448+
pr_val_debug("Child terminated with error code %d.",
449+
error);
450+
if (error == -ENOMEM)
450451
pr_enomem();
451452

452453
if (!error)

src/thread/thread_pool.c

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ task_queue_pull(struct task_queue *queue)
8888

8989
tmp = TAILQ_LAST(queue, task_queue);
9090
TAILQ_REMOVE(queue, tmp, next);
91+
pr_op_debug("Pulling a task from the pool");
9192

9293
return tmp;
9394
}
@@ -97,6 +98,7 @@ static void
9798
task_queue_push(struct task_queue *queue, struct task *task)
9899
{
99100
TAILQ_INSERT_HEAD(queue, task, next);
101+
pr_op_debug("Pushing a task to the pool");
100102
}
101103

102104
/*
@@ -115,13 +117,12 @@ tasks_poll(void *arg)
115117
/* The thread has started, send the signal */
116118
thread_pool_lock(pool);
117119
pthread_cond_signal(&(pool->waiting_cond));
118-
thread_pool_unlock(pool);
119120

120121
while (true) {
121-
thread_pool_lock(pool);
122-
123-
while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop)
122+
while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
123+
pr_op_debug("Thread waiting for work...");
124124
pthread_cond_wait(&(pool->working_cond), &(pool->lock));
125+
}
125126

126127
if (pool->stop)
127128
break;
@@ -144,8 +145,6 @@ tasks_poll(void *arg)
144145
if (!pool->stop && pool->working_count == 0 &&
145146
TAILQ_EMPTY(&(pool->queue)))
146147
pthread_cond_signal(&(pool->waiting_cond));
147-
148-
thread_pool_unlock(pool);
149148
}
150149

151150
/* The thread will cease to exist */
@@ -173,52 +172,61 @@ thread_pool_thread_wait_start(struct thread_pool *pool)
173172
clock_gettime(CLOCK_REALTIME, &tmout);
174173
tmout.tv_sec += 2;
175174

175+
thread_pool_lock(pool);
176176
error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
177177
&tmout);
178-
if (error)
178+
if (error) {
179+
thread_pool_unlock(pool);
179180
return pr_op_errno(error, "Waiting thread to start");
181+
}
182+
thread_pool_unlock(pool);
180183

181184
return 0;
182185
}
183186

184187
static int
185-
tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point)
188+
thread_pool_attr_create(pthread_attr_t *attr)
186189
{
187-
pthread_attr_t attr;
188-
pthread_t thread_id;
189190
int error;
190191

191-
memset(&thread_id, 0, sizeof(pthread_t));
192-
193-
error = pthread_attr_init(&attr);
192+
error = pthread_attr_init(attr);
194193
if (error)
195194
return pr_op_errno(error, "Calling pthread_attr_init()");
196195

197196
/* Use 2MB (default in most 64 bits systems) */
198-
error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
199-
if (error)
197+
error = pthread_attr_setstacksize(attr, 1024 * 1024 * 2);
198+
if (error) {
199+
pthread_attr_destroy(attr);
200200
return pr_op_errno(error,
201201
"Calling pthread_attr_setstacksize()");
202+
}
202203

203-
error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
204-
if (error)
204+
error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
205+
if (error) {
206+
pthread_attr_destroy(attr);
205207
return pr_op_errno(error,
206208
"Calling pthread_attr_setdetachstate()");
209+
}
207210

208-
thread_pool_lock(pool);
209-
error = pthread_create(&thread_id, &attr, entry_point, pool);
210-
pthread_attr_destroy(&attr);
211-
if (error) {
212-
thread_pool_unlock(pool);
211+
return 0;
212+
}
213+
214+
static int
215+
tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr,
216+
thread_pool_task_cb entry_point)
217+
{
218+
pthread_t thread_id;
219+
int error;
220+
221+
memset(&thread_id, 0, sizeof(pthread_t));
222+
223+
error = pthread_create(&thread_id, attr, entry_point, pool);
224+
if (error)
213225
return pr_op_errno(error, "Spawning pool thread");
214-
}
215226

216227
error = thread_pool_thread_wait_start(pool);
217-
if (error) {
218-
thread_pool_unlock(pool);
228+
if (error)
219229
return error;
220-
}
221-
thread_pool_unlock(pool);
222230

223231
return 0;
224232
}
@@ -227,6 +235,7 @@ int
227235
thread_pool_create(unsigned int threads, struct thread_pool **pool)
228236
{
229237
struct thread_pool *tmp;
238+
pthread_attr_t attr;
230239
unsigned int i;
231240
int error;
232241

@@ -260,19 +269,28 @@ thread_pool_create(unsigned int threads, struct thread_pool **pool)
260269
TAILQ_INIT(&(tmp->queue));
261270
tmp->stop = false;
262271
tmp->working_count = 0;
263-
tmp->thread_count = threads;
272+
tmp->thread_count = 0;
273+
274+
error = thread_pool_attr_create(&attr);
275+
if (error)
276+
goto free_waiting_cond;
264277

265278
for (i = 0; i < threads; i++) {
266-
error = tpool_thread_spawn(tmp, tasks_poll);
279+
error = tpool_thread_spawn(tmp, &attr, tasks_poll);
267280
if (error) {
281+
pthread_attr_destroy(&attr);
268282
thread_pool_destroy(tmp);
269283
return error;
270284
}
285+
tmp->thread_count++;
271286
pr_op_debug("Pool thread #%u spawned", i);
272287
}
288+
pthread_attr_destroy(&attr);
273289

274290
*pool = tmp;
275291
return 0;
292+
free_waiting_cond:
293+
pthread_cond_destroy(&(tmp->waiting_cond));
276294
free_working_cond:
277295
pthread_cond_destroy(&(tmp->working_cond));
278296
free_mutex:
@@ -326,15 +344,14 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
326344

327345
thread_pool_lock(pool);
328346
task_queue_push(&(pool->queue), task);
329-
thread_pool_unlock(pool);
330-
331347
/* There's work to do! */
332-
pthread_cond_broadcast(&(pool->working_cond));
348+
pthread_cond_signal(&(pool->working_cond));
349+
thread_pool_unlock(pool);
333350

334351
return 0;
335352
}
336353

337-
/* Are there available threads to work? */
354+
/* There are available threads to work? */
338355
bool
339356
thread_pool_avail_threads(struct thread_pool *pool)
340357
{
@@ -354,7 +371,13 @@ thread_pool_wait(struct thread_pool *pool)
354371
thread_pool_lock(pool);
355372
while (true) {
356373
pr_op_debug("Waiting all tasks from the pool to end");
357-
if ((!pool->stop && pool->working_count != 0) ||
374+
pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
375+
pr_op_debug("- Working count: %u", pool->working_count);
376+
pr_op_debug("- Thread count: %u", pool->thread_count);
377+
pr_op_debug("- Empty queue: %s",
378+
TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
379+
if ((!pool->stop &&
380+
(pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) ||
358381
(pool->stop && pool->thread_count != 0))
359382
pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
360383
else

0 commit comments

Comments
 (0)