Java

一读一写(SPSC):Memory Barrier + Volatile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Single-Producer Single-Consumer (SPSC) 无锁环形队列。
*
* <p>原理说明:
* - 仅允许一个线程调用 {@code offer()},一个线程调用 {@code poll()}。
* - 由于没有写竞争,无需 CAS;只需保证写操作对消费者可见。
* - 使用两个 volatile 索引(head/tail)建立 happens-before 关系:
* 生产者写入元素 → volatile 写 tail → 消费者 volatile 读 tail → 读取元素。
* - 这本质上利用了 Java 内存模型中的“volatile 写-读”内存屏障(StoreLoad),
* 虽然比 Disruptor 的 putOrdered 稍重,但在 SPSC 场景下完全安全且简洁。
*
* <p>性能提示:
* - 更极致的实现会用 {@code AtomicLong.lazySet()}(即 store-store barrier)
* 替代 volatile 写,避免 StoreLoad 开销。但本例为清晰起见使用 volatile。
*/
public class SPSCQueue<E> {
private final E[] buffer;
private final int capacityMask; // 必须是 2^n - 1
private volatile int tail = 0; // producer writes here
private volatile int head = 0; // consumer reads from here

@SuppressWarnings("unchecked")
public SPSCQueue(int capacity) {
int c = 1;
while (c < capacity) c <<= 1;
this.buffer = (E[]) new Object[c];
this.capacityMask = c - 1;
}

/**
* 单生产者入队。
* @return 成功返回 true,队列满则返回 false。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
int currentTail = tail;
int nextTail = (currentTail + 1) & capacityMask;
if (nextTail == head) return false; // full

buffer[currentTail] = e;
// volatile 写 tail 建立 happens-before:buffer 写入 → tail 更新
tail = nextTail;
return true;
}

/**
* 单消费者出队。
* @return 队首元素,若空则返回 null。
*/
public E poll() {
if (head == tail) return null; // empty
E e = buffer[head];
buffer[head] = null; // help GC
head = (head + 1) & capacityMask;
return e;
}
}

一写多读(SPMC):Single-Writer Principle + Volatile Publish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Single-Producer Multiple-Consumer (SPMC) 序列化环形缓冲区。
*
* <p>原理说明:
* - 严格遵循 "single-writer principle":仅一个线程可调用 {@code publish()}。
* - 多个消费者通过读取 {@code cursor} 判断数据是否就绪。
* - {@code cursor} 是 volatile long,作为“发布门”(publication fence):
* 写者先写入 buffer[slot],再 volatile 写 cursor → 所有读者 guaranteed see the value.
* - 这正是 LMAX Disruptor RingBuffer 的核心思想:避免 CAS,靠单写者 + volatile 发布。
* - 读者无需同步,只需轮询或等待 cursor >= targetSequence。
*
* <p>注意:
* - 不提供阻塞或通知机制,仅为底层数据结构。
* - sequence 必须单调递增,由外部协调。
*/
public class SPMCSequenceRingBuffer<E> {
private final E[] buffer;
private final int mask;
private volatile long cursor = -1L; // last published sequence

@SuppressWarnings("unchecked")
public SPMCSequenceRingBuffer(int capacity) {
int c = 1;
while (c < capacity) c <<= 1;
this.buffer = (E[]) new Object[c];
this.mask = c - 1;
}

/**
* 获取下一个可用序列号(仅建议生产者调用)。
*/
public long next() {
return cursor + 1;
}

/**
* 发布指定序列的数据(仅单写者调用!)。
* @param sequence 序列号,必须等于 {@code cursor + 1}
* @param value 数据
*/
public void publish(long sequence, E value) {
if (value == null) throw new NullPointerException();
int slot = (int) (sequence & mask);
buffer[slot] = value;
// volatile write to cursor acts as a memory barrier:
// ensures all prior writes (to buffer[slot]) are visible to readers
cursor = sequence;
}

/**
* 多消费者可安全读取已发布的数据。
* @return 若 sequence 尚未发布,返回 null;否则返回数据。
*/
public E get(long sequence) {
if (sequence > cursor) return null; // not published yet
return buffer[(int) (sequence & mask)];
}
}

disruptor 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* 简化版 Disruptor RingBuffer 核心原理演示(仅支持单生产者)。
*
* <p><b>Disruptor 为何能达到超高吞吐(100M+ ops/sec)?</b>
* 其性能并非来自魔法,而是通过精心设计,将并发问题转化为<strong>顺序执行 + 可预测内存访问</strong>的问题。
* 核心依赖三大工程原则:
*
* <h3>1. 【单写者原则】(Single-Writer Principle)</h3>
* - 在任意时刻,<b>只有一个线程</b>被允许修改关键状态(如 sequence)。
* - 这彻底消除了多写者竞争,无需 CAS、锁或原子操作,避免了:
* • CAS 失败重试的开销(在高竞争下指数级恶化)
* • 内存总线争用(多个核心同时尝试写同一缓存行)
* - 多消费者通过各自维护独立的消费序列号(consumer cursor)进行读取,
* 彼此完全解耦,无任何同步需求。
* - 此原则是 Disruptor 性能基石:它把“并发写”问题降维为“并发读 + 单写”。
*
* <h3>2. 【预分配对象 + 缓存行填充】(Pre-allocation + Cache Line Padding)</h3>
* - <b>预分配</b>:RingBuffer 在构造时一次性分配所有槽位(Object[]),运行时不再创建/回收对象,
* 极大降低 GC 压力(尤其对 latency-sensitive 系统至关重要)。
* - <b>缓存行填充</b>(Cache Line Padding):防止 <i>伪共享</i>(False Sharing)——这是高性能并发中常被忽视的“隐形杀手”。
*
* <p><b>什么是伪共享(False Sharing)?</b>
* - 现代 CPU 并不以字节为单位加载内存,而是以 <b>缓存行</b>(Cache Line)为单位,通常为 <b>64 字节</b>。
* - 当多个 CPU 核心分别访问 <b>不同变量</b>,但这些变量物理上位于 <b>同一个缓存行</b> 内时,
* 只要任一核心修改了该缓存行中的任意一个变量,整个缓存行就会在其他核心的 L1/L2 缓存中标记为 <i>Invalid</i>(失效)。
* - 其他核心若再读取自己关心的变量,就必须通过缓存一致性协议(如 MESI)从主存或其他核心重新加载整行。
* - 这种因“逻辑无关但物理相邻”导致的缓存同步开销,称为 <b>伪共享</b>。
*
* <p><b>为何在 RingBuffer 中特别危险?</b>
* - 生产者高频写 {@code cursor.value}(sequence++)。
* - 消费者高频读 {@code cursor.value} 判断数据是否就绪。
* - 如果 {@code cursor.value} 与其它 volatile 字段(如另一个消费者的进度、状态标志等)落在同一缓存行,
* 则每次生产者更新都会导致所有消费者缓存失效 → 引发大量不必要的缓存同步流量 → 吞吐骤降。
*
* <p><b>如何解决?</b>
* - 让高频读写的独立状态字段 <b>独占一个完整的缓存行</b>。
* - 方法:在目标字段前后填充无用的 long 字段(每个 long 8 字节),
* 使得目标字段前后至少有 64 字节的“隔离带”。
* - 本例中:7 个前填充 + 1 个 value + 7 个后填充 = 15 × 8 = 120 字节 > 64 字节,
* 确保无论 JVM 如何布局对象,{@code value} 都不会与相邻字段共享缓存行。
* - 注意:Java 9+ 可用 {@code @sun.misc.Contended} 自动填充,但 Java 8 必须手动实现。
*
* <h3>3. 【精确内存屏障控制】(Precise Memory Barrier Control)</h3>
* - Disruptor 不依赖粗粒度的 synchronized 或 heavyweight volatile,
* 而是使用 <b>最小必要屏障</b> 实现“发布语义”(Publication Guarantee):
* • 先写入数据到 buffer[slot]
* • 再更新 sequence(作为“发布信号”)
* - 在 Java 内存模型中,{@code volatile} 写提供 <b>StoreLoad barrier</b>,
* 保证所有之前的普通写(buffer[slot] = value)对后续读取该 volatile 的线程可见。
* - 工业级 Disruptor 实际使用 {@code Unsafe.putOrderedLong()}(即 store-store barrier),
* 它比 volatile 更轻量(不包含 StoreLoad),但 Java 8 无法通过标准 API 使用 VarHandle,
* 故此处用 volatile 保证正确性与可移植性。
*
* <p><b>对比传统队列(如 ArrayBlockingQueue / ConcurrentLinkedQueue):</b>
* - ✅ 无锁(lock-free)且无 CAS(单生产者场景)
* - ✅ 无指针追逐(pointer chasing):数组连续内存 vs 链表分散节点
* - ✅ 无 GC 压力:预分配槽位,避免运行时 new/delete
* - ✅ 无伪共享:关键状态字段缓存行隔离
* - ✅ CPU 分支预测友好:循环逻辑高度可预测
*
* <p>正是这些看似微小的设计叠加,使 Disruptor 在 LMAX 交易所实测达到 <b>每秒数千万事件处理</b> 的吞吐。
*/
public class DisruptorLikeRingBuffer<E> {

/**
* 带缓存行填充的序列计数器,专为防止伪共享设计。
*
* <p>布局说明(按 64 字节缓存行对齐):
* - p1~p7: 前填充(56 字节),确保 {@code value} 不与前一个对象字段共享缓存行。
* - value: 核心字段(8 字节),volatile 保证跨线程可见性。
* - p8~p14: 后填充(56 字节),确保 {@code value} 不与后续字段(如对象头、其他实例变量)共享缓存行。
*
* <p>总占用 120 字节,远超 64 字节缓存行,提供强隔离保障。
*/
private static class PaddedSequence {
long p1, p2, p3, p4, p5, p6, p7; // 前填充:56 字节
volatile long value = -1L; // 核心序列号:8 字节
long p8, p9, p10, p11, p12, p13, p14; // 后填充:56 字节
}

private final E[] buffer; // 预分配的环形缓冲区
private final int mask; // 容量掩码(capacity - 1),用于快速取模
private final PaddedSequence cursor = new PaddedSequence(); // 生产者发布的最新序列号

/**
* 构造函数:初始化 RingBuffer。
* @param capacity 队列容量(会被向上对齐到 2 的幂,以支持位运算取模)
*/
@SuppressWarnings("unchecked")
public DisruptorLikeRingBuffer(int capacity) {
// 强制容量为 2 的幂,以便用 (sequence & mask) 替代 % 运算(更快)
int c = 1;
while (c < capacity) c <<= 1;
this.buffer = (E[]) new Object[c];
this.mask = c - 1;
}

/**
* 单生产者申请下一个可用序列号。
* <p>由于仅一个线程调用此方法,直接返回 {@code cursor + 1},无任何同步开销。
*/
public long next() {
return cursor.value + 1;
}

/**
* 发布数据到指定序列位置。
*
* <p><b>内存可见性保证:</b>
* - 步骤 1: {@code buffer[slot] = value} 是普通写(non-volatile store)
* - 步骤 2: {@code cursor.value = sequence} 是 volatile 写
* - 根据 Java 内存模型,volatile 写建立 happens-before 关系:
* 所有在 volatile 写之前的动作(包括步骤 1),对后续读取该 volatile 的线程可见。
* - 因此,任何消费者读到 {@code cursor >= sequence} 时,必定能看到 {@code buffer[slot]} 的值。
*
* <p>工业级优化:Disruptor 使用 {@code Unsafe.putOrderedLong()} 替代 volatile 写,
* 仅插入 StoreStore barrier(禁止重排序,但不强制刷新缓存),性能更高。
* 但 Java 8 无标准 API 支持,故此处用 volatile 保证正确性。
*/
public void publish(long sequence, E value) {
if (value == null) throw new NullPointerException();
int slot = (int) (sequence & mask); // 快速取模
buffer[slot] = value;
// 发布屏障:确保数据写入先于 sequence 更新对消费者可见
cursor.value = sequence;
}

/**
* 消费者读取指定序列的数据。
* <p>多消费者可并行调用,只要各自维护自己的消费进度(如 lastSeenSequence),
* 互不干扰。此方法本身无同步,依赖外部协调消费逻辑。
*
* @param sequence 要读取的序列号
* @return 若数据已发布则返回元素,否则返回 null
*/
public E get(long sequence) {
if (sequence > cursor.value) return null; // 尚未发布
return buffer[(int) (sequence & mask)];
}

/**
* 获取当前已发布的最大序列号。
* <p>消费者通常轮询此值,判断是否有新数据可处理。
*/
public long getCursor() {
return cursor.value;
}
}

多写多读(MPMC):CAS + Lock-Free Linked Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* Multi-Producer Multi-Consumer (MPMC) 无锁链表队列。
*
* <p>原理说明:
* - 支持任意数量的生产者和消费者并发操作。
* - 核心同步机制:CAS(Compare-And-Swap)原子更新指针。
* - 基于经典的 Michael & Scott 无锁队列算法:
* - 入队:尝试 CAS 更新 tail.next;失败则重试或帮助推进 tail。
* - 出队:CAS 更新 head 指向 next 节点,实现逻辑删除。
* - 使用 {@code AtomicReferenceFieldUpdater} 对 Node.next 做 CAS,
* 避免每个 Node 持有 AtomicReference(节省内存)。
* - 虽然无锁(lock-free),但高竞争下可能重试多次,吞吐低于 SPSC/SPMC。
*
* <p>设计权衡:
* - 无容量限制(unbounded),可能 OOM。
* - 不处理 ABA 问题(因引用语义,Java 中对象地址唯一,ABA 风险低)。
*/
public class MPMCLinkedListQueue<E> {
private static class Node<E> {
volatile E item;
volatile Node<E> next;

Node(E item) { this.item = item; }
}

private final AtomicReference<Node<E>> head;
private final AtomicReference<Node<E>> tail;

public MPMCLinkedListQueue() {
Node<E> dummy = new Node<>(null);
head = new AtomicReference<>(dummy);
tail = new AtomicReference<>(dummy);
}

private static final AtomicReferenceFieldUpdater<Node, Node> NEXT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");

/**
* 多生产者入队。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> newNode = new Node<>(e);

while (true) {
Node<E> currentTail = tail.get();
Node<E> next = currentTail.next;

if (next != null) {
// Tail is stale; help advance it
tail.compareAndSet(currentTail, next);
} else {
// Attempt to link new node
if (NEXT_UPDATER.compareAndSet(currentTail, null, newNode)) {
// Success: now try to move tail forward (optional optimization)
tail.compareAndSet(currentTail, newNode);
return true;
}
// Lost race to another producer; retry
}
}
}

/**
* 多消费者出队。
*/
public E poll() {
while (true) {
Node<E> currentHead = head.get();
Node<E> first = currentHead.next;

if (first == null) return null; // empty

E item = first.item;
if (item != null && NEXT_UPDATER.compareAndSet(currentHead, first, null)) {
// Successfully unlinked; move head forward
head.set(first);
first.item = null; // help GC
return item;
}
// Help other consumers by advancing head, or retry
head.compareAndSet(currentHead, first);
}
}
}