From 56f063eee5945cd368c65a25296133f2d6ee62ef Mon Sep 17 00:00:00 2001 From: Le Xuan Tuan Anh Date: Tue, 10 May 2022 11:49:54 +0700 Subject: [PATCH] clean code --- example.cpp | 28 +++++++++++++-- src/threadpool.cpp | 87 ++++++---------------------------------------- src/threadpool.h | 62 ++++++--------------------------- 3 files changed, 45 insertions(+), 132 deletions(-) diff --git a/example.cpp b/example.cpp index 900ba05..d477d45 100644 --- a/example.cpp +++ b/example.cpp @@ -7,6 +7,31 @@ using namespace std; using namespace threadpool; +class RunnableExample : public threadpool::IRunnable +{ +public: + RunnableExample(std::string name) { m_name = name; }; + ~RunnableExample(){}; + + virtual bool run() override + { + using namespace std::chrono_literals; + int loop = rand() % 10 + 1; + + for (int i = 0; i < loop; i++) + { + { + std::cout << m_name << " running " << i << std::endl; + } + } + + return true; + } + + std::string m_name; +}; + + int main(void) { try @@ -20,13 +45,10 @@ int main(void) queue.push(std::make_shared("#t6#")); ThreadPool pool = ThreadPool(2); - ThreadPool pool2 = ThreadPool(queue, 2); pool.execute(std::make_shared("#t7#")); pool.execute(std::make_shared("#t8#")); pool.execute(std::make_shared("#t9#")); - ThreadPoolDynamic pool1 = ThreadPoolDynamic(); - std::cout << "main thread sleep for 10s" << std::endl; std::this_thread::sleep_for(20s); diff --git a/src/threadpool.cpp b/src/threadpool.cpp index 5aa3371..fcba5d2 100644 --- a/src/threadpool.cpp +++ b/src/threadpool.cpp @@ -1,28 +1,13 @@ #include "threadpool.h" -#include -#include -#include -using namespace std::chrono_literals; - -int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, - std::string name) +int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue) { - std::string workerName = name; std::shared_ptr backElm = nullptr; - { - const std::lock_guard lk(pool->_tpMtCoutStream); - std::cout << workerName << " has been created" << std::endl; - } - do { { std::unique_lock lk{pool->_tpMtQueue}; - { - const std::lock_guard lk(pool->_tpMtCoutStream); - std::cout << workerName << " waitting" << std::endl; - } + pool->_tpCV.wait( lk, [&]() @@ -31,18 +16,8 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que (pool->_tpTerminal.load(std::memory_order_relaxed) || !queue->empty()); }); - if (pool->_tpTerminal.load(std::memory_order_relaxed)) - { - { - const std::lock_guard lk(pool->_tpMtCoutStream); - std::cout << workerName << "is terminal" << std::endl; - } - break; - } - { - const std::lock_guard lk(pool->_tpMtCoutStream); - std::cout << workerName << " wait done" << std::endl; - } + if (pool->_tpTerminal.load(std::memory_order_relaxed)) break; + // get back element backElm = queue->front(); queue->pop(); @@ -50,13 +25,6 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que lk.unlock(); } - // do somethings - { - const std::lock_guard lk(pool->_tpMtCoutStream); - std::string t = ((RunnableExample*)(backElm.get()))->m_name; - std::cout << workerName << " do " << t << std::endl; - } - if (backElm) backElm->run(); backElm = nullptr; @@ -66,17 +34,13 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que } int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, - const std::chrono::nanoseconds& aliveTime, std::string name) + const std::chrono::nanoseconds& aliveTime) { - std::string workerName = name; std::shared_ptr backElm = nullptr; - std::cout << workerName << "seasonal has been created" << std::endl; - do { { std::unique_lock lk{pool->_tpMtQueue}; - std::cout << workerName << "seasonal waitting" << std::endl; if (!pool->_tpCV.wait_for( lk, aliveTime, [&]() @@ -86,11 +50,11 @@ int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQu !queue->empty()); })) { - std::cout << workerName << "seasonal is terminal" << std::endl; break; } - std::cout << workerName << " seasonal wait done" << std::endl; + if (pool->_tpTerminal.load(std::memory_order_relaxed)) break; + // get back element backElm = queue->front(); queue->pop(); @@ -98,12 +62,6 @@ int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQu lk.unlock(); } - // do somethings - { - std::string t = ((RunnableExample*)(backElm.get()))->m_name; - std::cout << workerName << "seasonal do " << t << std::endl; - } - if (backElm) backElm->run(); backElm = nullptr; @@ -121,7 +79,7 @@ threadpool::ThreadPool::ThreadPool(PoolQueue& queue, m_poolSize = poolSize; m_taskQueue.swap(queue); _tpWaitForSignalStart.store(waitForSignalStart); - createThreads(m_poolSize); + createWorker(m_poolSize); } threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/, @@ -159,33 +117,8 @@ void threadpool::ThreadPool::terminate() _tpCV.notify_all(); } -void threadpool::ThreadPool::createThreads(std::uint32_t count) +void threadpool::ThreadPool::createWorker(std::uint32_t count) { for (std::uint32_t i = 0; i < count; i++) - { - std::stringstream sstr; - sstr << "#w " << i << " #"; - std::string name = sstr.str(); - m_threads.push_back(std::make_unique(worker, this, &m_taskQueue, name)); - } -} - -threadpool::ThreadPoolDynamic::ThreadPoolDynamic( - PoolQueue& queue, std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/, - std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/, - const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/) -{ - m_poolSize = poolSize; - m_poolMaxSize = poolMaxSize; - m_aliveTime = aliveTime; - m_taskQueue.swap(queue); - _tpWaitForSignalStart.store(waitForSignalStart); -} - -threadpool::ThreadPoolDynamic::ThreadPoolDynamic( - std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/, - std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/, - const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/) - : ThreadPoolDynamic(m_taskQueue, poolSize, poolMaxSize, aliveTime, waitForSignalStart) -{ + m_threads.push_back(std::make_unique(worker, this, &m_taskQueue)); } diff --git a/src/threadpool.h b/src/threadpool.h index 9ff4c63..4f7d734 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -2,9 +2,11 @@ #define THREAD_POOL_H__ #include "noncopyable.h" -#include -#include +#include +#include +#include #include +#include #define THREAD_POOl_DEFAULT_POOL_SIZE 2 @@ -29,9 +31,9 @@ class PoolQueue : public std::queue> }; class IThreadPool; -int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, std::string name); +int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue); int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, - const std::chrono::nanoseconds& aliveTime, std::string name); + const std::chrono::nanoseconds& aliveTime); class IThreadPool : public boost::noncopyable_::noncopyable { @@ -44,10 +46,9 @@ class IThreadPool : public boost::noncopyable_::noncopyable virtual void wait() = 0; virtual void terminate() = 0; - friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, - std::string name); + friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue); friend int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, - const std::chrono::nanoseconds& aliveTime, std::string name); + const std::chrono::nanoseconds& aliveTime); protected: std::uint32_t m_poolSize; @@ -55,7 +56,6 @@ class IThreadPool : public boost::noncopyable_::noncopyable std::vector> m_threads; std::mutex _tpMtQueue; - std::mutex _tpMtCoutStream; std::condition_variable _tpCV; std::atomic_bool _tpQueueReady = false; std::atomic_bool _tpTerminal = false; @@ -71,57 +71,15 @@ class ThreadPool : public IThreadPool bool waitForSignalStart = false); virtual ~ThreadPool(); - void execute(std::shared_ptr runnable); + virtual void execute(std::shared_ptr runnable); void start(); void wait(); void terminate(); protected: - virtual void createThreads(std::uint32_t count); -}; - -class ThreadPoolDynamic : public ThreadPool -{ - -public: - ThreadPoolDynamic(std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE, - std::uint32_t poolMaxSize = std::thread::hardware_concurrency(), - const std::chrono::nanoseconds& aliveTime = 60s, - bool waitForSignalStart = false); - ThreadPoolDynamic(PoolQueue& queue, std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE, - std::uint32_t poolMaxSize = std::thread::hardware_concurrency(), - const std::chrono::nanoseconds& aliveTime = 60s, - bool waitForSignalStart = false); - ~ThreadPoolDynamic(){}; - -protected: - std::uint32_t m_poolMaxSize; - std::chrono::nanoseconds m_aliveTime; + virtual void createWorker(std::uint32_t count); }; } // namespace threadpool -class RunnableExample : public threadpool::IRunnable -{ -public: - RunnableExample(std::string name) { m_name = name; }; - ~RunnableExample(){}; - - virtual bool run() override - { - - using namespace std::chrono_literals; - int loop = rand() % 10 + 1; - - for (int i = 0; i < loop; i++) - { - std::cout << m_name << " running " << i << std::endl; - std::this_thread::sleep_for(1s); - } - - return true; - } - - std::string m_name; -}; #endif