一、线程池简介
线程池是一种并发编程技术,通过预先创建一组线程并复用它们来执行多个任务,避免了频繁创建和销毁线程的开销。它特别适合处理大量短生命周期任务的场景(如服务器请求、并行计算)。
线程池的核心组件
1. 任务队列(Task Queue)
存储待执行的任务(通常是函数对象或可调用对象)。
2. 工作线程(Worker Threads)
一组预先创建的线程,不断从队列中取出任务并执行。
3. 同步机制互斥锁(Mutex):保护任务队列的线程安全访问。
条件变量(Condition Variable):通知线程任务到达或线程池终止。
实现步骤
1. 初始化线程池创建固定数量的线程,每个线程循环等待任务。
2. 提交任务将任务包装成函数对象,加入任务队列。
3. 任务执行工作线程从队列中取出任务并执行。
4. 终止线程池发送停止信号,等待所有线程完成当前任务后退出。
二、C++11实现线程池
源码
#include <vector> #include <queue> #include <future> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <stdexcept> class ThreadPool { public: //构造函数:根据输入的线程数(默认硬件并发数)创建工作线程。 //每个工作线程执行一个循环,不断从任务队列中取出并执行任务。 //explicit关键字防止隐式类型转换 explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); //等待条件:线程通过条件变量等待任务到来或停止信号。(CPU使用率:休眠时接近0%,仅在任务到来时唤醒) //lambda表达式作为谓词,当条件(停止信号为true 或 任务队列非空)为真时,才会解除阻塞。 this->condition.wait(lock, [this] { return (this->stop || !this->tasks.empty()); }); /* 传统忙等待:while (!(stop || !tasks.empty())) {} // 空循环消耗CPU */ if (this->stop && this->tasks.empty()) { //如果线程池需要终止且任务队列为空则直接return return; } //任务提取:从队列中取出任务并执行,使用std::move避免拷贝开销。 task = std::move(this->tasks.front()); this->tasks.pop(); } //执行任务 task(); } }); } } //任务提交(enqueue方法) template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; //任务封装:使用std::packaged_task包装用户任务,支持异步返回结果。 //智能指针管理:shared_ptr确保任务对象的生命周期延续至执行完毕。 //完美转发:通过std::forward保持参数的左值/右值特性。 auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); /* push传入的对象需要事先构造好,再复制过去插入容器中; 而emplace则可以自己使用构造函数所需的参数构造出对象,并直接插入容器中。 emplace相比于push省去了复制的步骤,则使用emplace会更加节省内存。*/ } condition.notify_one(); return res; } ~ThreadPool() { //设置stop标志,唤醒所有线程,等待任务队列清空。 { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } private: std::vector<std::thread> workers; //存储工作线程对象 std::queue<std::function<void()>> tasks; //任务队列,存储待执行的任务 std::mutex queue_mutex; //保护任务队列的互斥锁 std::condition_variable condition; //线程间同步的条件变量 bool stop; //线程池是否停止标志 };
三、线程池源码解析
1. 成员变量
std::vector<std::thread> workers; // 工作线程容器 std::queue<std::function<void()>> tasks; // 任务队列 std::mutex queue_mutex; // 队列互斥锁 std::condition_variable condition; // 条件变量 bool stop; // 停机标志
设计要点:
-
采用生产者-消费者模式,任务队列作为共享资源
-
组合使用
mutex
+condition_variable
实现线程同步 -
vector
存储线程对象便于统一管理生命周期
2. 构造函数
2.1 线程初始化
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop(false) { if (threads == 0) { threads = 1; } for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { /* 工作线程逻辑 */ }); } }
设计要点:
-
explicit
防止隐式类型转换(如ThreadPool pool = 4;
) -
默认使用硬件并发线程数(通过
hardware_concurrency()
) -
最少创建1个线程避免空池
-
使用
emplace_back
直接构造线程对象
2.2 工作线程逻辑
for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); }
核心机制:
-
unique_lock
配合条件变量实现自动锁管理 -
双重状态检查(停机标志+队列非空)
-
任务提取使用移动语义避免拷贝
-
任务执行在锁作用域外进行
3. 任务提交(enqueue方法)
3.1 方法签名
template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
类型推导:
- 使用尾置返回类型声明
std::result_of
推导可调用对象的返回类型- 完美转发参数(
F&&
+Args&&...
)
3.2 任务封装
auto task = std::make_shared<std::packaged_task<return_type()>> (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
封装策略:
packaged_task
包装任务用于异步获取结果shared_ptr
管理任务对象生命周期std::bind
绑定参数(注意C++11的参数转发限制)
3.3 任务入队
tasks.emplace([task]() { (*task)(); });
优化点:
- 使用
emplace
直接构造队列元素 Lambda
捕获shared_ptr
保持任务有效性- 显式解引用执行
packaged_task
4. 析构函数
4.1 停机控制
~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (auto& worker : workers) { worker.join(); } }
停机协议:
- 设置停机标志原子操作
- 广播唤醒所有等待线程
- 等待所有工作线程退出
5. 关键技术点解析
5.1 完美转发实现
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- 保持参数的左右值特性
- 支持移动语义参数的传递
- C++11的限制:无法完美转发所有参数类型
5.2 异常传播机制
- 任务异常通过
future
对象传播 packaged_task
自动捕获异常- 用户通过
future.get()
获取异常
5.3 内存管理模型
[任务提交者] | v [packaged_task] <---- shared_ptr ---- [任务队列] | v [future]
- 三重生命周期保障:
提交者持有
future
队列持有任务包装器
工作线程执行任务
四、 性能特征分析
1. 时间复杂度
操作 | 时间复杂度 |
---|---|
任务提交(enqueue) | O(1)(加锁开销) |
任务提取 | O(1) |
线程唤醒 | 取决于系统调度 |
2. 空间复杂度
组件 | 空间占用 |
---|---|
线程栈 | 每线程MB级 |
任务队列 | 与任务数成正比 |
同步原语 | 固定大小 |
五、 扩展优化方向
1. 任务窃取(Work Stealing)
- 实现多个任务队列
- 空闲线程从其他队列窃取任务
2. 动态线程池
void adjust_workers(size_t new_size) { if (new_size > workers.size()) { // 扩容逻辑 } else { // 缩容逻辑 } }
3. 优先级队列
using Task = std::pair<int, std::function<void()>>; // 优先级+任务 std::priority_queue<Task> tasks;
4. 无锁队列
moodycamel::ConcurrentQueue<std::function<void()>> tasks;
六、 典型问题排查指南
现象 | 可能原因 | 解决方案 |
---|---|---|
任务未执行 | 线程池提前析构 | 延长线程池生命周期 |
future.get() 永久阻塞 |
任务未提交/异常未处理 | 检查任务提交路径 |
CPU利用率100% | 忙等待或锁竞争 | 优化任务粒度/使用无锁结构 |
内存持续增长 | 任务对象未正确释放 | 检查智能指针使用 |
该实现完整展现了现代C++线程池的核心设计范式,开发者可根据具体需求在此基础进行功能扩展和性能优化。理解这个代码结构是掌握更高级并发模式的基础。
七、 测试用例
使用实例(C++11兼容):
#include <iostream> int main() { ThreadPool pool(4); // 提交普通函数 auto future1 = pool.enqueue([](int a, int b) { return a + b; }, 2, 3); // 提交成员函数 struct Calculator { int multiply(int a, int b) { return a * b; } } calc; auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, std::placeholders::_1, std::placeholders::_2), 4, 5); // 异常处理示例 auto future3 = pool.enqueue([]() -> int { throw std::runtime_error("example error"); return 1; }); std::cout << "2+3=" << future1.get() << std::endl; std::cout << "4*5=" << future2.get() << std::endl; try { future3.get(); } catch(const std::exception& e) { std::cout << "Caught exception: " << e.what() << std::endl; } return 0; }
到此这篇关于C++线程池实现的文章就介绍到这了,更多相关C++线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
来源链接:https://www.jb51.net/program/335069t2f.htm
如有侵犯您的版权,请及时联系3500663466#qq.com(#换@),我们将第一时间删除本站数据。
暂无评论内容