cppcpp线程池C++线程池(同步/异步)
xingzhu
同步线程池
温馨提示:需要的前置知识:
C++多线程 https://xingzhu.top/archives/duo-xian-cheng-xian-cheng-chi
C++11新特性(分类里面有)
threadpool. h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <thread> #include <map> #include <queue> #include <functional> #include <atomic> #include <future> #include <condition_variable> #include <vector> #include <mutex> using namespace std; class ThreadPool { public: ThreadPool(int minn = 3, int maxn = thread::hardware_concurrency()); ~ThreadPool(); void addTask(function<void(void)> f); private: void worker(); void manager(); private: thread *m_manager; map<thread::id, thread> m_workers; vector<thread::id> m_ids; queue<function<void(void)>> m_tasks; int m_minThreads, m_maxThreads; atomic<int> m_idlethreads, m_curthreads; atomic<int> m_exitNum; atomic<bool> m_stop; mutex m_idMutex; mutex m_queue; condition_variable m_notEmpty; };
|
threadpool. c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
| #include "threadpool.h" #include <iostream>
ThreadPool::ThreadPool(int minn, int maxn) : m_minThreads(minn), m_maxThreads(maxn) , m_exitNum(0), m_stop(false) { m_idlethreads = m_curthreads = minn; m_manager = new thread(&ThreadPool::manager, this); for(int i = 0; i < m_curthreads; i++) { thread t(&ThreadPool::worker, this); m_workers.insert(make_pair(t.get_id(), std::move(t))); } } ThreadPool::~ThreadPool() { m_stop.store(true); m_notEmpty.notify_all(); cout << "线程池销毁中....." << endl; for(auto &it : m_workers) { thread &t = it.second; if(t.joinable()) { cout << "----------- 线程 " << t.get_id() << "最终被销毁" << endl; t.join(); } } if(m_manager->joinable()) { m_manager->join(); } delete m_manager; } void ThreadPool::addTask(function<void(void)> f) { { unique_lock<mutex> locker(m_queue); m_tasks.emplace(f); } m_notEmpty.notify_one(); }
void ThreadPool::worker() { while (!m_stop.load()) { function<void(void)> task = nullptr; { unique_lock<mutex> locker(m_queue); while(!m_stop.load() && m_tasks.empty()) { m_notEmpty.wait(locker); if(m_exitNum.load() > 0) { m_curthreads--, m_exitNum--; cout << "--------------线程退出了----------" << endl; unique_lock<mutex> locker1(m_idMutex); m_ids.emplace_back(this_thread::get_id()); return; } } cout << "取出了一个任务..." << endl; task = m_tasks.front(); m_tasks.pop(); } m_idlethreads--; task(); m_idlethreads++; } } void ThreadPool::manager() { while (!m_stop.load()) { this_thread::sleep_for(chrono::seconds(3)); int idel = m_idlethreads.load(); int cur = m_curthreads.load(); if(idel > cur / 2 && cur -2 >= m_minThreads) { m_exitNum.store(2); m_notEmpty.notify_all(); unique_lock<mutex> locker(m_idMutex); for (const auto& id : m_ids) { auto it = m_workers.find(id); if (it != m_workers.end()) { cout << "------------ 线程 " << (*it).first << "被销毁了...." << endl; (*it).second.join(); m_workers.erase(it); } } m_ids.clear(); } if(idel == 0 && cur + 2 <= m_maxThreads) { thread t(&ThreadPool::worker, this); cout << "++++++ 添加了一个线程, id: " << t.get_id() << endl; m_workers.insert(make_pair(t.get_id(), std::move(t))); m_curthreads++, m_idlethreads++; } } }
|
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| void calc(int x, int y) { int res = x + y; cout << "res = " << res << endl; this_thread::sleep_for(chrono::seconds(2)); }
int main() { ThreadPool pool(4); for (int i = 0; i < 10; ++i) { auto func = bind(calc, i, i * 2); pool.addTask(func); } getchar(); return 0; }
|
异步线程池
基于上面同步线程池的修改,修改 addTask
函数
- 同步线程池是需要阻塞等待结果,等待执行,然后得到执行结果
- 异步线程池实现了不需要阻塞等待,发起任务后,做其他事,发起的任务执行完毕后,返回执行结果给主线程
threadpool. h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #pragma once #include <future> using namespace std;
class ThreadPool { public: ThreadPool(int min, int max = thread::hardware_concurrency()); ~ThreadPool();
template<typename F, typename... Args> auto addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>; }
|
threadpool. c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| template<typename F, typename... Args> auto ThreadPool::addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { using returnType = typename result_of<F(Args...)>::type; auto task = make_shared<packaged_task<returnType()>>( bind(std::forward<F>(f), std::forward<Args>(args)...) ); future<returnType> res = task->get_future(); { unique_lock<mutex> lock(m_queue); m_tasks.emplace([task]() { (*task)(); }); } m_notEmpty.notify_one(); return res; }
|
解释
- 使用泛型编程,应用于多种情况,
typename... Args
这个表示可以接收任意数量类型的参数
- 返回值类型推导使用萃取器,
result_of
模板函数来实现,result_of<F(Args...)>::type
表示带有 Args...
的 F
函数的类型,前面加有 typename
是告诉编辑器这是一个类型,不是函数
addTask
里面的类型使用 &&
,这个表示是一个未定义类型,可以接收左值和右值,这么定义是为了适用于多种情况,可以传右值和左值,如果不这么定义,也就是去掉 &&
,会进行值拷贝,开销过大,如果只有一个 &
对于传右值情况都会转换为左值,但是使用这个形式,再结合 forward
完美转发,就能实现传递左值就是左值,传递右值就是右值
forward
功能就是上述解释的,实现:forward<F>(f)
F
是要完美转发的变量类型,f
是实际要完美转发的变量
package_task
是实现得到线程返回值的,<>
里面填写返回值类型 returnYype
和参数类型 ()
,这里参数为空,是因为可能存在参数,也可能不存在,这里不易写,那么全部搞成无参,而参数提前和可调用对象绑定起来,这里就用到了 bind
绑定器
std::bind(可调用对象地址, 绑定的参数/占位符);
这是 bind
的实现体,由于为了实现传递左值,就按左值接收,传递右值按右值接收,这里使用完美转发 forward
- 上面的
task
使用指针,是因为防止拷贝,因为最终是要得到 future
对象,如果传值,拷贝一份,执行的就是拷贝的副本在执行,就得不到正确的 future
对象了,lambda
传引用也不行,这个构造函数执行完毕后,当前 task
生命周期结束,再执行这个任务函数,就非法操作了
- 为了方便管理指针,这里使用了
shared_ptr
共享智能指针实现,因为会自动释放内存,安全性高,使用方法是 <>
里面写指针类型,在定义指针即可
- 由于加任务,这个任务队列是共享资源,因此需要使用互斥锁
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| int calc(int x, int y) { int res = x + y; this_thread::sleep_for(chrono::seconds(2)); return res; }
int main() { ThreadPool pool(4); vector<future<int>> results; for (int i = 0; i < 10; ++i) { results.emplace_back(pool.addTask(calc, i, i * 2)); } for (auto&& res : results) { cout << "线程函数返回值: " << res.get() << endl; } return 0; }
|
说明:参考学习: https://subingwen.cn/


xingzhu
keep trying!keep doing!believe in yourself!
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 星竹!