-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadPool.c
155 lines (135 loc) · 4.59 KB
/
threadPool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <pthread.h>
#include "threadPool.h"
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#define MAX_QUEUE 32
#define MAX_THREAD 32
struct threadpool *threadpool_init(int thread_num, int queue_size){
struct threadpool *pool;
int i;
//check to be within limits
if((thread_num < 1 ) || (queue_size < 1) || (thread_num > MAX_THREAD)|| (queue_size > MAX_QUEUE))
raise_error("beyond max queue size or thread");
pool = (struct threadpool *) malloc(sizeof(struct threadpool));
check_ptr(pool);
//init pool
pool->thread_count = thread_num;
pool->workers = (struct thread **) malloc(sizeof(struct thread*) * thread_num);
check_ptr(pool->workers);
pool->queue = (struct job_queue *)malloc(sizeof(struct job_queue));
check_ptr(pool->queue);
//init job quue
pool->queue->size = queue_size;
pool->queue->isfull = 0;
pool->queue->head = 0;
pool->queue->tail= 0;
pool->queue->count = 0;
pool->queue->sockets = (int *) malloc(sizeof(int) * queue_size);
check_ptr(pool->queue->sockets);
//init syncho tools for job queue
// value has to be set to zero because there's nothing in the queue
if(sem_init(&(pool->queue->isempty),0,0) != 0) raise_error("isempty semaphore");
if(pthread_mutex_init(&(pool->queue->job_mutex), NULL) != 0) raise_error("job_mutex");
//start creating threads
for(i=0;i < thread_num;++i){
pool->workers[i] = (struct thread *) malloc(sizeof(struct thread));
pool->workers[i]->iswait = 1;
//loop all threads to checking dict
if(pthread_create(&(pool->workers[i]->tid),NULL,(void *)threadpool_work, (struct threadpool *) pool) != 0)
raise_error("thread creating");
}
return pool;
}
//adding job to queue
int threadpool_add(struct threadpool *pool, int socket){
int pos;
if((pool == NULL) || (socket < 0))
return EXIT_FAILURE;
// locking time
if(pthread_mutex_lock(&(pool->queue->job_mutex)) != 0)
return EXIT_FAILURE;
//check if full, release lock and return to a loop till queue is empty
if(pool->queue->isfull == 1){
if(pthread_mutex_unlock(&(pool->queue->job_mutex)) != 0) return EXIT_FAILURE;
return pool->queue->isfull;
}
// put socket into queue
pos = pool->queue->tail;
pool->queue->sockets[pos] = socket;
//increment end of queue and increase count in queue
pool->queue->tail = (pool->queue->tail+1)%(pool->queue->size);
(pool->queue->count)++;
//check if full
if((pool->queue->count) == (pool->queue->size))
pool->queue->isfull = 1;
//call semaphore and do V
if(sem_post(&pool->queue->isempty) != 0) return EXIT_FAILURE;
//end locking time
if(pthread_mutex_unlock(&(pool->queue->job_mutex)) != 0) return EXIT_FAILURE;
return EXIT_SUCCESS;
}
//get thread to work
void threadpool_work(struct threadpool *pool){
pthread_t id = pthread_self();
int tposition = threadpool_self(pool, id);
int socket;
int head;
int status;
while(1){
//wait for queue to fill
if(sem_wait(&pool->queue->isempty) != 0) raise_error("thread sem");
//start of lock
if(pthread_mutex_lock(&(pool->queue->job_mutex)) != 0) raise_error("thread mutex");
//setself not not waiting
pool->workers[tposition]->iswait = 0;
//gets head of buffer
head = pool->queue->head;
//increment buffer head
(pool->queue->head) = (pool->queue->head+1)%(pool->queue->size);
//get socket
socket = pool->queue->sockets[head];
// empty taken socket position
pool->queue->sockets[head] = 0;
(pool->queue->count)--;
// since server cant just be blocked, this is a way to communicate with server
if((pool->queue->count) < (pool->queue->size))
pool->queue->isfull = 0;
if(pthread_mutex_unlock(&(pool->queue->job_mutex)) != 0) raise_error("thread mutex");
// end of lock
printf("Thread: %d - servicing: %d\n",tposition, socket);
//start word checking with the socket
status = wordChecking(socket);
if(status <0) raise_error("thread wordChecking");
// finsihed its work, go back to waiting
pool->workers[tposition]->iswait = 1;
}
}
//return position of thread n threadpool
int threadpool_self(struct threadpool *pool, pthread_t id){
int pos;
for(pos=0;pos <pool->thread_count;pos++)
if(id == (pool->workers[pos]->tid))
return pos;
}
//if all iswait for all thread is 0, it will return 1 else 0
int threadpool_isbusy(struct threadpool *pool){
int size = pool->thread_count;
int i;
int yes = 1;
int no = 0;
for(i = 0; i < size; i++)
if((pool->workers[i]->iswait) == 1)
return no;
return yes;
}
// check for malloc error
void check_ptr(void* ptr){
if(!ptr)
raise_error("Failed memory allocation");
}
//less work to call error
void raise_error(const char* err){
fprintf(stderr, "%s\n",err);
exit(EXIT_FAILURE);
}