|
在这两年C++选手找工作时候,选择最多的项目可能就是webserver这个了,对于webserver大部分基于epoll实现的IO复用,来实现的reactor模式,大部分是牛客上的单reactor多线程模式,也有同学选择github上的主从reactor和多线程,当然不管是那种模式都是基于对事件的分发处理实现的事件驱动模型,都用到了线程池这个结构,在面试中也是问到比较多的,也有很多面试官要求手撕,关于线程池的实现也是多种多样,读过游双《linux高性能网络编程》的同学会实现基于linux自带的pthread库实现,也有同学会基于C++11加入的thread库实现,今天我们讲解基于thread库实现,现代C++已经有了好用的线程库,和值得期待的携程机制,基于C++库实现的线程池可以实现跨平台,具有更好的可移植性,在这里,我们先放出线程池代码:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
class ThreadPool {
public:
ThreadPool(int numThreads) {
for (int i = 0; i < numThreads; ++i) {
threads_.emplace_back([this]() {
while (true) {
std::unique_lock lock(mutex_);
condition_.wait(lock, [this](){return !tasks_.empty();});
auto task = std::move(tasks_.front());
tasks_.pop();
lock.unlock();
task();
}
});
}
}
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
auto task = std::make_shared<std::packaged_task<decltype(f(args...))()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<decltype(f(args...))> res = task->get_future();
std::unique_lock lock(mutex_);
tasks_.emplace([task]() {(*task)();});
lock.unlock();
condition_.notify_one();
return res;
}
~ThreadPool() {
for (int i = 0; i < threads_.size(); ++i) {
submit([]() {});
}
for (auto& thread : threads_) {
thread.join();
}
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable condition_;
};
使用方法如下:
#include <iostream>
#include <chrono>
int main() {
ThreadPool pool(4);
auto future1 = pool.submit([]() { std::cout << &#34;Task 1.&#34; << std::endl; return 1; });
auto future2 = pool.submit([]() { std::cout << &#34;Task 2.&#34; << std::endl; return 2; });
std::cout << &#34;Result 1: &#34; << future1.get() << std::endl;
std::cout << &#34;Result 2: &#34; << future2.get() << std::endl;
// Submit more tasks before the previous ones finish:
for (int i = 0; i < 8; ++i) {
pool.submit(() { std::cout << &#34;Task &#34; << i+1 << &#34;.&#34; << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
}
return 0;
}
在这个线程池的实现中,我们用到了很多C++11后的新特性,也很实用,构造函数负责创建线程,线程存储在vector中,在线程池运行的时候,每个线程都不停地从任务队列中取出任务,这里使用了条件变量‘condition_variable‘,用于判断任务队列是否为空,即是否有可执行任务,为了避免多个线程抢占一个任务,使用了&#39;unique_lock&#39;,独享锁,是一种灵活的锁,可以随时释放和上锁。这里使用了移动语义move,将任务取出然后执行。
线程池中最核心的部分就是submit方法(叫什么因人而异,add_task,enqueue),即往任务队列添加一个任务,这里使用了可变参数模板用于表示被添加进来的task任务函数的参数列表,用到了后置返回值写法,使我们的代码更容易理解,&#39;std::future&#39;期望,用于判断一个异步任务的返回结果,用decltype即可得到他的返回值类型,还是挺好理解:
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> 接下来使用std::packaged_task包装了一个可调用对象,用bind将函数与参数绑定,运用完美转发保证参数不会被隐式转换成左值,运用智能指针来管理它,接下来调用std::packaged的get_futrue()方法,获取返回结果,往任务队列中添加任务,这个过程也需要线程同步,最后唤醒一个线程处理任务,这就是submit方法的讲解
auto task = std::make_shared<std::packaged_task<decltype(f(args...))()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);最后是析构部分,在一般的线程池中我们通常会设置一个stop_标志位,来决定线程池工作是否结束,这里我们没有实现,而是更为简洁的结束线程池,在服务器运行结束调用析构函数的时候,往队列里添加当前线程池容量个空任务,这样可以把剩下未处理的有用的任务处理完在处理空任务(处理空任务不还会影响我们的数据),保证了每个有效任务都被处理,再等待线程结束,线程池工作结束。
for (int i = 0; i < threads_.size(); ++i) {
submit([]() {});
}这只是一个简单的线程池,当然你可以直接把它替换到你的webserver项目中,它足以满足我们的低级需求,你也可以试着往他里面扩展方法,比如实现动态调整线程池中线程数量,任务的优先级和超时机制,允许客户端查看线程内部状态,还有异常机制等等,可以更好地帮助我们实现更安全的线程池(学艺不精,有问题请指出,虚心求教)
接下来提供扩展了自定义队列最大值的实现,同样思路的代码,开始理解不了,多看几遍就好啦:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
class ThreadPool {
public:
ThreadPool(int numThreads, int maxTasks)
: maxThreads_(numThreads),
maxTasks_(maxTasks),
activeThreads_(0),
poolShutdown_(false)
{
for (int i = 0; i < numThreads; ++i) {
threads_.emplace_back([this]() {
while (true) {
std::unique_lock lock(mutex_);
condition_.wait(lock, [this](){
return poolShutdown_ || !tasks_.empty() || activeThreads_ < maxThreads_;
});
if (poolShutdown_) {
lock.unlock();
return;
}
++activeThreads_;
auto task = std::move(tasks_.front());
tasks_.pop();
lock.unlock();
task();
lock.lock();
--activeThreads_;
condition_.notify_one();
lock.unlock();
}
});
}
}
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
auto task = std::make_shared<std::packaged_task<decltype(f(args...))()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<decltype(f(args...))> res = task->get_future();
std::unique_lock lock(mutex_);
condition_.wait(lock, [this](){return poolShutdown_ || tasks_.size() < maxTasks_;});
if (poolShutdown_) {
throw std::runtime_error(&#34;submit on stopped ThreadPool&#34;);
}
tasks_.emplace([task]() {
(*task)();
});
lock.unlock();
condition_.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock lock(mutex_);
poolShutdown_ = true;
}
condition_.notify_all();
for (auto& thread : threads_) {
thread.join();
}
}
private:
int maxThreads_;
int maxTasks_;
int activeThreads_;
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable condition_;
bool poolShutdown_;
}; |
|