Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
Le Xuan Tuan Anh committed May 10, 2022
1 parent 0b5d9a6 commit 56f063e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 132 deletions.
28 changes: 25 additions & 3 deletions example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,31 @@
using namespace std;
using namespace threadpool;

class RunnableExample : public threadpool::IRunnable
{
public:
RunnableExample(std::string name) { m_name = name; };
~RunnableExample(){};

virtual bool run() override
{
using namespace std::chrono_literals;
int loop = rand() % 10 + 1;

for (int i = 0; i < loop; i++)
{
{
std::cout << m_name << " running " << i << std::endl;
}
}

return true;
}

std::string m_name;
};


int main(void)
{
try
Expand All @@ -20,13 +45,10 @@ int main(void)
queue.push(std::make_shared<RunnableExample>("#t6#"));

ThreadPool pool = ThreadPool(2);
ThreadPool pool2 = ThreadPool(queue, 2);
pool.execute(std::make_shared<RunnableExample>("#t7#"));
pool.execute(std::make_shared<RunnableExample>("#t8#"));
pool.execute(std::make_shared<RunnableExample>("#t9#"));

ThreadPoolDynamic pool1 = ThreadPoolDynamic();

std::cout << "main thread sleep for 10s" << std::endl;
std::this_thread::sleep_for(20s);

Expand Down
87 changes: 10 additions & 77 deletions src/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
#include "threadpool.h"
#include <atomic>
#include <iostream>
#include <sstream>

using namespace std::chrono_literals;

int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
std::string name)
int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue)
{
std::string workerName = name;
std::shared_ptr<threadpool::IRunnable> backElm = nullptr;
{
const std::lock_guard<std::mutex> lk(pool->_tpMtCoutStream);
std::cout << workerName << " has been created" << std::endl;
}

do
{
{
std::unique_lock lk{pool->_tpMtQueue};
{
const std::lock_guard<std::mutex> lk(pool->_tpMtCoutStream);
std::cout << workerName << " waitting" << std::endl;
}

pool->_tpCV.wait(
lk,
[&]()
Expand All @@ -31,32 +16,15 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que
(pool->_tpTerminal.load(std::memory_order_relaxed) || !queue->empty());
});

if (pool->_tpTerminal.load(std::memory_order_relaxed))
{
{
const std::lock_guard<std::mutex> lk(pool->_tpMtCoutStream);
std::cout << workerName << "is terminal" << std::endl;
}
break;
}
{
const std::lock_guard<std::mutex> lk(pool->_tpMtCoutStream);
std::cout << workerName << " wait done" << std::endl;
}
if (pool->_tpTerminal.load(std::memory_order_relaxed)) break;

// get back element
backElm = queue->front();
queue->pop();

lk.unlock();
}

// do somethings
{
const std::lock_guard<std::mutex> lk(pool->_tpMtCoutStream);
std::string t = ((RunnableExample*)(backElm.get()))->m_name;
std::cout << workerName << " do " << t << std::endl;
}

if (backElm) backElm->run();
backElm = nullptr;

Expand All @@ -66,17 +34,13 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que
}

int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name)
const std::chrono::nanoseconds& aliveTime)
{
std::string workerName = name;
std::shared_ptr<threadpool::IRunnable> backElm = nullptr;
std::cout << workerName << "seasonal has been created" << std::endl;

do
{
{
std::unique_lock lk{pool->_tpMtQueue};
std::cout << workerName << "seasonal waitting" << std::endl;
if (!pool->_tpCV.wait_for(
lk, aliveTime,
[&]()
Expand All @@ -86,24 +50,18 @@ int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQu
!queue->empty());
}))
{
std::cout << workerName << "seasonal is terminal" << std::endl;
break;
}

std::cout << workerName << " seasonal wait done" << std::endl;
if (pool->_tpTerminal.load(std::memory_order_relaxed)) break;

// get back element
backElm = queue->front();
queue->pop();

lk.unlock();
}

// do somethings
{
std::string t = ((RunnableExample*)(backElm.get()))->m_name;
std::cout << workerName << "seasonal do " << t << std::endl;
}

if (backElm) backElm->run();
backElm = nullptr;

Expand All @@ -121,7 +79,7 @@ threadpool::ThreadPool::ThreadPool(PoolQueue& queue,
m_poolSize = poolSize;
m_taskQueue.swap(queue);
_tpWaitForSignalStart.store(waitForSignalStart);
createThreads(m_poolSize);
createWorker(m_poolSize);
}

threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
Expand Down Expand Up @@ -159,33 +117,8 @@ void threadpool::ThreadPool::terminate()
_tpCV.notify_all();
}

void threadpool::ThreadPool::createThreads(std::uint32_t count)
void threadpool::ThreadPool::createWorker(std::uint32_t count)
{
for (std::uint32_t i = 0; i < count; i++)
{
std::stringstream sstr;
sstr << "#w " << i << " #";
std::string name = sstr.str();
m_threads.push_back(std::make_unique<std::thread>(worker, this, &m_taskQueue, name));
}
}

threadpool::ThreadPoolDynamic::ThreadPoolDynamic(
PoolQueue& queue, std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/,
const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/)
{
m_poolSize = poolSize;
m_poolMaxSize = poolMaxSize;
m_aliveTime = aliveTime;
m_taskQueue.swap(queue);
_tpWaitForSignalStart.store(waitForSignalStart);
}

threadpool::ThreadPoolDynamic::ThreadPoolDynamic(
std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/,
const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/)
: ThreadPoolDynamic(m_taskQueue, poolSize, poolMaxSize, aliveTime, waitForSignalStart)
{
m_threads.push_back(std::make_unique<std::thread>(worker, this, &m_taskQueue));
}
62 changes: 10 additions & 52 deletions src/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#define THREAD_POOL_H__

#include "noncopyable.h"
#include <future>
#include <iostream>
#include <atomic>
#include <chrono>
#include <mutex>
#include <queue>
#include <thread>

#define THREAD_POOl_DEFAULT_POOL_SIZE 2

Expand All @@ -29,9 +31,9 @@ class PoolQueue : public std::queue<std::shared_ptr<IRunnable>>
};

class IThreadPool;
int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, std::string name);
int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue);
int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name);
const std::chrono::nanoseconds& aliveTime);

class IThreadPool : public boost::noncopyable_::noncopyable
{
Expand All @@ -44,18 +46,16 @@ class IThreadPool : public boost::noncopyable_::noncopyable
virtual void wait() = 0;
virtual void terminate() = 0;

friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
std::string name);
friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue);
friend int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name);
const std::chrono::nanoseconds& aliveTime);

protected:
std::uint32_t m_poolSize;
PoolQueue m_taskQueue;
std::vector<std::unique_ptr<std::thread>> m_threads;

std::mutex _tpMtQueue;
std::mutex _tpMtCoutStream;
std::condition_variable _tpCV;
std::atomic_bool _tpQueueReady = false;
std::atomic_bool _tpTerminal = false;
Expand All @@ -71,57 +71,15 @@ class ThreadPool : public IThreadPool
bool waitForSignalStart = false);
virtual ~ThreadPool();

void execute(std::shared_ptr<IRunnable> runnable);
virtual void execute(std::shared_ptr<IRunnable> runnable);
void start();
void wait();
void terminate();

protected:
virtual void createThreads(std::uint32_t count);
};

class ThreadPoolDynamic : public ThreadPool
{

public:
ThreadPoolDynamic(std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
std::uint32_t poolMaxSize = std::thread::hardware_concurrency(),
const std::chrono::nanoseconds& aliveTime = 60s,
bool waitForSignalStart = false);
ThreadPoolDynamic(PoolQueue& queue, std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
std::uint32_t poolMaxSize = std::thread::hardware_concurrency(),
const std::chrono::nanoseconds& aliveTime = 60s,
bool waitForSignalStart = false);
~ThreadPoolDynamic(){};

protected:
std::uint32_t m_poolMaxSize;
std::chrono::nanoseconds m_aliveTime;
virtual void createWorker(std::uint32_t count);
};

} // namespace threadpool

class RunnableExample : public threadpool::IRunnable
{
public:
RunnableExample(std::string name) { m_name = name; };
~RunnableExample(){};

virtual bool run() override
{

using namespace std::chrono_literals;
int loop = rand() % 10 + 1;

for (int i = 0; i < loop; i++)
{
std::cout << m_name << " running " << i << std::endl;
std::this_thread::sleep_for(1s);
}

return true;
}

std::string m_name;
};
#endif

0 comments on commit 56f063e

Please sign in to comment.