IE盒子

搜索
查看: 106|回复: 2

C++ 多线程(七):信号量 Semaphore 及 C++ 11 实现

[复制链接]

1

主题

2

帖子

3

积分

新手上路

Rank: 1

积分
3
发表于 2022-11-29 15:35:19 | 显示全部楼层 |阅读模式
理解 Semaphore,从一个好的翻译开始

Semaphore,对多线程有过了解的人都听说过,一般我们解释为“信号量”。可是,这个单词对我们来说还是比较陌生,它和另一个单词 Singal(信号)什么关系呢?想要真正理解这个概念,必须得从它的翻译开始。事实上,Semaphore 最好的翻译应该为“信号计数量”,承认了这一点,想必你也清楚了:它和 Signal 不是一回事!



剑桥词典翻译

信号:简单来说就是消息,是由用户、系统或者进程发送给目标进程的信息,用来通知目标进程某个状态的改变或系统异常,对应的是异步的场景(我之前的文章有详细介绍过)。
信号量:首先是一个变量,其次是计数器。它是多线程环境下使用的一种设施,信号量在创建时需要设置一个初始值,表示同时可以有几个任务(线程)可以访问某一块共享资源。

  • 一个任务要想访问共享资源,前提是信号量大于0,当该任务成功获得资源后,将信号量的值减 1;
  • 若当前信号量的值小于 0,表明无法获得信号量,该任务必须被挂起,等待信号量恢复为正值的那一刻;
  • 当任务执行完之后,必须释放信号量,对应操作就是信号量的值加 1。
另外,对信号量的操作(加、减)都是原子的。互斥锁(Mutex)就是信号量初始值为 1 时的特殊情形,即同时只能有一个任务可以访问共享资源区。


Semaphore 再理解

我们来设想这样一个场景(上图):假如北京的国家大剧院有一场免费的音乐会演出,可是现在正值疫情期间,剧院规定:剧院观众总人数要限制,但是允许大家中途退场,把票给其他人,其他人可以中途进场。于是,第一批先到的人从剧院门口票箱中取到了票,然后进场欣赏演出。后到的人就因为剧院满了,在门口等待。过了一段时间,有人嫌节目太无聊了,提前退场了,退场时他把门票放回去了。这样,其他人拿着这个人的票进场了。随后,又有人退场了,但是他忘记把票放回去了。这也没关系,大不了剧院内可容纳的总人数少了一个罢了。
上面的例子中,音乐会现场就是一块共享资源区,观众就是任务(线程),而票箱中的门票数就是信号量。信号量用作并发量限制,由于总的门票数是固定的,所以不会出现音乐厅被挤爆的情况。
上述的例子中,我们允许退场的观众把票带走,这是为什么呢?因为剧院工作人员可以随时在票箱里补充些门票呀(线程生产者)。说到这,你们是不是有点似曾相识呀?对啰,就是线程池,但还是有些不同,你们自己品味吧。
C语言中的 Semaphore

信号量类型为 sem_t,类型及相关操作定义在头文件 semaphore.h 中,
int sem_init(sem_t *sem, int pshared, unsigned int value);  // 创建信号量
int sem_post(sem_t *sem);  // 信号量的值加 1
int sem_wait(sem_t *sem);  // 信号量的值减 1
int sem_destroy(sem_t *sem);  // 信号量销毁
下面展示了一个例子:
你总共有三种类型的下载任务(类型 id 为 1、2、3),每次从键盘读取一种类型的任务进行下载,但是 CPU 最多可以同时执行 2 个下载任务(创建两个线程)。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define MAXNUM (2)
sem_t semDownload;
pthread_t a_thread, b_thread, c_thread;
int g_phreadNum = 1;

void func1(void *arg)
{
    // 等待信号量的值 > 0
    sem_wait(&semDownload);
    printf("============== Downloading taskType 1 ============== \n");
    sleep(5);
    printf("============== Finished taskType 1 ============== \n");
    g_phreadNum--;
    // 等待线程结束
    pthread_join(a_thread, NULL);
}

void func2(void *arg)
{
    sem_wait(&semDownload);
    printf("============== Downloading taskType 2 ============== \n");
    sleep(3);
    printf("============== Finished taskType 2 ============== \n");
    g_phreadNum--;
    pthread_join(b_thread, NULL);
}

void func3(void *arg)
{
    sem_wait(&semDownload);
    printf("============== Downloading taskType 3 ============== \n");
    sleep(1);
    printf("============== Finished taskType 3 ============== \n");
    g_phreadNum--;
    pthread_join(c_thread, NULL);
}

int main()
{
    // 初始化信号量
    sem_init(&semDownload, 0, 0);
    int taskTypeId;
    while (scanf("%d", &taskTypeId) != EOF)
    {
        // 输入 0, 测试程序是否能正常退出
        if (taskTypeId == 0 && g_phreadNum <= 1)
        {
            break;
        } else if (taskTypeId == 0)
        {
            printf("Can not quit, current running thread num is %d\n", g_phreadNum - 1);
        }
        printf("your choose Downloading taskType %d\n", taskTypeId);
        // 线程数超过 2 个则不下载
        if (g_phreadNum > MAXNUM)
        {
            printf("!!! You've reached the max number of threads !!!\n");
            continue;
        }
        // 用户选择下载 Task
        switch (taskTypeId)
        {
        case 1:
            // 创建线程 1
            pthread_create(&a_thread, NULL, func1, NULL);
            // 信号量 + 1,进而触发 func1 的任务
            sem_post(&semDownload);
            // 总线程数 + 1
            g_phreadNum++;
            break;
        case 2:
            pthread_create(&b_thread, NULL, func2, NULL);
            sem_post(&semDownload);
            g_phreadNum++;
            break;
        case 3:
            pthread_create(&c_thread, NULL, func3, NULL);
            sem_post(&semDownload);
            g_phreadNum++;
            break;
        default:
            printf("!!! error taskTypeId %d !!!\n", taskTypeId);
            break;
        }
    }
    // 销毁信号量
    sem_destroy(&semDownload);
    return 0;
}
上述例子中,采用了 pthread_join() 的方式,即子线程合入主线程,主线程阻塞等待子线程结束,然后回收子线程资源。而线程加入还有另外一种方式:pthread_detach(),即主线程与子线程分离,主线程不用关注子线程什么时候结束,子线程结束后,资源自动回收。
程序运行结果如下:


还要注意一点:pthread.h 非 linux 系统的默认库, gcc 编译参数需要手动添加选项:-lpthread、-pthread.
C++11 如何实现信号量

信号量直到 C++20 才被支持,那 C++11 如何实现 semaphore 呢?答案是通过互斥锁和条件变量实现。



C++20 支持 semaphore

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::string FormatTimeNow(const char* format) {
    auto now = std::chrono::system_clock::now();
    std::time_t now_c = std::chrono::system_clock::to_time_t(now);
    std::tm* now_tm = std::localtime(&now_c);

    char buf[20];
    std::strftime(buf, sizeof(buf), format, now_tm);
    return std::string(buf);
}

class Semaphore {
   public:
    explicit Semaphore(int count = 0) : count_(count) {}

    void Signal() {
        std::unique_lock<std::mutex> lock(mutex_);
        ++count_;
        cv_.notify_one();
    }

    void Wait() {
        std::unique_lock<std::mutex> lock(mutex_);
        cv_.wait(lock, [=] { return count_ > 0; });
        --count_;
    }

   private:
    std::mutex mutex_;
    std::condition_variable cv_;
    int count_;
};

Semaphore g_semaphore(4);
std::mutex g_io_mtx;

void DoWork() {
    g_semaphore.Wait();

    std::thread::id thread_id = std::this_thread::get_id();
    std::string now = FormatTimeNow("%H:%M:%S");
    {
        std::lock_guard<std::mutex> lock(g_io_mtx);
        std::cout << "Thread " << thread_id << ": wait succeeded"
                  << " (" << now << ")" << std::endl;
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    g_semaphore.Signal();
}

int main() {
    int threadNum = 4;
    std::vector<std::thread> v;
    v.reserve(threadNum);
    for (std::size_t i = 0; i < threadNum; ++i) {
        v.emplace_back(&DoWork);
    }
    for (std::thread& t : v) {
        t.join();
    }
    return 0;
}
输出结果:



允许 4 个线程同时运行

如果定义:
Semaphore g_semaphore(1);
输出结果:



每个时刻只允许一个线程运行
回复

使用道具 举报

0

主题

8

帖子

10

积分

新手上路

Rank: 1

积分
10
发表于 2025-3-20 13:18:01 | 显示全部楼层
LZ敢整点更有创意的不?兄弟们等着围观捏~
回复

使用道具 举报

6

主题

12

帖子

27

积分

新手上路

Rank: 1

积分
27
发表于 2025-3-20 13:18:01 | 显示全部楼层
这么强,支持楼主,佩服
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表