diff --git a/ChangeLog b/ChangeLog index 6f0913e..d37c9b8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2019-11-03 Ross Johnson + + * NOTICE: Remove third party code acknowledgments because files have been + removed from distro. + 2018-08-19 Ross Johnson * context.h (__PTW32_PROCPTR): Added missing '__' prefix for v3. diff --git a/NOTICE b/NOTICE index d73542d..3b99d7b 100644 --- a/NOTICE +++ b/NOTICE @@ -4,26 +4,4 @@ Copyright 1999-2018, Pthreads4w contributors This product includes software developed through the colaborative effort of several individuals, each of whom is listed in the file -CONTRIBUTORS included with this software. - -The following files are not covered under the Copyrights -listed above: - - [1] tests/rwlock7.c - [1] tests/rwlock7_1.c - [1] tests/rwlock8.c - [1] tests/rwlock8_1.c - [2] tests/threestage.c - -[1] The file tests/rwlock7.c and those similarly named are derived from -code written by Dave Butenhof for his book 'Programming With POSIX(R) -Threads'. The original code was obtained by free download from his -website http://home.earthlink.net/~anneart/family/Threads/source.html - -[2] The file tests/threestage.c is taken directly from examples in the -book "Windows System Programming, Edition 4" by Johnson (John) Hart -Session 6, Chapter 10. ThreeStage.c -Several required additional header and source files from the -book examples have been included inline to simplify compilation. -The only modification to the code has been to provide default -values when run without arguments. +CONTRIBUTORS included with this software. \ No newline at end of file diff --git a/_ptw32.h b/_ptw32.h index e638027..5821f94 100644 --- a/_ptw32.h +++ b/_ptw32.h @@ -43,10 +43,10 @@ */ #define __PTW32_VERSION_MAJOR 3 #define __PTW32_VERSION_MINOR 0 -#define __PTW32_VERSION_MICRO 1 +#define __PTW32_VERSION_MICRO 2 #define __PTW32_VERION_BUILD 0 -#define __PTW32_VERSION 3,0,0,1 -#define __PTW32_VERSION_STRING "3, 0, 1, 0\0" +#define __PTW32_VERSION 3,0,2,0 +#define __PTW32_VERSION_STRING "3, 0, 2, 0\0" #if defined(__GNUC__) # pragma GCC system_header diff --git a/tests/ChangeLog b/tests/ChangeLog index c4895d0..beda9e3 100644 --- a/tests/ChangeLog +++ b/tests/ChangeLog @@ -1,3 +1,11 @@ +2018-08-19 Ross Johnson + + * threestage.c: Delete. + * rwlock7.c: Delete. + * rwlock8.c: Delete. + * rwlock7_1.c: Delete. + * rwlock8_1.c: Delete. + 2018-08-10 Ross Johnson * Makefile (clean): remove *.idb files. diff --git a/tests/common.mk b/tests/common.mk index f6cf9d1..731713d 100644 --- a/tests/common.mk +++ b/tests/common.mk @@ -40,7 +40,7 @@ ALL_KNOWN_TESTS = \ robust1 robust2 robust3 robust4 robust5 \ rwlock1 rwlock2 rwlock3 rwlock4 \ rwlock2_t rwlock3_t rwlock4_t rwlock5_t rwlock6_t rwlock6_t2 \ - rwlock5 rwlock6 rwlock7 rwlock7_1 rwlock8 rwlock8_1 \ + rwlock5 rwlock6 \ self1 self2 \ semaphore1 semaphore2 semaphore3 \ semaphore4 semaphore4t semaphore5 \ diff --git a/tests/runorder.mk b/tests/runorder.mk index 215d7e7..63a428b 100644 --- a/tests/runorder.mk +++ b/tests/runorder.mk @@ -113,7 +113,7 @@ once3.pass: once2.pass once4.pass: once3.pass priority1.pass: join1.pass priority2.pass: priority1.pass barrier3.pass -reinit1.pass: rwlock7.pass +reinit1.pass: rwlock6.pass reuse1.pass: create3.pass reuse2.pass: reuse1.pass robust1.pass: mutex8r.pass @@ -127,13 +127,11 @@ rwlock3.pass: rwlock2.pass join2.pass rwlock4.pass: rwlock3.pass rwlock5.pass: rwlock4.pass rwlock6.pass: rwlock5.pass -rwlock7.pass: rwlock6.pass -rwlock8.pass: rwlock7.pass rwlock2_t.pass: rwlock2.pass -rwlock3_t.pass: rwlock2_t.pass -rwlock4_t.pass: rwlock3_t.pass -rwlock5_t.pass: rwlock4_t.pass -rwlock6_t.pass: rwlock5_t.pass +rwlock3_t.pass: rwlock3.pass rwlock2_t.pass +rwlock4_t.pass: rwlock4.pass rwlock3_t.pass +rwlock5_t.pass: rwlock5.pass rwlock4_t.pass +rwlock6_t.pass: rwlock6.pass rwlock5_t.pass rwlock6_t2.pass: rwlock6_t.pass self1.pass: sizes.pass self2.pass: self1.pass equal1.pass create1.pass diff --git a/tests/rwlock7.c b/tests/rwlock7.c deleted file mode 100644 index 9d58f6e..0000000 --- a/tests/rwlock7.c +++ /dev/null @@ -1,199 +0,0 @@ -/* - * rwlock7.c - * - * Hammer on a bunch of rwlocks to test robustness and fairness. - * Printed stats should be roughly even for each thread. - */ - -#include "test.h" -#include - -#ifdef __GNUC__ -#include -#endif - -#define THREADS 5 -#define DATASIZE 7 -#define ITERATIONS 1000000 - -/* - * Keep statistics for each thread. - */ -typedef struct thread_tag { - int thread_num; - pthread_t thread_id; - int updates; - int reads; - int changed; - int seed; -} thread_t; - -/* - * Read-write lock and shared data - */ -typedef struct data_tag { - pthread_rwlock_t lock; - int data; - int updates; -} data_t; - -static thread_t threads[THREADS]; -static data_t data[DATASIZE]; - -/* - * Thread start routine that uses read-write locks - */ -void *thread_routine (void *arg) -{ - thread_t *self = (thread_t*)arg; - int iteration; - int element = 0; - int seed = self->seed; - int interval = 1 + rand_r (&seed) % 71; - - self->changed = 0; - - for (iteration = 0; iteration < ITERATIONS; iteration++) - { - if (iteration % (ITERATIONS / 10) == 0) - { - putchar('.'); - fflush(stdout); - } - /* - * Each "self->interval" iterations, perform an - * update operation (write lock instead of read - * lock). - */ - if ((iteration % interval) == 0) - { - assert(pthread_rwlock_wrlock (&data[element].lock) == 0); - data[element].data = self->thread_num; - data[element].updates++; - self->updates++; - interval = 1 + rand_r (&seed) % 71; - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } else { - /* - * Look at the current data element to see whether - * the current thread last updated it. Count the - * times, to report later. - */ - assert(pthread_rwlock_rdlock (&data[element].lock) == 0); - - self->reads++; - - if (data[element].data != self->thread_num) - { - self->changed++; - interval = 1 + self->changed % 71; - } - - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } - - element = (element + 1) % DATASIZE; - - } - - return NULL; -} - -int -main (int argc, char *argv[]) -{ - int count; - int data_count; - int thread_updates = 0; - int data_updates = 0; - int seed = 1; - - __PTW32_STRUCT_TIMEB currSysTime1; - __PTW32_STRUCT_TIMEB currSysTime2; - - /* - * Initialize the shared data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data[data_count].data = 0; - data[data_count].updates = 0; - - assert(pthread_rwlock_init (&data[data_count].lock, NULL) == 0); - } - - __PTW32_FTIME(&currSysTime1); - - /* - * Create THREADS threads to access shared data. - */ - for (count = 0; count < THREADS; count++) - { - threads[count].thread_num = count; - threads[count].updates = 0; - threads[count].reads = 0; - threads[count].seed = 1 + rand_r (&seed) % 71; - - assert(pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)(size_t)&threads[count]) == 0); - } - - /* - * Wait for all threads to complete, and collect - * statistics. - */ - for (count = 0; count < THREADS; count++) - { - assert(pthread_join (threads[count].thread_id, NULL) == 0); - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - if (threads[count].changed > 0) - { - printf ("Thread %d found changed elements %d times\n", - count, threads[count].changed); - } - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - thread_updates += threads[count].updates; - printf ("%02d: seed %d, updates %d, reads %d\n", - count, threads[count].seed, - threads[count].updates, threads[count].reads); - } - - putchar('\n'); - fflush(stdout); - - /* - * Collect statistics for the data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data_updates += data[data_count].updates; - printf ("data %02d: value %d, %d updates\n", - data_count, data[data_count].data, data[data_count].updates); - assert(pthread_rwlock_destroy (&data[data_count].lock) == 0); - } - - printf ("%d thread updates, %d data updates\n", - thread_updates, data_updates); - - __PTW32_FTIME(&currSysTime2); - - printf( "\nstart: %ld/%d, stop: %ld/%d, duration:%ld\n", - (long)currSysTime1.time,currSysTime1.millitm, - (long)currSysTime2.time,currSysTime2.millitm, - ((long)((currSysTime2.time*1000+currSysTime2.millitm) - - (currSysTime1.time*1000+currSysTime1.millitm)))); - - return 0; -} diff --git a/tests/rwlock7_1.c b/tests/rwlock7_1.c deleted file mode 100644 index 4e2ea49..0000000 --- a/tests/rwlock7_1.c +++ /dev/null @@ -1,222 +0,0 @@ -/* - * rwlock7_1.c - * - * Hammer on a bunch of rwlocks to test robustness and fairness. - * Printed stats should be roughly even for each thread. - * - * Use CPU affinity to compare against non-affinity rwlock7.c - */ - -#include "test.h" -#include - -#ifdef __GNUC__ -#include -#endif - -#define THREADS 5 -#define DATASIZE 7 -#define ITERATIONS 1000000 - -/* - * Keep statistics for each thread. - */ -typedef struct thread_tag { - int thread_num; - pthread_t thread_id; - cpu_set_t threadCpus; - int updates; - int reads; - int changed; - int seed; -} thread_t; - -/* - * Read-write lock and shared data - */ -typedef struct data_tag { - pthread_rwlock_t lock; - int data; - int updates; -} data_t; - -static thread_t threads[THREADS]; -static data_t data[DATASIZE]; -static cpu_set_t processCpus; -static int cpu_count; - -/* - * Thread start routine that uses read-write locks - */ -void *thread_routine (void *arg) -{ - thread_t *self = (thread_t*)arg; - int iteration; - int element = 0; - int seed = self->seed; - int interval = 1 + rand_r (&seed) % 71; - - /* - * Set each thread to a fixed (different if possible) cpu. - */ - CPU_ZERO(&self->threadCpus); - CPU_SET(self->thread_num%cpu_count, &self->threadCpus); - assert(pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &self->threadCpus) == 0); - - self->changed = 0; - - for (iteration = 0; iteration < ITERATIONS; iteration++) - { - if (iteration % (ITERATIONS / 10) == 0) - { - putchar('.'); - fflush(stdout); - } - /* - * Each "self->interval" iterations, perform an - * update operation (write lock instead of read - * lock). - */ - if ((iteration % interval) == 0) - { - assert(pthread_rwlock_wrlock (&data[element].lock) == 0); - data[element].data = self->thread_num; - data[element].updates++; - self->updates++; - interval = 1 + rand_r (&seed) % 71; - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } else { - /* - * Look at the current data element to see whether - * the current thread last updated it. Count the - * times, to report later. - */ - assert(pthread_rwlock_rdlock (&data[element].lock) == 0); - - self->reads++; - - if (data[element].data != self->thread_num) - { - self->changed++; - interval = 1 + self->changed % 71; - } - - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } - - element = (element + 1) % DATASIZE; - - } - - return NULL; -} - -int -main (int argc, char *argv[]) -{ - int count; - int data_count; - int thread_updates = 0; - int data_updates = 0; - int seed = 1; - pthread_t self = pthread_self(); - __PTW32_STRUCT_TIMEB currSysTime1; - __PTW32_STRUCT_TIMEB currSysTime2; - - if (pthread_getaffinity_np(self, sizeof(cpu_set_t), &processCpus) == ENOSYS) - { - printf("pthread_get/set_affinity_np API not supported for this platform: skipping test."); - return 0; - } - - assert(pthread_getaffinity_np(self, sizeof(cpu_set_t), &processCpus) == 0); - assert((cpu_count = CPU_COUNT(&processCpus)) > 0); - printf("CPUs: %d\n", cpu_count); - - /* - * Initialize the shared data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data[data_count].data = 0; - data[data_count].updates = 0; - - assert(pthread_rwlock_init (&data[data_count].lock, NULL) == 0); - } - - __PTW32_FTIME(&currSysTime1); - - /* - * Create THREADS threads to access shared data. - */ - for (count = 0; count < THREADS; count++) - { - threads[count].thread_num = count; - threads[count].updates = 0; - threads[count].reads = 0; - threads[count].seed = 1 + rand_r (&seed) % 71; - - assert(pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)(size_t)&threads[count]) == 0); - } - - /* - * Wait for all threads to complete, and collect - * statistics. - */ - for (count = 0; count < THREADS; count++) - { - assert(pthread_join (threads[count].thread_id, NULL) == 0); - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - if (threads[count].changed > 0) - { - printf ("Thread %d found changed elements %d times\n", - count, threads[count].changed); - } - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - thread_updates += threads[count].updates; - printf ("%02d: seed %d, updates %d, reads %d, cpu %d\n", - count, threads[count].seed, - threads[count].updates, threads[count].reads, - threads[count].thread_num%cpu_count); - } - - putchar('\n'); - fflush(stdout); - - /* - * Collect statistics for the data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data_updates += data[data_count].updates; - printf ("data %02d: value %d, %d updates\n", - data_count, data[data_count].data, data[data_count].updates); - assert(pthread_rwlock_destroy (&data[data_count].lock) == 0); - } - - printf ("%d thread updates, %d data updates\n", - thread_updates, data_updates); - - __PTW32_FTIME(&currSysTime2); - - printf( "\nstart: %ld/%d, stop: %ld/%d, duration:%ld\n", - (long)currSysTime1.time,currSysTime1.millitm, - (long)currSysTime2.time,currSysTime2.millitm, - ((long)((currSysTime2.time*1000+currSysTime2.millitm) - - (currSysTime1.time*1000+currSysTime1.millitm)))); - - return 0; -} diff --git a/tests/rwlock8.c b/tests/rwlock8.c deleted file mode 100644 index 301e1ec..0000000 --- a/tests/rwlock8.c +++ /dev/null @@ -1,205 +0,0 @@ -/* - * rwlock8.c - * - * Hammer on a bunch of rwlocks to test robustness and fairness. - * Printed stats should be roughly even for each thread. - * - * Yield during each access to exercise lock contention code paths - * more than rwlock7.c does (particularly on uni-processor systems). - */ - -#include "test.h" -#include - -#ifdef __GNUC__ -#include -#endif - -#define THREADS 5 -#define DATASIZE 7 -#define ITERATIONS 100000 - -/* - * Keep statistics for each thread. - */ -typedef struct thread_tag { - int thread_num; - pthread_t thread_id; - int updates; - int reads; - int changed; - int seed; -} thread_t; - -/* - * Read-write lock and shared data - */ -typedef struct data_tag { - pthread_rwlock_t lock; - int data; - int updates; -} data_t; - -static thread_t threads[THREADS]; -static data_t data[DATASIZE]; - -/* - * Thread start routine that uses read-write locks - */ -void *thread_routine (void *arg) -{ - thread_t *self = (thread_t*)arg; - int iteration; - int element = 0; - int seed = self->seed; - int interval = 1 + rand_r (&seed) % 71; - - self->changed = 0; - - for (iteration = 0; iteration < ITERATIONS; iteration++) - { - if (iteration % (ITERATIONS / 10) == 0) - { - putchar('.'); - fflush(stdout); - } - /* - * Each "self->interval" iterations, perform an - * update operation (write lock instead of read - * lock). - */ - if ((iteration % interval) == 0) - { - assert(pthread_rwlock_wrlock (&data[element].lock) == 0); - data[element].data = self->thread_num; - data[element].updates++; - self->updates++; - interval = 1 + rand_r (&seed) % 71; - sched_yield(); - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } else { - /* - * Look at the current data element to see whether - * the current thread last updated it. Count the - * times, to report later. - */ - assert(pthread_rwlock_rdlock (&data[element].lock) == 0); - - self->reads++; - - if (data[element].data != self->thread_num) - { - self->changed++; - interval = 1 + self->changed % 71; - } - - sched_yield(); - - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } - - element = (element + 1) % DATASIZE; - - } - - return NULL; -} - -int -main (int argc, char *argv[]) -{ - int count; - int data_count; - int thread_updates = 0; - int data_updates = 0; - int seed = 1; - - __PTW32_STRUCT_TIMEB currSysTime1; - __PTW32_STRUCT_TIMEB currSysTime2; - - /* - * Initialize the shared data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data[data_count].data = 0; - data[data_count].updates = 0; - - assert(pthread_rwlock_init (&data[data_count].lock, NULL) == 0); - } - - __PTW32_FTIME(&currSysTime1); - - /* - * Create THREADS threads to access shared data. - */ - for (count = 0; count < THREADS; count++) - { - threads[count].thread_num = count; - threads[count].updates = 0; - threads[count].reads = 0; - threads[count].seed = 1 + rand_r (&seed) % 71; - - assert(pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)(size_t)&threads[count]) == 0); - } - - /* - * Wait for all threads to complete, and collect - * statistics. - */ - for (count = 0; count < THREADS; count++) - { - assert(pthread_join (threads[count].thread_id, NULL) == 0); - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - if (threads[count].changed > 0) - { - printf ("Thread %d found changed elements %d times\n", - count, threads[count].changed); - } - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - thread_updates += threads[count].updates; - printf ("%02d: seed %d, updates %d, reads %d\n", - count, threads[count].seed, - threads[count].updates, threads[count].reads); - } - - putchar('\n'); - fflush(stdout); - - /* - * Collect statistics for the data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data_updates += data[data_count].updates; - printf ("data %02d: value %d, %d updates\n", - data_count, data[data_count].data, data[data_count].updates); - assert(pthread_rwlock_destroy (&data[data_count].lock) == 0); - } - - printf ("%d thread updates, %d data updates\n", - thread_updates, data_updates); - - __PTW32_FTIME(&currSysTime2); - - printf( "\nstart: %ld/%d, stop: %ld/%d, duration:%ld\n", - (long)currSysTime1.time,currSysTime1.millitm, - (long)currSysTime2.time,currSysTime2.millitm, - ((long)((currSysTime2.time*1000+currSysTime2.millitm) - - (currSysTime1.time*1000+currSysTime1.millitm)))); - - return 0; -} diff --git a/tests/rwlock8_1.c b/tests/rwlock8_1.c deleted file mode 100644 index a85f37f..0000000 --- a/tests/rwlock8_1.c +++ /dev/null @@ -1,228 +0,0 @@ -/* - * rwlock8.c - * - * Hammer on a bunch of rwlocks to test robustness and fairness. - * Printed stats should be roughly even for each thread. - * - * Yield during each access to exercise lock contention code paths - * more than rwlock7.c does (particularly on uni-processor systems). - * - * Use CPU affinity to compare against non-affinity rwlock8.c - */ - -#include "test.h" -#include - -#ifdef __GNUC__ -#include -#endif - -#define THREADS 5 -#define DATASIZE 7 -#define ITERATIONS 100000 - -/* - * Keep statistics for each thread. - */ -typedef struct thread_tag { - int thread_num; - pthread_t thread_id; - cpu_set_t threadCpus; - int updates; - int reads; - int changed; - int seed; -} thread_t; - -/* - * Read-write lock and shared data - */ -typedef struct data_tag { - pthread_rwlock_t lock; - int data; - int updates; -} data_t; - -static thread_t threads[THREADS]; -static data_t data[DATASIZE]; -static cpu_set_t processCpus; -static int cpu_count; - -/* - * Thread start routine that uses read-write locks - */ -void *thread_routine (void *arg) -{ - thread_t *self = (thread_t*)arg; - int iteration; - int element = 0; - int seed = self->seed; - int interval = 1 + rand_r (&seed) % 71; - - /* - * Set each thread to a fixed (different if possible) cpu. - */ - CPU_ZERO(&self->threadCpus); - CPU_SET(self->thread_num%cpu_count, &self->threadCpus); - assert(pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &self->threadCpus) == 0); - - self->changed = 0; - - for (iteration = 0; iteration < ITERATIONS; iteration++) - { - if (iteration % (ITERATIONS / 10) == 0) - { - putchar('.'); - fflush(stdout); - } - /* - * Each "self->interval" iterations, perform an - * update operation (write lock instead of read - * lock). - */ - if ((iteration % interval) == 0) - { - assert(pthread_rwlock_wrlock (&data[element].lock) == 0); - data[element].data = self->thread_num; - data[element].updates++; - self->updates++; - interval = 1 + rand_r (&seed) % 71; - sched_yield(); - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } else { - /* - * Look at the current data element to see whether - * the current thread last updated it. Count the - * times, to report later. - */ - assert(pthread_rwlock_rdlock (&data[element].lock) == 0); - - self->reads++; - - if (data[element].data != self->thread_num) - { - self->changed++; - interval = 1 + self->changed % 71; - } - - sched_yield(); - - assert(pthread_rwlock_unlock (&data[element].lock) == 0); - } - - element = (element + 1) % DATASIZE; - - } - - return NULL; -} - -int -main (int argc, char *argv[]) -{ - int count; - int data_count; - int thread_updates = 0; - int data_updates = 0; - int seed = 1; - pthread_t self = pthread_self(); - __PTW32_STRUCT_TIMEB currSysTime1; - __PTW32_STRUCT_TIMEB currSysTime2; - - if (pthread_getaffinity_np(self, sizeof(cpu_set_t), &processCpus) == ENOSYS) - { - printf("pthread_get/set_affinity_np API not supported for this platform: skipping test."); - return 0; - } - - assert(pthread_getaffinity_np(self, sizeof(cpu_set_t), &processCpus) == 0); - assert((cpu_count = CPU_COUNT(&processCpus)) > 0); - printf("CPUs: %d\n", cpu_count); - - /* - * Initialize the shared data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data[data_count].data = 0; - data[data_count].updates = 0; - - assert(pthread_rwlock_init (&data[data_count].lock, NULL) == 0); - } - - __PTW32_FTIME(&currSysTime1); - - /* - * Create THREADS threads to access shared data. - */ - for (count = 0; count < THREADS; count++) - { - threads[count].thread_num = count; - threads[count].updates = 0; - threads[count].reads = 0; - threads[count].seed = 1 + rand_r (&seed) % 71; - - assert(pthread_create (&threads[count].thread_id, - NULL, thread_routine, (void*)(size_t)&threads[count]) == 0); - } - - /* - * Wait for all threads to complete, and collect - * statistics. - */ - for (count = 0; count < THREADS; count++) - { - assert(pthread_join (threads[count].thread_id, NULL) == 0); - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - if (threads[count].changed > 0) - { - printf ("Thread %d found changed elements %d times\n", - count, threads[count].changed); - } - } - - putchar('\n'); - fflush(stdout); - - for (count = 0; count < THREADS; count++) - { - thread_updates += threads[count].updates; - printf ("%02d: seed %d, updates %d, reads %d, cpu %d\n", - count, threads[count].seed, - threads[count].updates, threads[count].reads, - threads[count].thread_num%cpu_count); - } - - putchar('\n'); - fflush(stdout); - - /* - * Collect statistics for the data. - */ - for (data_count = 0; data_count < DATASIZE; data_count++) - { - data_updates += data[data_count].updates; - printf ("data %02d: value %d, %d updates\n", - data_count, data[data_count].data, data[data_count].updates); - assert(pthread_rwlock_destroy (&data[data_count].lock) == 0); - } - - printf ("%d thread updates, %d data updates\n", - thread_updates, data_updates); - - __PTW32_FTIME(&currSysTime2); - - printf( "\nstart: %ld/%d, stop: %ld/%d, duration:%ld\n", - (long)currSysTime1.time,currSysTime1.millitm, - (long)currSysTime2.time,currSysTime2.millitm, - ((long)((currSysTime2.time*1000+currSysTime2.millitm) - - (currSysTime1.time*1000+currSysTime1.millitm)))); - - return 0; -} diff --git a/tests/threestage.c b/tests/threestage.c deleted file mode 100644 index cd607f0..0000000 --- a/tests/threestage.c +++ /dev/null @@ -1,583 +0,0 @@ -/* - This source code is taken directly from examples in the book - Windows System Programming, Edition 4 by Johnson (John) Hart - - Session 6, Chapter 10. ThreeStage.c - - Several required additional header and source files from the - book examples have been included inline to simplify building. - The only modification to the code has been to provide default - values when run without arguments. - - Three-stage Producer Consumer system - Other files required in this project, either directly or - in the form of libraries (DLLs are preferable) - QueueObj.c (inlined here) - Messages.c (inlined here) - - Usage: ThreeStage npc goal [display] - start up "npc" paired producer and consumer threads. - Display messages if "display" is non-zero - Each producer must produce a total of - "goal" messages, where each message is tagged - with the consumer that should receive it - Messages are sent to a "transmitter thread" which performs - additional processing before sending message groups to the - "receiver thread." Finally, the receiver thread sends - the messages to the consumer threads. - - Transmitter: Receive messages one at a time from producers, - create a transmission message of up to "TBLOCK_SIZE" messages - to be sent to the Receiver. (this could be a network xfer - Receiver: Take message blocks sent by the Transmitter - and send the individual messages to the designated consumer - */ - -/* Suppress warning re use of ctime() */ -#define _CRT_SECURE_NO_WARNINGS 1 - -#include "test.h" -#define sleep(i) Sleep(i*1000) -#ifndef max -#define max(a,b) ((a) > (b) ? (a) : (b)) -#endif - -#define DATA_SIZE 256 -typedef struct msg_block_tag { /* Message block */ - pthread_mutex_t mguard; /* Mutex for the message block */ - pthread_cond_t mconsumed; /* Event: Message consumed; */ - /* Produce a new one or stop */ - pthread_cond_t mready; /* Event: Message ready */ - /* - * Note: the mutex and events are not used by some programs, such - * as Program 10-3, 4, 5 (the multi-stage pipeline) as the messages - * are part of a protected queue - */ - volatile unsigned int source; /* Creating producer identity */ - volatile unsigned int destination;/* Identity of receiving thread*/ - - volatile unsigned int f_consumed; - volatile unsigned int f_ready; - volatile unsigned int f_stop; - /* Consumed & ready state flags, stop flag */ - volatile unsigned int sequence; /* Message block sequence number */ - time_t timestamp; - unsigned int checksum; /* Message contents checksum */ - unsigned int data[DATA_SIZE]; /* Message Contents */ -} msg_block_t; - -void message_fill (msg_block_t *, unsigned int, unsigned int, unsigned int); -void message_display (msg_block_t *); - -#define CV_TIMEOUT 5 /* tunable parameter for the CV model */ - - -/* - Definitions of a synchronized, general bounded queue structure. - Queues are implemented as arrays with indices to youngest - and oldest messages, with wrap around. - Each queue also contains a guard mutex and - "not empty" and "not full" condition variables. - Finally, there is a pointer to an array of messages of - arbitrary type - */ - -typedef struct queue_tag { /* General purpose queue */ - pthread_mutex_t q_guard;/* Guard the message block */ - pthread_cond_t q_ne; /* Event: Queue is not empty */ - pthread_cond_t q_nf; /* Event: Queue is not full */ - /* These two events are manual-reset for the broadcast model - * and auto-reset for the signal model */ - volatile unsigned int q_size; /* Queue max size size */ - volatile unsigned int q_first; /* Index of oldest message */ - volatile unsigned int q_last; /* Index of youngest msg */ - volatile unsigned int q_destroyed;/* Q receiver has terminated */ - void * msg_array; /* array of q_size messages */ -} queue_t; - -/* Queue management functions */ -unsigned int q_initialize (queue_t *, unsigned int, unsigned int); -unsigned int q_destroy (queue_t *); -unsigned int q_destroyed (queue_t *); -unsigned int q_empty (queue_t *); -unsigned int q_full (queue_t *); -unsigned int q_get (queue_t *, void *, unsigned int, unsigned int); -unsigned int q_put (queue_t *, void *, unsigned int, unsigned int); -unsigned int q_remove (queue_t *, void *, unsigned int); -unsigned int q_insert (queue_t *, void *, unsigned int); - -#include -#include -#include - -#define DELAY_COUNT 1000 -#define MAX_THREADS 1024 - -/* Queue lengths and blocking factors. These numbers are arbitrary and */ -/* can be adjusted for performance tuning. The current values are */ -/* not well balanced. */ - -#define TBLOCK_SIZE 5 /* Transmitter combines this many messages at at time */ -#define Q_TIMEOUT 2000 /* Transmiter and receiver timeout (ms) waiting for messages */ -//#define Q_TIMEOUT INFINITE -#define MAX_RETRY 5 /* Number of q_get retries before quitting */ -#define P2T_QLEN 10 /* Producer to Transmitter queue length */ -#define T2R_QLEN 4 /* Transmitter to Receiver queue length */ -#define R2C_QLEN 4 /* Receiver to Consumer queue length - there is one - * such queue for each consumer */ - -void * producer (void *); -void * consumer (void *); -void * transmitter (void *); -void * receiver (void *); - - -typedef struct _THARG { - volatile unsigned int thread_number; - volatile unsigned int work_goal; /* used by producers */ - volatile unsigned int work_done; /* Used by producers and consumers */ -} THARG; - - -/* Grouped messages sent by the transmitter to receiver */ -typedef struct T2R_MSG_TYPEag { - volatile unsigned int num_msgs; /* Number of messages contained */ - msg_block_t messages [TBLOCK_SIZE]; -} T2R_MSG_TYPE; - -queue_t p2tq, t2rq, *r2cq_array; - -/* ShutDown, AllProduced are global flags to shut down the system & transmitter */ -static volatile unsigned int ShutDown = 0; -static volatile unsigned int AllProduced = 0; -static unsigned int DisplayMessages = 0; - -int main (int argc, char * argv[]) -{ - unsigned int tstatus = 0, nthread, ithread, goal, thid; - pthread_t *producer_th, *consumer_th, transmitter_th, receiver_th; - THARG *producer_arg, *consumer_arg; - - if (argc < 3) { - nthread = 32; - goal = 1000; - } else { - nthread = atoi(argv[1]); - goal = atoi(argv[2]); - if (argc >= 4) - DisplayMessages = atoi(argv[3]); - } - - srand ((int)time(NULL)); /* Seed the RN generator */ - - if (nthread > MAX_THREADS) { - printf ("Maximum number of producers or consumers is %d.\n", MAX_THREADS); - return 2; - } - producer_th = (pthread_t *) malloc (nthread * sizeof(pthread_t)); - producer_arg = (THARG *) calloc (nthread, sizeof (THARG)); - consumer_th = (pthread_t *) malloc (nthread * sizeof(pthread_t)); - consumer_arg = (THARG *) calloc (nthread, sizeof (THARG)); - - if (producer_th == NULL || producer_arg == NULL - || consumer_th == NULL || consumer_arg == NULL) - perror ("Cannot allocate working memory for threads."); - - q_initialize (&p2tq, sizeof(msg_block_t), P2T_QLEN); - q_initialize (&t2rq, sizeof(T2R_MSG_TYPE), T2R_QLEN); - /* Allocate and initialize Receiver to Consumer queue for each consumer */ - r2cq_array = (queue_t *) calloc (nthread, sizeof(queue_t)); - if (r2cq_array == NULL) perror ("Cannot allocate memory for r2c queues"); - - for (ithread = 0; ithread < nthread; ithread++) { - /* Initialize r2c queue for this consumer thread */ - q_initialize (&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN); - /* Fill in the thread arg */ - consumer_arg[ithread].thread_number = ithread; - consumer_arg[ithread].work_goal = goal; - consumer_arg[ithread].work_done = 0; - - tstatus = pthread_create (&consumer_th[ithread], NULL, - consumer, (void *)&consumer_arg[ithread]); - if (tstatus != 0) - perror ("Cannot create consumer thread"); - - producer_arg[ithread].thread_number = ithread; - producer_arg[ithread].work_goal = goal; - producer_arg[ithread].work_done = 0; - tstatus = pthread_create (&producer_th[ithread], NULL, - producer, (void *)&producer_arg[ithread]); - if (tstatus != 0) - perror ("Cannot create producer thread"); - } - - tstatus = pthread_create (&transmitter_th, NULL, transmitter, &thid); - if (tstatus != 0) - perror ("Cannot create tranmitter thread"); - tstatus = pthread_create (&receiver_th, NULL, receiver, &thid); - if (tstatus != 0) - perror ("Cannot create receiver thread"); - - - printf ("BOSS: All threads are running\n"); - /* Wait for the producers to complete */ - /* The implementation allows too many threads for WaitForMultipleObjects */ - /* although you could call WFMO in a loop */ - for (ithread = 0; ithread < nthread; ithread++) { - tstatus = pthread_join (producer_th[ithread], NULL); - if (tstatus != 0) - perror ("Cannot wait for producer thread"); - printf ("BOSS: Producer %d produced %d work units\n", - ithread, producer_arg[ithread].work_done); - } - /* Producers have completed their work. */ - printf ("BOSS: All producers have completed their work.\n"); - AllProduced = 1; - - /* Wait for the consumers to complete */ - for (ithread = 0; ithread < nthread; ithread++) { - tstatus = pthread_join (consumer_th[ithread], NULL); - if (tstatus != 0) - perror ("Cannot wait for consumer thread"); - printf ("BOSS: consumer %d consumed %d work units\n", - ithread, consumer_arg[ithread].work_done); - } - printf ("BOSS: All consumers have completed their work.\n"); - - ShutDown = 1; /* Set a shutdown flag - All messages have been consumed */ - - /* Wait for the transmitter and receiver */ - - tstatus = pthread_join (transmitter_th, NULL); - if (tstatus != 0) - perror ("Failed waiting for transmitter"); - tstatus = pthread_join (receiver_th, NULL); - if (tstatus != 0) - perror ("Failed waiting for receiver"); - - q_destroy (&p2tq); - q_destroy (&t2rq); - for (ithread = 0; ithread < nthread; ithread++) - q_destroy (&r2cq_array[ithread]); - free (r2cq_array); - free (producer_th); - free (consumer_th); - free (producer_arg); - free(consumer_arg); - printf ("System has finished. Shutting down\n"); - return 0; -} - -void * producer (void * arg) -{ - THARG * parg; - unsigned int ithread, tstatus = 0; - msg_block_t msg; - - parg = (THARG *)arg; - ithread = parg->thread_number; - - while (parg->work_done < parg->work_goal && !ShutDown) { - /* Periodically produce work units until the goal is satisfied */ - /* messages receive a source and destination address which are */ - /* the same in this case but could, in general, be different. */ - sleep (rand()/100000000); - message_fill (&msg, ithread, ithread, parg->work_done); - - /* put the message in the queue - Use an infinite timeout to assure - * that the message is inserted, even if consumers are delayed */ - tstatus = q_put (&p2tq, &msg, sizeof(msg), INFINITE); - if (0 == tstatus) { - parg->work_done++; - } - } - - return 0; -} - -void * consumer (void * arg) -{ - THARG * carg; - unsigned int tstatus = 0, ithread, Retries = 0; - msg_block_t msg; - queue_t *pr2cq; - - carg = (THARG *) arg; - ithread = carg->thread_number; - - carg = (THARG *)arg; - pr2cq = &r2cq_array[ithread]; - - while (carg->work_done < carg->work_goal && Retries < MAX_RETRY && !ShutDown) { - /* Receive and display/process messages */ - /* Try to receive the requested number of messages, - * but allow for early system shutdown */ - - tstatus = q_get (pr2cq, &msg, sizeof(msg), Q_TIMEOUT); - if (0 == tstatus) { - if (DisplayMessages > 0) message_display (&msg); - carg->work_done++; - Retries = 0; - } else { - Retries++; - } - } - - return NULL; -} - -void * transmitter (void * arg) -{ - - /* Obtain multiple producer messages, combining into a single */ - /* compound message for the receiver */ - - unsigned int tstatus = 0, im, Retries = 0; - T2R_MSG_TYPE t2r_msg = {0}; - msg_block_t p2t_msg; - - while (!ShutDown && !AllProduced) { - t2r_msg.num_msgs = 0; - /* pack the messages for transmission to the receiver */ - im = 0; - while (im < TBLOCK_SIZE && !ShutDown && Retries < MAX_RETRY && !AllProduced) { - tstatus = q_get (&p2tq, &p2t_msg, sizeof(p2t_msg), Q_TIMEOUT); - if (0 == tstatus) { - memcpy (&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg)); - t2r_msg.num_msgs++; - im++; - Retries = 0; - } else { /* Timed out. */ - Retries++; - } - } - tstatus = q_put (&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE); - if (tstatus != 0) return NULL; - } - return NULL; -} - - -void * receiver (void * arg) -{ - /* Obtain compound messages from the transmitter and unblock them */ - /* and transmit to the designated consumer. */ - - unsigned int tstatus = 0, im, ic, Retries = 0; - T2R_MSG_TYPE t2r_msg; - msg_block_t r2c_msg; - - while (!ShutDown && Retries < MAX_RETRY) { - tstatus = q_get (&t2rq, &t2r_msg, sizeof(t2r_msg), Q_TIMEOUT); - if (tstatus != 0) { /* Timeout - Have the producers shut down? */ - Retries++; - continue; - } - Retries = 0; - /* Distribute the packaged messages to the proper consumer */ - im = 0; - while (im < t2r_msg.num_msgs) { - memcpy (&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg)); - ic = r2c_msg.destination; /* Destination consumer */ - tstatus = q_put (&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE); - if (0 == tstatus) im++; - } - } - return NULL; -} - -#if (!defined INFINITE) -#define INFINITE 0xFFFFFFFF -#endif - -/* - Finite bounded queue management functions - q_get, q_put timeouts (max_wait) are in ms - convert to sec, rounding up - */ -unsigned int q_get (queue_t *q, void * msg, unsigned int msize, unsigned int MaxWait) -{ - int tstatus = 0, got_msg = 0, time_inc = (MaxWait + 999) /1000; - struct timespec timeout; - timeout.tv_nsec = 0; - - if (q_destroyed(q)) return 1; - pthread_mutex_lock (&q->q_guard); - while (q_empty (q) && 0 == tstatus) { - if (MaxWait != INFINITE) { - timeout.tv_sec = time(NULL) + time_inc; - tstatus = pthread_cond_timedwait (&q->q_ne, &q->q_guard, &timeout); - } else { - tstatus = pthread_cond_wait (&q->q_ne, &q->q_guard); - } - } - /* remove the message, if any, from the queue */ - if (0 == tstatus && !q_empty (q)) { - q_remove (q, msg, msize); - got_msg = 1; - /* Signal that the queue is not full as we've removed a message */ - pthread_cond_broadcast (&q->q_nf); - } - pthread_mutex_unlock (&q->q_guard); - return (0 == tstatus && got_msg == 1 ? 0 : max(1, tstatus)); /* 0 indicates success */ -} - -unsigned int q_put (queue_t *q, void * msg, unsigned int msize, unsigned int MaxWait) -{ - int tstatus = 0, put_msg = 0, time_inc = (MaxWait + 999) /1000; - struct timespec timeout; - timeout.tv_nsec = 0; - - if (q_destroyed(q)) return 1; - pthread_mutex_lock (&q->q_guard); - while (q_full (q) && 0 == tstatus) { - if (MaxWait != INFINITE) { - timeout.tv_sec = time(NULL) + time_inc; - tstatus = pthread_cond_timedwait (&q->q_nf, &q->q_guard, &timeout); - } else { - tstatus = pthread_cond_wait (&q->q_nf, &q->q_guard); - } - } - /* Insert the message into the queue if there's room */ - if (0 == tstatus && !q_full (q)) { - q_insert (q, msg, msize); - put_msg = 1; - /* Signal that the queue is not empty as we've inserted a message */ - pthread_cond_broadcast (&q->q_ne); - } - pthread_mutex_unlock (&q->q_guard); - return (0 == tstatus && put_msg == 1 ? 0 : max(1, tstatus)); /* 0 indictates success */ -} - -unsigned int q_initialize (queue_t *q, unsigned int msize, unsigned int nmsgs) -{ - /* Initialize queue, including its mutex and events */ - /* Allocate storage for all messages. */ - - q->q_first = q->q_last = 0; - q->q_size = nmsgs; - q->q_destroyed = 0; - - pthread_mutex_init (&q->q_guard, NULL); - pthread_cond_init (&q->q_ne, NULL); - pthread_cond_init (&q->q_nf, NULL); - - if ((q->msg_array = calloc (nmsgs, msize)) == NULL) return 1; - return 0; /* No error */ -} - -unsigned int q_destroy (queue_t *q) -{ - if (q_destroyed(q)) return 1; - /* Free all the resources created by q_initialize */ - pthread_mutex_lock (&q->q_guard); - q->q_destroyed = 1; - free (q->msg_array); - pthread_cond_destroy (&q->q_ne); - pthread_cond_destroy (&q->q_nf); - pthread_mutex_unlock (&q->q_guard); - pthread_mutex_destroy (&q->q_guard); - - return 0; -} - -unsigned int q_destroyed (queue_t *q) -{ - return (q->q_destroyed); -} - -unsigned int q_empty (queue_t *q) -{ - return (q->q_first == q->q_last); -} - -unsigned int q_full (queue_t *q) -{ - return ((q->q_first - q->q_last) == 1 || - (q->q_last == q->q_size-1 && q->q_first == 0)); -} - - -unsigned int q_remove (queue_t *q, void * msg, unsigned int msize) -{ - char *pm; - - pm = (char *)q->msg_array; - /* Remove oldest ("first") message */ - memcpy (msg, pm + (q->q_first * msize), msize); - // Invalidate the message - q->q_first = ((q->q_first + 1) % q->q_size); - return 0; /* no error */ -} - -unsigned int q_insert (queue_t *q, void * msg, unsigned int msize) -{ - char *pm; - - pm = (char *)q->msg_array; - /* Add a new youngest ("last") message */ - if (q_full(q)) return 1; /* Error - Q is full */ - memcpy (pm + (q->q_last * msize), msg, msize); - q->q_last = ((q->q_last + 1) % q->q_size); - - return 0; -} - -unsigned int compute_checksum (void * msg, unsigned int length) -{ - /* Computer an xor checksum on the entire message of "length" - * integers */ - unsigned int i, cs = 0, *pint; - - pint = (unsigned int *) msg; - for (i = 0; i < length; i++) { - cs = (cs ^ *pint); - pint++; - } - return cs; -} - -void message_fill (msg_block_t *mblock, unsigned int src, unsigned int dest, unsigned int seqno) -{ - /* Fill the message buffer, and include checksum and timestamp */ - /* This function is called from the producer thread while it */ - /* owns the message block mutex */ - - unsigned int i; - - mblock->checksum = 0; - for (i = 0; i < DATA_SIZE; i++) { - mblock->data[i] = rand(); - } - mblock->source = src; - mblock->destination = dest; - mblock->sequence = seqno; - mblock->timestamp = time(NULL); - mblock->checksum = compute_checksum (mblock, sizeof(msg_block_t)/sizeof(unsigned int)); - /* printf ("Generated message: %d %d %d %d %x %x\n", - src, dest, seqno, mblock->timestamp, - mblock->data[0], mblock->data[DATA_SIZE-1]); */ - return; -} - -void message_display (msg_block_t *mblock) -{ - /* Display message buffer and timestamp, validate checksum */ - /* This function is called from the consumer thread while it */ - /* owns the message block mutex */ - unsigned int tcheck = 0; - - tcheck = compute_checksum (mblock, sizeof(msg_block_t)/sizeof(unsigned int)); - printf ("\nMessage number %d generated at: %s", - mblock->sequence, ctime (&(mblock->timestamp))); - printf ("Source and destination: %d %d\n", - mblock->source, mblock->destination); - printf ("First and last entries: %x %x\n", - mblock->data[0], mblock->data[DATA_SIZE-1]); - if (tcheck == 0 /*mblock->checksum was 0 when CS first computed */) - printf ("GOOD ->Checksum was validated.\n"); - else - printf ("BAD ->Checksum failed. message was corrupted\n"); - - return; - -}