无锁循环队列

无锁循环队列(Lock-Free Circular Queue)是一种线程安全的环形缓冲区数据结构,允许多个线程并发访问而无需使用锁(如互斥锁)。它通过原子操作(如 CAS,Compare-And-Swap)确保线程安全,适用于高并发场景,如生产者-消费者模型。

1. 无锁循环队列的特点

  • 循环队列
    • 使用固定大小的数组,首尾相连形成环形结构。
    • 通过模运算(% size)实现循环索引。
  • 无锁
    • 使用原子操作(如 std::atomic)避免锁竞争,减少上下文切换和锁开销。
    • 适合多线程环境,降低阻塞,提高吞吐量。
  • 线程安全
    • 允许多个生产者(push)和消费者(pop)并发操作。
    • 需处理并发时的竞争和一致性问题。
  • 限制
    • 通常固定大小(非动态扩展)。
    • 实现复杂,需谨慎处理边界条件和 ABA 问题。

2. 实现原理

无锁循环队列通常基于以下关键点:

  • 原子变量:使用 std::atomic 管理头(head)和尾(tail)索引,确保线程安全的更新。
  • CAS 操作:通过比较并交换(Compare-And-Swap)实现无锁的入队(push)和出队(pop)。
  • 循环索引:通过模运算循环使用数组,避免越界。
  • 单生产者/单消费者(SPSC)或多生产者/多消费者(MPMC)
    • SPSC 实现较简单,竞争较少。
    • MPMC 需要更复杂的同步机制。

3. 实现代码(C++ 单生产者/单消费者无锁循环队列)

以下是一个简单的单生产者/单消费者(SPSC)无锁循环队列实现,适合一个线程入队、一个线程出队的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <atomic>
#include <cstddef>

template<typename T, size_t Size>
class LockFreeQueue {
private:
T buffer[Size]; // 固定大小的环形缓冲区
std::atomic<size_t> head{0}; // 出队索引(消费者更新)
std::atomic<size_t> tail{0}; // 入队索引(生产者更新)
static constexpr size_t MASK = Size - 1; // 假设 Size 是 2 的幂

public:
// 入队操作(单生产者)
bool push(const T& item) {
size_t current_tail = tail.load(std::memory_order_relaxed);
size_t next_tail = (current_tail + 1) & MASK; // 循环索引

// 检查队列是否满
if (next_tail == head.load(std::memory_order_acquire)) {
return false; // 队列满
}

// 写入数据
buffer[current_tail] = item;
// 更新 tail(原子操作,确保内存可见性)
tail.store(next_tail, std::memory_order_release);
return true;
}

// 出队操作(单消费者)
bool pop(T& item) {
size_t current_head = head.load(std::memory_order_relaxed);

// 检查队列是否空
if (current_head == tail.load(std::memory_order_acquire)) {
return false; // 队列空
}

// 读取数据
item = buffer[current_head];
// 更新 head(原子操作)
head.store((current_head + 1) & MASK, std::memory_order_release);
return true;
}

// 检查队列是否为空
bool empty() const {
return head.load(std::memory_order_acquire) == tail.load(std::memory_order_acquire);
}

// 检查队列是否已满
bool full() const {
return ((tail.load(std::memory_order_acquire) + 1) & MASK) == head.load(std::memory_order_acquire);
}
};

代码说明

  • 模板参数
    • T:存储的数据类型。
    • Size:队列大小,必须是 2 的幂(便于模运算优化)。
  • 原子操作
    • headtail 使用 std::atomic 确保线程安全。
    • 使用 memory_order_acquirememory_order_release 保证内存序正确。
  • 入队(push)
    • 检查队列是否满(next_tail == head)。
    • 写入数据后更新 tail
  • 出队(pop)
    • 检查队列是否空(head == tail)。
    • 读取数据后更新 head
  • 循环索引
    • 使用 & MASK 代替模运算(% Size),因为 Size 是 2 的幂,优化性能。
  • 内存序
    • acquire 确保读取最新值,防止重排。
    • release 确保写入对其他线程可见。

4. 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>

int main() {
LockFreeQueue<int, 8> queue; // 容量为 8 的队列

// 生产者:入队
for (int i = 0; i < 7; ++i) {
if (queue.push(i)) {
std::cout << "Pushed: " << i << "\n";
} else {
std::cout << "Queue full\n";
}
}

// 消费者:出队
int value;
while (queue.pop(value)) {
std::cout << "Popped: " << value << "\n";
}

return 0;
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Pushed: 0
Pushed: 1
Pushed: 2
Pushed: 3
Pushed: 4
Pushed: 5
Pushed: 6
Popped: 0
Popped: 1
Popped: 2
Popped: 3
Popped: 4
Popped: 5
Popped: 6

5. 多生产者/多消费者(MPMC)实现

MPMC 队列更复杂,需要使用 CAS 操作解决并发竞争。以下是关键改动:

  • 入队(push)
    • 使用 compare_exchange_strong 原子更新 tail
      1
      2
      3
      4
      5
      6
      size_t current_tail = tail.load(std::memory_order_relaxed);
      while (!tail.compare_exchange_strong(current_tail, (current_tail + 1) & MASK,
      std::memory_order_release,
      std::memory_order_relaxed)) {
      current_tail = tail.load(std::memory_order_relaxed); // 重试
      }
  • 出队(pop)
    • 类似地,使用 CAS 更新 head
  • ABA 问题
    • 使用版本计数或双字 CAS(如 std::atomic<std::pair<size_t, size_t>>)避免 ABA 问题。
  • 复杂 Facade Pattern
    • 引入引用计数或其他标记确保数据一致性。

6. 注意事项

  • 适用场景
    • SPSC 适合高性能单线程生产/消费场景,如消息队列。
    • MPMC 适合多线程复杂场景,如任务调度。
  • 局限性
    • 固定大小,需预估最大容量。
    • MPMC 实现复杂,可能需要额外的同步机制。
    • 原子操作可能因重试导致性能波动。
  • 优化
    • 使用 2 的幂大小优化模运算。
    • 批量操作减少原子操作频率。
    • 考虑内存对齐和缓存行优化。

7. 总结

  • 无锁循环队列:通过原子操作实现线程安全的环形缓冲区,适合高并发场景。
  • 实现要点:原子变量、CAS 操作、循环索引、内存序管理。
  • SPSC vs MPMC:SPSC 简单高效,MPMC 更通用但实现复杂。
  • 应用:消息队列、任务调度、数据流处理。

如需更详细的 MPMC 实现或性能测试数据,请提供进一步要求!