Skip to content

A lightweight, parallel, straightforward csv to sqlite converter

Notifications You must be signed in to change notification settings

jessejcw/csv2sqlite-parallel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 

Repository files navigation

csv2sqlite parallel

A simple parallel row processing with C++11 thread.unfinished

producer pushes job into job queue, once the queue is not empty, consumer woke and processing it.

Worker threads life cycle:

  1. spawn.
  2. sleep.
  3. working.
  4. synchronizing.
  5. shutdown

1. spawn

threads creation

   // num of threads that cpu support minus main thread
   thread_count_ = std::thread::hardware_concurrency()-1;
   worker_threads.resize(thread_count_);
   for(unsigned i =0; i < thread_count_; ++i) {
        worker_threads[i] = std::thread(Worker::func_worker_main, (void *) this, i);
   }

2. sleep

    while (invoked_worker_ != thread_count_) {
        std::unique_lock<mutex> lk(thread_create_mutex_);
        worker_invoked_cond_var_ = true;
        thread_create_cv.wait(lk);
        worker_invoked_cond_var_ = false;
    }

3. working

    JobPkg curr_job;
    while (true) {
        // executed here when curr thread is done
        // with prev job and available for next job

        // blocking when job queue is empty
        mgr->dequeue_job(curr_job);

        int set = get<2>(curr_job);
        if (set != Killer) {
            // call back to requester
            mgr->process_job(curr_job);
        }
        else {
            mgr->set_exit();
            break;
        }

4. synchronizing

used when you trying to purge the job queue and perform pass through

    /* check if all consumers are flushed */
    syncing_mutex_.lock();
    while (comsumer_sync_cnt != thread_count_) {
        consumer_cv.notify_all();
        is_syncing_wait_ = true;
        syncing_cv.wait(syncing_mutex_);
    }
    comsumer_sync_cnt = 0;
    is_syncing_wait_ = false;
    syncing_mutex_.unlock();

5. shutdown

steps to take:

  1. send killer bit to job queue
  2. purge job queue
  3. join worker thread.
    JobPkg killer_job({nullptr, {}, -777, (void*)this});
    enqueue_job(killer_job);

TODO

create sqlite table

queue each row, write on sqlite

data buffer compaction

About

A lightweight, parallel, straightforward csv to sqlite converter

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published