在异步编程中,回调函数是一种非常常见的设计模式,尤其是在消息队列或网络库中。它允许我们在接收到消息(或事件)时,调用预先注册的用户自定义处理逻辑,并将消息或事件参数传递给回调函数。通过这种机制,系统能够灵活地处理不同的消息类型,而无需硬编码每种消息的处理逻辑。

今天,我们详细解析如何通过 C++ 实现一个带有回调机制的消息队列,并探讨回调函数的注册、消息生产与消费,以及多线程环境下如何有效处理并发。


一、回调函数的概念

回调函数是一种函数指针或函数对象,它在特定的事件发生时被调用。在消息队列系统中,回调函数可以用来处理接收到的消息。例如,当一个消息被消费者线程取出队列时,系统会调用之前注册的回调函数,并将该消息传递给它进行处理。


二、示例描述

我们将通过一个简单的消息队列框架来演示回调机制:

  • 消息生产者会将消息(字符串)放入消息队列。
  • 消费者线程会从队列中取出消息,并调用预先注册的回调函数来处理这些消息。
  • 回调函数可以是普通函数、类的成员函数或其他可调用对象(如 std::function 或 Lambda 函数)。

三、代码结构设计

3.1 主要模块

  • 消息队列(AA 类):负责存储消息,并通过条件变量和互斥锁确保线程安全。
  • 回调注册与调用:通过 std::function 包装回调函数,支持普通函数、成员函数等。
  • 生产者与消费者模型:多个消费者线程会并发地从队列中取消息并处理。

四、代码实现

以下代码展示了如何实现一个带回调机制的消息队列系统:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <string>
#include <thread>                      // 线程类头文件。
#include <mutex>                       // 互斥锁类的头文件。
#include <deque>                       // deque容器的头文件。
#include <queue>                       // queue容器的头文件。
#include <condition_variable>          // 条件变量的头文件。
#include <functional>                  // 函数对象及回调机制
using namespace std;

// 普通函数,用于处理消息
void show(const string& message) {
    cout << "处理数据:" << message << endl;
}

// 类BB,包含一个成员函数用于处理消息
struct BB {
    void show(const string& message) {
        cout << "处理表白数据:" << message << endl;
    }
};

// 消息队列类,负责生产与消费消息
class AA {
    mutex m_mutex;                                    // 互斥锁,确保多线程访问时的安全性。
    condition_variable m_cond;                        // 条件变量,消费者等待消息到来。
    queue<string, deque<string>> m_q;                 // 消息队列,存储生产的数据。
    function<void(const string&)> m_callback;         // 回调函数,用于处理消息。

public:
    // 注册回调函数,回调函数可以是普通函数、成员函数或Lambda等
    template<typename Fn, typename ...Args>
    void callback(Fn&& fn, Args&&...args) {
        // 绑定回调函数,使用 std::bind 实现可调用对象的绑定
        m_callback = bind(forward<Fn>(fn), forward<Args>(args)..., std::placeholders::_1);
    }

    // 生产数据,将消息放入队列
    void incache(int num) {
        lock_guard<mutex> lock(m_mutex);   // 申请锁
        for (int i = 0; i < num; ++i) {
            static int bh = 1;             // 消息编号
            string message = to_string(bh++) + "号超女"; // 生成消息
            m_q.push(message);             // 将消息放入队列
        }
        m_cond.notify_all();  // 通知所有等待的消费者
    }

    // 消费者线程,从队列中取消息并调用回调函数处理
    void outcache() {
        while (true) {
            unique_lock<mutex> lock(m_mutex);  // 加锁
            m_cond.wait(lock, [this] { return !m_q.empty(); });  // 等待有消息到来

            string message = m_q.front();  // 获取队列中的消息
            m_q.pop();  // 将消息出队
            lock.unlock();  // 解锁,允许生产者继续添加消息

            cout << "线程:" << this_thread::get_id() << ",消息:" << message << endl;

            if (m_callback) m_callback(message);  // 调用回调函数处理消息
        }
    }
};

五、代码解析

5.1 普通函数与类成员函数

  • show(const string&) 是一个普通的处理函数,用于打印消息。
  • BB 包含了一个成员函数 show,用于处理消息。

5.2 消息队列类 AA

AA 类中包含了以下主要部分:

  • 回调函数注册:通过 callback() 函数注册回调函数,使用 std::bind 绑定任意可调用对象(普通函数、成员函数等),并存储在 std::function 对象 m_callback 中。
  • 生产数据incache() 函数用于将指定数量的消息生成并放入队列,并通过条件变量唤醒等待的消费者线程。
  • 消费数据outcache() 函数是消费者线程的任务函数,它从队列中取出消息,并调用预先注册的回调函数处理消息。

5.3 生产者与消费者

main() 函数展示了如何创建多个消费者线程,并注册不同的回调函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int main() {
    AA aa;

    // 注册普通函数为回调函数
    // aa.callback(show); 

    // 注册类成员函数为回调函数
    BB bb;
    aa.callback(&BB::show, &bb); 

    // 创建多个消费者线程
    thread t1(&AA::outcache, &aa);  
    thread t2(&AA::outcache, &aa);
    thread t3(&AA::outcache, &aa);

    this_thread::sleep_for(chrono::seconds(2)); // 模拟主线程的其他操作
    aa.incache(2);  // 生产2条消息

    this_thread::sleep_for(chrono::seconds(3));
    aa.incache(5);  // 生产5条消息

    // 等待消费者线程完成
    t1.join();
    t2.join();
    t3.join();

    return 0;
}

六、关键技术点

6.1 std::functionstd::bind

  • std::function:是一个通用的可调用对象的包装器,能够存储任意的函数、Lambda 表达式或成员函数,并将其统一处理。
  • std::bind:用于将函数与参数绑定,并生成一个新的可调用对象。结合占位符 std::placeholders::_1 等,可以灵活地将参数传递给回调函数。

6.2 多线程同步与互斥

  • 互斥锁(std::mutex:确保多线程访问共享资源时的安全性。
  • 条件变量(std::condition_variable:用于在消费者线程等待数据时阻塞,直到有新数据被放入队列。

七、总结

本文实现了一个带有回调机制的消息队列框架,通过 std::functionstd::bind,我们可以灵活地注册不同的回调函数(包括普通函数、成员函数等),并在消息到达时自动调用回调函数处理消息。在多线程环境中,通过互斥锁和条件变量,我们确保了消息生产与消费过程的同步和线程安全。