在异步编程中,回调函数是一种非常常见的设计模式,尤其是在消息队列或网络库中。它允许我们在接收到消息(或事件)时,调用预先注册的用户自定义处理逻辑,并将消息或事件参数传递给回调函数。通过这种机制,系统能够灵活地处理不同的消息类型,而无需硬编码每种消息的处理逻辑。
今天,我们详细解析如何通过 C++ 实现一个带有回调机制的消息队列,并探讨回调函数的注册、消息生产与消费,以及多线程环境下如何有效处理并发。
一、回调函数的概念#
回调函数是一种函数指针或函数对象,它在特定的事件发生时被调用。在消息队列系统中,回调函数可以用来处理接收到的消息。例如,当一个消息被消费者线程取出队列时,系统会调用之前注册的回调函数,并将该消息传递给它进行处理。
二、示例描述#
我们将通过一个简单的消息队列框架来演示回调机制:
- 消息生产者会将消息(字符串)放入消息队列。
- 消费者线程会从队列中取出消息,并调用预先注册的回调函数来处理这些消息。
- 回调函数可以是普通函数、类的成员函数或其他可调用对象(如
std::function
或 Lambda 函数)。
三、代码结构设计#
3.1 主要模块#
- 消息队列(
AA
类):负责存储消息,并通过条件变量和互斥锁确保线程安全。
- 回调注册与调用:通过
std::function
包装回调函数,支持普通函数、成员函数等。
- 生产者与消费者模型:多个消费者线程会并发地从队列中取消息并处理。
四、代码实现#
以下代码展示了如何实现一个带回调机制的消息队列系统:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#include <iostream>
#include <string>
#include <thread> // 线程类头文件。
#include <mutex> // 互斥锁类的头文件。
#include <deque> // deque容器的头文件。
#include <queue> // queue容器的头文件。
#include <condition_variable> // 条件变量的头文件。
#include <functional> // 函数对象及回调机制
using namespace std;
// 普通函数,用于处理消息
void show(const string& message) {
cout << "处理数据:" << message << endl;
}
// 类BB,包含一个成员函数用于处理消息
struct BB {
void show(const string& message) {
cout << "处理表白数据:" << message << endl;
}
};
// 消息队列类,负责生产与消费消息
class AA {
mutex m_mutex; // 互斥锁,确保多线程访问时的安全性。
condition_variable m_cond; // 条件变量,消费者等待消息到来。
queue<string, deque<string>> m_q; // 消息队列,存储生产的数据。
function<void(const string&)> m_callback; // 回调函数,用于处理消息。
public:
// 注册回调函数,回调函数可以是普通函数、成员函数或Lambda等
template<typename Fn, typename ...Args>
void callback(Fn&& fn, Args&&...args) {
// 绑定回调函数,使用 std::bind 实现可调用对象的绑定
m_callback = bind(forward<Fn>(fn), forward<Args>(args)..., std::placeholders::_1);
}
// 生产数据,将消息放入队列
void incache(int num) {
lock_guard<mutex> lock(m_mutex); // 申请锁
for (int i = 0; i < num; ++i) {
static int bh = 1; // 消息编号
string message = to_string(bh++) + "号超女"; // 生成消息
m_q.push(message); // 将消息放入队列
}
m_cond.notify_all(); // 通知所有等待的消费者
}
// 消费者线程,从队列中取消息并调用回调函数处理
void outcache() {
while (true) {
unique_lock<mutex> lock(m_mutex); // 加锁
m_cond.wait(lock, [this] { return !m_q.empty(); }); // 等待有消息到来
string message = m_q.front(); // 获取队列中的消息
m_q.pop(); // 将消息出队
lock.unlock(); // 解锁,允许生产者继续添加消息
cout << "线程:" << this_thread::get_id() << ",消息:" << message << endl;
if (m_callback) m_callback(message); // 调用回调函数处理消息
}
}
};
|
五、代码解析#
5.1 普通函数与类成员函数#
show(const string&)
是一个普通的处理函数,用于打印消息。
- 类
BB
包含了一个成员函数 show
,用于处理消息。
5.2 消息队列类 AA
#
AA
类中包含了以下主要部分:
- 回调函数注册:通过
callback()
函数注册回调函数,使用 std::bind
绑定任意可调用对象(普通函数、成员函数等),并存储在 std::function
对象 m_callback
中。
- 生产数据:
incache()
函数用于将指定数量的消息生成并放入队列,并通过条件变量唤醒等待的消费者线程。
- 消费数据:
outcache()
函数是消费者线程的任务函数,它从队列中取出消息,并调用预先注册的回调函数处理消息。
5.3 生产者与消费者#
main()
函数展示了如何创建多个消费者线程,并注册不同的回调函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
int main() {
AA aa;
// 注册普通函数为回调函数
// aa.callback(show);
// 注册类成员函数为回调函数
BB bb;
aa.callback(&BB::show, &bb);
// 创建多个消费者线程
thread t1(&AA::outcache, &aa);
thread t2(&AA::outcache, &aa);
thread t3(&AA::outcache, &aa);
this_thread::sleep_for(chrono::seconds(2)); // 模拟主线程的其他操作
aa.incache(2); // 生产2条消息
this_thread::sleep_for(chrono::seconds(3));
aa.incache(5); // 生产5条消息
// 等待消费者线程完成
t1.join();
t2.join();
t3.join();
return 0;
}
|
六、关键技术点#
6.1 std::function
与 std::bind
#
std::function
:是一个通用的可调用对象的包装器,能够存储任意的函数、Lambda 表达式或成员函数,并将其统一处理。
std::bind
:用于将函数与参数绑定,并生成一个新的可调用对象。结合占位符 std::placeholders::_1
等,可以灵活地将参数传递给回调函数。
6.2 多线程同步与互斥#
- 互斥锁(
std::mutex
):确保多线程访问共享资源时的安全性。
- 条件变量(
std::condition_variable
):用于在消费者线程等待数据时阻塞,直到有新数据被放入队列。
七、总结#
本文实现了一个带有回调机制的消息队列框架,通过 std::function
和 std::bind
,我们可以灵活地注册不同的回调函数(包括普通函数、成员函数等),并在消息到达时自动调用回调函数处理消息。在多线程环境中,通过互斥锁和条件变量,我们确保了消息生产与消费过程的同步和线程安全。