写在前面的话

并发编程最早的实践都在操作系统里。

理论和实践之间是有鸿沟的,要弥合这种鸿沟,通常需要我们去学习别人的实践。比如并发的标准设计思想来自于操作系统里的管程,我们应当学习管程,进而了解标准的并发模型。

juc.png
juc.xmind

Java 线程

线程是比进程更轻量级的调度执行单位,线程的引入,可以把一个进程的资源分配和执行调度分开,各个线程既可以共享进程资源(内存地址、文件 I/O),又可以独立调度(线程是 CPU 调度的最基本单位)。主流的操作系统都提供了线程的实现,但 Java 的线程实现却在关键方法上大量使用 Native(这也就意味着,不能使用平台无关的实现),其中主要包括三种方法:

  1. 内核线程实现。
  2. 用户线程实现(Green Thread)。
  3. 用户线程加轻量级线程实现。

线程实现

内核线程实现

内核线程(Kernel Thread,KLT)就是直接由操作系统内核(Kernel)支持的线程,这种线程由内核来完成线程切换,内核通过操纵调度器(Scheduler)对线程进行调度,并负责将线程的任务映射到各个处理器上。每个内核线程可以被看做是内核的一个分身,这样操作系统就有能力处理多件事情。支持多线程的内核就叫多线程内核。
程序一般不会直接去使用内核线程,而是去使用内核线程的一种高级接口—轻量级进程(Light Weight Process,LWP),轻量级进程就是我们通常意义上所讲的线程。由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程。这种轻量级进程与内核线程1:1的关系称为一对一的线程模型。
由于内核线程的支持,每个轻量级进程都成为一个独立的调度单元。即使有一个轻量级进程在系统调用中阻塞了,也不会影响整个进程继续工作。但轻量级进程具有它的局限性:首先,各种线程的创建、析构和同步,都需要进行系统调用,也就是用户态(User Mode)trapped 到内核态(Kernel Mode),来回切换。其次,每个 LWP 都需要一个内核线程的支持,因此轻量级进程还要消耗一定的内核资源(如内核线程的栈空间),因此,一个系统能够支持的轻量级进程的数量是有限的。

用户线程实现

从广义上来讲,一个线程只要不是内核线程,就可以被认为是用户线程。因此从这个定义上来说讲,轻量级进程也属于用户线程,但轻量级进程的实现始终是建立在内核之上的,许多操作都要进行系统调用。
而下一的用户线程值得是完全建立在用户空间的线程库上,系统内核不能感知到线程存在的实现。用户线程的建立、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。如果程序实现得当,线程不需要切换到内核态,因此程序可以是非常快速而且是低消耗的,也因此可以支持规模更大的线程数量,部分高性能的数据库中的多线程就是由用户线程实现的(node?数据库中间件?redis?)。这种进程与用户线程之间 1:N 的关系称为一对多的线程模型。
使用用户线程的优势是不需要系统内核支援,劣势也在于没有系统内核的支援,所有的线程操作都需要用户程序自己处理。线程的创建、qiehu切换和调度都是需要考虑的问题,而且由于操作系统只把处理器资源分配到进程,那诸如“阻塞如何处理”、“多处理器系统中如何将线程映射到其他处理器上”这类问题解决起来会异常困难,甚至是不可能完成的。所以现在越来越少使用用户线程来实现线程了。据我所知,只有早期的 Java (1.2以前), Ruby(1.9以前)使用绿色线程。 很多程序员将它称为基于分时(time-sharing)的调度方式,不无道理,它要自己写自己的scheduler,是一个非常麻烦的事情,等于把批发回来的资源再分配了一遍。

混合实现

将内核线程和用户线程一起使用的方式。用户线程依然完全建立在用户空间内,而LWP则是用户线程和内核沟通的桥梁,可以让用户线程通过它背后的内核线程 leverage kernel scheduler, processor mapping和 system call。这种设计大大降低了一个线程阻塞另一个线程以至于进程全被阻塞的风险,但还是存在这样的风险。所以还是一对一的线程模型好,虽然操作可能有昂贵的地方,但是也很省心实用99。

Java的线程实现

Java在当前的规范里面取消了绿色线程,也就是线程使用的透明性需要结合操作系统来看待。目前对于 Hot-Spot 虚拟机而言,Windows版和 Linux 版是使用一对一的模型实现的。在其他平台上,JVM 还有些可选参数来专门选择线程模型。

以Linux为例。Linux历史上,最开始使用的线程是LinuxThreads,但LinuxThreads有些方面受限于内核的特性,从而违背了SUSV3 Pthreads标准。即它要根据内核的特性来实现线程,有些地方没有遵循统一的标准。后来IBM开发了NGPT(Next Generation POSIX Threads),性能明显优于LinuxThreads,人们曾把它当作LinuxThreads的继任者。但最后,又有一个项目NPTL(Native POSIX Threads Library)出来后,性能更优于NGPT。2002年NGPT项目停止开发,我们现在用的Linux线程就是NPTL。

线程的实现曾有3种模型:

1.多对一(M:1)的用户级线程模型

2.一对一(1:1)的内核级线程模型

3.多对多(M:N)的两级线程模型

上面的x对y(x:y)即x个用户线程对应y个内核调度实体(Kernel Scheduling Entity,这个是内核分配CPU的对象单位)。

LinuxThreads和NPTL都是采用一对一的线程模型,NGPT采用的是多对多的线程模型!!!

Java的线程调度

不要试图依赖线程优先级。因为线程优先级并不是操作系统本地的优先级(windows 系统优先级 < JVM 优先级 < Solaris 系统优先级),而且优先级还会发生变动(Windows上的 Priority Boosting)。

在 OS 级别可能实现由抢占式调度和协作式调度,抢占式调度更强大而协作式调度更简单,Java 只有协作式调度(Thread.yield()方法)。

Java的线程状态转换

Java 线程状态

java-thread-state.png

六种被写进线程里的枚举状态,可以在 jstack 等 JMX 工具里得到解答:

  • New 新建后尚未开始启动的状态。
  • Runnable 等于系统线程中 Running 和 Ready(操作系统也有不同的状态)。线程可能正在运行,也可能准备可以运行(在阻塞或者等待状态被唤醒,等待 CPU 分配时间片)。
  • Waiting 无限期等待。除非被显式唤醒(notify,notifyAll,signal,signalAll,interrupt),不然无限期地等待下去。可以导致 Waiting 的方法有:
    • 没有设置 Timeout 参数的 Object.wait()。
    • 没有设置 Timeout 参数的 Thread.join()。
    • LockSupport.park()方法(所以这个方法就是无限黑洞)。
  • Timed Waiting 处于这种状态的线程没有 CPU 时间,可以被自动唤醒(也可以被 interrrupt())。由以下方法可以看出,Sleep 和 Wait 除了附着的对象不同,都要让出 CPU 时间片:
    • Thread.sleep()。
    • 设置了 Timeout 参数的 Object.wait() 方法。
    • 设置了TimeOut参数的 Thread.join() 方法。
    • LockSupport.parkNanos() 方法。
    • LockSupport.parkUntil() 方法。
  • Blocked: 等待排他锁(synchronized, reentrantlock,获取阻塞队列的操作权)的时候。wait 在后置的 wait_set 里面,synchronized 在 entry_set 里面。
  • Terminated: 结束执行的线程的状态,比如走完了栈帧里程的 main 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}

线程状态列举

NEW

没有启动过的线程。

RUNNABLE

  1. 正在执行的线程。
  2. 可以被执行但没有拿到处理器资源。

BLOCKED

blocked 其实是 blocked waiting。
1 等待 monitor,进入 synchronized method/block
2 或者等 wait()/await()以后再次进入 synchronized method/block(注意这一点,解除 wait 以后以后不是直接 runnable,而是进入 blocked,但这一步非常短暂,几乎不可能用程序观察到)。

WAITING

在调用这三种不计时方法以后,线程进入 waiting 态:

  • Object.wait
  • Thread.join
  • LockSupport.park 我们经常在文档里看到的 thread lies dormant 就是被这个方法处理过的结果

waiting 意味着一个线程在等待另一个线程做出某种 action。wait 在等其他对象 notify 和 notifyAll,join 在等其他线程终结。

如:
java.util.concurrent.LinkedBlockingQueue.take -> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await -> java.util.concurrent.locks.LockSupport.park

Reentrantlock 的 lock 接口的栈帧则是:

1
2
3
4
5
6
7
sun.misc.Unsafe.park 行: 不可用 [本地方法]
java.util.concurrent.locks.LockSupport.park 行: 175
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt 行: 836
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued 行: 870
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire 行: 1199
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock 行: 209
java.util.concurrent.locks.ReentrantLock.lock 行: 285

jstack 总会告诉我们 waiting 的位置,比如等待某个 Condition 的 await 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
ReentrantLock lock = new ReentrantLock();
final Thread t1 = new Thread(() -> {
System.out.println("t1 before lock");
lock.lock();
try {
// 此时 t1 是 Runnable
queue.put(1); // 此时刺激主线程开始读 t2
System.out.println("t1 begin to sleep");
Thread.sleep(1000000L);
} catch (Exception ex) {
}
System.out.println("t1 prepare to release lock");
lock.unlock();
System.out.println("t1 release lock");
});

final Thread t2 = new Thread(() -> {
try {
Thread.sleep(1000L);
} catch (Exception ex) {
}
System.out.println("t2 before lock");
// 此时 t2 可能被 t1 阻塞,进入 waiting 状态
lock.lock();
System.out.println("t2 prepare to release lock");
lock.unlock();
System.out.println("t2 release lock");
});
t1.setName("t1");
t2.setName("t2");
t1.start();
t2.start();
// 此时主线程在等待一个信号来刺激自己往下走
queue.take();
// 往下走的目的就是校验 t2 的状态
while (t2.isAlive()) {
System.out.println(t2.getState());
}
}

对这个程序进行 thread dump,可以看出 ReentrantLock 就是依赖于 park 导致的 waiting:

parking即waiting.png
sleeping即timed-waiting.png

如果使用 synchronized,则会显示 object monitor:

object-monitor.png

所以 waiting 可能是在条件变量上等待,也可能是在 synchronizer 本身上等待,不可一概而论。

按照 jvisualvm 的分类方法,线程还可以分为:

  • 等待 wait
  • 驻留 Park
  • 监视 Monitor
  • 运行中 Running
  • 睡眠 Sleeping

TIMED_WAITING

调用了计时方法,等待时间结束后才或者被其他唤醒方法唤醒结束等待。

Thread.sleep
Object.wait
Thread.join
LockSupport#parkNanos
LockSupport.parkUntil

如:

java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take -> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos -> java.util.concurrent.locks.LockSupport.parkNanos -> sun.misc.Unsafe.park

除了 sleep 以外,jstack 总会告诉我们 time_waiting 的位置,比如等待某个 Condition 的 await 操作。

TERMINATED

终结的线程,执行已经结束了。

中断退出也是一种结束。

几种线程状态的对比

  1. blocked:线程想要获取锁进入临界区之前,会求锁,求不到锁会进入 wait_set,然后放弃 cpu。高并发时 blocked 会增多。
  2. 工作线程池开始伸缩,扩容的时候:jvm.thread.waiting.count 的数量会变少,这意味着等待从 blockingQueue 里面 take 任务而阻塞的工作线程在变少。
  3. 工作吞吐变多,而调用下游的工作线程在阻塞的时候,jvm.thread.time_waiting.count 会变多,因为 rpc 框架自带超时,而这些超时是会让工作线程进行计时等待的。
  4. 流量变大的时候,2 和 3 可能同时发生。

特别的切换方法

LockSupport.park

condition 的 await 底层调用的是 LockSupport.park。这个方法的参数是一个用作 monitor 的对象,会被设置到 Object 的特定 Offset 上。

park 只能带来 waiting。所以 sync 和 conditionObject 其实都让 thread waiting ,只不过代表 thread 的 node 处在的队列不一样而已。

wait

这个方法是对 object 用的。
从 wait 中醒来会有伪唤醒的 case,所以醒来的时候一定要先检查唤醒条件是否已经得到满足。原理见《为什么条件锁会产生虚假唤醒现象(spurious wakeup)?》

join

JMM

Volatile

volatile的可见性影响.png

JUC

juc-class-diagram.png

总体设计原则

  • Doug Lea 特别热衷于使用顺序状态来表达初始、中间态和终态,往往使用 <= 中间态当作初始态,>= 中间态当作完成态(包括 normal 和 exceptional)。
  • 有一些变量内存不安全,强依赖于 happen-before relation 的巧妙实现,也需要参考Volatile
  • Doug Lea 不喜欢写大括号。
  • 对于所有的计时等待而言,0 意味着无限等待。
  • 链表的特性:
    • 在初始时,链表总是先初始化 head,cas 成功,然后用 head 赋值给 tail,使其最初相等,但读值的时候顺序是反过来的-利用了 volatile 的内存屏障的特性。
    • 只要有一个 cas 操作成功,包裹住剩下的 volatile 写都不需要再做 cas。所以通常对 AQS 自身的全局状态的 cas 是和链表自身状态的 link/unlink 操作是分开的。
    • 所有要被 cas 的链表元素/aqs state,在局部代码运行前,要被先用局部变量存储起来,写在 block 的最开始。这样做可以提高复用性,减少多次对 volatile 变量的求值,避免对缓存机制的扰乱,也保证了变量的线程封闭性。在全局也有一些 global state。
  • 有副作用的方法,副作用包括修改全局变量、park、unpark、修改中断位,返回值通常是主要操作的成败。有时候用 int 代表多种返回值。

Unsafe 的应用

JUC 强依赖于 Unsafe,它提供了硬件级别的 CAS 原子操作。在 X86 上,这个 CAS 操作依赖于 cmpxchg 指令,会锁定总线。所以仍然会产生一些硬件锁。

通常 Unsafe 的使用模式是compareAndSwapXXX,一个典型的函数是:

1
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);

  • obj是我们要操作的目标对象
  • offset表示了目标对象中,对应的属性的内存偏移量
  • expect是进行比较的原值
  • update是拟写入的新值

获取 field

1
2
Class<?> k = FutureTask.class;
Field stateField = k.getDeclaredField("state");

进而获取 field 的偏移:

1
public native long objectFieldOffset(Field field);

如:

1
2
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

然后就可以做类似的 cas 操作了:

1
2
3
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

很多 AtomicXXX 原子类,底层都依赖于 Unsafe 的 CAS offset、old value、new value操作。

increaseAndGet 与 getAndIncrease

所有的 increaseAndGet 都是由 getAndIncrease 支撑起来的:

1
2
3
4
5
6
7
8
9
// OpenJDK 8
public final int getAndAddInt(Objet obj, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
// 整个“比较+更新”操作封装在 compareAndSwapInt() 中,在 JNI 里是借助于一个 CPU 指令(cmpxchg)完成的,属于原子操作,可以保证多个线程都能够看到同一个变量的修改值。
} while (!compareAndSwapInt(obj, v, v + delta))
return v;
}

CAS 有三大问题:

  • ABA问题。CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。
    • JDK从1.5开始提供了AtomicStampedReference类来解决ABA问题,具体操作封装在compareAndSet()中。compareAndSet()首先检查当前引用和当前标志与预期引用和预期标志是否相等,如果都相等,则以原子方式将引用值和标志的值设置为给定的更新值。
  • 循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。
  • 只能保证一个共享变量的原子操作。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
    • Java从1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。

自旋锁

阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。

而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。

自旋锁与非自旋锁

自旋锁本身是有缺点的,它不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是10次,可以使用-XX:PreBlockSpin来更改)没有成功获得锁,就应当挂起线程。

自旋锁在JDK1.4.2中引入,使用-XX:+UseSpinning来开启。JDK
6中变为默认开启,并且引入了自适应的自旋锁(适应性自旋锁)。

自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。

在自旋锁中另有三种常见的锁形式:TicketLock、CLHlock和MCSlock。这是很多 Lock Free 数据结构的基础。但 CLH 改成双端队列和引入 Park 以后,也可以实现 blocking lock。

四种锁升级的思路

具体的解释先统一存储在《线程安全与锁优化》

函数式接口

区别 Runnable 和 Callable

Runnable 本身是不抛出异常的,但 Callable 本身耗时比较长,而且还会抛出异常(这个设计会最终导致我们进行函数式编程的时候,有时候我们需要在 Runnable 内部处理异常,有时候我们要在 Callable 外处理异常):

1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

synchronized 的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws InterruptedException {
Object sync1 = new Object();
Thread t1 = new Thread(() -> {
// 制造一个内外部死锁,让 t1 内部锁死在这里
synchronized (sync1) {
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("I am going to die,current state:" + Thread.currentThread().getState() );
}
});
t1.setName("TTTTTT1");
synchronized (sync1) {
t1.start();
Thread.sleep(2000L);
System.out.println("t1 state1:" + t1.getState());
// waiting for jmx client to attach on it
Thread.sleep(30000L);
}
System.out.println("unsyncrhonized");
System.out.println("t1 state2:" + t1.getState());
Thread.sleep(500000L);
}

jvisualvm观察到monitored态
jmc得到blocked态

JVisualVM 会专门把 block 当做 monitorered 态。

LockSupport

这个类型是为了提供阻塞元语,这样可以创造以锁为代表的 synchronization classes。

使用一个 park,如果有 permit,则立刻返回,否则阻塞;使用一个 unpark 会让 permit available。permit 最多有一个,这点和 semaphore 不一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws InterruptedException {
Object sync1 = new Object();
Thread t1 = new Thread(() -> {
// 在这一步以后,线程进入 waiting 的 state
// park 和 unpark 只能内外部调用,不能在一个线程内对称调用,而且 park 的参数是 sync,unpark 的参数是线程
LockSupport.park(sync1);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// RUNNABLE
System.out.println("I am going to die,current state:" + Thread.currentThread().getState() );
});
t1.setName("TTTTTT1");
t1.start();
Thread.sleep(2000L);
// t1 state1:WAITING
System.out.println("t1 state1:" + t1.getState());
// waiting for jmx client to attach on it
Thread.sleep(30000L);
// 这个unpark并不会立刻让 t1 进入runnable
LockSupport.unpark(t1);
System.out.println("unpark");
// t1 state1:WAITING/TIMED_WAITING/RUNNABLE/TERMINATED
System.out.println("t1 state2:" + t1.getState());
Thread.sleep(500000L);
}

JVisualVM的驻留状态
JMC看waiting状态

注意,只有 jvisualvm会有“驻留”这个状态,jmc和jconsole都是直接进入标准的 waiting 状态。目前使用基于aqs 的lock 和 await 语义都会导致waiting,这会和 Object waiting 产生混淆。JVisualVM 似乎能够把 object waiting 和 park 驻留导致的 waiting 专门区别开来。

内存一致性效应(Memory consistency effects)

哪一种 action happen-before 哪一种action?

通常是 action prior to some release opertion happen-before action following acquire operation。

比如 Semaphore 的 memory consistency effect:Actions in a thread prior to calling a “release” method such as release() happen-before actions following a successful “acquire” method such as acquire() in another thread.

主流锁

Java的主流锁

悲观锁与乐观锁

乐观锁与悲观锁

  • 悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
  • 乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。

    AQS(AbstractQueuedSynchronizer)

JUC 有个 locks 包,所有的锁和基于锁的并发基础设施都在这个包里隐藏。这些数据结构被称为同步器,而同步器本身是为了并发安全而存在的(相应地应该也存在原子化的解决方案、隔离的解决方案,我们改天再探讨)。AQS 是为了实现同步器而设计的框架,作为 basis of a synchronizer(同步器的依据),它提供了 queuing and blocking mechanic。

AQS虽然被定义为抽象类,但事实上它并不包含任何抽象方法。这是因为AQS是被设计来支持多种用途的,如果定义抽象方法,则子类在继承时必须要覆写所有的抽象方法,这显然是不合理的。所以AQS将一些需要子类覆写的方法都设计成protect方法,将其默认实现为抛出UnsupportedOperationException异常。如果子类使用到这些方法,但是没有覆写,则会抛出异常;如果子类没有使用到这些方法,则不需要做任何操作。

全部使用 protected 方法也是抽象类的设计方法之一。

AQS 并不是最初的基类,它能够被 Thread Own 这个特性,来自于 AbstractOwnableSynchronizer。它对 Own 的表达方式在于保存一个线程句柄-exclusiveOwnerThread。和锁(Monitor)的markword里保留一个对象头的设计思路是很相似的。

AQS 提供两种模式:

  • 独占 exclusive(这是缺省模式):当以独占模式获取时,尝试通过其他线程获取不能成功。When acquired in exclusive mode, attempted acquires by other threads cannot succeed.
  • 共享 share:共享模式通常会成功,但实际上不一定。Shared mode acquires by multiple threads may (but need not) succeed.这和其他提供共享锁机制的软件实现(如 MySQL)还是不一样的。当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取-也就是说阻塞是决定后的结果when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well.

重要在不同模式下等待的线程共享相同的FIFO队列(互斥和共享模式都需要addWaiter)。实现子类只支持这些模式之一,但是两者都可以在ReadWriteLock中发挥作用(所以同一个 FIFO 队列是实现读写锁混合的基石) 。仅支持独占或仅共享模式的子类不需要定义支持未使用模式的方法。

在四大 try 方法中,tryAcquire(),tryRelease(),tryReleaseShared() 都返回 boolean,只有 tryAcquireShared() 返回数字。

参考:

  1. 《一行一行源码分析清楚AbstractQueuedSynchronizer》
  2. 《Java多线程》
  3. 《从ReentrantLock的实现看AQS的原理及应用》
  4. 《不可不说的Java“锁”事》
  5. 《官方文档的中文翻译》

AQS 的设计原则

  • Node 代表线程。atomic int state 代表锁状态(初始值为0),也就是并发理论里经常提到的临界资源本身,FIFO queue (分为 sync queue 和 wait queue )是它的同步逻辑的基石(bias)。
  • 依赖状态、队列和 CAS 操作来完成对同步机制的实现。
    • 如果要涉及到出入队,操作的顺序大致上是:cas node wait status、sync queue 节点之间的enq 和 deque、管理 aqs 的状态(tryAcquir/tryRelease)。
    • AQS 自身的状态代表着锁的状态,这些状态都是 transient volatile 的,重点关注锁是被acquired 还是 released锁操作的实质就是对状态的维护。这其中又大量使用 CAS 操作,CAS操作是最轻量的并发处理动作。cas 会把操作分隔出两个部分:cas 以前做拥有权限的 if 比对,cas以后做其他成员-如在 AQS 里,主要是 exclusiveOwnerThread 的设值操作,这样就形成了原子性。
      • CAS操作保证了同一个时刻,只有一个线程能修改成功,从而保证了线程安全,CAS操作基本是由Unsafe工具类的compareAndSwapXXX来实现的;CAS采用的是乐观锁的思想,因此常常伴随着自旋,如果发现当前无法成功地执行CAS,则不断重试,直到成功为止,自旋的的表现形式通常是一个死循环for(;;)。if 条件写得非常复杂,不易于拆解,而且有些 if 的约束是隐藏在内部的 cas 和自旋里的。有非常多的带有 side-effect 的 action,最终返回一个 boolean,可以被连续地 && 在一个大括号里。大括号里把某个值设置为 true,意味着把最后一个 action 的返回值带出去。
        • 加锁主要比对 state
        • 解锁主要比对 owner thread
        • 通过监控 API 可以查看:是否持有锁、是否有 contend、waitQueue的内容、syncQueue的内容。
  • Node 组成的链表代表了所有与锁相关的线程,我们有 sync queue 和 wait queue。
    • 在 sync queue 里,头结点代表了拥有锁的线程,而链表的其他部分意味着阻塞队列(所以头指针可以是一个蓝色的node,而阻塞队列是一系列绿色的链表节点)
      • 。队列遵循 FIFO 的原则(入队顺序为加锁顺序,从尾部入队),因此能够实现公平锁。
        • 原始的acquire要求在队列里的 node,必须 predecessor 为 head 才能tryAcquire,这就实现了先进先出。
        • 公平锁意味着在tryAcquire实现的时候,要看看有没有 queued predecessor(表现为hasQueuedPredeccesor()(而不是predecessor())方法返回 false,即这个节点)才能够 cas state,进而获取锁)。
        • 能够实现非公平锁是能够实现公平锁的基础。非公平锁在 tryAcquireLock 的 casState 之前,没有多余的 hasQueuedPredeccesor() 检查,所以公平锁是非公平锁画蛇添足的结果,非公平锁在 acquire 前面还套了一个很强的短路操作,在入队以前就猛抢,即很多人常说的插队。
  • Node 的 waitStatus 代表着线程-锁的等待状态(而不是锁的获取状态) ,如线程是否取消争抢线程的锁。AQS 内部大量使用 compareAndSetWaitStatus 来变更每个 node 和它的 predecessor 的状态
    • 后节点是否在 sync queue 里 acquireQueued 里阻塞,要看前驱节点 ws。是否在 wait queue 里,要看本节点 ws。
    • 通常是由后节点来更新前节点的 ws,自身的 ws 也会在某些情况下被自己的线程更新:
      • 每一个 node 在 sync queue 和 wait queue 之间互换是要求在 0 和 CONDITION 之间互换的,在由本线程对本节点使用的。
      • 每一个 node 被取消(主要是 acquireQueued 里发现了 failed)的时候会变成 CANCELLED,不管是自身求锁的时候发现被取消,还是被后继节点取消,或者被 signal 的流程取消。
      • 每一个 node 会因为 next 的入队变成 SIGNAL。
  • Node 本身也有 next 和 prev,通常对 prev 的重新赋值可以直接用 =,但对 next 的赋值需要做 cas。有时候 set next 还需要考虑 next 自己的 ws。
  • 线程自身还有 state,如 new、runnable 等。
  • 所有的“同步属性”,都是某个类内部的非公开内部帮助类(如 ReentrantLock 内部的 abstract static class Sync)。
  • aqs 不实现任何 synchronization interface,其他同步器或者具体锁真正需要做的是使用acquireInterruptibly等方法。就锁的获取操作而言,子类必须重写tryAcquire方法。
  • 各种 Sync 的标准格式应该是Subclasses should be defined as non-public internal helper classes(非公共内部助手类) that are used to implement the synchronization properties of their enclosing class(封闭类)
  • 序列化模式:此类的序列化仅存储底层原子整数维持状态,因此反序列化对象具有空线程队列(实现方法是,让链表的各种节点都被设置成transient 的)。 需要可序列化的典型子类将定义一个readObject方法,可以将其恢复为readObject时的已知初始状态。所有的状态都是 atomic 的,这些状态可以被序列化(有一些特定的成员变量被刻意设计成 transient的,如ownerThread),但 queue 通常不能直接被序列化,需要序列化器实现readObject方法才行。

简要记忆

  1. 互斥:改双入尾,放双出头
  2. 共享:初始设态,非0入尾,扣减出队,解放总是全解放。

AQS的五层结构

AQS的五层结构

上图中有颜色的为Method,无颜色的为Attribution。

总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。

当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

CLH queue

CLH 是一种 lock queue,normally used for spinlocks,but here used for blocking synchronizers。

这个 ADT 的 basic tatic 是holding some of the control information about a thread in the predecessor of its node。即本节点是否需要被 SIGNAL(parked - signaled- unparked),是由前一个节点决定的。

A “status” field in each node keeps track of whether a thread should block.

一个 node 是不是需要 block 需要由 status field 决定。实际上一个 field 的 status 为 SIGNAL 必然导致它的 successor parked(形成 blocked)。

A node is signalled when its predecessor releases.

release 里会附带一个 unparked successor 操作,而第一个 acquire 的入队会让出队自动进入一个 for-loop,不断 tryAcquire。

The status field does NOT control whether threads are granted locks etc though.

线程求锁就是它成为队头,队头的 thread 本身为 null。真正 hold thread 的地方只剩下 exclusiveOwnerThread。

入队是作尾,出队是作头。被唤醒不一定得到锁-如果不是公平锁的话。

The “prev” links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor.

prev 让这个链表变成双向的,主要是为了让 cancelled 的node 的 next 找到新的 predecessor。

next 的用意是为了让 a predecessor signals the next node to wake up by traversing next link to determine which thread it is. next 是可能有争议的,CLH 的算法的用意是在一个节点的 successor 看起来是 null 的时候,对 tail 进行回溯检查-见 unparkSuccessor 里面寻找 null 的方法,这是基于 tail 是 atomically updated 的假定。

CLH 的 head 最初就是 dummy 的,node 被设置为 head 的时候也会变成 dummy 的。

Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on.

condition 有一个单独的 condition queue,和 main queue 使用同一批节点,但使用 additional link。

CLH变体队列

互斥框架的标准伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 所以 tryAcquire 可以用非阻塞实现阻塞,tryAcquire 是一切 aqs 操作的灵魂
// 1. 试获取(改 state 和ownerThread)2. 入队
Acquire:
// 自旋带来阻塞,没有 sleep,这里就没有引入 clh 的 block
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;// 可以 block 也可以不 block,要看入队以后第二次 tryAcquire 的结果,以及 predecessor 的 waitStatus
}

Release:
// 1.试释放(改 state 和ownerThread)2. 尝试唤醒 successor,不需要出队,因为作为头部就已经算是出队了
if (tryRelease(arg))
// 尝试唤醒,这里的伪代码实际上漏掉了实现中存在的 unpark
unblock the first queued thread;

// 锁的调用模式:
somelock.lock()

// 有的 lock 会有 sync.lock() 的设计(如 ReentrantLock 的 UnfairSync),通过这一层再 delegate 到 acquire,lock 里面会有一个短路的 tryAcquire 操作。这足以证明 tryAcquire 不一定和 CLH 机制有关
sync.acquire(1)

Node 的实现

Node的结构

构造器,一个是需要 waitStatus,一个需要 nextWaiter,但都需要 Thread。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

/**
* Marker to indicate a node is waiting in shared mode
*/
static final Node SHARED = new Node();

/**
* Marker to indicate a node is waiting in exclusive mode
*/
static final Node EXCLUSIVE = null;

Node() { // Used to establish initial head or SHARED marker
}
// SHARED 和 EXCLUSIVE 其实是用来指向 nextWaiter 的,这里隐含一个假设,非互斥获锁的前提下不需要使用条件变量,缺省情况下 EXCLUSIVE 才是 null
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

加锁

求锁的过程里,得到锁的线程会被记录在 aqs 的成员里,而得不到锁的线程会把 aqs 作为 blocker 记在自己的对象里。

CLH 主框架
1
2
3
4
5
6
7
8
9
/*
* 走完第一步走第二步,第一步求锁成功改状态和线程-这是被包裹在一个原子操作里的,如果不成功走到第二步,让 acquire 这个操作入队,用 node 来管理线程。
* 这一步就实现了通过 cas 把前条件和后操作分隔开来,acquireQueued 本身是阻塞的,出来才会需要做 selfInterrupt 的重置
* Node.EXCLUSIVE 是一个 null,而 Share 是一个平凡的 empty Node 单例,所以先实现出来的 condition queue(而不是 CLH queue)是 exclusive 的
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
// 这个 selfInterrupt 是唤醒以后使用的。入队是在 addWaiter 做的,自旋-唤醒-检查能否 tryAcquire 出队 - 继续 park 是在 acquireQueued 里做的
selfInterrupt();
}

acquire 最复杂,aqs 自己实现了,最好不要覆写,所以一般设计 sync 都是从外部和 tryAcquire 入手(重点:tryAcquire 是非 CLH queue 的部分,只是对 state 和 ownerThread 的原子操作,易于覆写。而 CLH 队列的出队入队是 AQS 自身的标准套路,不需要覆写,覆写会造成破坏)。在 AQS 里,tryAcquire 默认的实现是空。

阻塞主要靠的是 tryAcquire(1) 以后的入队操作。

tryAcquire

tryAcquire 通常是交给具体的lock实现的:cas 本身的 state,修改 ownerThread 是在这一层做的。甲骨文推荐的一个特别简单的实现是:

1
2
3
4
5
6
7
8
9
10
11
12
protected boolean tryAcquire(int arg) {
assert 1 == arg;

/*
* 这两步无锁放在一起,也算原子了,因为只有一个线程的 cas能走进来,不考虑线程中断的问题
*/
if (compareAndSetState(0, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

这个实现过于简单,以至于忽略了一个简单的问题,那就是,为什么这里没有累加呢?因为累加就要加上一个判定 volatile currentOwnerThread 是否等于 currentThread 的问题。所以问题又变成为什么没有对 currentOwnerThread 的检查呢?所以 ReentrantLock 的实现里是会针对互斥 owner 提出判断分支的。

addWaiter

这是一个 aqs 自身的成员方法,会生成一个节点,加入到自身的队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Node addWaiter(Node mode) {
// 每一个node 天然出现的时候,自己的 next 都是 mode 参数本身,tail 的next 是 mode
Node node = new Node(Thread.currentThread(), mode);
// 最快入队的方法是在把新节点的 pred 设置成队尾,所以 FIFO 的 I 指的是对 tail 的追加
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果存在队尾-此时这不是一个空链表,否则必定走入一个 enq
if (pred != null) {
// 先设置 node 的 prev,在并发入队时,会有多个 node 的 prev 设置为 pred
node.prev = pred;
// 这一步的实现,是把当前 aqs 的 tail 节点设置为新的 node,之前的 tail 的 prev 本身是不变的
if (compareAndSetTail(pred, node)) {
// 然后把老 tail 的 next 设置为本 node,只有成功通过 cas 的 node 能够得到 pred 的next,其他节点还维持错误的 prev
pred.next = node;
// 然后就可以从快路线返回了,此处返回的是 node 本身
return node;
}
}
// 进入这里的 node 的 prev 还是错的,在这一步返回的 node 是 alreadyLinked 了的,注意,这个方法返回的不是node本身,而是它的前驱
enq(node);
// 然后就算入队成功,可以返回了
return node;
}

在外面虽然有一个 enqued 的操作,但实质上对 tail 的修改-也就是入队,是被收敛到 enq(node) 这个方法调用里的,这一个 addWaiter 方法里完成了全部的入队操作。

enq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 无限循环,直到能够返回一个 t 为止,enq 的存在是为了实现一个多线程 for 循环
for (;;) {
// 因为 tail 为空 head 必为空,所以此处用局部变量来节省性能,使用链表的基本法则之一是,在一轮操作里会被修改的变量,要被提前存起来
Node t = tail;
// 在 for 循环里,只会进入这一步一次
if (t == null) { // Must initialize
// 设计一个空的队头,然后把本 aqs 的 head 设置为 tail。注意,这个链表的第二个节点可能是抢不到锁的,但执行 enq 的时候,实际上前一个节点已经把锁释放了,所以此处可能遇到一个无头队列,如果无头,则初始化 tail 和 head
if (compareAndSetHead(new Node()))
// 这里隐藏一个设定,就是 head 和 tail 只要有一个为空,就全部为空。上一个 cas已经卡住其他操作,这里的操作就不用 cas,直接 set 即可
tail = head;
} else {
// 这一段代码就是外部代码的一部分,只不过移到一个 for loop 里来,这一段是无 sleep 和无 park 的自旋 + 一个初始化链表的操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
// 如果 cas 失败,则下一轮的 node.prev 会被覆写
}
}
}
acquireQueued

这个方法提供了一个内部的自旋和 park 和从 unpark 中醒来的全部流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 我们可以先姑且认为,这个方法是一个极度特殊版本的 acquire。
* 是在互斥且不可中断模式下(与之相对地是 doAcquireShared,本方法也可以被叫做 doAcquireExclusive)
* share 和 interruptibly 是两个维度
* 每一个节点进入queue的顺序是这样的:
* 1. 先看看自己是不是 head 后的第一个node(此处不分公平锁还是不公平锁)或者尝试获取锁(此处可能是为了预防并发,这意味着 pred 可能是被污染的),如果是的话,带着 interrupted 返回
* 2. 检查 pred 的状态 ws:
* 2.1 ws 此时是 SIGNAL,意味着上次已经设值过了,此节点需要做的就是 park,等待进入下一循环,如果下次循环还是求不到锁,前节点的 SIGNAL不变,总是能够不断地被 park,unpark。在此直接返回
* 2.2 如果 ws 此时是 CANCELLED,寻找一个新的 pred(做 pred 收窄),但不去碰新 pred 的状态
* 2.3 尝试把它设置为 SIGNAL
* 2.4 2.2 和 2.3 会直接导致 false 退出,然后进入下一轮的求锁循环,下一轮循环开始时,pred 的节点终究会是 SIGNAL,然后就可以 park 了
* 所以 CLH queue 本质上是1 入队,2 获锁,3 检查 pred,park等唤醒 唤醒检查 interrputed 或者设置 pred 但不park,4 再进入 for 循环尝试2获锁(还没有 release),一旦获取锁就带着 interrupted 离开自旋的一种结构
* 所有的 bloking queue 的轮询等待,都在这里通过 for loop park 来实现
* @param node the node
* @param arg the acquire argument 这个参数主要是递归调用 tryAcquire 时用的
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// 一开始会预设失败
boolean failed = true;
try {
// 一开始会预设未被中断
boolean interrupted = false;
// 阻塞由两部分组成,一部分是这里的 for 循环,一部分是下面的检查 park
// 每次进入第一部分都先尝试获锁
for (;;) {
// node 在循环里是不会变的,每个循环里都只取 node 的前置
// 注意,阻塞队列不包含 head 节点,head 一般指的是占有锁的线程,head 后面的才称为阻塞队列。这个方法在需要 npe 检查的时候会代替成员指针拿出来用
final Node p = node.predecessor();
// 如果它的前驱为 head-就是获取锁的当前线程(前驱为 head,意味着本 node 前面已经没有节点了,也就意味着这是出队方法),则尝试获取锁-acquireQueued 外部是由 tryAcquire 开头的,自己也是由 tryAcquire 结尾
// 历史上第一个 head 其实是一个空 node(根据懒加载原理),如果本 node 后入队发现自己是阻塞队列的第一个节点,则还要再尝试 tryAcquire 一下
if (p == head && tryAcquire(arg)) {
// 如果获锁成功,则把本节点设置为头,头就是这样被替代的,头就意味着出队的线程。
setHead(node);
// 此时本 node(也就是 head)不一定是 tail,因为在这个 for 循环里,可能有其它线程 link 过本 node 了
// 但 p 本身是“前一个 head”,新老交替的结果就是把 p 和 node 做一个 unlink
// 这两步把 head 做了一个出队
, p.next = null; // help GC
failed = false;
// interrupted 为true 或者 false 都可能退出这个队列,这一步是一定会跑到的,而 interrupted 可能被上一轮循环的尾部给修改了
return interrupted;
}
// 如果在上面一步没有返回,则没有产生收窄出队的效应。在这一步就会产生判定是不是要 park 以及实际 park 操作了
// 第一个条件检查状态机,看看是不是 shouldPark
if (shouldParkAfterFailedAcquire(p, node)
// 第二个条件用 park 来实现阻塞,这个 park 就是 waiting queue 的实质了
&& parkAndCheckInterrupt())
// 理论上被中断唤醒以后会进入这里,重新设计中断位(因为底层清了,这里有些多此一举),否则也可能 interrupted 为 false 退出这里
// 这一步是可能被跑到,但大部分时候跑不到的
interrupted = true;
}
} finally {
if (failed)
// 注意,这里是在 for循环之外的一个兜底措施,上面的 for循环本身只有求到锁以后的 return 退出方法,这里是为了预防异常,在求锁过程中遇到异常,要直接清掉这个节点,这意味着这个 node 在 acquire 操作下无法进入正常的状态
cancelAcquire(node);
}
}

这个方法的复杂之处在于,理解 pred/next 的复杂性,和 waitStatus 对 AQS 工作流程的间接影响。

acquireQueued的实现

setHead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
// 在这里只从 node 视角进行操作,node.prev.next 的 unlink 操作留给外部的 node.prev 自己做
// 获取锁以后,算是出了 waiting-set 了,本 node 只是给 aqs 管理队列用,所以解除了对 thread 的引用,防止 thread 不能被回收
// 这个 node hold thread 和 status 都不算让 thread hold lock,node as head 算是让 thread 加了锁
node.thread = null;
// head 不应该有 prev 的,因为这不是循环链表
node.prev = null;
}

这个方法的关注点是:node 是dummy 的,要清空自身状态。

shouldParkAfterFailedAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 这个状态机很有意思,这意味着我们的每一个 node 的实际状态是应该由前一个node(即 pred)来决定的
int ws = pred.waitStatus;
// 前一个节点的 SIGNAL 状态,意味着后一个线程的 unpark。
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 被 signal 以后反而会触发 park。这里的 signal 指的是 asking a release to signal it,通过 park 来等 unpark 来进入下一个循环的入口
// 通常我们要进入队列就是要 park 的。
return true;
// 这里的大于零此时专指 CANCELLED,以后 cancelled 类的状态都必须大于零(反过来也一样)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 链表自动做一个小的收窄,所有的 cancelled 线程要排出本链表,这里是自动把本 node 的prev跳了一下,为什么不会有并发问题安全问题还是不太容易看明白
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 缩进玩
pred.next = node;
} else {
/*
* 这里引入了一个对状态机的隐式推导,可读性不太好。作者断言,此处要么是 0,要么是 PROPAGATE。CONDITION 也就是 -2 不会进入这个方法,因为 CONDITION 在另一个队伍里
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 此时我们需要一个 signal,但还不需要 park
*/
// 否则尝试将 pred 的status 置为 SIGNAL,这样下一轮循环的时候,就可以进入 park,然后等 unpark了
// 也就意味着,这个 CLH 里的队列的每个 node 都天然需要自己的前驱是 SIGNAL 才正常。最初入队的 thread addWaiter 的时候会初始化一个 dummy head,然后进入这里,把这个 head 设置为 Node.SIGNAL。然后下一轮循环进到这里来,会从上面的 return true 那里出去。等于每一个节点是由它的后继节点的 acquiredQueued() 的第一次 for loop 设置为 SIGNAL 的。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 本次检查先不park,不 park 则外部可能就直接进入下一轮循环,尝试抢锁,失败再看要不要 park
return false;
}

这个配置告诉我们几个已知事实:每个节点的 waitStatus 状态,在主流程里是由后面的排队的 next 的入队来触发变化的。

shouldParkAfterFailedAcquire

parkAndCheckInterrupt
1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 入队和每次 for 循环被唤醒抢不到锁,然后又需要 park,就会进入本方法 park 一次
LockSupport.park(this);
return Thread.interrupted();
}

这里需要关注的是this 作为 blocker 参数。

cancelAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Cancels an ongoing attempt to acquire.
* 1. 设置本节点内部状态:status 和 thread
* 2. 把本节点的前后节点做好 unsplice,意味着对出队做好准备
* 3. 把本节点从 aqs 的视图里去掉,这里使用了一个二分法:区分是队尾和不是队尾
* 4. 在这个方法里面我们有一个很重要的认知迭代:signal、cancel 和各式各样的 acquire 是存在 race condition 的,对于 ws、queue的头和尾都是需要 cas的。
* 5. 尽量先找 pred,动力从 pred 来
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 清空本 node
node.thread = null;

// Skip cancelled predecessors。因为 enq 效应存在,node.prev 不会为 null
Node pred = node.prev;
// 在这里也在做一个 prev 的覆盖,只管本废弃节点的 prev 即可,这里可能会存在一个并发问题,如果 pred 的 prev 本身同时也在修改,则 node.prev 会跟着这个 prev 修改变动。这种“越过”操作的本质是废弃不当的 prev,至于 pred.prev 是不是正常,那另当别论
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// 在这里只做一个逻辑二分,只解决是不是队尾的问题,不区分队中和队头,如果是队尾则不用管 node next
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// 尾节点不需要 next,直接清空,因为中间节点实际上都是 cancelled 的节点
compareAndSetNext(pred, predNext, null);
} else {
// 如果要处理 next,也有两种思路:把 pred 的next link 和 本 node 的 next 连起来,或者直接unpark node 的 next,总之本 node 的 next 得到了很好的处理
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;

// pred 的节点自身是 SIGNAL 和 cas 成 SIGNAL 是等效的,这是 Doug Lea 的习惯
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// prev 是不怕有并发安全问题的,但 next 一定要使用 compareAndSetNext,也就是说 prev 寻址出错不要紧,但 next 寻址出错要紧,这是容易被忽略的
compareAndSetNext(pred, predNext, next);
} else {
// 不属于上面的情况,即 pread 是头或者 pred 的状态不为 signal(即使在 cas 设置过后)或者 pred 的线程为空,则对 succesor 进行 unpark 操作,所以 succesor unpark 不是常态
unparkSuccessor(node);
}
// 清除完 prev,最后再清除 next,即本方法主要是对 next 负责,不对 pred 负责
node.next = node; // help GC
}
}

队尾和不是队尾的流程太复杂了,要仔细看。

cancelledAcquire-当前节点是尾节点
cancellAcquire-当前节点是head的后继节点
cancellAcquire-当前节点是中间节点

公平锁与非公平锁

非公平锁流程1
非公平锁流程2

ReentrantLock 的 sync 的 lock 方法是抽象的,对公平锁和非公平锁来讲都是一个壳方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 抽象实现
public void lock() {
sync.lock();
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0) {
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有前置的排队节点,这是 cas 前条件。这也是公平的
// 这个嵌套条件判断没有和外面的条件判断放在一起,算是一个可读性更好的表达。在这里我们要注意,很多场合会同时出现 pred 和 prev,把这两个词分开就不会出现 prev.prev 的读写问题了
if (!hasQueuedPredecessors() &&
// 如果没有线程在等待,那就用CAS尝试一下(注意这里有一个并发处理问题),成功了就获取到锁了
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
// 因为刚刚还没人的,我判断过了
compareAndSetState(0, acquires)) {
// 这是 cas 后操作
setExclusiveOwnerThread(current);
return true;
}
}
// 会进入这个else if分支,说明是重入了,需要操作:state=state+1
// 这里不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 这里告诉我们,子类也可能传入错误的 acquire,这里要做防御性编程。这里允许等于0,这就要求 release 的时候要注意 release 操作的对称性
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果到这里,说明前面的 if 和 else if 都没有返回 true,说明没有获取到锁,直接返回 false
return false;
}

注意 FairSync 和 NoneFairSync 的区别一部分在 tryAcquire,acquire 的标准实现在 AQS 里是不需要实现的。

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 这一段代码是 ReentrantLock 里的,解锁也是一个抽象方法,跨公平锁和非公平锁
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not hold this lock
*/
public void unlock() {
sync.release(1);
}

解锁也依赖于壳方法。

release

release 也分双重,需要复写的只有 tryRelease,管理状态用这个方法不管公平不公平,统一 tryRelease + unparkSuccessor head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 这里的 tryRelease 的返回结果是是否完全释放的意思
if (tryRelease(arg)) {
Node h = head;
// 这里的 != 0 就是 SIGNAl 的意思
// 只要/只有完全释放了 state 才唤醒 h 的继任者
// ws > 0,h 取消了;ws < 0,要么是 PROPAGATE,要么是 SIGNAL,从语义上来讲后继节点就是一个阻塞态。换言之,如果head是初始节点,则不需要 unpark 后继
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease

tryRelease 的返回值是值得关注的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// free 的意思是完全释放锁,不管 true/false,state 总是会被扣减掉的
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

只有完全释放锁,才算 free,进入只执行一次的 post-free 流程。

unparkSuccessor

这个方法在 release 或者某个 node cancelled 的时候会被调用这个方法的难度在于,如何决定 successor。那些 timeout 的就不要了有时候 unparkSuccessor 的对象是 head,有时候是任意节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
*
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 把本 node 设置为非状态机的初始态
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部开始遍历,直到要 unpark 的节点是尾部的第一个 waitStatus 为负数的 node
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到即唤醒它
if (s != null)
// node 里持有 thread 的用意就是让外围的 lockSupport 来引用和 unpark
LockSupport.unpark(s.thread);
}

独占与共享

独占模式与共享模式

条件

从 Doug Lea 的原始设计思路来讲,一个 Condition 是要引出一组多线程共用的 wait-set。它和锁的抽象实现了arbitrary lock 和 condition implementaion 的正交组合。

Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods. 在原始的 Java 锁机制里面,synchronized 被叫作 synchronized methods,而 condition 被叫作 monitor methods。

条件有很多种叫法,可以叫作 condition queue,也可以叫作 condition variable。一个线程的使用语序总是先 wait,然后再由外部 notify,又内外协同来实现对执行上下文的切换。

这种状态必须被锁保护,否则就会触发 IllegateMonitorStateException。这会导致 ConditionObject 的实现总是一个 lock implementation 的 inner class,相互持有引用。A Condition instance is intrinsically bound to a lock。

有一个 key property 就是,Condition 会 atomically releases the associated lock and suspends the current thread,也就是说 Thread 会从一种 waiting 切换到另一种 waiting。但是锁没了。恰如 Java 语言规范力图描述的那样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 这个程序的意义告诉我们:如果两个条件变量是互斥的-即不同时为真,则可以实现环形的 wait 和 notify(即相互阻塞),而不会产生死锁
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

理论上来讲,condition 能够提供某些原生 notify 不支持的行为,如:

  1. guaranteed ordering for notifications
  2. not requiring a lock to be held when performing notifications

往下我们会看到,这些可选的 notification feature 的实现,严重依赖于类似 Reentrantlock 里标准的 isHeldExclusive 的实现。

Condition 对象原生的 monitor 相关方法仍然是可以被使用的,只不过最好不要拿来用,因为它会搞混你对条件队列的理解。

因为 spurious wakeup 的存在,所以 condition 的 waiting 一定要在一个 loop 里面执行。当然,一个狭义的实现也可以不允许 spurious wakeup 的存在,这就会要求不遵循 platform 的默认模式。

条件等待有三种形式:

  1. interruptible waiting 会抛出中断异常
  2. non-interruptible waiting 不会抛出中断异常
  3. timed waiting

不管采用哪种形式,从 await 中返回过来,这个线程要 guaranteed to hold this lock。

另外,在 Java 的设计里面,凡是抛出 InterruptedException,必定清空 interrupted state。

条件对象

条件对象仍然管理 Node,而且也是用头和尾的形式,不过这次的链表是个单链表。一般的 Lock 是用 Sync 来管理 lock,sync 作为 AQS 自己维护 node,用 ConditionObject 来管理condition,ConditionObject 自己管理 Node。照理来讲一个 lock 可以产生多个 condition object 线程可以通过 await 操作,进入多个队列。

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
}

条件队列

我们可以看到,Node 仍然是AQS 的 node,但它们的 ws 是 Node.CONDITION。

总结

大部分的换状态,放锁,部分的中断回再换状态回 sync queue,重新获锁,检查中断的流程都是在 await 中完成的;而 signal 中完成的只是主动的 transferForSignal,完成主动的 换状态回 sync queue 操作,和部分的唤醒。

await
  1. 检查是否需要抛出中断异常。
  2. 在调用 await 相关方法的时候,线程先进入 wait queue。此时线程也不在 sync queue里了,因为获取锁才可以 await,获取锁的时候它已经是 sync queue 的 head 了。
  3. 调用 fullyRelease 方法,释放锁并把锁释放前的状态取出来。
  4. 在 wait queue 的自旋里检查 isOnSyncQueue。如果没有在同步队列里,自己 park,park 的 blocker 是 ConditionObject。每次从 park 中唤醒有两种可能:被 signal 内置的 unpark 唤醒,或者被中断唤醒。线程检查自己是不是被中断唤醒的,如果是被中断唤醒的,则自己 transfer 到 sync queue 里,设置 interrupt mode 然后退出;否则,它是被 signal 唤醒的,此时已经在 sync queue里了。自旋到下一阶段也会退出。
  5. 进入 wait queue 会把 ws 置为 condition,而进入 sync queue 则置为 0(等待 next 置为 SIGNAL),然后做好前后节点的 link。
  6. 调用 acquireQueued,引出在 sync queue 里的 park (park 的 blocker 是 AQS 自己)和自旋。
  7. 从 acquireQueued 返回真则本线程又是从中断中返回的,要确认上一步的 interrupt mode 是不是指示我们抛出异常,如果是则 interrupt mode 不变,否则设置 interrupt mode 为重新设置中断位。
  8. 看看本 node 的 next waiter 是不是 null,是的话 unlinkCancelledWaiters。
  9. 根据 interrupt mode 来让线程选择一种方式来设置自己的中断位。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock1 = new ReentrantLock();
Condition condition = lock1.newCondition();

Thread t1 = new Thread(()-> {
lock1.lock();
try {
condition.await();
System.out.println("i am awaken");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock1.unlock();
}
});
t1.setName("TTTTT1");
t1.start();
t1.interrupt();
}

这个方法值得一看的地方是,如果一个方法是长时间的阻塞的,它是不是对外声明自己要响应中断,如果要响应中断,则中断是无时无刻在发生的,它要怎么在多个地方响应中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void await()
throws InterruptedException {
// 如果开始 await 之前线程已经被中断了,则直接显式地抛出异常。这里有一个 juc 的习惯,只要抛出异常必定清空中断位,反之亦然
if (Thread.interrupted())
throw new InterruptedException();
// 先入 wait queue 队伍
Node node = addConditionWaiter();
// 然后彻底释放锁,注意,此时其实线程也不在 sync queue里了,代表它的是头节点,所以这里只要清理 aqs 的state就行了,为了预防万一,我们先留存 savedState
// 这个设计是先冗余入队,然后再释放锁。这个方法表面上看起来没有做 isHeldExclusive 的检查,实际上在 tryRelease 的内部就会有,因为 tryRelease 是可选实现的,甚至 isHeldExclusive 也是可选实现的,所以条件在锁内被持有的设计规范是可以被破坏掉的
int savedState = fullyRelease(node);
int interruptMode = 0;
// 这里的 syncQueue 就是 CLH queue 里面非头的部分了,在这里就是自己 park,然后自旋
while (!isOnSyncQueue(node)) {
// 注意,这里的 park 使用的 blocker 是 ConditionObject
LockSupport.park(this);
// 一旦从 unpark 中醒来,要做几件事,首先检查中断,如果中断,则由 await 来做 transfer queue的操作,走 park 内部的 transfer 流程(把 node 的节点置回0,然后 enq node,算是完成了 queue 之间的 transfer),如果走完就会从这里面 break,但如果不发生中断,而产生了 signal,signal 内部也会完成 enq,让 isOnSyncQueue 检测到自动退出而不是 break 退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
// 如果因中断退出,而不是 signal 退出,直接 break,不再校验是否在 SyncQueue
break;
}
// enq 完就进入 CLH 的 park了,savedState 保证归还的锁的累积数量仍然不变。当然,此处也不一定会触发 park,如果此时锁被完全 release 的话,可以直接求锁成功
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 这里要理解一个两次中断的事实:acquireQueued 中返回true,也意味着发生了中断,但这种中断必然发生在 signal 之后,如果前面发生过一次 wait 的中断,则此处就不重设中断位,否则此处要设置模式
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 抛出异常或者自置中断位,也就是说,如果发生过基于 park 的中断,内部没有做过中断重设,此处需要做,这和 acquire 内部还要调用一次 selfInterrupt 是异曲同工的。park 和 interrupt 的关系就是这么复杂
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter

这个方法的主要逻辑就是让本线程产生一个代表节点,然后入队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addConditionWaiter() {
Node t = lastWaiter;
// 在需要检查状态时,链表的 cancelled 节点总要被清掉
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullyRelease

这个方法依赖于 release 的实现,也会返回 savedState。但如果 release 失败太复杂了,作者在这里选择了直接抛出异常,令人意外:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
unlinkCancelledWaiters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
isOnSyncQueue

检查这个 node 是否已经在 sync queue 里了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
isOnSyncQueue

如果中断,则做 queue transfer,如果不中断则返回 0,这里使用了内部类能够引用外部方法的闭包性质,可以用 enq 方法自动找到尾部。

这个方法的动词使用了现在进行时:

1
2
3
4
5
6
private int checkInterruptWhileWaiting(Node node) {
// 不发生中断返回 0 最好,如果发生中断则要自己做 enq,而且根据 transfer 的结果告知外部是该抛出异常,还是自己重新设置中断位
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait

这个方法是 await 内部的 transfer 方法之一(另一个在 signal 里):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 这个方法是给中断 CancelledWait 用的
final boolean transferAfterCancelledWait(Node node) {
// 如果改变 ws成功,则应该抛出中断异常
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* 上面失败就意味着发生了一个 race condition,如果是 signal 赢了,则实际上不应该抛出异常
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
reportInterruptAfterWait

使用一个常量作为 flag,就完成了从中断-flag-重新还原中断的全流程:

1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal
  1. 检查 isHeldExclusively。
  2. 选一个非空的 firstWaiter,进行 signal。因为 ConditionObject 本身是跨多线程共享的,所以这会随机选取第一个 node 进行信号唤醒,把它从 CONDITION 置为 0,如果失败则返回 false。与之相对的,signalAll 是用 firstWaiter 开始,按顺序唤醒整个链表。AQS 的随机性在于不知道链表的顺序是怎样的,但对于 FIFO 的出队是不随机的
  3. enq node。
  4. 把前驱 cas 设置成 SIGNAL 失败或者前驱节点已经取消,尝试直接 unpark 这个 node。然后就让 await 方法的 isOnSyncQueue 走剩下的流程。
  5. 返回操作为 true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 注意看这个方法,只要 isHeldExclusively 不正常,则这个方法会抛出 IllegalMonitorStateException
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 检查是不是在锁的控制范围内
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 只取非空 first 作为 doSignal 对象
if (first != null)
doSignal(first);
}
doSignal

只有 signal 一个非零和非 cancelled 的 node 成功才会从循环中停止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
        /**
*
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 首先用 frist.next 来顶掉 firstwaiter,如果 first.next 为空,则清空本队列
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// unlink first 到 next waiter 的联系
first.nextWaiter = null;
// 如果 transferForSignal 成功,循环中止,否则必定是 node 被 cancelled 了,这时候要把 firstWaiter 赋值回 first,看看是不是还能找到 non-null 继续循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
```

###### transferForSignal

这个方法的主要目的是为了完成往 sync queue 的转移只要求 cas node 的状态为 0 和对它进行 enq 只有在极端情况下才会做立即 unpark:AQS 有个设计,在 cancel 一个 node 以后,或者 一个 node 不正常了以后,会立刻 unpark 它的 successor 如果这个 unpark 没有被执行,unlock 的时候还会带有一个 unpark 来刺激 lock。

```java
/**
*
*/
final boolean transferForSignal(Node node) {

/*
* If cannot change waitStatus, the node has been cancelled.
* 假定,condition queue中的节点一定是 CONDITION,不会再变
* 这是本方法第一次试图 cas 改变一个 node,其实此时如果失败,意味着本节点是 cancelled 的,应该返回 false
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱节点已取消,本 node 应该直接 unpark;或者前驱节点不能设置为 SIGNAL-此处不需要等到下轮循环再设置了,要把本 node 做一个 unpark,交给 await中的循环处理。
// 什么情况下 compareAndSetWaitStatus 会失败呢?p 是前驱节点的意思,p 被人动过,这也就意味着此处的 cas已经无意义了
// 这是第二个地方用 cas 检查 node 的前驱,如果失败,通过 unpark 让 acquiredQueued 来试图收窄链表
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 注意,node.thread 已经在await 的循环里 park了,此处的 enq 帮那个循环省略了从中断中 enq 的动作,它会在 await 方法里进入 acquireQueued,再尝试求锁解锁,这种直接唤醒重新入队的方法,被作者称作 resync。
LockSupport.unpark(node.thread);
// 但如果不 unpark,直接返回,则对于 node.thread 的 unpark 需要等到 unlock 底层的 release
return true;
}

关于 signal + unlock 带来的获锁,可以参考这个实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    @Test
void testConditionProcedure() throws InterruptedException, IOException {

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
AtomicBoolean controlFlag = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
lock.lock();
System.out.println("内线程先求锁");
try {
while (!controlFlag.get()) {
System.out.println("check input failed, begin to await,准备释放锁");
// 在这一行里,内线程 fullyRelease 了 lock
condition.await();
// 在这一行里,内线程重新拿回了 lock
System.out.println("signaled, begin to check input,重新得回锁");
}
} catch (Exception ignored) {
System.out.println("isInterrupted: " + Thread.currentThread().isInterrupted());
} finally {
lock.unlock();
}
});
t1.setName("test-thread1");
t1.start();
// 造成一种内线程先拿到锁,但释放后进入 wait queue 的情况
Thread.sleep(1000L);
lock.lock();
try {
// 在 idea 里暂时无法使用 prompt input,所以现阶段就用倒数计时来触发锁定
// Scanner command = new Scanner(System.in);
// System.out.println("Enter command: ");
// boolean running = true;
//
// while(running){
// switch(command.nextLine()){
// case "signal":
// System.out.println("Machine started!");
// running = false;
// break;
// }
// }
// command.close();

controlFlag.set(true);
// 这里会让内线程准备求锁
System.out.println("让内线程准备求锁");
// 要在锁里面执行 signal,其内部针对极端情况会触发 unpark,但那个unpark 只能触发 resync 入队用,如果本线程没有释放锁,则内线程求不到锁,会进入第二次 park
condition.signal();

} finally {
// 在这行彻底执行完的一瞬间,内线程求锁完,才能从 await 中退出,这会导致 unpark,await 内部至少会有一次 unpark 醒来,到时候无人争抢锁的话,会直接重新回到获锁(acquired)状态
System.out.println("释放本线程的锁,下一瞬间内线程就会求到锁");
lock.unlock();
}

Thread.sleep(15000L);
}

AQS 与中断

Java 自带的内置长时间停顿方法有以下:

  • Object:wait()、wait(long)、wait(long, int)
  • Thread:join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)
  • 实现了 InterruptibleChannel 接口的类中的一些 I/O 阻塞操作,如 DatagramChannel 中的 connect 方法和 receive 方法等
  • Selector 中的 select 方法

这几类方法在抛出 InterruptedException 以后,会重置中断状态(对线程进行中断检查会得到 false)。

但,LockSupport.park 会响应中断,但不会重置中断状态(对线程进行中断检查会得到 true)。

在阻塞方法里 catch InterruptedException 和非阻塞流程轮询检查线程的中断状态是响应中断的方法。

AQS 在设计的时候,会设计普通 lock,Interruptibly() 的版本。在条件上,会设计 await 和 awaitUninterruptibly()。

1
2
3
4
5
6
7
public void lock() {
sync.lock();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

其中,普通 lock 是不会响应中断的阻塞操作的。

在下面的实验里,我们从主线程,不断地中断另一个线程,另一个线程没有抛出异常,只有求到锁才从中断中返回,而且重置了中断位-这是 park 隐晦地把响应中断的职责外放给使用者的一种设计。如果我们我们真的关心线程在跑到外部的时候是否还是从中断路径中出现,我们可以使用 lockInterruptibly + 检查异常,或者使用普通 lock + 检查中断位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    @Test
void testLockProcedure() throws InterruptedException {
Lock lock = new ReentrantLock();

CountDownLatch countDownLatch1 = new CountDownLatch(1);

Thread t1 = new Thread(() -> {
try {
countDownLatch1.await();
} catch (Exception ignored) {
System.out.println("isInterrupted: " + Thread.currentThread().isInterrupted());
}

// 用 while 而不是 sleep,这样就不怕 interrupt 被阻塞 api 抛出异常来响应了
int j = 0;
for (int i = 0; i < 100000; i++) {
j += i;
}
System.out.println(j);
// 在lock 内部可能检测不到这个中断,需要外部频繁中断为好
Thread.currentThread().interrupt();
lock.lock();
try {
System.out.println("3");
} finally {
System.out.println("4");
lock.unlock();
}
});
t1.setName("test-thread1");
t1.start();
lock.lock();
try {
System.out.println("1");
} finally {
System.out.println("2");
countDownLatch1.countDown();
try {
// 休眠一段时间,让t2开始求锁,然后再解锁
Thread.sleep(1000L);
} catch (Exception e) {

}
// 中断一千次
for (int i = 0; i < 1000; i++) {
t1.interrupt();
}
Thread.sleep(10000L);
lock.unlock();
// t2.interrupt();

/**
* final boolean acquireQueued(final Node node, int arg) {
* boolean failed = true;
* try {
* boolean interrupted = false;
* for (;;) {
* final Node p = node.predecessor();
* if (p == head && tryAcquire(arg)) {
* setHead(node);
* p.next = null; // help GC
* failed = false;
* // 不管是否中断,都从这里 return
* return interrupted;
* }
* if (shouldParkAfterFailedAcquire(p, node) &&
* parkAndCheckInterrupt())
* // 如果中断则跑到这,不然得到锁就保持 interrupted = false,然后从上面 return
* interrupted = true;
* }
* } finally {
* if (failed)
* cancelAcquire(node);
* }
* }
*
* public final void acquire(int arg) {
* if (!tryAcquire(arg) &&
* acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
* // 如果 acquireQueued 方法返回true,断点到这一行
* // 如果 while interrupt 循环足够多,是可以从这一步跑出去的
* selfInterrupt();
*
* // 否则断点到这一行即不做 selfInterrupt
* }
*
* private final boolean parkAndCheckInterrupt() {
* LockSupport.park(this);
* // 这是一个清理线程状态的 testMethod()
* return Thread.interrupted();
* }
*
*
*/
}
t1.join();
Thread.sleep(10000L);

}

从总体来讲,lock 作为一个外部操作是不用抛出中断异常的方式来退出的,从lock 中正常退出往下走是用户可以接受的结果。中断位被隐藏得比较隐晦。

await 操作如果遇到异常,到底退出还是不退出呢?如果像park 一样退出,则用户必须写类似 aqs 之类的代码才能检查中断位,而且有时候中断的真实目的并不是让一个线程苏醒,而是真的把后续的流程阻断掉(如果不中断掉,因为自旋的存在,线程只会再一次进入 park 的状态),从这个视角来看,抛出异常学 sleep 和 wait更合理。

在 AQS 内部,把这两种中断模式归类为:

  • REINTERRUPT:Mode meaning to reinterrupt on exit from wait
  • THROW_IE:Mode meaning to throw InterruptedException on exit from wait

CountDownLatch

countDownLatch

CountDownLatch 是一个 one-shot phenomenon,它的 state 是不能被复用的。

这个类的功能底层依赖于自身的 sync 的两个实现:

  • tryAcquireShared
  • tryReleaseShared

其他都是框架编织的结果。这两个方法告诉我们,try 方法不需要考虑 interruptibly/uninterruptibly。

在大型项目里,测试异步用例的时候经常使用如下模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void testScheduleTimeoutShouldNotRunBeforeDelay() throws InterruptedException {
final Timer timer = new HashedWheelTimer();
final CountDownLatch barrier = new CountDownLatch(1);
final Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
fail("This should not have run");
barrier.countDown();
}
}, 10, TimeUnit.SECONDS);
assertFalse(barrier.await(3, TimeUnit.SECONDS));
assertFalse(timeout.isExpired(), "timer should not expire");
timer.stop();
}

初始化

自身构造器:

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

同步构造器只依赖于 setState:

1
2
3
4
Sync(int count) {
// 这样就 state == count 了
setState(count);
}

sync 在共享模式和互斥模式下有很重大的区别,就是 state 在互斥模式下维护的是同一个线程求锁的次数,在共享模式下维护的是共有多少个线程持有这把锁。

await

在 CountDownLatch 里:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

这个方法的隐式逻辑是:sync 内部的 state 为0则此方法可以返回,阻塞发生在改为0之前。普通的 acquire 是修改 state 成功返回,阻塞发生在获取修改 state 的权限之前,这里产生了比较大的语义差别。

acquireSharedInterruptibly

在 AQS 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

// 这里小于0意味着 state 还是保持在 sync 的 state 非0的状态,才可以进入 doAcquireSharedInterruptibly 阻塞;否则就是已经被扣减到头了,就直接返回了,这会导致上层的 await 直接返回,这就是很多的事后 await 会直接返回的原理。这种
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

CountDownLatch入队1

在互斥类的 acquire 里面,只有 state 不为0(已被其他线程获取锁)会导致入队。在共享类的 acquire 里,只要 state 不为 0,也入队,反复自旋,直到 state 为 0 才导致出队,让 await 降为0。

CountDownLatch入队2
CountDownLatch入队3

tryAcquireShared

在自定义 sync 里:

1
2
3
4
protected int tryAcquireShared(int acquires) {
// 在 CountDownLatch 的sync 里 acquires 是无用的,但在其他 sync 里就可以有用
return (getState() == 0) ? 1 : -1;
}

只有当 state == 0 的时候,这个方法才会返回 1,否则返回-1。这和普通的 tryAcquire 返回 boolean 有很大差别。

acquireSharedInterruptibly(arg) -> tryAcquireShared(arg) 只做判断 -> doAcquireSharedInterruptibly(arg) 产生阻塞,AQS 的框架隐藏在doAcquireSharedInterruptibly里:

doAcquireSharedInterruptibly

这个方法很像 acquireQueued,出队的条件是 state 变为 0,而不是得到了修改 state 的机会

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 共享模式也入队,这里让每个入队的节点都带有同一个 nextWaiter
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 入队也要自旋
for (;;) {
final Node p = node.predecessor();
// 只有快要出队的节点可以这样做
if (p == head) {
// tryAcquireShared > 0 意味着此时 aqs 没动过,或者被还原了,此时就可以考虑出队,最终让上层的 await 返回了
int r = tryAcquireShared(arg);
if (r >= 0) {
// 只要共享状态降为0即可以出队
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
// 然后返回,让更上层返回
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 这个方法是 interruptibly 的版本,所以遇到中断应该抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

这个方法 propagate 是很难理解的,doShare 是它试图 propagate 的东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// shared mode 的模式在此处生效产生了一个和普通的 setHead 不同的效应,它会产生一个 doReleaseShared
if (s == null || s.isShared())
doReleaseShared();
}
}

doReleaseShared

这个方法特别难,普通 tryRelease 是改动 aqs 的自身状态,但 doReleaseShared 依赖于 tryReleaseShared 的返回结果,只专心处理从 head 开始的 ws 问题,然后对 head 的后继进行 unpark。这个节点会让所有卡在 countDownLatch 的计时条件上的线程都越过门槛本身。因为 tryReleaseShared 已经把 state 扣减为0,此处做的主要是 unpark + 改 ws,doAcquireSharedInterruptibly 那里就会自己直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 1. h == null: 说明阻塞队列为空
// 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
// 或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
// 所以这两种情况不需要进行唤醒后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// head 如果能够被从 SIGNAL 设为 0,则 unpark head 的下一个节点,否则循环 recheck
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
unparkSuccessor(h);
}
// head 如果已经是 0,则把它设置为 PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

countDown

CountDownLatch释放

countDown() -> releaseShared(1) -> tryRleaseShare(1) -> doReleaseShare();

1
2
3
4
public long countDown() {
// 每次 countDown 都等于一次共享释放1
sync.releaseShared(1);
}

releaseShared

这里在完全退出的分支里,再次主动调用了 doReleaseShared

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
// 如果已经 state 为0,则直接返回;如果扣减不到0,也直接返回;如果扣减到0了,则执行 doReleaseShared
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared

这个方法的返回值很有意思:true 意味着允许一个 acquire(不管是共享式还是互斥式的)得到一个 permit-是否要进入一个最终退出动作,这个动作只执行一次。否则,返回 false可能含有2个含义,已被扣减完,应该进入退出后状态;或者意味着先扣减得到中间态的 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 在 try release 系列里,这是唯一一个有自旋的
for (;;) {
int c = getState();
if (c == 0)
return false;
// 这里直接做了减1,而不是减 releases
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

CyclicBarrier

cyclic 既有周期的意思,也有循环的意思。

cyclicbarrier-1
cyclicbarrier-2

CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS
的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。

这个类是一个很简单的交替地使用一个内置了 Lock 和 Condition 来维护一个 barrier waiting count 的实现。

和CountDownLatch 的区别

  1. CyclicBarrier 使用互斥而不是共享机制来实现多线程协同,共享状态是由 party 生成的 count。CyclicBarrier 依托 Condition。
  2. 而 CountDownLatch 通过共享而不是互斥来实现多线程协同,共享状态是 AQS 的 state。而 CountDownLatch 有自己的 sync。
  3. CyclicBarrier 只需要每个内线程做完自己的事自动await,不需要 signal,到齐了自动就通过;CountDownLatch 让内线程countDown。CyclicBarrier 外线程等待这个到齐结果 join 所有内线程; CountDownLatch 对 latch 进行 await

作者的官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}

// wait until done
for (Thread thread : threads)
thread.join();
}

在 Spring 里拿他来做测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void pubSubLostConnectionTest() throws Exception {
final CyclicBarrier latch = new CyclicBarrier(2);
channel.subscribe(message -> {
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (Exception e) {
}
});
this.channel.send(new GenericMessage<>("foo"));
latch.await(10, TimeUnit.SECONDS);
latch.reset();
BlockingQueueConsumer consumer = (BlockingQueueConsumer) TestUtils.getPropertyValue(this.channel,
"container.consumers", Set.class).iterator().next();
connectionFactory.destroy();
waitForNewConsumer(this.channel, consumer);
this.channel.send(new GenericMessage<>("bar"));
latch.await(10, TimeUnit.SECONDS);
this.channel.destroy();
this.pubSubWithEP.destroy();
this.withEP.destroy();
this.pollableWithEP.destroy();
assertThat(TestUtils.getPropertyValue(connectionFactory, "connectionListener.delegates", Collection.class)
.size()).isEqualTo(0);
}

有些地方还会有工具方法:

1
2
3
4
5
6
7
8
private void waitAtBarrier(String barrierName, Map<String, CyclicBarrier> barriers) {
try {
barriers.get(barrierName).await(10, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new AssertionError("Test didn't complete: ", e);
}
}

成员拆解

首先,非 generation 的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();

// 触动开关
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

/** The number of parties */
// 这个数字不可扣减
private final int parties;

/* The command to run when tripped */
private final Runnable barrierCommand;

/**
* 这个数字可以扣减
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;

其次,generation 的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}

/** The current generation */
private Generation generation = new Generation();

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}

await

这个方法既会返回,也会抛出异常。

它的返回值是:the arrival index of the current thread, where index getParties() - 1 indicates the first to arrive and zero indicates the last to arrive。也就是说,如果有5个线程在等,await == 4 意味着第一个返回,await == 0 意味着最后一个返回。然后可以这样做:

1
2
3
4
// 这里的 await 是支持 happen-before 语义的,在 await 返回的那一刻即返回
if (barrier.await() == 0) {
// log the completion of this iteration
}

它有一个核心的方法,这个方法把所有的非计时转化为计时- ConditionObject 内部不是这样实现的:

1
2
3
4
5
6
7
8
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
// 针对有签名但不处理的异常,我们包装为 error
throw new Error(toe); // cannot happen
}
}

所以我们有了计时的等待方法总能得到非计时的实现。

dowait

有4种方法可退出:

- last thread arrives
- 中断发生在任意一个等待线程,抛出 InterruptedException(自己被中断)或者 BrokenBarrierException (其他线程被中断)
- 超时发生,抛出 BrokenBarrierException
- 触发了 reset,抛出 BrokenBarrierException
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 这个内部框架在维护状态的时候,都是使用标准的 lock-check-await 的模式
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 把当前的 generation 取出来
final Generation g = generation;

// 这个方法的前置检查抛出 generation 异常,作为栅栏破坏的响应,而不处理栅栏
if (g.broken)
throw new BrokenBarrierException();

// 一个线程检查出中断,要把其他线程破坏。所以被中断的线程是中断异常,被破坏的线程是破坏异常。中断线程可以处理栅栏。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 在进行等待以前对 count 做共享减法,这个共享写的顺序能够体现在 index 里
int index = --count;

// index 等于 0 是一个特定的事件,是最后一个线程才会触发的分支,这引出了一种写法,最后返回的分支写在等待的开头
if (index == 0) { // tripped
// 维护一个 ranAction 状态
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 由最后一个线程驱动下一个 generation,理论上下一轮的 await 可以在这个时刻运行了。返回线程可以处理栅栏。
nextGeneration();
// 这里的 return 是唯一的“无害动作”,如果这里插入任何维护状态的代码,下面的 breakBarrier 再破坏状态,可能会导致下一代的 wait 动作被集体破坏
return 0;
} finally {
// 如果 ranAction 状态不正常,则还是要破坏栅栏
if (!ranAction)
breakBarrier();
}
}

// 非 last 动作,则只有3种方式返回
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 区分计时和非计时的等待,然后在自旋里工作
if (!timed)
trip.await();
else if (nanos > 0L)
// an estimate of the nanosTimeout value minus the time spent waiting upon return from this method. A positive value may be used as the argument to a subsequent call to this method to finish waiting out the desired time. A value less than or equal to zero indicates that no time remains.
// 这个 nanos 可能成为负数,为我们超时异常提供了依据
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果此时本代仍然是同一代,则尝试破坏栅栏(这是为了让同一代里 breakBarrier 式退出 exactly once)
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 如果不需要我们破坏动作,根据标准协议,收到 InterruptedException 我们也要中断线程,作为响应。
// 我们要理解一个巨大的差别:在当代的中断我们是要忠实地履行方法签名的行为,抛出异常,不在当代则静默地中断自己
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

// await 的正常返回带来的后置检查,这里的 g是当前代数
if (g.broken)
throw new BrokenBarrierException();

// 如果代数换了,就返回当前的次序,不管超时了(无意义)。只要�发生过nextGeneration,发生代切换,此处才得到许可可以出去,这又潜在要求 nextGeneration 执行替换一定发生在 signallAll 之前。但 nextGeneration 的实现却是先 signallAll 再替换 nextGeneration 的
if (g != generation)
// 在这里编译器居然能保证 return 总是不漏,怎么做到的呢?
return index;

// 如果排队中出现了超时,此时要处理一下
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

这个方法的等待是由 await 类操作决定的,它的唤醒操作必然来自 signal 类操作,而这两类操作被隐藏在正常退出和异常退出里。

异常退出 breakBarrier

1
2
3
4
5
6
7
private void breakBarrier() {
// 把当前的代破坏
generation.broken = true;
// 把 count 复原为 parties
count = parties;
trip.signalAll();
}

异常退出是设置完状态再 signalAll。

正常退出 nextGeneration

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

异常退出是 signalAll 完再设置完状态。

重置 reset

等于先破坏栅栏再重置栅栏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 老线程要通过闭包里闭合的局部变量理解 break
breakBarrier(); // break the current generation
// 新线程使用隔离的 generation
nextGeneration(); // start a new generation
// reset 意味着 count 再次等于 parties
} finally {
lock.unlock();
}
}

统计方法

isBroken

这个方法实现了准确读:

1
2
3
4
5
6
7
8
9
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

getNumberWaiting

这个方法也实现了准确读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns the number of parties currently waiting at the barrier.
* This method is primarily useful for debugging and assertions.
*
* @return the number of parties currently blocked in {@link #await}
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

Semaphore

Semaphore 使用数字维护一个共享状态池,使用共享加解锁的思路来修改 state。

创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用
acquire 的时候,执行 state = state - 1,release 的时候执行 state = state +
1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

需要仔细设计的怎么阻塞与唤醒。

官方示例

作者认为这是一个 permit pool。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}

在这个示例里对称很重要,在 op 以前就要 acquire permit,在 op 成功以后才 release permit。它的 state 恰好不是以互斥的方式 accumulated 的,是以共享的方式 accumulated 的。

mutex = binary semaphore = nonreentrant exclusive lock

这种锁是不计较 owner 的,在死锁恢复场景下尤其有用。比如有个线程 a 拿走了一个 permit 没有还,其他线程可以通过把自己的 permit 归还,来暂时制造流动性。

照理来讲,没有 IllegateMonitorStateException 的约束,多 release 也是有可能的,不一定需要严格按照 acquire 时线程得到的 permit进行归还。所以,CountDownLatch 和 CyclicBarrier 的state 是固定大小的,Semaphore不是。

构造器

因为信号量也是不易察觉的“类锁”的方案,所以它也有公平和非公平的实现。

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

不公平抢锁可以减少调度,被称作 barging;公平抢锁可以减少饥饿。

获取资源:四大 acquire 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

有可中断和不可中断两大类 API,也有无参数和固定参数两大 API,不过 API 命名的习惯没有得到遵循,默认的 Semaphore 的方法是响应中断的。

acquireShared

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

这个方法很像 CountDownLatch 的 await,但少了检查中断的一部分。其中 doAcquireShared 和 CountDownLatch 一样都使用的 AQS 的原生实现。

tryAcquireShared

公平
1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

共享池模式使用 -1 和 1 来控制是否 doAcquireShare,所以这里返回 -1 意味着要入队,返回负数的 remaining 也是需要入队的。如果是正数则可以尝试 cas 一下。

非公平
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

// 在父类里
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

公平和非公平的差别只在 hasQueuedPredecessors 的调用结果里。

释放资源

不管用什么方式获取,释放总是用同一个方法。

1
2
3
4
5
6
7
8
9
10
11
public void release() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

releaseShared 和 doReleaseShared 仍然共用 AQS 的缺省实现,只有 tryReleaseShared 是使用 sync 实现。

tryReleaseShared

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 溢出,当然,我们一般也不会用这么大的数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

获取当前的 state,按照 releases 来做加法。

线程池

Pooling is the grouping together of resources (assets, equipment,
personnel, effort, etc.) for the purposes of maximizing advantage or
minimizing risk to the users. The term is used in finance, computing
and equipment management.——wikipedia

“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。

在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。 连接池(Connection
Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。 实例池(Object
Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

Doug Lea 对线程池的期待有:

  1. 改善性能。
  2. 有界地利用资源(多次强调 bounds)。
  3. 提供统计。

线程池继承体系

ThreadPoolExecutorUML类图.png

Executor 接口

将任务提交和任务执行进行解耦(decoupling the execution mechanic)。用户无需关注如何创建线程,如何调度线程(scheduling)来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。

JUC 里所有的解耦设计都不一定是异步的,它只是解耦,所以执行器本身也是可以同步执行的:

1
2
3
4
5
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}

一般而言可以认为,executor 会 spawns a new thread for each task.

ExecutorService 接口

增加了一些能力:

扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法():

1
2
3
4
5
public Future<?> submit(Runnable task)
public <T> Future<T> submit(Runnable task, T result)
public <T> Future<T> submit(Callable<T> task)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)

提供了管控线程池的方法,比如停止线程池的运行。

shutdown 拒绝接收任务,触发 rejection policy。
shutdownNow 除了 shutdown 的功能以外,还会强制触发线程中断。

Memory consistency effects:future.get 满足 JSL 定义的 Memory consistency properties,也就是 happens before relation。

理解 happens before relation 一定不要按照硬件的工作方式来理解(Flushing model is fundamentally flawed (it is just not how hardware works)),最好从 JLS 的规范出发:

AbstractExecutorService

将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

ThreadPoolExecutor

将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor运行流程.png

ForkJoinPool

这个线程池本身就是一个复杂框架,为 Jdk 其他组件提供 yet another executor alternative。

这个框架有个特点:

  1. 产生的线程默认是守护线程。
  2. 产生的线程会自动收缩-不存在空转的 core thread 问题。
  3. 公共线程池的名字一般叫“ForkJoinPool.commonPool-worker-1”。

这里就要讨论到一个很多人忽略的问题:我们如何决定何时使用守护类线程。这类线程可以用来执行一些:

  1. 临时执行的任务,这些任务之间如果存在父子关系更好。
  2. 后台监控类任务。
  3. 某些与 io 解耦的计算任务。

也就是说,这类线程池(包括守护线程本身)决不能用来执行工作逻辑,不然:

  1. 工作线程池会在 JVM 关闭时被无声无息地杀死。
  2. 当其他非守护线程都结束后,这些守护线程的存在反而是 JVM 进入关闭态的理由。
  3. 典型的工作线程池就是 IO 线程池,和与他们绑定的计算线程池。

初始化这类线程池有一些简单的工厂方法,比原始构造器更加可用: Executors.newWorkStealingPool(int parallelism)

The Executors

provides convenient factory methods for these Executors.

线程池如何维护自身状态

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public static void main(String[] args) {

int COUNT_BITS = Integer.SIZE - 3;
// 29
System.out.println(COUNT_BITS);
final int shifted = 1 << COUNT_BITS;
// 1 与 29 个 0,所以左移意味着补 0,左移一位意味着补 1 个 0,左移 29 位意味着补 29 个 0,最后得到 30 位数:100000000000000000000000000000
System.out.println(Integer.toBinaryString(shifted));
int CAPACITY = shifted - 1;
// 29个 1:11111111111111111111111111111
System.out.println(Integer.toBinaryString(CAPACITY));
// 高 3 位为 1,低 29 位为 0:11100000000000000000000000000000,和 CAPACITY 低 29 位为 1,高 3 位为 0 恰好相反
System.out.println(Integer.toBinaryString(~CAPACITY));
System.out.println(Integer.toBinaryString(~CAPACITY));
int RUNNING = -1 << COUNT_BITS;
// -1 意味着 32 个 1:11111111111111111111111111111111
System.out.println(Integer.toBinaryString(-1));
// 3 个 1 和 29 个 0:11100000000000000000000000000000
System.out.println(Integer.toBinaryString(RUNNING));

// 重点:从后方补 0 的算法,可以把 state 的基准位从低位移到高位,这样大数的一部分就可以拿来表示有限状态了

// 0 不管位移多少位都是 0
int SHUTDOWN = 0 << COUNT_BITS;
System.out.println(Integer.toBinaryString(SHUTDOWN));
int STOP = 1 << COUNT_BITS;
// 1 和 29 个 0:100000000000000000000000000000
System.out.println(Integer.toBinaryString(STOP));
int TIDYING = 2 << COUNT_BITS;
// 10 和 29 个 0:1000000000000000000000000000000
System.out.println(Integer.toBinaryString(TIDYING));
int TERMINATED = 3 << COUNT_BITS;
// 11 和 29 个 0:1100000000000000000000000000000
System.out.println(Integer.toBinaryString(TERMINATED));
}

private static int runStateOf(int c) {
int CAPACITY = getCapacity();
// 把 CAPACITY 的补码按位与,这样高 32 位就可以被取出来
return c & ~CAPACITY;
}

private static int workerCountOf(int c) {
int CAPACITY = getCapacity();
// 低 32 位本身就是 workCount,这个 workCount 可以容纳很大的数
return c & CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}

private static int getCapacity() {
int COUNT_BITS = Integer.SIZE - 3;
final int shifted = 1 << COUNT_BITS;
int CAPACITY = shifted - 1;
return CAPACITY;
}
运行状态 状态描述
RUNNING 能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP 不能接受新任务,【也不处理队列中的任务,会中断正在处理任务的线程。】增加了两条措施,是一个更严厉的状态,理论上只要线程被中断完,线程池就可以走向关闭
TIDYING 所有的任务都已终止了,workerCount (有效线程数) 为0,这个状态的意思不是整理中,而是整理完了。
TERMINATED 在terminated() 方法执行完后进入该状态。

线程池生命周期.png

其中 running 既是初始态,也是中间态,所以才有private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));作为初始化块的一部分。

尝试关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
// 注意这里有个自旋
for (;;) {
int c = ctl.get();
// 尝试把把本线程池的状态改成 TIDYING -> TERMINATED,所以正在 running、正在 shutdown 但队列未空、已经高于 TIDYING 都直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 只要 wc>0,就关闭并只关闭一个空闲线程(看起来这里是假设本方法通常是由线程退出来触发的,所以此处能够关掉一个就直接退出)
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

// 如果一个 worker 都没有了,就真的关闭本线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这个地方做了一个多余操作,把 TIDYING 做一个 ctlOf 转化 // 先置为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 通常是一个空钩子方法,这两个状态之间就差了一个钩子设计
terminated();
} finally {
// 再设置为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 已关闭才做 signalAll()
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
// 这里失败,下次再从外部进入 TIDYING -> TERMINATED 的循环
}
}

线程池如何管理任务

每个线程池的 Worker 管理的实质上是 FutureTask,它既是Callable(确切地说,wrap Callable),也是Future(一个最完美的任务是一个RunnableFuture<V>,用成员变量来帮助 Runnable来保存一个Callable的返回值,以供Future使用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 一个可以取消的计算。
// 基本上只能完成一次,除非执行 runAndReset,执行完成不能再 cancel
// 只有计算执行完成 get 才可以获取结果,之前必然阻塞
//
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
* <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V> {
}

线程池使用一个把 Runnable 转变为 Callable 的适配器(Callable 转 Runnable 理论上也是容易做到的,但应该没有必要转换),来兼容把 Runnable 传进 submit 的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 把 runnable 和一个勉强的 result 包装成一个 callable,分三步
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
// 注意,这个 call() 是不抛出异常的,所以对 Java 而言,子类的签名里可以不继续抛出父类声明的异常,
public T call() {
task.run();
// 组合逻辑在这一层
return result;
}
}

FutureTask 实现了 RunnableFuture,它本质上是一个携带 Runnable 和 state 的任务。

首先看它的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

值得一提的是,任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,所以可以这么说:
只要state不处于 NEW 状态,就说明任务已经执行完毕。
注意,这里的执行完毕是指传入的Callable对象的call方法执行完毕或者抛出了异常。所以这里的COMPLETING的名字显得有点迷惑性,它并不意味着任务正在执行中,而意味着call方法已经执行完毕,正在设置任务执行的结果。

换言之,只有 NEW 状态才是 cancellable 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Doug Lea 本身比较喜欢使用普通整数来制造状态机
// COMPLETING 和 INTERRUPTING 是 set state 和取消任务的中间态


/** The underlying callable; nulled out after running */
private Callable<V> callable;

// 异常和输出使用同一个 outcome,所以 outcome 不能是泛型,必须是 object
// 它是非 volatile 的,需要巧妙利用 state 读写
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// ensure visibility of callable
this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
// 把 volatile 变量写在写语句的最后,写在读语句的最前面,类似 monitorEnter 和 monitorExit 的语义,可以保证可见性
// ensure visibility of callable
this.state = NEW;
}

它的状态管理方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean isCancelled() {
return state >= CANCELLED;
}
// 只要不是 NEW 就是完成了
public boolean isDone() {
return state != NEW;
}


// 移除并通知所有等待线程,
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
// 漂亮的声明和比对写法
for (WaitNode q; (q = waiters) != null;) {
// 在 for 循环里用 cas 把 waiter 置空
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 在内层循环里把当前线程和 futureTask 的关系移除,并且
for (;;) {
// 在这个内存循环里面,要做的就是一个个遍历链表的 next,unpark 掉它们,并且 help gc
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();
// 此处就是上面的 nulled 的意思了,任务进入终态以后 callable 也可以被回收
callable = null; // to reduce footprint
}

// 通过使 permit 变成 available 的方式,使这个线程从 blocked 状态变成非 blocked 状态,或者下次调用 park 的时候非阻塞。
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

实际被工作线程调度的 run 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void run() {
// 如果不等于 new 或者 cas 把线程绑定到本 future task 上,就直接退出,这其实是一种幂等
// runner 的获取是从上下文里获得的
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 只有状态和 callable 完备才能把值设进来
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 如果 run 出异常,就进入 setException 终态方法
setException(ex);
}
if (ran)
// 否则,set result,走入另一种终态
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 执行完要把 runner 置空,这样上面那个 cas 对其他线程而言就会失败
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
// 可能有其他线程在 interrupting,在这里实现一套等待到 interrupted 的自旋 yield
handlePossibleCancellationInterrupt(s);
}
}

run 有一个重跑版本,这个版本会重复执行,但不会影响 get 的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
*
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

在 FutureTask 里有三类终态方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected void set(V v) {
// 在两个 CAS 操作之间夹逼一个 outcome
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

// 几乎等同于 set,但夹逼的是把 Throwable 设进 outcome 里面
protected void setException(Throwable t) {
// 注意这里只能把 callable 内部的异常设置进 outcome 里面,如果本服务发生了 interrupt,则这里必然失败
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}


// 从这个方法可以看出,中断也是 cancel 的一种
public boolean cancel(boolean mayInterruptIfRunning) {
// 在一个布尔表达式里面表达顺序结构
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// 只能从 new 迁移到 INTERRUPTING 或者 CANCELLED,只要 cas 不成功,就返回 false。
return false;
try { // in case call to interrupt throws exception
// 如果取消带有中断标志
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 强制中断这个线程
t.interrupt();
} finally { // final state
// 不使用 cas,把本应用的状态设为已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 所有的终态操作都有的一个操作
finishCompletion();
}
return true;
}

如果程序进入终态,则 get 终于可以得到合理的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public V get() throws InterruptedException, ExecutionException {
int s = state;
// 比对状态
if (s <= COMPLETING)
// 进入计时的 awaitDone 流程,这里的计时结果是带有状态的,0L 意味着无限计时
s = awaitDone(false, 0L);
// 进入 report 流程
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

其中等待流程见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 这里通过自旋来实现计时等待
for (;;) {
// 如果本线程被中断,则释放所有的 get 线程,然后抛出一个中断异常,这里引入了一个经典的设计模式,在 waiting 状态内发生 interrupt 的地方,响应中断的方式是清空中断位(而不是简单地 swap),并抛出中断异常
if (Thread.interrupted()) {
// 如果中断了(不正常退出),清空 waiter
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
// 进入终态,返回 s
if (s > COMPLETING) {
if (q != null)
// 清空等待栈的线程,waitnode 可以功成身退了,但只清空当前的 q 的 thread,并不做完整的 removeWaiter
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
// no-op
Thread.yield();
else if (q == null)
// 这里生成了一个 waitnode,注意,这里的调用栈是等待线程 get -> awaitDone(),所以此处会捕获get 线程进 waitnode 里,在本循环里产生了第一个 q 的节点。
q = new WaitNode();
// 一般第一轮循环q 总是为 null 的,只有第二轮进入这个地方的,才会进入这个分支,而且这里可能会失败,如果失败也,这时候就会把
else if (!queued)
// 如果 q 不为空,且没有入队,则首先把当前的 waiters 放到当前的 q.next 里,然后把 q 放到本类型的 waiters 里(用新 q 代替老 waiter)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 前面两轮循环都走过了(前面两轮必须使 q 不为空,queued变成 true,才进入接下来的循环),接下来就进入 park 或者 parkNanos,看看会不会再被唤醒了
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果超时了(bu),清空 waiter
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 如果没有超时,本线程先驻留一下,驻留完进入下一个循环
LockSupport.parkNanos(this, nanos);
}
else
// 否则,无限驻留,直到下一个循环。下一个循环必须由 finishCompletion 里的 LockSupport.unpark(t); 触发
LockSupport.park(this);
}
}

/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

// 解掉链表,help gc
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

然后就把outcome 通过 report 传出来:

1
2
3
4
5
6
7
8
9
10
// 这里使用 object 转 v,必然带来 warning
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

任务执行

提交任务调度
  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

任务调度流程.png

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是(阻塞的本质即为此):在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列的工作原理.png

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁
DelayQueue 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半
LinkedBlockingQueue 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
PriorityBlockingQueue 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
SynchronousQueue 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收
任务申请

任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

任务的执行主要有 submit->execute,submit 的主要逻辑是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

execute 的主要逻辑是:

Worker执行任务.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 // 这个方法体现了线程池的任务调度策略的顶层设计:先 core 后 queue 后非 core 的设计思路。不过,这里面的 queue 的使用方案需要考虑线程池的状态。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

这需要用到尝试增加线程

线程池如何管理线程

核心线程的 idle 不影响核心线程的创建;非核心线程的 idle time 会导致它们退出。

尝试增加线程

注意 addWorker 只是 execute 的一个子分支而已。

申请线程执行流程图.png

Worker 可以被认为是线程和锁的结合体,它的使命就是不断地把 runnable 从缓冲队列里拿出来,放在自己的 thread 里执行,其中关键的方法是 addWorker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    /**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// retry 是外部自旋的标签。大自旋保证 rs 是稳定的,小自旋保证 wc 是稳定的,在双自旋里面保证 wc 的修改成功
retry:
for (;;) {
int c = ctl.get();
// 获取运行时状态
int rs = runStateOf(c);

// 如果线程池关闭了,或者不是worker 的 firstTask 为空,但 workQueue 不空
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层自旋
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 其实 worker 里并没有 core 与否的属性,core 主要看比对哪个 PoolSize
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果这次一个原子性地增加 WorkerCount 成功,则退出大自旋;否则还是在大自旋里做 cas 增加 workerCount
if (compareAndIncrementWorkerCount(c))
break retry;
// 否则失败有两种可能:rc 变了,或者 wc 变了。看看当前 runState 是否还是大自旋的 runState
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 如果不是则返回大自旋
continue retry;
// 如果是则 runState 不变,只是 wc 变了,在小自旋里重新获取 wc 即可
// else CAS failed due to workerCount change; retry inner loop
}
}

// 在上层的 ctl 的修改是通过自旋来做的,不加锁,但下层就必须加锁了。这个设计实际上让 ctl 的修改和 worker 的修改解耦,实现了某种“最终一致”

// worker 的创建和添加是两个状态
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 外部传进来的 firstTask 可能为空,这里照样传进去
w = new Worker(firstTask);
// 在 Worker 构造器的内部携带的线程工厂创建的 thread 也可能为空
final Thread t = w.thread;
if (t != null) {
// 凡是修改线程池的 bookkeeping 操作,包含状态之外(比如 worker)的成员复杂流程修改的时候,都需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// Tests if this thread is alive. A thread is alive if it has been started and has not yet died.
// 这个方法本身是为了启动新线程,如果线程工厂不是启动新线程而是像线程池一样复用线程的话,线程就是 alive 的了(注意这个状态和线程的 status 还不一样),这时候线程池 addWorker 会失败
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 更新簿记值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 此时才开始线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
// 增加线程失败,会导致线程池终结
tryTerminate();
} finally {
mainLock.unlock();
}
}

/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
// 减 worker count 的操作必须自旋到成功,这种小成员的自旋修改不需要 sleep!
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

private boolean compareAndIncrementWorkerCount(int expect) {
// 因为 wc 在 32 位整数的低位,所以直接对 expect + 1 即可。
return ctl.compareAndSet(expect, expect + 1);
}

线程执行

执行任务流程.png

线程的执行强依赖于 worker 本身的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

// worker 本身并不严重依赖自己的状态,所以不像线程池一样拥有一个 runState,但它持有一个 state,能够表达自身的锁状态。所以它自身拥有 -1、0、1 三种状态
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// inhibit == prohibit,就是禁止中断的意思,中断前也要求锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这个方法是调用的线程池的 factory,
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
// 这个方法是线程池里的方法,这样交互委托可以实现上下文的 merge,以当前的线程去读外部的上下文
runWorker(this);
}

// Lock methods
// 0 代表常态无锁
// 1 代表常态加锁
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
// 启动的时候使用的-1 是一种“启动时才能使用的锁”,这里也包含进来了
return getState() != 0;
}

// aqs 最关键的加锁方法,锁的标记位可以自定义
protected boolean tryAcquire(int unused) {
// 这里体现了经典的设计模式,先 cas 把标记位加上去,然后绑定线程。这里要求线程安全的写只有锁的 cas,线程的归属却不是线程安全的
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
// 恰好和上一个方法反过来
setExclusiveOwnerThread(null);
// 强制解锁,无 cas
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
// 锁只支持互斥锁定模式,不支持共享锁定模式
public boolean isLocked() { return isHeldExclusively(); }

// 提供一种中断 worker(包括内部线程)的工作模式
void interruptIfStarted() {
Thread t;
// -1 和 1 不允许中断
// 在一个括号里实现了漂亮的取数操作
if (getState() >= 0 && (t = thread) != null &&
// 线程没有被中断的时候可以被中断
!t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

在一个工作线程里,worker delegate 调用给线程池的 runWorker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
// 这里为什么不使用 worker 里面的线程呢?
Thread wt = Thread.currentThread();
// 做一个置换/置空操作
Runnable task = w.firstTask;
w.firstTask = null;
// 在对象初始化的时候触发了加锁,在线程启动的时候触发了解锁。线程池的 shutdown 方法本身会 interrupt worker,这里不允许在锁周期里面 interrupt worker
w.unlock(); // allow interrupts
// 突然完成默认为真
boolean completedAbruptly = true;
try {
// getTask 里封装了复杂的取任务流程,这里在一个表达式里面实现了漂亮的取任务操作
// 本线程只有在 getTask 取不到的时候才退出
while (task != null || (task = getTask()) != null) {
// 只在 run 一个 task 的时候锁定自己一次,不可重入
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池本身已经进入停止及以后状态,则直接求 工作线程的中断状态。否则,做一轮线程的中断,再求线程池状态(中断居然会影响线程池的状态,很奇怪?),再求工作线程的中断状态。这里有一个比较炫技的地方,wt 和 currentThread 都是当前线程,但偏偏不使用 wt 里的线程
// 这里的思想是:不能由命令触发中断,必须由状态触发中断
if ((runStateAtLeast(ctl.get(), STOP) ||
// 或者
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 遇到这些情况,就要中断 wt,在这里。所以内部线程是由 getTask 内部的流程中断的,然后才去执行下面的 run,看看下面的 run 会不会响应
wt.interrupt();
// 线程的中断也不会影响接下来的 task.run()
try {
// 通常这个方法是空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// runnable.run()
task.run();
} catch (RuntimeException x) {
// 有这样的写法就意味着要在 finally 留存 thrown
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// thrown 是给 afterExecute 准备的
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 只有在 getTask 取不到的时候退出,这个值才是false,其他时候都算是“突然退出”
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

// 阻塞式获取任务。
// 遇到异常情况给上游的返回值是 null:
// 1. 有超过maximumPoolSize 的线程数,这时候返回 null 会导致它退出。
// 2. 线程池 stopped 了(由 shutdownNow 来触发,比 shutdown 更严厉),这时候线程池也会用 null 的方式指示线程有序退出
// 3. 线程池 shutdown,且队列为空(其实光是本条件就可以返回 null,只是如果线程池还在工作中,队列应该让 getTask 的线程阻塞等待)
// 4. 线程超时。真正的超时实际上有两种:线程数超过 core 且超时,连 core 都允许超时且超时
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 在自旋里面
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 第一类情况返回 null
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling? 是否要强制减少线程数?是的话就要引入计时了
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超时返回 null 的场景,但注意这里要能减掉一个线程才能返回 null。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 减线程数目(不一定成功,如 wc == 0 也可能进入这个语句块)
if (compareAndDecrementWorkerCount(c))
return null;
// 不能减线程则 cas 失败,进入大循环里继续
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

回收线程

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反映线程现在的执行状态。

1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

线程池回收线程的过程.png
线程销毁流程.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    // 处理一些关闭和簿记工作:
// 1. 只能被从 worker 线程里调用,也就是说只能在 runWorker 方法里被调用
// 2. 先尝试把 workerCount 减一
// 3. 把 worker 从工作集里移除
// 4. 尝试终结线程池
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 只要能够成功减一就行了
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不管怎样退出,都把 worker 的完成任务数加总到线程池的总数里
completedTaskCount += w.completedTasks;
// 移除本 worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 每个 worker 终结都尝试终结线程池
tryTerminate();

int c = ctl.get();
// 如果线程池没有真的被真的关闭,可以加减线程池里的线程
if (runStateLessThan(c, STOP)) {
// 如果线程池正常关闭
if (!completedAbruptly) {
// allowCoreThreadTimeOut 通常为 false,所以线程池的最小值应该是 corePoolSize,否则核心线程数可以归零
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
// 如果缓冲队列不空,则最小线程数需要维持在 1
min = 1;
if (workerCountOf(c) >= min)
// 如果当前工作线程数大于等于 min,则直接退出
return; // replacement not needed
}
// 反之则认为工作线程数小于 min,需要增加非核心线程(增加非核心线程实际上也是在增加核心线程),这里的设计思想是任何一个线程退出都应该增加一个线程,所以就当作非核心线程增加了
addWorker(null, false);
}
}

// 这个方法在线程退出时只关闭一个【空闲线程】,但在线程池关闭等场景下,会关闭所有的空闲线程,这样线程池最终就关闭了-因为每个worker 退出的时候最少都会关闭一个空闲线程,全局的线程最终得以全部关闭。但线程池的核心参数如 keepAliveTime、corePoolSize、maximumPoolSize 有变化的时候,都会触发全部空闲线程关闭
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 能够被关闭的线程是一个能够拿到内部锁的线程
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断,这个线程内部的工作线程能不能响应看 runnable 内部的实现了
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

线程池使用中可能遇到的问题

线程池的调参有几个难点:

  1. 如果核心线程数过小,则吞吐可能不够,遇到流量矛刺可能导致 RejectExecutionException;但值得警惕的是,如果核心线程数很大,可能导致频繁的上下文切换和过多的资源消耗(不管是 cpu 时间片还是操作系统的内核线程)。
  2. 如果队列过长,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。

那么,如何计算这些参数呢?
有一个基本的原则是:

  1. 计算密集型的线程数本身应该尽量贴进 cpu 核数。
  2. io 密集型的线程数要注意伸缩,要配合阻塞队列使用,要有承受拒绝失败的的准备。

我们常见的计算方式主要来自于《Java并发编程实战》:

线程池计算公式.png

现实中可选的线程数计算公式最好是取一个并发 qps 数和 cpu 数的折中。通常可以认为 单任务的 rt/1ms 可以得到单一线程的吞吐数,qps 除以吞吐数可以得到 qps 相应的线程数,但这个方案没有考虑cpu 核数和上下文切换的问题。所以这样算出来的线程数的实际 qps 表现应该低于理论 qps,但可以通过估算和压测不断让理论值逼近实际值。

线程池的可替换方案

其他可替代方案,都不如线程池的调优方案成熟(在可以使用新技术的前提下,我们是否还有调优旧方案的魄力呢?):

名称 描述 优势 劣势
Disruptor框架 线程池内部是通过一个工作队列去维护任务的执行的,它有一个根本性的缺陷:连续争用问题。也就是多个线程在申请任务时,为了合理地分配任务要付出锁资源,对比快速的任务执行来说,这部分申请的损耗是巨大的。高性能进程间消息库LMAX使用了一个叫作环形缓冲的数据结构,用这种这个特殊的数据结构替代队列,将会避免申请任务时出现的连续争用状况。 避免连续争用,性能更佳 缺乏线程管理的能力,使用场景较少
协程框架 协程是一种用户态的轻量级线程,其拥有自己的寄存器上下文和栈,当调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。这种切换上下文的方式要小于线程的开销。在瓶颈侧重IO的情况,使用协程获得并发性要优于使用线程。 侧重IO情况时,性能更佳。与多线程策略无冲突,可结合使用 在Java中缺乏成熟的应用
Actor框架 Actor模型通过维护多个Actor去处理并发的任务,它放弃了直接使用线程去获取并发性,而是自己定义了一系列系统组件应该如何动作和交互的通用规则,不需要开发者直接使用线程。通过在原生的线程或协程的级别上做了更高层次的封装,只需要开发者关心每个Actor的逻辑即可实现并发操作。由于避免了直接使用锁,很大程度解决了传统并发编程模式下大量依赖悲观锁导致的资源竞争情况。 无锁策略,性能更佳,避免直接使用线程,安全性更高 在Java中缺乏成熟的应用,内部复杂,难以排查和调试

缺乏管控能力就不适合调优。

最终解决方案

通过监控线程池负载,制定告警策略:

  1. 线程池活跃度 = activeCount/maximumPoolSize。看看这个值是不是趋近于 1。
  2. 监控队列的capacity 和 size 的比例。
  3. 监控 RejectExecutionException 的出现。

加引入线程池动态管控能力,基于告警制定 sop,确定是否要动态调节线程数和拒绝策略。

如果还是解决不了问题,需要考虑全局动态扩容的方案。

线程池的常见扩展

QueueResizingEsThreadPoolExecutor

这是 es 用的线程池。

线程组

线程组提供一个“集合”,开源把一群线程归于一处,可以批量 interrupt/stop/suspend。
但这个方案是很危险的,使用线程池和并发安全的 Collection 都可以管理好线程。

CompletionStage

这是定义“可能是”异步计算的一个阶段,可能被其他阶段触发,也可以触发其他阶段。它是 CompletableFuture 的父接口。

它有一个特点,大量非 void 方法返回值都是 CompletionStage 类型,这样既允许 builder 模式,也允许各种 transformation 模式。

小技巧

如何处理任务超时问题

方法1:使用 FutureTask 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Future<Map<String, Object>> future = executor.submit(() -> getFeatures(context, zeusSceneId));
try {
// 穷人版超时:最简单的超时不是使用 circuit breaker,而是使用 FutureTask 的缺省超时实现,这个方案取不到值的时候底层会返回 TimeoutException,只要捕获这个超时就可以走入 fallback 逻辑
features.putAll(future.get(paramCollectTimeout, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// fallback logic
}

// 其中 FutureTask 的实现是:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 等待结束后抛出异常而不是空指针,否则调用 report 方法
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

方法2:使用条件变量

1
2
3
4
// 发出调用
condition.await
// 非阻塞地调用 isDone 方法,抛出异常或取值
// 这是方法1 的泛化实现

方法3:使用 countDownLatch/CyclicBarrier

1
2
3
// 发出调用
无锁的 await
// 非阻塞地调用 isDone 方法,抛出异常或取值,但要注意其他线程对 done 状态的维护

这个方法不需要依赖于 ReentrantLock,是通过纯 AQS 实现的,见 CountDownLatch 源码。

自定义线程池实现自定义中断

1
待补充

参考资料:

  1. 《一行一行源码分析清楚AbstractQueuedSynchronizer》
  2. 《Java线程池实现原理及其在美团业务中的实践》
  3. 《Keep Coding》