在多线程编程中,协调线程间的执行顺序和资源共享是一个重要问题。条件变量condition_variable)是 C++11 提供的一种线程同步机制,允许线程在某些条件满足时被唤醒或阻塞。它通常用于实现生产者-消费者模型或其他需要线程等待特定事件的场景。


一、条件变量的基础概念

C++11 提供了两个条件变量类:

  • condition_variable:效率更高的条件变量,适合与普通 mutex 一起使用。
  • condition_variable_any:通用条件变量,可以与任意 mutex 搭配,包括用户自定义的锁类型。

条件变量的工作原理是,当某个条件不满足时,线程会被阻塞,直到满足条件被唤醒。这在生产者-消费者模型中应用广泛。

常见成员函数

  • condition_variable():默认构造函数,初始化条件变量。
  • notify_one():唤醒一个被当前条件变量阻塞的线程。
  • notify_all():唤醒所有被条件变量阻塞的线程。
  • wait(unique_lock<mutex> lock):阻塞当前线程,直到条件变量被通知。
  • wait(unique_lock<mutex> lock, Pred pred):阻塞线程,直到条件变量通知且谓词 pred 返回 true
  • wait_for(unique_lock<mutex> lock, 时间长度):阻塞线程,等待指定的时间长度。
  • wait_until(unique_lock<mutex> lock, 时间点):阻塞线程,直到指定的时间点。

记不住吧 @_@,我也记不住,且用且查即可。


二、unique_lock

为了配合条件变量,C++11 引入了 unique_lock 类。它是一个 RAII 风格的锁管理类,与 lock_guard 类似,能简化锁的使用。不同之处在于,unique_lock 允许手动控制加锁和解锁,以便配合条件变量的 wait() 函数。

unique_lock 常见成员函数

  • lock():手动加锁。
  • unlock():手动解锁。
  • release():释放控制,但不解锁。

三、使用条件变量实现生产者-消费者模型

下面的示例展示了如何使用条件变量、互斥锁和 unique_lock 来实现生产者-消费者模型。

示例 1:简单的生产者-消费者模型

在这个例子中,生产者线程不断生成数据并将其放入队列,而消费者线程则从队列中取出数据进行处理。条件变量用于通知消费者,当队列中有新数据可用时唤醒被阻塞的消费者线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <string>
#include <thread> // 线程类头文件
#include <mutex> // 互斥锁类的头文件
#include <deque> // 双端队列容器头文件
#include <queue> // 队列容器头文件
#include <condition_variable> // 条件变量头文件

using namespace std;

class AA {
    mutex m_mutex; // 互斥锁
    condition_variable m_cond; // 条件变量
    queue<string, deque<string>> m_q; // 队列,使用 deque 作为底层容器

public:
    // 生产数据,参数 num 指定生产的数量
    void incache(int num) {
        lock_guard<mutex> lock(m_mutex); // 自动加锁
        for (int ii = 0; ii < num; ++ii) {
            static int bh = 1; // 数据编号
            string message = to_string(bh++) + "号超女"; // 生成数据
            m_q.push(message); // 将数据放入队列
        }
        m_cond.notify_one(); // 唤醒一个等待的消费者线程
    }

    // 消费者任务函数
    void outcache() {
        while (true) {
            string message;
            {
                unique_lock<mutex> lock(m_mutex); // 手动控制加锁
                // 使用循环判断队列是否为空,防止虚假唤醒
                while (m_q.empty()) {
                    m_cond.wait(lock); // 等待生产者通知
                }
                message = m_q.front(); // 获取队列中的数据
                m_q.pop(); // 移除已消费的数据
            }
            // 模拟处理数据
            this_thread::sleep_for(chrono::milliseconds(1)); // 处理1毫秒
            cout << "线程:" << this_thread::get_id() << ",处理数据:" << message << endl;
        }
    }
};

int main() {
    AA aa;

    // 创建三个消费者线程
    thread t1(&AA::outcache, &aa);
    thread t2(&AA::outcache, &aa);
    thread t3(&AA::outcache, &aa);

    this_thread::sleep_for(chrono::seconds(2)); // 休眠2秒,模拟生产者等待
    aa.incache(3); // 生产3个数据

    this_thread::sleep_for(chrono::seconds(3)); // 再次休眠3秒
    aa.incache(5); // 生产5个数据

    t1.join(); // 等待线程完成
    t2.join();
    t3.join();
}

代码解析

  • 生产者线程:通过 incache() 函数向队列中添加数据。每次生产数据后,调用 notify_one() 唤醒一个等待的消费者线程。
  • 消费者线程:通过 outcache() 函数从队列中获取数据。消费者在没有数据时使用 condition_variablewait() 阻塞,直到生产者生产数据并发出通知。

示例 2:唤醒所有消费者线程

在有多个消费者线程时,生产者可以使用 notify_all() 函数来唤醒所有被阻塞的消费者线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <deque>
#include <queue>
#include <condition_variable>

using namespace std;

class AA {
    mutex m_mutex; // 互斥锁
    condition_variable m_cond; // 条件变量
    queue<string, deque<string>> m_q; // 队列

public:
    // 生产数据
    void incache(int num) {
        lock_guard<mutex> lock(m_mutex); // 加锁
        for (int ii = 0; ii < num; ++ii) {
            static int bh = 1; // 数据编号
            string message = to_string(bh++) + "号超女";
            m_q.push(message);
        }
        m_cond.notify_all(); // 唤醒所有被阻塞的线程
    }

    // 消费者任务函数
    void outcache() {
        while (true) {
            unique_lock<mutex> lock(m_mutex); // 手动加锁
            // 使用 lambda 表达式替代循环等待,避免虚假唤醒
            m_cond.wait(lock, [this] { return !m_q.empty(); });

            // 获取并移除数据
            string message = m_q.front();
            m_q.pop();
            cout << "线程:" << this_thread::get_id() << ",处理数据:" << message << endl;
            lock.unlock(); // 手动解锁

            // 模拟处理数据
            this_thread::sleep_for(chrono::milliseconds(1));
        }
    }
};

int main() {
    AA aa;

    thread t1(&AA::outcache, &aa);
    thread t2(&AA::outcache, &aa);
    thread t3(&AA::outcache, &aa);

    this_thread::sleep_for(chrono::seconds(2)); // 休眠2秒
    aa.incache(2); // 生产2个数据

    this_thread::sleep_for(chrono::seconds(3)); // 再次休眠3秒
    aa.incache(5); // 生产5个数据

    t1.join();
    t2.join();
    t3.join();
}

代码解析

在这个例子中,生产者通过 notify_all() 唤醒所有被阻塞的消费者线程。消费者线程在被唤醒后,通过条件变量等待数据。m_cond.wait(lock, [this] { return !m_q.empty(); }) 确保只有在队列中有数据时,线程才会被唤醒并继续执行。


总结

C++11 的条件变量为线程同步提供了非常灵活和高效的方式。在多线程编程中,条件变量可以很好地协调生产者和消费者之间的关系,防止数据竞争和死锁。通过 condition_variableunique_locklock_guard 的组合,开发者可以更方便地编写高效、线程安全的代码。