C++线程池(同步/异步)


同步线程池

温馨提示:需要的前置知识:
C++多线程 https://xingzhu.top/archives/duo-xian-cheng-xian-cheng-chi
C++11新特性(分类里面有)

threadpool. h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <thread>  
#include <map>
#include <queue>
#include <functional>
#include <atomic>
#include <future>
#include <condition_variable>
#include <vector>
#include <mutex>
using namespace std;

class ThreadPool
{
public:
ThreadPool(int minn = 3, int maxn = thread::hardware_concurrency());
~ThreadPool();

void addTask(function<void(void)> f);
private:
void worker();
void manager();

private:
// 工作的线程
// 这里定义为指针,是因为后续创建线程直接 new 出来即可,因为线程之间是不允许拷贝构造的
// 如果写 thread m_manager; 后续不能直接使用 = 赋值,除非使用移动构造转移给 m_manager
thread *m_manager;

// 工作的线程
map<thread::id, thread> m_workers;

// 存储已经退出了的线程 id,主要用于销毁线程
vector<thread::id> m_ids;

// 任务队列
queue<function<void(void)>> m_tasks;

// 最小最大线程数
int m_minThreads, m_maxThreads;

// 空闲线程数,当前线程数
atomic<int> m_idlethreads, m_curthreads;

// 一次性退出的线程数
atomic<int> m_exitNum;

// 线程池是否关闭
atomic<bool> m_stop;

// 互斥量
mutex m_idMutex;
mutex m_queue;

// 条件变量
condition_variable m_notEmpty;
};

threadpool. c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#include "threadpool.h"  
#include <iostream>

// 初始化默认构造函数
ThreadPool::ThreadPool(int minn, int maxn) : m_minThreads(minn), m_maxThreads(maxn)
, m_exitNum(0), m_stop(false)
{
m_idlethreads = m_curthreads = minn;

// 初始化工作的线程
m_manager = new thread(&ThreadPool::manager, this);

// 初始化创建工作的线程,以最小线程数进行创建
for(int i = 0; i < m_curthreads; i++)
{
thread t(&ThreadPool::worker, this);
m_workers.insert(make_pair(t.get_id(), std::move(t)));
}
}
ThreadPool::~ThreadPool()
{
m_stop.store(true);

m_notEmpty.notify_all();

cout << "线程池销毁中....." << endl;

// 只能使用引用,因为线程不允许拷贝
for(auto &it : m_workers)
{
// 引用取值后就不需要解引用了
thread &t = it.second;
if(t.joinable())
{
cout << "----------- 线程 " << t.get_id() << "最终被销毁" << endl;
t.join();
}
}

if(m_manager->joinable())
{
m_manager->join();
}
// 销毁堆内存
delete m_manager;
}

void ThreadPool::addTask(function<void(void)> f)
{
{
unique_lock<mutex> locker(m_queue);
m_tasks.emplace(f);
}
m_notEmpty.notify_one();
}


// 实现工作的线程函数
void ThreadPool::worker()
{
// 当线程池没有关闭的时候
while (!m_stop.load())
{
function<void(void)> task = nullptr;

{
// 这个大括号是局部作用域,表示锁的范围,只需要锁住取任务,任务的执行不需要加锁
unique_lock<mutex> locker(m_queue);
// 任务队列为空
// 注意这里一定要为 while,不能为 if,不然会存在 bug
// 假如唤醒了多个阻塞在这里的线程,如果为 if,全部都会抢互斥锁,然后直接向下执行了,不会判断是否还有任务没被抢
// 但是可能此时任务队列中只有一个任务,就会造成非法访问
// 如果为 while, 抢到锁了,也需要先判断 while 条件是否满足,才退出循环向下执行,这样就不存在上述的问题了
while(!m_stop.load() && m_tasks.empty())
{
m_notEmpty.wait(locker);

if(m_exitNum.load() > 0)
{
m_curthreads--, m_exitNum--;
cout << "--------------线程退出了----------" << endl;

// 共享资源加锁
unique_lock<mutex> locker1(m_idMutex);
m_ids.emplace_back(this_thread::get_id());
return;
}
}

// 取出任务
cout << "取出了一个任务..." << endl;
task = m_tasks.front();
m_tasks.pop();
}

// 空闲的线程变化
m_idlethreads--;
task();
m_idlethreads++;
}
}

void ThreadPool::manager()
{
while (!m_stop.load())
{
// 3 秒进行检测一次
this_thread::sleep_for(chrono::seconds(3));

int idel = m_idlethreads.load();
int cur = m_curthreads.load();

// 销毁线程(自定义规则)
// 这里定义为: 空闲线程数 > 当前线程数 / 2 && 当前线程数 - 2 >= 最小线程数
// 减去 2 是因为我定义的是一次性销毁 2 个线程数
if(idel > cur / 2 && cur -2 >= m_minThreads)
{
// 销毁线程不是管理者线程销毁,而是唤醒工作的线程,让其自己退出,管理者线程可以用于回收退出的线程资源
// 一次性销毁 2 个
m_exitNum.store(2);
m_notEmpty.notify_all(); // 虽然是唤醒了所有的线程,但是只会有 2 个线程销毁

// 这个 m_ids 容器是共享资源,因此需要加锁
unique_lock<mutex> locker(m_idMutex);
for (const auto& id : m_ids)
{
auto it = m_workers.find(id);
if (it != m_workers.end())
{
cout << "------------ 线程 " << (*it).first << "被销毁了...." << endl;

// 这里看似会阻塞程序运行,实则不会,因为这些线程都在 worker 函数中退出了,这个只是回收 pcb 等资源
(*it).second.join();
m_workers.erase(it);
}
}
m_ids.clear();
}

// 添加线程
// 自定义: 空闲的线程数为 0 && 当前线程数 + 2 <= 最大线程数
if(idel == 0 && cur + 2 <= m_maxThreads)
{
thread t(&ThreadPool::worker, this);
cout << "++++++ 添加了一个线程, id: " << t.get_id() << endl;
m_workers.insert(make_pair(t.get_id(), std::move(t)));
m_curthreads++, m_idlethreads++;
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void calc(int x, int y)
{
int res = x + y;
cout << "res = " << res << endl;
this_thread::sleep_for(chrono::seconds(2));
}

int main()
{
ThreadPool pool(4);
for (int i = 0; i < 10; ++i) {
auto func = bind(calc, i, i * 2);
pool.addTask(func);
}
getchar();
return 0;
}

异步线程池

基于上面同步线程池的修改,修改 addTask 函数

  • 同步线程池是需要阻塞等待结果,等待执行,然后得到执行结果
  • 异步线程池实现了不需要阻塞等待,发起任务后,做其他事,发起的任务执行完毕后,返回执行结果给主线程

threadpool. h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#pragma once
#include <future>
using namespace std;

// 线程池类
class ThreadPool
{
public:
ThreadPool(int min, int max = thread::hardware_concurrency());
~ThreadPool();

// F 可调用对象类型,Args 参数
template<typename F, typename... Args>
auto addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>;
}

threadpool. c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename F, typename... Args>  
auto ThreadPool::addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>
{
using returnType = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<returnType()>>(
bind(std::forward<F>(f), std::forward<Args>(args)...)
);

// 得到 future 对象,因为里面存储了可调用对象的返回值
future<returnType> res = task->get_future();

{
// 加到这个任务到任务队列
unique_lock<mutex> lock(m_queue);
m_tasks.emplace([task]() {
(*task)();
});
}
m_notEmpty.notify_one();
return res;
}

解释

  • 使用泛型编程,应用于多种情况,typename... Args 这个表示可以接收任意数量类型的参数
  • 返回值类型推导使用萃取器,result_of 模板函数来实现,result_of<F(Args...)>::type 表示带有 Args...F 函数的类型,前面加有 typename 是告诉编辑器这是一个类型,不是函数
  • addTask 里面的类型使用 && ,这个表示是一个未定义类型,可以接收左值和右值,这么定义是为了适用于多种情况,可以传右值和左值,如果不这么定义,也就是去掉 &&,会进行值拷贝,开销过大,如果只有一个 & 对于传右值情况都会转换为左值,但是使用这个形式,再结合 forward 完美转发,就能实现传递左值就是左值,传递右值就是右值
  • forward 功能就是上述解释的,实现:forward<F>(f) F 是要完美转发的变量类型,f 是实际要完美转发的变量
  • package_task 是实现得到线程返回值的,<> 里面填写返回值类型 returnYype 和参数类型 () ,这里参数为空,是因为可能存在参数,也可能不存在,这里不易写,那么全部搞成无参,而参数提前和可调用对象绑定起来,这里就用到了 bind 绑定器
  • std::bind(可调用对象地址, 绑定的参数/占位符); 这是 bind 的实现体,由于为了实现传递左值,就按左值接收,传递右值按右值接收,这里使用完美转发 forward
  • 上面的 task 使用指针,是因为防止拷贝,因为最终是要得到 future 对象,如果传值,拷贝一份,执行的就是拷贝的副本在执行,就得不到正确的 future 对象了,lambda 传引用也不行,这个构造函数执行完毕后,当前 task 生命周期结束,再执行这个任务函数,就非法操作了
  • 为了方便管理指针,这里使用了 shared_ptr 共享智能指针实现,因为会自动释放内存,安全性高,使用方法是 <> 里面写指针类型,在定义指针即可
  • 由于加任务,这个任务队列是共享资源,因此需要使用互斥锁

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int calc(int x, int y)
{
int res = x + y;
this_thread::sleep_for(chrono::seconds(2));
return res;
}

int main()
{
ThreadPool pool(4);
vector<future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(pool.addTask(calc, i, i * 2));
}
// 等待并打印结果
for (auto&& res : results) {
cout << "线程函数返回值: " << res.get() << endl;
}
return 0;
}

说明:参考学习: https://subingwen.cn/