C++ 高性能编程之无锁队列的实现原理
无锁队列不用互斥锁,靠"原子操作"和"内存序"让多线程安全地入队、出队。实现方式可以从两个维度看:并发模型(谁在生产、谁在消费)和同步机制(用哪种原子原语)。下面按这两条线捋一遍,最后给一个 SPSC + 原子 load/store 的简单例子。
一、按并发模型分
并发模型决定了"谁在改头、谁在改尾",从而决定你需要多强的同步机制。
SPSC(单生产者单消费者)
一个线程只 push,一个线程只 pop。生产者独占"尾"、消费者独占"头",两边各改各的,不会同时抢同一指针。
所以用原子 load/store + 正确的内存序就够了,不需要 CAS。实现简单,很多场景(比如流水线里一阶段生产、下一阶段消费)够用。
MPSC(多生产者单消费者)
多个线程 push,一个线程 pop。尾指针会被多个生产者争抢,必须用 CAS(或类似原子"比较并交换")让多线程安全地把自己挂到队尾;头指针只有消费者改,用原子 store 即可。
MPMC(多生产者多消费者)
多个线程同时 push、同时 pop。头尾都可能被多线程改,通常头尾都要用 CAS 争抢,并且要处理 ABA、节点回收等问题,实现和内存序都更复杂。
二、按同步机制分
无锁队列用到的同步机制主要有两类:原子 load/store 配内存序,以及 CAS。
原子 load/store + acquire-release
适合只有一个写者的指针或下标。例如 SPSC 里:生产者只写尾、消费者只写头,所以各自用原子 store 更新、对方用原子 load 读取即可。
关键是用对"内存序",可以记两条规则:
- Store 用 release:当你先写好了普通数据(或别的共享状态),再用一次原子 store 把"指针/下标"更新出去时,这次 store 用 release。表示"我在这之前的全部写入,都要对之后读到这个 store 结果的线程可见"。也就是用这次 store 来"发布"前面的写入。
- Load 用 acquire:当你用原子 load 读到一个"别人可能刚更新"的指针或下标,并要基于它去访问别的内存时,这次 load 用 acquire。表示"我要看到之前别人用 release 发布出去的全部写入;且我这次 load 之后的读写,不会被重排到这次 load 之前"。也就是用这次 load 来"接收"对方的发布。
配对关系:线程 A 的 store(release) 和线程 B 的 load(acquire) 若针对同一原子变量,且 B 读到的值就是 A 写进去的那次,则称 B 的 load "与 A 的 store 同步"。此时 A 在 store(release) 之前的全部写入,对 B 在 load(acquire) 之后都可见。无锁队列里:生产者对尾指针 store(release),消费者对头/next load(acquire),就能保证消费者在看到新节点之后,一定能看到生产者写进节点里的数据。
CAS(Compare-And-Swap)
当多个线程要改同一个位置(比如多个生产者抢尾指针、或多个消费者抢头指针)时,单纯 load/store 不够,要用 CAS。
MPSC 里尾指针用 CAS;MPMC 里头尾通常都要 CAS,并且往往要配合"指针 + 计数"或 DCAS 等避免 ABA、安全回收节点。
CAS 的工作原理:CAS 是一条原子指令,一般有三个参数——目标内存地址、期望值(expected)、新值(desired)。执行时,CPU 会原子地做两件事:先看当前目标地址里的值是否等于期望值;若相等则把新值写进去并返回"成功",若不相等则什么都不改并返回"失败"。也就是说:只有"当前值还是我上次读到的那样"时我才改,否则说明被别的线程改过了,这次更新作废。所以多线程争抢时,每个线程的典型写法是:先 load 当前值,根据业务算出新值,再 CAS(地址, 当前值, 新值);若 CAS 返回失败就重新 load、再算、再 CAS,直到成功(即"compare-and-swap 循环")。这样同一时刻只有一个线程的 CAS 能成功,其它线程重试即可,无需加锁。
三、SPSC 简单 demo(链表 + 原子 load/store)
下面用单向链表实现一个无界 SPSC 队列,对应的是:并发模型 = SPSC,同步机制 = 原子 load/store + acquire-release。
- 维护两个原子指针:
_head(消费者从这儿取)、_tail(生产者往这儿挂)。 - 保留一个"哑元节点",
_head指向它;有数据时,数据在_head->next到_tail这条链上。 - Push:先让当前
_tail->next指向新节点(release),再把_tail指到新节点(release)。顺序不能反。 - Pop:看
_head->next是否为空;不空则把_head移到_head->next,返回旧头节点里的值(acquire 读指针)。
#include <atomic>
#include <memory>
template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() {
Node* dummy = new Node();
_head.store(dummy);
_tail.store(dummy);
}
~LockFreeQueue() {
T tmp;
while (Pop(tmp)) {}
delete _head.load();
}
void Push(T value) {
Node* node = new Node(std::move(value));
Node* prev = _tail.load(std::memory_order_acquire);
prev->_next.store(node, std::memory_order_release);
_tail.store(node, std::memory_order_release);
}
bool Pop(T& out) {
Node* head = _head.load(std::memory_order_acquire);
Node* next = head->_next.load(std::memory_order_acquire);
if (next == nullptr) return false;
out = std::move(next->_value);
_head.store(next, std::memory_order_release);
delete head;
return true;
}
private:
struct Node {
T _value;
std::atomic<Node*> _next{nullptr};
Node() = default;
explicit Node(T v) : _value(std::move(v)) {}
};
std::atomic<Node*> _head;
std::atomic<Node*> _tail;
};
要点:Push 先挂链再改尾;_head->next 的 load 用 acquire,保证可见性;哑元节点让空队列时 Pop 直接返回 false。完整可运行 demo 见 https://github.com/os-artificer/ebooks/tree/main/cpp/src/lock_free_queue_demo.cpp(文件头部注释含编译与运行指令)。
四、小结
| 维度 | 说明 |
|---|---|
| 并发模型 | SPSC 最简单(各改各的);MPSC 尾要争抢;MPMC 头尾都要争抢。 |
| 同步机制 | 单写者用原子 load/store + acquire-release;多写者用 CAS。 |
| 组合 | SPSC + 原子操作即可(上文 demo);MPSC/MPMC 需在争抢处上 CAS 及 ABA、回收等处理。 |
先按"并发模型 + 同步机制"把分类搞清楚,再去看有界环形队列或 MPMC 实现会容易很多。