C++ 高性能编程之无锁队列的实现原理

无锁队列不用互斥锁,靠"原子操作"和"内存序"让多线程安全地入队、出队。实现方式可以从两个维度看:并发模型(谁在生产、谁在消费)和同步机制(用哪种原子原语)。下面按这两条线捋一遍,最后给一个 SPSC + 原子 load/store 的简单例子。


一、按并发模型分

并发模型决定了"谁在改头、谁在改尾",从而决定你需要多强的同步机制。

SPSC(单生产者单消费者)

一个线程只 push,一个线程只 pop。生产者独占"尾"、消费者独占"头",两边各改各的,不会同时抢同一指针。

所以用原子 load/store + 正确的内存序就够了,不需要 CAS。实现简单,很多场景(比如流水线里一阶段生产、下一阶段消费)够用。

MPSC(多生产者单消费者)

多个线程 push,一个线程 pop。尾指针会被多个生产者争抢,必须用 CAS(或类似原子"比较并交换")让多线程安全地把自己挂到队尾;头指针只有消费者改,用原子 store 即可。

MPMC(多生产者多消费者)

多个线程同时 push、同时 pop。头尾都可能被多线程改,通常头尾都要用 CAS 争抢,并且要处理 ABA、节点回收等问题,实现和内存序都更复杂。


二、按同步机制分

无锁队列用到的同步机制主要有两类:原子 load/store 配内存序,以及 CAS

原子 load/store + acquire-release

适合只有一个写者的指针或下标。例如 SPSC 里:生产者只写尾、消费者只写头,所以各自用原子 store 更新、对方用原子 load 读取即可。

关键是用对"内存序",可以记两条规则:

配对关系:线程 A 的 store(release) 和线程 B 的 load(acquire) 若针对同一原子变量,且 B 读到的值就是 A 写进去的那次,则称 B 的 load "与 A 的 store 同步"。此时 A 在 store(release) 之前的全部写入,对 B 在 load(acquire) 之后都可见。无锁队列里:生产者对尾指针 store(release),消费者对头/next load(acquire),就能保证消费者在看到新节点之后,一定能看到生产者写进节点里的数据。

CAS(Compare-And-Swap)

多个线程要改同一个位置(比如多个生产者抢尾指针、或多个消费者抢头指针)时,单纯 load/store 不够,要用 CAS。

MPSC 里尾指针用 CAS;MPMC 里头尾通常都要 CAS,并且往往要配合"指针 + 计数"或 DCAS 等避免 ABA、安全回收节点。

CAS 的工作原理:CAS 是一条原子指令,一般有三个参数——目标内存地址、期望值(expected)、新值(desired)。执行时,CPU 会原子地做两件事:先看当前目标地址里的值是否等于期望值;若相等则把新值写进去并返回"成功",若不相等则什么都不改并返回"失败"。也就是说:只有"当前值还是我上次读到的那样"时我才改,否则说明被别的线程改过了,这次更新作废。所以多线程争抢时,每个线程的典型写法是:先 load 当前值,根据业务算出新值,再 CAS(地址, 当前值, 新值);若 CAS 返回失败就重新 load、再算、再 CAS,直到成功(即"compare-and-swap 循环")。这样同一时刻只有一个线程的 CAS 能成功,其它线程重试即可,无需加锁。


三、SPSC 简单 demo(链表 + 原子 load/store)

下面用单向链表实现一个无界 SPSC 队列,对应的是:并发模型 = SPSC同步机制 = 原子 load/store + acquire-release

#include <atomic>
#include <memory>

template <typename T>
class LockFreeQueue {
public:
    LockFreeQueue() {
        Node* dummy = new Node();
        _head.store(dummy);
        _tail.store(dummy);
    }

    ~LockFreeQueue() {
        T tmp;
        while (Pop(tmp)) {}
        delete _head.load();
    }

    void Push(T value) {
        Node* node = new Node(std::move(value));
        Node* prev = _tail.load(std::memory_order_acquire);
        prev->_next.store(node, std::memory_order_release);
        _tail.store(node, std::memory_order_release);
    }

    bool Pop(T& out) {
        Node* head = _head.load(std::memory_order_acquire);
        Node* next = head->_next.load(std::memory_order_acquire);
        if (next == nullptr) return false;
        out = std::move(next->_value);
        _head.store(next, std::memory_order_release);
        delete head;
        return true;
    }

private:
    struct Node {
        T _value;
        std::atomic<Node*> _next{nullptr};
        Node() = default;
        explicit Node(T v) : _value(std::move(v)) {}
    };

    std::atomic<Node*> _head;
    std::atomic<Node*> _tail;
};

要点:Push 先挂链再改尾;_head->next 的 load 用 acquire,保证可见性;哑元节点让空队列时 Pop 直接返回 false。完整可运行 demo 见 https://github.com/os-artificer/ebooks/tree/main/cpp/src/lock_free_queue_demo.cpp(文件头部注释含编译与运行指令)。


四、小结

维度 说明
并发模型 SPSC 最简单(各改各的);MPSC 尾要争抢;MPMC 头尾都要争抢。
同步机制 单写者用原子 load/store + acquire-release;多写者用 CAS。
组合 SPSC + 原子操作即可(上文 demo);MPSC/MPMC 需在争抢处上 CAS 及 ABA、回收等处理。

先按"并发模型 + 同步机制"把分类搞清楚,再去看有界环形队列或 MPMC 实现会容易很多。