Skip to content

Commit

Permalink
add interface for dynamic ThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Le Xuan Tuan Anh committed May 9, 2022
1 parent 04a2c67 commit 0b5d9a6
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 69 deletions.
21 changes: 6 additions & 15 deletions example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ int main(void)
{
try
{
std::chrono::seconds;
auto t = 40s;


PoolQueue queue;
queue.push(std::make_shared<RunnableExample>("#t1#"));
queue.push(std::make_shared<RunnableExample>("#t2#"));
Expand All @@ -23,17 +19,13 @@ int main(void)
queue.push(std::make_shared<RunnableExample>("#t5#"));
queue.push(std::make_shared<RunnableExample>("#t6#"));

PoolQueue queue1 = queue;

ThreadPool pool("1", 4, queue);
ThreadPool pool1("2", 4, queue1);

std::cout << "main thread sleep for 3s" << std::endl;
std::this_thread::sleep_for(3s);

std::cout << "main thread sleep for 40s" << std::endl;
std::this_thread::sleep_for(40s);
ThreadPool pool = ThreadPool(2);
ThreadPool pool2 = ThreadPool(queue, 2);
pool.execute(std::make_shared<RunnableExample>("#t7#"));
pool.execute(std::make_shared<RunnableExample>("#t8#"));
pool.execute(std::make_shared<RunnableExample>("#t9#"));

ThreadPoolDynamic pool1 = ThreadPoolDynamic();

std::cout << "main thread sleep for 10s" << std::endl;
std::this_thread::sleep_for(20s);
Expand All @@ -43,7 +35,6 @@ int main(void)

// okey, but before I hug my mother last
pool.wait();
pool1.wait();
}
catch (const exception& e)
{
Expand Down
65 changes: 38 additions & 27 deletions src/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <iostream>
#include <sstream>

using namespace std::chrono_literals;

int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
std::string name)
{
Expand Down Expand Up @@ -63,9 +65,8 @@ int threadpool::worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* que
return 1;
}

template <class Rep, class Period>
int seasonalWorker(threadpool::PoolQueue* queue,
const std::chrono::duration<Rep, Period>& aliveTime, std::string name)
int threadpool::seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name)
{
std::string workerName = name;
std::shared_ptr<threadpool::IRunnable> backElm = nullptr;
Expand All @@ -74,15 +75,16 @@ int seasonalWorker(threadpool::PoolQueue* queue,
do
{
{
std::unique_lock lk{_tpMtQueue};
std::unique_lock lk{pool->_tpMtQueue};
std::cout << workerName << "seasonal waitting" << std::endl;
if (!_tpCV.wait_for(lk, aliveTime,
[&]()
{
return !_tpWaitForSignalStart.load(std::memory_order_relaxed) &&
(_tpTerminal.load(std::memory_order_relaxed) ||
!queue->empty());
}))
if (!pool->_tpCV.wait_for(
lk, aliveTime,
[&]()
{
return !pool->_tpWaitForSignalStart.load(std::memory_order_relaxed) &&
(pool->_tpTerminal.load(std::memory_order_relaxed) ||
!queue->empty());
}))
{
std::cout << workerName << "seasonal is terminal" << std::endl;
break;
Expand Down Expand Up @@ -112,14 +114,8 @@ int seasonalWorker(threadpool::PoolQueue* queue,

threadpool::PoolQueue::PoolQueue() {}

threadpool::ThreadPool::ThreadPool() { m_poolSize = 2; }

threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize, bool waitForSignalStart /*= false*/)
: ThreadPool(poolSize, m_taskQueue, waitForSignalStart)
{
}

threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize, PoolQueue& queue,
threadpool::ThreadPool::ThreadPool(PoolQueue& queue,
std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
bool waitForSignalStart /*= false*/)
{
m_poolSize = poolSize;
Expand All @@ -128,6 +124,12 @@ threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize, PoolQueue& queue,
createThreads(m_poolSize);
}

threadpool::ThreadPool::ThreadPool(std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
bool waitForSignalStart /*= false*/)
: ThreadPool(m_taskQueue, poolSize, waitForSignalStart)
{
}

threadpool::ThreadPool::~ThreadPool() { terminate(); }

void threadpool::ThreadPool::execute(std::shared_ptr<IRunnable> runnable)
Expand Down Expand Up @@ -168,13 +170,22 @@ void threadpool::ThreadPool::createThreads(std::uint32_t count)
}
}

void threadpool::ThreadPoolDynamic::createThreads(std::uint32_t count)
threadpool::ThreadPoolDynamic::ThreadPoolDynamic(
PoolQueue& queue, std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/,
const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/)
{
m_poolSize = poolSize;
m_poolMaxSize = poolMaxSize;
m_aliveTime = aliveTime;
m_taskQueue.swap(queue);
_tpWaitForSignalStart.store(waitForSignalStart);
}

threadpool::ThreadPoolDynamic::ThreadPoolDynamic(
std::uint32_t poolSize /*= THREAD_POOl_DEFAULT_POOL_SIZE*/,
std::uint32_t poolMaxSize /*= std::thread::hardware_concurrency()*/,
const std::chrono::nanoseconds& aliveTime /*= 60s*/, bool waitForSignalStart /*= false*/)
: ThreadPoolDynamic(m_taskQueue, poolSize, poolMaxSize, aliveTime, waitForSignalStart)
{
for (std::uint32_t i = 0; i < count; i++)
{
std::stringstream sstr;
sstr << "#wddd" << i << "#";
std::string name = sstr.str();
m_threads.push_back(std::make_unique<std::thread>(worker, this, &m_taskQueue, name));
}
}
49 changes: 22 additions & 27 deletions src/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <iostream>
#include <queue>

#define THREAD_POOl_DEFAULT_POOL_SIZE 2

namespace threadpool
{

Expand All @@ -28,6 +30,8 @@ class PoolQueue : public std::queue<std::shared_ptr<IRunnable>>

class IThreadPool;
int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, std::string name);
int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name);

class IThreadPool : public boost::noncopyable_::noncopyable
{
Expand All @@ -40,7 +44,10 @@ class IThreadPool : public boost::noncopyable_::noncopyable
virtual void wait() = 0;
virtual void terminate() = 0;

friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue, std::string name);
friend int worker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
std::string name);
friend int seasonalWorker(threadpool::IThreadPool* pool, threadpool::PoolQueue* queue,
const std::chrono::nanoseconds& aliveTime, std::string name);

protected:
std::uint32_t m_poolSize;
Expand All @@ -57,10 +64,11 @@ class IThreadPool : public boost::noncopyable_::noncopyable

class ThreadPool : public IThreadPool
{

public:
ThreadPool(std::uint32_t poolSize, bool waitForSignalStart = false);
ThreadPool(std::uint32_t poolSize, PoolQueue& queue, bool waitForSignalStart = false);
ThreadPool(std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
bool waitForSignalStart = false);
ThreadPool(PoolQueue& queue, std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
bool waitForSignalStart = false);
virtual ~ThreadPool();

void execute(std::shared_ptr<IRunnable> runnable);
Expand All @@ -69,42 +77,28 @@ class ThreadPool : public IThreadPool
void terminate();

protected:
ThreadPool();
virtual void createThreads(std::uint32_t count);
};

class ThreadPoolDynamic : public ThreadPool
{

public:
template <class Rep, class Period>
ThreadPoolDynamic(std::uint32_t poolSize, std::uint32_t poolMaxSize,
const std::chrono::duration<Rep, Period>& aliveTime, PoolQueue& queue,
ThreadPoolDynamic(std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
std::uint32_t poolMaxSize = std::thread::hardware_concurrency(),
const std::chrono::nanoseconds& aliveTime = 60s,
bool waitForSignalStart = false);
ThreadPoolDynamic(PoolQueue& queue, std::uint32_t poolSize = THREAD_POOl_DEFAULT_POOL_SIZE,
std::uint32_t poolMaxSize = std::thread::hardware_concurrency(),
const std::chrono::nanoseconds& aliveTime = 60s,
bool waitForSignalStart = false);
~ThreadPoolDynamic(){};

protected:
virtual void createThreads(std::uint32_t count) override;

std::uint32_t m_poolMaxSize;
std::chrono::nanoseconds m_aliveTime;
};

template <class Rep, class Period>
threadpool::ThreadPoolDynamic::ThreadPoolDynamic(
std::uint32_t poolSize, std::uint32_t poolMaxSize,
const std::chrono::duration<Rep, Period>& aliveTime, PoolQueue& queue,
bool waitForSignalStart /*= false*/)
//: ThreadPool(poolSize, queue, waitForSignalStart)
{
extern std::atomic_bool _tpWaitForSignalStart;
m_poolSize = poolSize;
m_taskQueue.swap(queue);
_tpWaitForSignalStart.store(waitForSignalStart);
createThreads(m_poolSize);
m_aliveTime = aliveTime;
m_poolMaxSize = poolMaxSize;
}

} // namespace threadpool

class RunnableExample : public threadpool::IRunnable
Expand All @@ -117,8 +111,9 @@ class RunnableExample : public threadpool::IRunnable
{

using namespace std::chrono_literals;
int loop = rand() % 10 + 1;

for (int i = 0; i < 5; i++)
for (int i = 0; i < loop; i++)
{
std::cout << m_name << " running " << i << std::endl;
std::this_thread::sleep_for(1s);
Expand Down

0 comments on commit 0b5d9a6

Please sign in to comment.