|
理解 Semaphore,从一个好的翻译开始
Semaphore,对多线程有过了解的人都听说过,一般我们解释为“信号量”。可是,这个单词对我们来说还是比较陌生,它和另一个单词 Singal(信号)什么关系呢?想要真正理解这个概念,必须得从它的翻译开始。事实上,Semaphore 最好的翻译应该为“信号计数量”,承认了这一点,想必你也清楚了:它和 Signal 不是一回事!

剑桥词典翻译
信号:简单来说就是消息,是由用户、系统或者进程发送给目标进程的信息,用来通知目标进程某个状态的改变或系统异常,对应的是异步的场景(我之前的文章有详细介绍过)。
信号量:首先是一个变量,其次是计数器。它是多线程环境下使用的一种设施,信号量在创建时需要设置一个初始值,表示同时可以有几个任务(线程)可以访问某一块共享资源。
- 一个任务要想访问共享资源,前提是信号量大于0,当该任务成功获得资源后,将信号量的值减 1;
- 若当前信号量的值小于 0,表明无法获得信号量,该任务必须被挂起,等待信号量恢复为正值的那一刻;
- 当任务执行完之后,必须释放信号量,对应操作就是信号量的值加 1。
另外,对信号量的操作(加、减)都是原子的。互斥锁(Mutex)就是信号量初始值为 1 时的特殊情形,即同时只能有一个任务可以访问共享资源区。

Semaphore 再理解
我们来设想这样一个场景(上图):假如北京的国家大剧院有一场免费的音乐会演出,可是现在正值疫情期间,剧院规定:剧院观众总人数要限制,但是允许大家中途退场,把票给其他人,其他人可以中途进场。于是,第一批先到的人从剧院门口票箱中取到了票,然后进场欣赏演出。后到的人就因为剧院满了,在门口等待。过了一段时间,有人嫌节目太无聊了,提前退场了,退场时他把门票放回去了。这样,其他人拿着这个人的票进场了。随后,又有人退场了,但是他忘记把票放回去了。这也没关系,大不了剧院内可容纳的总人数少了一个罢了。
上面的例子中,音乐会现场就是一块共享资源区,观众就是任务(线程),而票箱中的门票数就是信号量。信号量用作并发量限制,由于总的门票数是固定的,所以不会出现音乐厅被挤爆的情况。
上述的例子中,我们允许退场的观众把票带走,这是为什么呢?因为剧院工作人员可以随时在票箱里补充些门票呀(线程生产者)。说到这,你们是不是有点似曾相识呀?对啰,就是线程池,但还是有些不同,你们自己品味吧。
C语言中的 Semaphore
信号量类型为 sem_t,类型及相关操作定义在头文件 semaphore.h 中,
int sem_init(sem_t *sem, int pshared, unsigned int value); // 创建信号量
int sem_post(sem_t *sem); // 信号量的值加 1
int sem_wait(sem_t *sem); // 信号量的值减 1
int sem_destroy(sem_t *sem); // 信号量销毁
下面展示了一个例子:
你总共有三种类型的下载任务(类型 id 为 1、2、3),每次从键盘读取一种类型的任务进行下载,但是 CPU 最多可以同时执行 2 个下载任务(创建两个线程)。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define MAXNUM (2)
sem_t semDownload;
pthread_t a_thread, b_thread, c_thread;
int g_phreadNum = 1;
void func1(void *arg)
{
// 等待信号量的值 > 0
sem_wait(&semDownload);
printf(&#34;============== Downloading taskType 1 ============== \n&#34;);
sleep(5);
printf(&#34;============== Finished taskType 1 ============== \n&#34;);
g_phreadNum--;
// 等待线程结束
pthread_join(a_thread, NULL);
}
void func2(void *arg)
{
sem_wait(&semDownload);
printf(&#34;============== Downloading taskType 2 ============== \n&#34;);
sleep(3);
printf(&#34;============== Finished taskType 2 ============== \n&#34;);
g_phreadNum--;
pthread_join(b_thread, NULL);
}
void func3(void *arg)
{
sem_wait(&semDownload);
printf(&#34;============== Downloading taskType 3 ============== \n&#34;);
sleep(1);
printf(&#34;============== Finished taskType 3 ============== \n&#34;);
g_phreadNum--;
pthread_join(c_thread, NULL);
}
int main()
{
// 初始化信号量
sem_init(&semDownload, 0, 0);
int taskTypeId;
while (scanf(&#34;%d&#34;, &taskTypeId) != EOF)
{
// 输入 0, 测试程序是否能正常退出
if (taskTypeId == 0 && g_phreadNum <= 1)
{
break;
} else if (taskTypeId == 0)
{
printf(&#34;Can not quit, current running thread num is %d\n&#34;, g_phreadNum - 1);
}
printf(&#34;your choose Downloading taskType %d\n&#34;, taskTypeId);
// 线程数超过 2 个则不下载
if (g_phreadNum > MAXNUM)
{
printf(&#34;!!! You&#39;ve reached the max number of threads !!!\n&#34;);
continue;
}
// 用户选择下载 Task
switch (taskTypeId)
{
case 1:
// 创建线程 1
pthread_create(&a_thread, NULL, func1, NULL);
// 信号量 + 1,进而触发 func1 的任务
sem_post(&semDownload);
// 总线程数 + 1
g_phreadNum++;
break;
case 2:
pthread_create(&b_thread, NULL, func2, NULL);
sem_post(&semDownload);
g_phreadNum++;
break;
case 3:
pthread_create(&c_thread, NULL, func3, NULL);
sem_post(&semDownload);
g_phreadNum++;
break;
default:
printf(&#34;!!! error taskTypeId %d !!!\n&#34;, taskTypeId);
break;
}
}
// 销毁信号量
sem_destroy(&semDownload);
return 0;
}
上述例子中,采用了 pthread_join() 的方式,即子线程合入主线程,主线程阻塞等待子线程结束,然后回收子线程资源。而线程加入还有另外一种方式:pthread_detach(),即主线程与子线程分离,主线程不用关注子线程什么时候结束,子线程结束后,资源自动回收。
程序运行结果如下:

还要注意一点:pthread.h 非 linux 系统的默认库, gcc 编译参数需要手动添加选项:-lpthread、-pthread.
C++11 如何实现信号量
信号量直到 C++20 才被支持,那 C++11 如何实现 semaphore 呢?答案是通过互斥锁和条件变量实现。

C++20 支持 semaphore
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
std::string FormatTimeNow(const char* format) {
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm* now_tm = std::localtime(&now_c);
char buf[20];
std::strftime(buf, sizeof(buf), format, now_tm);
return std::string(buf);
}
class Semaphore {
public:
explicit Semaphore(int count = 0) : count_(count) {}
void Signal() {
std::unique_lock<std::mutex> lock(mutex_);
++count_;
cv_.notify_one();
}
void Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [=] { return count_ > 0; });
--count_;
}
private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;
};
Semaphore g_semaphore(4);
std::mutex g_io_mtx;
void DoWork() {
g_semaphore.Wait();
std::thread::id thread_id = std::this_thread::get_id();
std::string now = FormatTimeNow(&#34;%H:%M:%S&#34;);
{
std::lock_guard<std::mutex> lock(g_io_mtx);
std::cout << &#34;Thread &#34; << thread_id << &#34;: wait succeeded&#34;
<< &#34; (&#34; << now << &#34;)&#34; << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
g_semaphore.Signal();
}
int main() {
int threadNum = 4;
std::vector<std::thread> v;
v.reserve(threadNum);
for (std::size_t i = 0; i < threadNum; ++i) {
v.emplace_back(&DoWork);
}
for (std::thread& t : v) {
t.join();
}
return 0;
}
输出结果:

允许 4 个线程同时运行
如果定义:
Semaphore g_semaphore(1);
输出结果:

每个时刻只允许一个线程运行 |
|