IE盒子

搜索
查看: 136|回复: 6

现代C++并行与并发笔记 附C++17线程池实现项目实战

[复制链接]

2

主题

5

帖子

8

积分

新手上路

Rank: 1

积分
8
发表于 2023-1-17 00:30:03 | 显示全部楼层 |阅读模式
C++11 之前,C++原生不支持并行与并发。但这并不意味着无法对线程进行操作, 只不过需要使用系统库的API进行操作(因为线程与操作系统是不可分开的)。随着C++11标准的完成,我们有了 std::thread ,其能给予我们可以在所有操作系统 上可移植的线程操作。为了同步线程,C++11也添加了互斥量,并且对一些RAII类 型的锁进行了封装。另外,std::condition_variable 也能够灵活的在线程间,进行 唤醒操作。另一些有趣的东西就是 std::async 和  std::future  ——我们可以将普通的函数封装 到  std::async 中,可以在后台异步的运行这些函数。包装后函数的返回值则 用  std::future 来表示,函数的结果将会在运行完成后,放入这个对象中,所以可以在函数完成前,做点别的事情。
让程序在特定时间休眠

C++11中对于线程的控制非常优雅和简单。在 this_thread 的命名空间中,包含了只 能被运行线程调用的函数。其包含了两个不同的函数,让线程睡眠一段时间,这样就不需要使用任何额外的库,或是操作系统依赖的库来执行这个任务。 本节中,我们将关注于如何将线程暂停一段时间,或是让其休眠一段时间。
#include <chrono>
#include <iostream>
#include <thread>
using namespace std;
using namespace chrono_literals;
int main() {
  cout << "sleep for 5s + 300ms";
  this_thread::sleep_for(5s + 300ms);

  cout << "sleep for 3s";
  this_thread::sleep_until(chrono::high_resolution_clock::now() + 3s);
}
这是可以运行的程序,其中sleep_for 和 sleep_until 函数都已经在 C++11 中加入,存放于std::this_thread命名空间中。其能对当前线程进行限时阻塞(并不是整个程序或整个进程)。线程被阻塞时不会消耗CPU时间,操作系统会将其标记挂起的状态,时间到了后线程会自动醒来。这种方式的好处在于,不需要知道操作系统对我们运行的程序做了什么,因为 STL会将其中的细节进行包装。 this_thread::sleep_for 函数能够接受一个  chrono::duration 值。最简单的方式就是 1s或 5s+300ms。为了使用这种非常简洁的字面时间表示方式,我们需要对命名空间进行声明 using namespace std::chrono_literals; 。this_thread::sleep_until 函数能够接受一个  chrono::time_out 参数。这就能够简单的指定对应的壁挂钟时间,来限定线程休眠的时间。 对于那种新型写法,我查了一下大概是 c++ 11 引进得 User-defined literals , 允许用户自定义后缀产生相应的对象,具体可以看这个网址:https://en.cppreference.com/w/cpp/language/user_literal
启动和停止线程

C++11中添加了 std::thread 类,并能使用简洁的方式能够对线程进行启动或停止, 线程相关的东西都包含在STL中,并不需要额外的库或是操作系统的实现来对其进行支持。 本节中,我们将实现一个程序对线程进行启动和停止。如果是第一次使用线程的话,就需要了解一些细节。
#include <chrono>
#include <iostream>
#include <thread>
using namespace std;
using namespace chrono_literals;

static void thread_with_param(int i) {
  this_thread::sleep_for(1ms * i);
  cout << "线程 " << i << "开始 \n";
  this_thread::sleep_for(1s * i);
  cout << "线程 " << i << "结束\n";
}

int main() {
  cout << thread::hardware_concurrency()
       << "个线程在当前所使用的系统中能够同时运行.\n";
  thread t1{thread_with_param, 1};
  thread t2{thread_with_param, 2};
  thread t3{thread_with_param, 3};
  t1.join();
  t2.join();
  t3.detach();
  cout << "Threads joined.\n";
}

  • 主函数中,会先了解在所使用的系统中能够同时运行多少个线程,使用 std::thread::hardware_concurrency 进行确定。这个数值通常依赖于机器上有 多少个核,或是STL实现中支持多少个核。这也就意味着,对于不同机器,这 个函数会返回不同的值.
  • 这里我们启动三个线程。 我们使用实例化线程的代码行为  thread t {f, x},这就等于在新线程中调用  f(x) 。这样,在不同的线程中就可以给于  thread_with_param 函数不同的参数。
  • 当启动线程后,我们就需要在其完成其工作后将线程进行终止,使用  join 函数来停止线程。调用  join 将会阻塞调用线程,直至对应的线程终止为止
  • 另一种方式终止的方式是分离。如果不以 join 或 detach 的方式进行终止,那 么程序只有在 thread 对象析构时才会终止。通过调用 detech ,我们将告诉3号线程,即使主线程终止了,你也可以继续运行。这个是cppreference 的意思,可是经过我的测试和查询,join 和 detach 仅仅只是回收资源的方式不一样,join 是由主线程回收,detach 则是由运行时库负责清理被调线程相关的资源,主线程结束后,所有相关的子线程都会结束。




image.png

互斥量(mutex)





image.png

线程对互斥量上锁之后,很多事情都变的非常简单,我们只需要上锁、访问、解锁 三步就能完成我们想要做的工作。不过对于有些比较健忘的开发者来说,在上锁之 后,很容易忘记对其进行解锁,或是互斥量在上锁状态下抛出一个异常,如果要对 这个异常进行处理,那么代码就会变得很难看。最优的方式就是程序能够自动来处理这种事情。 内存管理部分,我们有  unique_ptr ,  shared_ptr 和  weak_ptr 。这些辅助类可以很完美帮我们避免内存泄漏。互斥量也有类似的帮手,最简单的一个就是 std::lock_guard 。使用方式如下:
void critical_function() {

  lock_guard<mutex> l{some_mutex};

  // critical section

}
lock_guard 的构造函数能接受一个互斥量,其会立即自动调用 lock ,构造函数会直到获取互斥锁为止。当实例进行销毁时,其会对互斥量再次进行解锁。这样互斥量就很难陷入到  lock/unlock 循环错误中。 C++17 STL提供了如下的RAII辅助锁。其都能接受一个模板参数,其与互斥量的类型相同(在C++17中,编译器可以自动推断出相应的类型):



image.png

scoped_lock 能够避免死锁的发生。 std::scoped_lock的预防死锁策略很简单,假设要对 n 个 mutex(mutex1, mutex2, ..., mutexn)上锁,那么每次只尝试对一个 mutex 上锁,只要上锁失败就立即释放获得的所有锁(方便让其他线程获得锁),然后重新开始上锁,处于一个循环当中,直到对 n 个 mutex 都上锁成功。这种策略是基本上是有效的,虽然有极小的概率出现“活锁”,例如ABBA死锁中,线程1释放锁A的同一时刻时线程2又释放了锁B,然后这两个线程又同时分别获得了锁A和锁B,如此循环。
进行延迟初始化——std::call_once

假如我们将完成一个程序,我们使用多线程对同一段代码进行执行。他们执行的是相同的代码,但是我们的初始化函数只需要运行一次:
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
// 在对指定函数使用  call_once 时,需要对所有线程进行同步,这是同步标志
once_flag callflag;
// 只需要执行一次的函数
static void once_print() { cout << '!'; }

static void print(size_t x) {
  // 只调用一次的写法
  std::call_once(callflag, once_print);
  cout << x;
}
int main() {
  vector<thread> v;
  for (size_t i{0}; i < 10; ++i) {
    v.emplace_back(print, i);
  }
  for (auto &t : v) {
    t.join();
  }
  cout << '\n';
}
std::call_once 工作原理和栅栏类似:第一个线程达到  call_once 的线程会执行对应的函数,其他线程到这就阻塞。当第一个线程从准备函数中返回后,其他线程才结束阻塞。我们可以对这个过程进行安排,让一个变量决定其他线程能否运行,线程则必须对这个变量进行等待,直到这个变量准备好了,所有变量才能运行。这个变量就 是 once_flag callflag;。每一个 call_once 都需要一个 once_flag 实例作为参数,来  表明预处理函数是否运行了一次。 另一个细节是:如果  call_once 执行失败了(因为准备函数抛出了异常),那么下一个线程则会再去尝试执行。
将执行的程序推到后台——std::async

前面说过的 t1.join(),并不会给你返回值,为了获取返回值通常只能运用共享内存的方法。 C++11之后, std::async 能帮我们完成这项任务。我们将写一个简单的程序,并使用异步函数,让线程在同一时间内做很多事情。 std::async 其实很强大,让我们先来了解其一方面。
#include <algorithm>
#include <future>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <map>
#include <string>
using namespace std;
// 对字符串中的字符进行统计
static map<char, size_t> histogram(const string &s) {
  map<char, size_t> m;
  for (char c : s) {
    m[c] += 1;
  }
  return m;
}
// 返回一个排序后的副本
static string sorted(string s) {
  sort(begin(s), end(s));
  return s;
}
// 是否元音字母
static bool is_vowel(char c) {
  char vowels[]{"aeiou"};
  return end(vowels) != find(begin(vowels), end(vowels), c);
}
// 对传入的字符串中元音字母进行计数

static size_t vowels(const string &s) {
  return count_if(begin(s), end(s), is_vowel);
}
int main() {
  // 测试变量
  string input = "wakwkdawfkwekf";
  // 函数一
  auto hist(async(launch::async, histogram, input));

  // 函数二
  auto sorted_str(async(launch::async, sorted, input));

  // 函数三
  auto vowel_count(async(launch::async, vowels, input));

  // 获取函数一的返回值
  for (const auto &[c, count] : hist.get()) {
    cout << c << ": " << count << '\n';
  }

  // 获取函数 二 三 的返回值
  cout << "Sorted string: " << sorted_str.get() << '\n'
       << "Total vowels: " << vowel_count.get() << '\n';
}
通过  std::async 会返回一个  future 类型对象。这个对象表示在未来某个时间点上,对象将会获取返回值。通过对  future 对象使 用  .get() ,我们将会阻塞主函数,直到相应的值返回。 我们将三个函数使用 async(launch::async, ...) 进行包装。这样三个函数都不会由主函数来完成。此外, async 会启动新线程,并让线程并发的完成这几个函数。这样我们只需要启动一个线程的开销,就能将对应的工作放在后台进行,而后可以继续执行其他代码。 launch::async 是一个用来定义执行策略的标识。其有两种独立方式和一种组合方式:
策略选择意义
std::launch::async运行新线程,以异步执行任务
std::launch::deferred在调用线程上执行任务(惰性求值)。在对  future 调用 get 和 wait 的时候,才进行执行。如果什么都没有发生,那么执行函数就没有运行。
不使用策略参数调用  async(f, 1, 2, 3) ,  async 的实现可以自由的选择策略。这也就意味着,我们不能确定任务会执行在一个新的线程上,还是执行在当前线程上。
还有件事情我们必须要知道,假设我们写了如下的代码:
async(launch::async, f);
async(launch::async, g);
这就会让  f 和  g 函数并发执行(这个例子中,我们并不关心其返回值)。运行这段代码时,代码会阻塞在这两个调用上,这并不是我们想看到的情况。 所以,为什么会阻塞呢? async 不是非阻塞式、异步的调用吗?没错,不过这里有点特殊:当对一个 async 使用 launch::async 策略时,获取一个 future 对象,之后其析构函数将会以阻塞式等待函数结束运行。 这也就意味着,这两次调用阻塞的原因就是, future 生命周期只有一行的时间!我们可以以接收其返回值的方式,来避免这个问题,从而让 future 对象的生命周期更长。
条件变量(condition_variable)

下面是例子(生产者消费者模型):
#include <condition_variable>
#include <iostream>
#include <queue>
#include <thread>
#include <tuple>
using namespace std;
using namespace chrono_literals;
queue<size_t> q;
mutex mut;
condition_variable cv;
bool finished{false};
// 生产者
static void producer(size_t items) {
  for (size_t i{0}; i < items; ++i) {
    this_thread::sleep_for(100ms);
    {
      lock_guard<mutex> lk{mut};
      q.push(i);
    }
    cv.notify_all();
  }
  {
    lock_guard<mutex> lk{mut};
    finished = true;
  }
  cv.notify_all();
}
// 消费者
static void consumer() {
  while (!finished) {
    unique_lock<mutex> l{mut};
    cv.wait(l, [] { return !q.empty() || finished; });
    while (!q.empty()) {
      cout << "Got " << q.front() << " from queue.\n";
      q.pop();
    }
  }
}

int main() {
  thread t1{producer, 10};
  thread t2{consumer};
  t1.join();
  t2.join();
  cout << "finished!\n";
}
我们只启动了两个线程。第一个线程会生产一些商品,并放到队列中。另 一个则是从队列中取走商品。当其中一个线程需要对队列进行访问时,其否需要对公共互斥量 mut 进行上锁,这样才能对队列进行访问。这样,我们就能保证两个线程不能再同时对队列进行操作。
cv.wait(lock, predicate) 将会等到 predicate() 返回true时,结束等待。不过其不会对 lock 持续的进行解锁与上锁的操作。为了将使用 wait 阻塞的线程唤醒,我们就需要使用 condition_variable 对象,另一个线程会对同一个对象调用  notify_one() 或  notify_all() 。等待中的线程将从休眠中醒来,并检查  predicate() 条件是否成立。 这样的操作很方便我们使用,因为一般情况下需要多行代码,当消费者被 notify 起来后,可能不止你一个消费者被激活,为保证条件仍就能够合理消费,需要对临界条件再次检查,该 wait函数的用法浓缩了这几行代码。
C++17 线程池

前置知识

返回值类型推导 result_of 和 invoke_result

result_of 是在 C++11 中引入的,同时由于自身原因于C++20 中移除,替代品为 C++17 中引入的 invoke_result,result_of我觉得很难用,从下面例子可以看出来,泛型的类别我很难写,下面那个 ri 函数,我想着直接用函数的类型,但是不行,会报错,要以下面那种奇怪的写法写出来,而invoke_result就很合理,所以接下来的线程池会采用 invoke_result 来参与返回值获取的实现.
#include <iostream>
using namespace std;
int r1() { return 1; }

result_of<decltype(r1)&(void)>::type i = 1;

invoke_result<decltype(r1)>::type s = 1;

int main() { cout << i; }
packaged_task

类模板 std::packaged_task 包装任何可调用目标(函数、 lambda 表达式、 bind 表达式或其他函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。
下面是例子:
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>

// 避免对 std::pow 重载集消歧义的独有函数
int f(int x, int y) { return std::pow(x,y); }

void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b);
    });
    std::future<int> result = task.get_future();

    task(2, 9);

    std::cout << "task_lambda:\t" << result.get() << '\n';
}

void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();

    task();

    std::cout << "task_bind:\t" << result.get() << '\n';
}

void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();

    std::thread task_td(std::move(task), 2, 10);
    task_td.join();

    std::cout << "task_thread:\t" << result.get() << '\n';
}

int main()
{
    task_lambda();
    task_bind();
    task_thread();
}
线程池代码

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class ThreadPool {
public:
  ThreadPool(size_t);
  template <class F, class... Args>
  auto enqueue(F &&f, Args &&...args)
      -> std::future<typename std::invoke_result<F>::type>;
  ~ThreadPool();

private:
  // need to keep track of threads so we can join them
  std::vector<std::thread> workers;
  // the task queue
  std::queue<std::function<void()>> tasks;

  // synchronization
  std::mutex queue_mutex;
  std::condition_variable condition;
  bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
  for (size_t i = 0; i < threads; ++i)
    workers.emplace_back([this] {
      for (;;) {
        std::function<void()> task;

        {
          std::unique_lock<std::mutex> lock(this->queue_mutex);
          // 等生产者生产
          this->condition.wait(
              lock, [this] { return this->stop || !this->tasks.empty(); });
          // 若是因为内存池停止,或是虚假唤醒 直接 return
          if (this->stop && this->tasks.empty())
            return;
          // 拿任务
          task = std::move(this->tasks.front());
          this->tasks.pop();
        }

        task();
      }
    });
}

// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&...args)
    -> std::future<typename std::invoke_result<F>::type> {
  // std::invoke_result 用于推导可调用对象的放回值类型
  using return_type = typename std::invoke_result<F>::type;
  // 将获取 packaged_task
  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task->get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if (stop)
      throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace([task]() { (*task)(); });
  }
  condition.notify_one();
  return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
  {
    std::unique_lock<std::mutex> lock(queue_mutex);
    stop = true;
  }
  condition.notify_all();
  for (std::thread &worker : workers)
    worker.join();
}
测试函数

#include <iostream>
#include <vector>
#include <chrono>

#include "ThreadPool.h"

int main()
{

    ThreadPool pool(4);
    std::vector< std::future<int> > results;

    for(int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue( {
                std::cout << "hello " << i << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::cout << "world " << i << std::endl;
                return i*i;
            })
        );
    }

    for(auto && result: results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;

    return 0;
}
回复

使用道具 举报

5

主题

15

帖子

29

积分

新手上路

Rank: 1

积分
29
发表于 2023-1-17 00:30:23 | 显示全部楼层
“C++原生不支持并发和并发”。。。
回复

使用道具 举报

3

主题

15

帖子

22

积分

新手上路

Rank: 1

积分
22
发表于 2023-1-17 00:30:59 | 显示全部楼层
啊不好意思,我查的资料是C++11以前标准库是没有相关api呀,都是直接依靠系统api,然后C++11标准库才在系统api上进行封装实现的呀,能指导下嘛大佬
回复

使用道具 举报

3

主题

8

帖子

13

积分

新手上路

Rank: 1

积分
13
发表于 2023-1-17 00:31:57 | 显示全部楼层
这是你文章第一句的原文,不感觉有点怪吗?[害羞]
回复

使用道具 举报

4

主题

7

帖子

15

积分

新手上路

Rank: 1

积分
15
发表于 2023-1-17 00:32:43 | 显示全部楼层
我有加状语呀[大哭]
回复

使用道具 举报

1

主题

5

帖子

5

积分

新手上路

Rank: 1

积分
5
发表于 2023-1-17 00:33:16 | 显示全部楼层
但你的宾语是“并发和并发”。。。这种重复的表达真的能体现你独特的见解吗?[害羞]
回复

使用道具 举报

4

主题

8

帖子

16

积分

新手上路

Rank: 1

积分
16
发表于 2023-1-17 00:34:04 | 显示全部楼层
不好意思大佬,改正了[捂脸]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表