-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.h
161 lines (152 loc) · 4.12 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include "assert.h"
#include <iostream>
namespace ThreadPool
{
#define MAX_THREAD_NUM 8
//线程池,可以提交变参函数或lamada表达式的匿名函数执行,可以获取执行返回值
//֧支持类成员函数,支持类静态成员函数或全局函数,Operator函数等
class ThreadPool
{
typedef std::function<void()> Task;
private:
std::vector<std::thread> m_pool; // 线程池
std::queue<Task> m_tasks; // 任务队列
std::mutex m_lock; // ͬ同步锁
std::condition_variable m_cv; // 条件阻塞
std::atomic<bool> m_isStoped; // 是否关闭提交
std::atomic<int> m_idleThreadNum; //空闲线程数量
public:
ThreadPool(int size = MAX_THREAD_NUM)
{
m_isStoped=false;
size = size > MAX_THREAD_NUM ? MAX_THREAD_NUM : size;
m_idleThreadNum = size;
for (int i = 0; i < size; i++)
{
//初始化线程数量
m_pool.emplace_back(&ThreadPool::scheduler, this);
}
}
~ThreadPool()
{
Close();
while (!m_tasks.empty())
{
m_tasks.pop();
}
m_cv.notify_all(); // 唤醒所有线程执行
for (std::thread &thread : m_pool)
{
if (thread.joinable())
{
thread.join(); // 等待任务结束,前提是线程可以执行完
}
}
m_pool.clear();
}
// 打开线程池,重启任务提交
void ReOpen()
{
if (m_isStoped)
m_isStoped.store(false);
m_cv.notify_all();
}
// 关闭线程池,停止提交新任务
void Close()
{
if (!m_isStoped)
m_isStoped.store(true);
}
// 判断线程池是否被关闭
bool IsClosed() const
{
return m_isStoped.load();
}
// 获取当前任务队列中的任务数
int GetTaskSize()
{
return m_tasks.size();
}
// 获取当前空闲线程数
int IdleCount()
{
return m_idleThreadNum;
}
// 提交任务并执行
// 调用方式为:std::future<returnType> var = threadpool.Submit(...)
// var.get() 会等待任务执行完,并获取返回值
// 其中...可以直接用函数名+函数参数代替,例如threadpool.Submit(f, 0, 1)
// 如果要调用类成员函数,最好用如下方式:
// threadpool.Submit(std::bind(&Class::Func, &classInstance)) 或
// threadpool.Submit(std::mem_fn(&Class::Func), &classInstance)
template <class F, class... Args>
auto Submit(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
{
using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数f的返回值类型
std::shared_ptr<std::packaged_task<RetType()>> task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RetType> future = task->get_future();
// 封装任务并添加到队列
addTask([task]() {
(*task)();
});
return future;
}
private:
// 消费者
Task getTask()
{
std::unique_lock<std::mutex> lock(m_lock); // unique_lock ��� lock_guard �ĺô��ǣ�������ʱ unlock() �� lock()
while (m_tasks.empty() && !m_isStoped)
{
m_cv.wait(lock);
} // wait ֱ直到有 task
if (m_isStoped)
{
return Task();
}
if(m_tasks.empty())
{
return NULL;
}
Task task = std::move(m_tasks.front()); // ȡһ�� task
m_tasks.pop();
m_cv.notify_one();
return task;
}
// 生产者
void addTask(Task task)
{
std::lock_guard<std::mutex> lock{m_lock}; //对当前快的语句加 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候unlock()
m_tasks.push(task);
m_cv.notify_one(); // 唤醒一个线程执行
std::cout<<"线程数"<<m_idleThreadNum<<std::endl;
}
// 工作线程主循环函数
void scheduler()
{
// std::cout << "bbb " << m_isStoped << " " << m_isStoped.load() << std::endl;
while (!m_isStoped.load())
{
// 获取一个待执行的task
Task task(getTask());
if (task)
{
m_idleThreadNum--;
task();
m_idleThreadNum++;
}
}
}
};
} // namespace ThreadPool
#endif