producer pushes job into job queue, once the queue is not empty, consumer woke and processing it.
- spawn.
- sleep.
- working.
- synchronizing.
- shutdown
threads creation
// num of threads that cpu support minus main thread
thread_count_ = std::thread::hardware_concurrency()-1;
worker_threads.resize(thread_count_);
for(unsigned i =0; i < thread_count_; ++i) {
worker_threads[i] = std::thread(Worker::func_worker_main, (void *) this, i);
}
while (invoked_worker_ != thread_count_) {
std::unique_lock<mutex> lk(thread_create_mutex_);
worker_invoked_cond_var_ = true;
thread_create_cv.wait(lk);
worker_invoked_cond_var_ = false;
}
JobPkg curr_job;
while (true) {
// executed here when curr thread is done
// with prev job and available for next job
// blocking when job queue is empty
mgr->dequeue_job(curr_job);
int set = get<2>(curr_job);
if (set != Killer) {
// call back to requester
mgr->process_job(curr_job);
}
else {
mgr->set_exit();
break;
}
used when you trying to purge the job queue and perform pass through
/* check if all consumers are flushed */
syncing_mutex_.lock();
while (comsumer_sync_cnt != thread_count_) {
consumer_cv.notify_all();
is_syncing_wait_ = true;
syncing_cv.wait(syncing_mutex_);
}
comsumer_sync_cnt = 0;
is_syncing_wait_ = false;
syncing_mutex_.unlock();
steps to take:
- send killer bit to job queue
- purge job queue
- join worker thread.
JobPkg killer_job({nullptr, {}, -777, (void*)this});
enqueue_job(killer_job);
create sqlite table
queue each row, write on sqlite
data buffer compaction