在多线程编程中,协调线程间的执行顺序和资源共享是一个重要问题。条件变量(condition_variable
)是 C++11 提供的一种线程同步机制,允许线程在某些条件满足时被唤醒或阻塞。它通常用于实现生产者-消费者模型或其他需要线程等待特定事件的场景。
一、条件变量的基础概念#
C++11 提供了两个条件变量类:
condition_variable
:效率更高的条件变量,适合与普通 mutex
一起使用。
condition_variable_any
:通用条件变量,可以与任意 mutex
搭配,包括用户自定义的锁类型。
条件变量的工作原理是,当某个条件不满足时,线程会被阻塞,直到满足条件被唤醒。这在生产者-消费者模型中应用广泛。
常见成员函数#
condition_variable()
:默认构造函数,初始化条件变量。
notify_one()
:唤醒一个被当前条件变量阻塞的线程。
notify_all()
:唤醒所有被条件变量阻塞的线程。
wait(unique_lock<mutex> lock)
:阻塞当前线程,直到条件变量被通知。
wait(unique_lock<mutex> lock, Pred pred)
:阻塞线程,直到条件变量通知且谓词 pred
返回 true
。
wait_for(unique_lock<mutex> lock, 时间长度)
:阻塞线程,等待指定的时间长度。
wait_until(unique_lock<mutex> lock, 时间点)
:阻塞线程,直到指定的时间点。
记不住吧 @_@,我也记不住,且用且查即可。
二、unique_lock
类#
为了配合条件变量,C++11 引入了 unique_lock
类。它是一个 RAII 风格的锁管理类,与 lock_guard
类似,能简化锁的使用。不同之处在于,unique_lock
允许手动控制加锁和解锁,以便配合条件变量的 wait()
函数。
unique_lock
常见成员函数#
lock()
:手动加锁。
unlock()
:手动解锁。
release()
:释放控制,但不解锁。
三、使用条件变量实现生产者-消费者模型#
下面的示例展示了如何使用条件变量、互斥锁和 unique_lock
来实现生产者-消费者模型。
示例 1:简单的生产者-消费者模型#
在这个例子中,生产者线程不断生成数据并将其放入队列,而消费者线程则从队列中取出数据进行处理。条件变量用于通知消费者,当队列中有新数据可用时唤醒被阻塞的消费者线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#include <iostream>
#include <string>
#include <thread> // 线程类头文件
#include <mutex> // 互斥锁类的头文件
#include <deque> // 双端队列容器头文件
#include <queue> // 队列容器头文件
#include <condition_variable> // 条件变量头文件
using namespace std;
class AA {
mutex m_mutex; // 互斥锁
condition_variable m_cond; // 条件变量
queue<string, deque<string>> m_q; // 队列,使用 deque 作为底层容器
public:
// 生产数据,参数 num 指定生产的数量
void incache(int num) {
lock_guard<mutex> lock(m_mutex); // 自动加锁
for (int ii = 0; ii < num; ++ii) {
static int bh = 1; // 数据编号
string message = to_string(bh++) + "号超女"; // 生成数据
m_q.push(message); // 将数据放入队列
}
m_cond.notify_one(); // 唤醒一个等待的消费者线程
}
// 消费者任务函数
void outcache() {
while (true) {
string message;
{
unique_lock<mutex> lock(m_mutex); // 手动控制加锁
// 使用循环判断队列是否为空,防止虚假唤醒
while (m_q.empty()) {
m_cond.wait(lock); // 等待生产者通知
}
message = m_q.front(); // 获取队列中的数据
m_q.pop(); // 移除已消费的数据
}
// 模拟处理数据
this_thread::sleep_for(chrono::milliseconds(1)); // 处理1毫秒
cout << "线程:" << this_thread::get_id() << ",处理数据:" << message << endl;
}
}
};
int main() {
AA aa;
// 创建三个消费者线程
thread t1(&AA::outcache, &aa);
thread t2(&AA::outcache, &aa);
thread t3(&AA::outcache, &aa);
this_thread::sleep_for(chrono::seconds(2)); // 休眠2秒,模拟生产者等待
aa.incache(3); // 生产3个数据
this_thread::sleep_for(chrono::seconds(3)); // 再次休眠3秒
aa.incache(5); // 生产5个数据
t1.join(); // 等待线程完成
t2.join();
t3.join();
}
|
代码解析#
- 生产者线程:通过
incache()
函数向队列中添加数据。每次生产数据后,调用 notify_one()
唤醒一个等待的消费者线程。
- 消费者线程:通过
outcache()
函数从队列中获取数据。消费者在没有数据时使用 condition_variable
的 wait()
阻塞,直到生产者生产数据并发出通知。
示例 2:唤醒所有消费者线程#
在有多个消费者线程时,生产者可以使用 notify_all()
函数来唤醒所有被阻塞的消费者线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <deque>
#include <queue>
#include <condition_variable>
using namespace std;
class AA {
mutex m_mutex; // 互斥锁
condition_variable m_cond; // 条件变量
queue<string, deque<string>> m_q; // 队列
public:
// 生产数据
void incache(int num) {
lock_guard<mutex> lock(m_mutex); // 加锁
for (int ii = 0; ii < num; ++ii) {
static int bh = 1; // 数据编号
string message = to_string(bh++) + "号超女";
m_q.push(message);
}
m_cond.notify_all(); // 唤醒所有被阻塞的线程
}
// 消费者任务函数
void outcache() {
while (true) {
unique_lock<mutex> lock(m_mutex); // 手动加锁
// 使用 lambda 表达式替代循环等待,避免虚假唤醒
m_cond.wait(lock, [this] { return !m_q.empty(); });
// 获取并移除数据
string message = m_q.front();
m_q.pop();
cout << "线程:" << this_thread::get_id() << ",处理数据:" << message << endl;
lock.unlock(); // 手动解锁
// 模拟处理数据
this_thread::sleep_for(chrono::milliseconds(1));
}
}
};
int main() {
AA aa;
thread t1(&AA::outcache, &aa);
thread t2(&AA::outcache, &aa);
thread t3(&AA::outcache, &aa);
this_thread::sleep_for(chrono::seconds(2)); // 休眠2秒
aa.incache(2); // 生产2个数据
this_thread::sleep_for(chrono::seconds(3)); // 再次休眠3秒
aa.incache(5); // 生产5个数据
t1.join();
t2.join();
t3.join();
}
|
代码解析#
在这个例子中,生产者通过 notify_all()
唤醒所有被阻塞的消费者线程。消费者线程在被唤醒后,通过条件变量等待数据。m_cond.wait(lock, [this] { return !m_q.empty(); })
确保只有在队列中有数据时,线程才会被唤醒并继续执行。
C++11 的条件变量为线程同步提供了非常灵活和高效的方式。在多线程编程中,条件变量可以很好地协调生产者和消费者之间的关系,防止数据竞争和死锁。通过 condition_variable
、unique_lock
和 lock_guard
的组合,开发者可以更方便地编写高效、线程安全的代码。