|
文章目录
1、线程池原理
2、 任务队列
3、 线程池
3.1、任务提交
3.2、工作线程
4、完整代码
5、测试
6、其他搜集
7、类似的完整代码实现
c++11中添加了线程库,从此标准库有了支持并发的方案。c++11标准中的线程使用极其简单,但是c++对于多线程的支持还是比较低级,高级一点的用法都需要自己实现。
1、线程池原理
任务量小时,可以一个任务对应一个线程,线程资源的创建与销毁的时间消耗、上下文切换资源占用都可以忽略。但是在高并发、高吞吐、低延迟的场景下,就必须降低线程资源管理的时间消耗,就需要使用线程池了。
本文通过c++11的新特性实现一个线程池,通过管理一个任务队列、线程队列,将主线程中用户提交任务到一任务队列中,空闲的线程将从任务队列中获取任务并执行,用户可以异步的获取提交任务的执行结果。
基本框架如下图所示。这里的任务调度执行策略为:任务队列无上限,同时执行任务的数量固定,有空闲线程时获取队列中的队头任务。
线程池的线程是复用的,线程池中的线程创建后不销毁,直到线程池关闭退出。每个线程创建后,循环执行从任务队列获取任务并执行。
线程池与任务队列之间需要匹配,是一个典型的消费者-生产者模型,需要解决资源访问冲突,并且保证任务为空时,线程应该等待(阻塞),即需要实现线程安全、有同步机制的任务队列。
2、 任务队列
文章 C++ 带超时的线程安全队列ThreadSafeQueue 介绍不同需求下的线程安全队列实现。
这里需要的线程安全队列简单,利用std::mutex限制并发访问即可。 对于出队操作,这里直接阻塞,因为上层线程池中将使用同步消息机制在队列不为空时才执行出队操作。
直接给出任务队列的完整代码
template<typename T>
class QueueSafe{
private:
std::queue<T> _queue;
std::mutex _mtx;
public:
QueueSafe() = default;
QueueSafe(QueueSafe&&) = default;
QueueSafe& operator=(QueueSafe&&) = default;
QueueSafe(const QueueSafe&) = delete;
QueueSafe& operator=(const QueueSafe&) = delete;
~QueueSafe() {
clean();
}
void enqueue(const T& t){
std::unique_lock<std::mutex> lck(_mtx);
_queue.push(t);
}
void enqueue(T&& t){
std::unique_lock<std::mutex> lck(_mtx);
_queue.emplace(t);
}
// 队列为空时,也能返回结果
bool dequeue(T& t){
std::unique_lock<std::mutex> lck(_mtx);
if(_queue.empty())
return false;
t = std::move(_queue.front());
_queue.pop();
return true;
}
bool empty(){
std::unique_lock<std::mutex> lck(_mtx);
return _queue.empty();
}
int size(){
std::unique_lock<std::mutex> lck(_mtx);
return _queue.size();
}
// 用于退出
void clean(){
std::unique_lock<std::mutex> lck(_mtx);
while(!_queue.empty())
_queue.pop();
}
};
3、 线程池
3.1、任务提交
线程池最重要的方法就是负责向任务队列添加任务。提交函数应该做到以下两点:
- 接受任意类型、任意个数的参数的任何函数(普通函数,lambda,成员函数,仿函数…)
- 立即返回“结果”,避免阻塞主线程。这里的“结果”是能够延迟获取的包含任务执行结束的结果。
完整的提交任务函数代码如下:
template <typename F, typename... Args>
#if __cplusplus > 201103L
auto submit(F&& f, Args&& ...args) // c++14可行,c++11 需要添加尾返回类型推导 // ①
#else
auto submit(F&& f, Args&& ...args) -> std::future<decltype(f(args...))> // ①
#endif
{
// 获取根据函数、输入参数推断返回结果 ②
//using Ret = typename std::result_of<F(Args...)>::type;
using Ret = decltype(f(args...));
// 输入的函数指针、变长参数绑定到一个std::function<Ret()> func,仅有返回结果。 ③
std::function<Ret()> func = std::bind( std::forward<F>(f), std::forward<Args>(args)... );
// 使用std::packaged_task打包成一个shared_ptr指针对象,用于异步执行、返回结果
auto task_ptr = std::make_shared< std::packaged_task<Ret()> >( func );
// 进一步将上述结果绑定成一个std::function<void()>函数, 任务队列中元素类型 ④
std::function<void()> wrapper_func = [task_ptr](){
(*task_ptr)();
};
// 外部线程执行
_queue.enqueue(wrapper_func); // 放入任务队列
_cv.notify_one(); // 唤醒一个线程, 异步执行当前任务
// 返回当前任务的std::future指针, 可以使用std::future的get()函数等待任务执行结果 ⑤
return task_ptr->get_future();
}
【学习地址】:FFmpeg/WebRTC/RTMP/NDK/Android音视频流媒体高级开发 【文章福利】:免费领取更多音视频学习资料包、大厂面试题、技术视频和学习路线图,资料包括(C/C++,Linux,FFmpeg webRTC rtmp hls rtsp ffplay srs 等等)有需要的可以点击1079654574加群领取哦~

这里面用到多个c++11新特性:
1.submit是一个模板函数,template<typename F, typename... Args>中的typename... Args是C++11引入的可变模版参数(variadic templates),很容易理解。
函数头部分auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>,这里函数类型的定义用到了叫做“尾返回类型推导”的技巧。在c++14前,关键字auto不能用于函数参数的类型推导,也不能推导函数的返回类型。
传统的c++必须这么实现
template<typename R, typename T, typename U>
R add(T x, U y) {
return x+y;
}通常我们并不知道add()会执行什么,返回结果类型是什么。因此c++11的关键字decltype能够进行返回类型的推导。但是不能写出类似 decltype(x+y) add(T x, U y) { } 的代码。因为在执行到decltype(x+y)时,x和y尚未定义。解决方案是使用尾返回类型推导,利用auto关键字将返回类型后置:
template<typename T, typename U>
auto add2(T x, U y) -> decltype(x+y) {
return x + y;
}在看函数头类型std::future<T>,它提供了一个访问异步操作结果的途径。可以使用std::future的wait()方法来设置屏障,阻塞线程,实现线程同步。并最终使用std::future的get()方法来获得执行结果。对于std::future,可以在这篇文献中找到更详细的讲解:现代 C++ 教程:高速上手 C++ 11/14/17/20-- 期物std::future
总的来说,最后将会获得返回类型为 实例化为f(args…)的std::future<> 的submit函数。
2.这里使用了std::function进行包装从而产生了一个特殊函数,这个特殊函数使用std::bind将函数f和参数args绑定起来。
简单来说,std::function可以对多个相似的函数进行包装(即通用的描述方法)。std::function可以hold住任何可以通过“()”来调用的对象,包括:
普通函数
成员函数
lambda
std::bind
而std::bind可以将调用函数时的部分参数先制定好,留下一部分在真正调用时确定。(当然,你也可以直接指定全部参数,在调用时不再指定。) 例如:
int func_1(double v){
return (int)v;
}
int func_2(double v1, double v2) {
return (int)(v1+v2);
}
auto x_func_1 = std::bind(func_1, std::placeholders::_1);
auto x_func_2 = std::bind(func_2, 1, std::placeholders::_1);
int x1 = x_func_1 (1.2); // func_1(1.2), 1
int x2 = x_func_1 (2.3); // func_2(1, 2.3), 2 这里我们会注意到,std::bind中出现了一个std::forward()的特殊方法。std::forward()又被称作完美转发。简单来说,std::forward()将会完整保留参数的引用类型进行转发。
这里的F&& f和Args&&... args中的&&并非是右值引用意思,而是一种特殊现象,这个现象被称作万能引用(universal reference)。万能引用可以简单理解为,当T是模板参数时,T&&的作用主要是保持值类别进行转发。
然而,一个绑定到universial reference上的对象可能具有lvaluesness或者rvalueness,正是因为有这种二义性,所以产生了std::forward。有关于万能引用、完美转发以及背后所隐藏的引用折叠,参考现代C++之万能引用、完美转发、引用折叠。
总的来说,②会产生一个以 函数f(arg…)返回类型Ret 为返回类型、不含参数的特殊函数包装func。
3.使用std::make_shared<>()方法,声明了一个std::packaged_task<Ret()>类型的智能指针,并将前面std::function方法声明的特殊函数包装func传入作为std::packaged_task的实例化参数。智能指针将更方便我们对该std::packaged_task对象进行管理, 并且所有权进行转移到线程池中,否则临时对象将在submit函数生命周期结束后销毁。
前面提到,std::packaged_task可以用来封装任何可以调用的目标,从而用于实现异步的调用。
4.这里我们再次利用std::function,将task_ptr指向的std::packaged_task对象取出并包装为void函数。这样我们的代码将更加美观优雅。 我们也可以像其他示例一样,使用lambda匿名函数将任务进入队列操作简化为一步:
_queue.enqueue( [task_ptr](){
(*task_ptr)();
});
5.条件变量会通知一个处于wait状态的线程,该线程将会从任务队列中取得任务并执行。
简要介绍条件变量 std::condition_variable ,是为了解决死锁而生,当互斥操作不够用而引入的。比如,线程可能需要等待某个条件为真才能继续执行,而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。所以,condition_variable实例被创建出现主要就是用于唤醒等待线程从而避免死锁。std::condition_variable的notify_one()用于唤醒一个线程;notify_all()则是通知所有线程。
3.2、工作线程
线程池中,每个线程创建后就开始循环从任务队列中取得任务并执行,可以使用匿名函数实现。 为简化结构,在线程池中创建私有成员类ThreadWoker作为内置线程工作类,执行真正的工作。
class ThreadWorker{
private:
int _id;
ThreadPool *_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : _pool(pool), _id(id)
{ }
void operator()()
{
printf(&#34;threadpool [%d]: enter.\n&#34;,_id);
std::function<void()> func;
bool hastask;
while(!_pool->_shutdown)
{
{
std::unique_lock<std::mutex> lck(_pool->_mutex);
printf(&#34;threadpool [%d]: waiting task\n&#34;,_id);
_pool->_cv.wait(lck, [&]{
// 关闭线程池,在任务队列为空时,也能退出等待
if(_pool->_shutdown)
return true;
return !_pool->_queue.empty();
});
//func = _pool->_queue.dequeue();
hastask = _pool->_queue.dequeue(func);
}
if(hastask){
// 执行任务
printf(&#34;threadpool [%d]: excuting work... \n&#34;,_id);
func();
printf(&#34;threadpool [%d]: work done. \n&#34;,_id);
}
}
printf(&#34;threadpool [%d]: exit.\n&#34;,_id);
}
};
重点关注重载()操作的void operator()(),可以理解是一个仿函数,这里面进行了任务的取出与执行。
我们使用了一个while循环,在线程池处于工作时循环从任务队列中提取任务。并利用条件变量,在任务队列为空时阻塞当前线程,等待上文中的提交函数添加任务后发出的通知。在任务队列不为空时,我们将任务队列中的任务取出,并放在事先声明的基础函数类func中。成功取出后便立即执行该任务。
4、完整代码
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
template<typename T>
class QueueSafe{
private:
std::queue<T> _queue;
std::mutex _mtx;
public:
QueueSafe() = default;
QueueSafe(QueueSafe&&) = default;
QueueSafe& operator=(QueueSafe&&) = default;
QueueSafe(const QueueSafe&) = delete;
QueueSafe& operator=(const QueueSafe&) = delete;
~QueueSafe()
{
clean();
}
void enqueue(const T& t)
{
std::unique_lock<std::mutex> lck(_mtx);
_queue.push(t);
}
void enqueue(T&& t)
{
std::unique_lock<std::mutex> lck(_mtx);
_queue.emplace(t);
}
// 队列为空时,也能返回结果
bool dequeue(T& t)
{
std::unique_lock<std::mutex> lck(_mtx);
if(_queue.empty())
return false;
t = std::move(_queue.front());
_queue.pop();
return true;
}
bool empty()
{
std::unique_lock<std::mutex> lck(_mtx);
return _queue.empty();
}
int size()
{
std::unique_lock<std::mutex> lck(_mtx);
return _queue.size();
}
// 用于退出
void clean()
{
std::unique_lock<std::mutex> lck(_mtx);
while(!_queue.empty())
_queue.pop();
}
};
class ThreadPool{
private:
class ThreadWorker{
private:
int _id;
ThreadPool *_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : _pool(pool), _id(id)
{
}
void operator()()
{
printf(&#34;threadpool [%d]: enter.\n&#34;,_id);
std::function<void()> func;
bool hastask;
while(!_pool->_shutdown)
{
{
std::unique_lock<std::mutex> lck(_pool->_mutex);
printf(&#34;threadpool [%d]: waiting task\n&#34;,_id);
_pool->_cv.wait(lck, [&]{
// 在任务队列为空时,也能退出等待
if(_pool->_shutdown)
return true;
return !_pool->_queue.empty();
});
//func = _pool->_queue.dequeue();
hastask = _pool->_queue.dequeue(func);
}
if(hastask){
// 执行任务
printf(&#34;threadpool [%d]: excuting work... \n&#34;,_id);
func();
printf(&#34;threadpool [%d]: work done. \n&#34;,_id);
}
}
printf(&#34;threadpool [%d]: exit.\n&#34;,_id);
}
};
std::atomic_bool _shutdown;
QueueSafe<std::function<void()>> _queue;
std::vector<std::thread> _threads;
std::mutex _mutex;
std::condition_variable _cv;
ThreadPool(const int n_threads = 4):
_threads( std::vector<std::thread>(n_threads) ), _shutdown(false)
{
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
void init()
{
for (int i = 0; i < _threads.size(); ++i)
{
匿名函数实现线程实际工作
// _threads.at(i) = std::thread([this, i]{
// printf(&#34;threadpool [%d]: enter.\n&#34;, i);
//
// std::function<void()> func;
// bool hastask;
//
// while(!_shutdown)
// {
// {
// std::unique_lock<std::mutex> lck(_mutex);
// printf(&#34;threadpool [%d]: waiting task\n&#34;,i );
//
// _cv.wait(lck, [&]{
//
// // 在任务队列为空时,也能退出等待
// if(_shutdown)
// return true;
//
// return !_queue.empty();
// });
//
// //func = _queue.dequeue();
// hastask = _queue.dequeue(func);
// }
//
// if(hastask){
// // 执行任务
// printf(&#34;threadpool [%d]: excuting work... \n&#34;, i);
// func();
// printf(&#34;threadpool [%d]: work done. \n&#34;, i);
// }
// }
//
// printf(&#34;threadpool [%d]: exit.\n&#34;, i);
// });
_threads.at(i) = std::thread(ThreadWorker(this, i)); // 分配线程池任务
}
}
void shutdown()
{
_shutdown = true;
_cv.notify_all();
for(auto& td : _threads){
if(td.joinable()) td.join();
}
}
template <typename F, typename... Args>
#if __cplusplus > 201103L
auto submit(F&& f, Args&& ...args) // c++14可行,c++11 需要添加尾返回类型推导
#else
auto submit(F&& f, Args&& ...args) -> std::future<decltype(f(args...))>
#endif
{
//using Ret = typename std::result_of<F(Args...)>::type;
using Ret = decltype(f(args...));
std::function<Ret()> func = std::bind( std::forward<F>(f), std::forward<Args>(args)... );
auto task_ptr = std::make_shared< std::packaged_task<Ret()> >( func );
std::function<void()> wrapper_func = [task_ptr](){
(*task_ptr)();
};
// 外部线程执行
_queue.enqueue(wrapper_func);
_cv.notify_one(); // 唤醒一个线程, 异步执行当前任务
// 返回工作的future指针
return task_ptr->get_future();
}
};
可能的运行结果,部分省略:
threadpool [0]: enter.
threadpool [0]: waiting task
threadpool [1]: enter.
threadpool [1]: waiting task
threadpool [2]: enter.
threadpool [2]: waiting task
threadpool [3]: enter.
threadpool [3]: waiting task
===============================================
submit task 0.
submit task 1.
threadpool [0]: excuting work...
threadpool [1]: excuting work...
submit task 2.
submit task 3.
threadpool [2]: excuting work...
threadpool [3]: excuting work...
submit task 4.
submit task 5.
...
...
...
submit task 18.
submit task 19.
===============================================
task 1 done!
task 3 done!
task 2 done!
threadpool [2]: work done.
threadpool [2]: waiting task
threadpool [3]: work done.
threadpool [2]: excuting work...
threadpool [3]: waiting task
threadpool [1]: work done.
threadpool [1]: waiting task
threadpool [3]: excuting work...
threadpool [1]: excuting work...
task 0 done!
threadpool [0]: work done.
threadpool [0]: waiting task
threadpool [0]: excuting work...
task 5 done!
threadpool [3]: work done.
...
...
...
task 17 done!
threadpool [0]: work done.
threadpool [0]: waiting task
task 15 done!
threadpool [3]: work done.
threadpool [3]: waiting task
task 16 done!
threadpool [2]: work done.
threadpool [2]: waiting task
task 19 done!
threadpool [1]: work done.
threadpool [1]: waiting task
result 0
...
...
...
result 19
===============================================
threadpool [0]: exit.
threadpool [1]: exit.
threadpool [3]: exit.
threadpool [2]: exit.
5、其他搜集
发现一篇博客【图解 | 你管这破玩意叫线程池?】,虽是Java栈但思路整理清晰有层次,用图文的的方式介绍线程池的实现和优化演进。
6、类似的完整代码实现
这个使用原生的queue和mutex进行线程安全处理,场景使用简单。
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don&#39;t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error(&#34;enqueue on stopped ThreadPool&#34;);
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
原文链接:https://blog.csdn.net/wanggao_1990/article/details/116231784 |
|