问题起因

因为工作需要,笔者最近考虑利用 Java 标准文档里的标准 API做一个批量执行任务的框架,:

invokeAll

这个 API 的语义是:

  1. 批量执行任务。
  2. 在任务执行完成(不管任务是被正常终止还是异常终止)后,批量返回一个持有任务状态的 Future 列表。

我们团队内部的代码里已经有了一个自定义线程池的工厂方法,大义是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 一个有缺陷的线程池构造器
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueueSize 拥塞队列尺寸
* @param tag 线程标签
* @return 有缺陷的线程池
*/
public static ThreadPoolExecutor arrayBlockingExecutor(int corePoolSize, int maximumPoolSize
, long keepAliveTime, TimeUnit unit, int blockingQueueSize, final String tag) {
log.info("create thread pool with blocking array queue, "
+ "tag : {}, corePoolSize : {}, maximumPoolSize : {}, "
+ "keepAliveTime : {}, unit : {}, blockingQueueSize : {}",
tag, corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueueSize
);
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new ArrayBlockingQueue<>(blockingQueueSize),
r -> {
Thread thread = new Thread(r);
thread.setName(Joiner.on("_").join(tag, "thread", Thread.NORM_PRIORITY));
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
},
(r, executor) -> log.error(
"{}_thread_full:queue_size={}, ActiveCount={}, CorePoolSize={}, CompletedTaskCount={}",
executor.getQueue().size(), executor.getActiveCount(), executor.getCorePoolSize(),
executor.getCompletedTaskCount()
)
);
}

可是使用这个工厂调用,可能会永久阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) {
// 设计一个特别狭小的线程池,只有一个线程,拥塞队列极易满掉
ExecutorService testPool = arrayBlockingExecutor(
1,
1,
10,
TimeUnit.SECONDS,
1,
"test-pool");

// 生成一系列批量任务
final List<Callable<Integer>> tasks = IntStream.range(1, 10).mapToObj((i) -> {
return new Callable<Integer>() {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Integer call() throws Exception {
// 模仿长时间操作,防止线程一提交即立即执行完
Thread.sleep(1000L);
return i;
}
};
}).collect(Collectors.toList());
try {
testPool.invokeAll(tasks);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 永远打不出这个语句出来
log.info("this executor can run through all tasks");
testPool.shutdownNow();
// 永远打不出这个语句出来
log.info("good bye");
}

我们可以看到控制台的输出如下:

1
2
3
4
5
6
7
8
# 我们可以很容易看到线程池阻塞后,invokeAll 永远都没有返回,所以之后的语句永远都没有打印出来
16:14:57.674 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}
16:14:57.676 [main] ERROR com.magicliang.transaction.sys.common.concurrent.ForeverBlockingExperiment - 1_thread_full:queue_size=1, ActiveCount=1, CorePoolSize=0, CompletedTaskCount={}

我们永远看不到invokeAll 返回。如果我们使用 JMX 工具来查看 Java 线程的状态,我们可以看到:

一个被卡死的主线程

我们的主线程卡死了。

为什么主线程会被卡住

从栈帧我们可以看到,Java 的标准类库里的抽象线程池自己卡住了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 这一行被永远卡住了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

这一行为什么会被卡住呢?这里要简单介绍下 Java 的 future 管理机制的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
// 1. 第一步生成一个 RunnableFuture,实际上是 FutureTask,此时它的 state 仍然是 NEW。
RunnableFuture<T> f = newTaskFor(t);
// 2. 把这个 RunnableFuture 加入 futures 列表
futures.add(f);
// 3. 执行 RunnableFuture,通常这个 execute 会把任务的状态推进到终态
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 如果 3 的执行无问题,此处的 get 能够正常返回,否则无限阻塞
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

execute内部什么时候会出问题呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 简单翻译下 Doug Lea 的设计思想:
* 1. 如果线程总数小于 corePoolSize 先加线程
* 2. 不行就把线程入队
* 3. 不行就增加线程
* 4. 不行就想办法拒绝这个线程
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

我们在线程池满了以后,就会进入我们自定义的rejectedExecution方法:

1
2
3
4
5
6
// 这个方法除了记一行日志,什么都没有做
(r, executor) -> log.error(
"{}_thread_full:queue_size={}, ActiveCount={}, CorePoolSize={}, CompletedTaskCount={}",
executor.getQueue().size(), executor.getActiveCount(), executor.getCorePoolSize(),
executor.getCompletedTaskCount()
)

如果 reject 什么都不做,会发生什么呢?

没有被初始化的future

这里的 state = 0 意味着永远走不到 COMPLETING 的终态:

1
2
3
4
5
6
7
private static final int NEW          = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

这会导致FutureTaskawaitDone永远等待下去:

1
2
3
4
5
6
7
8
9
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

看起来是我们的RejectedExecutionHandler定义有问题,我们来看看 Java 自带的RejectedExecutionHandler是怎么处理这个问题的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 以 CallerRunsPolicy 为例,Doug Lea 的实际做法是调用 runnable 的 run
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

如果call已经执行完成了则可以设值
set的最终状态跃迁

只要任务执行到NORMAL,线程就可以正常退出了。

当然,也有其他人意识到这个问题,给出了基于 CANCEL的解法

那么,是不是只要我们不要用我们自定义的RejectedExecutionHandler就没问题了呢?实际上 JDK 内部自带一个类似的“存在 bug 的”RejectedExecutionHandler,我们团队前人自定义的RejectedExecutionHandler可能是受它启发的产物:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 这个方法的问题在于,它什么都没有做
}
}

我们试试看用它来替代我们自定义的RejectedExecutionHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger(0);
// a very small thread pool which is easy to fulfilled
ExecutorService testPool = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
r -> {
Thread thread = new Thread(r);
thread.setName("test-thread" + counter.getAndIncrement());
return thread;
},
new ThreadPoolExecutor.DiscardPolicy()
);

// gengerate several jobs here
final List<Callable<Integer>> tasks = IntStream.range(1, 10).mapToObj((i) -> {
return new Callable<Integer>() {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Integer call() throws Exception {
// sleep to simulate long-run job
Thread.sleep(1000L);
return i;
}
};
}).collect(Collectors.toList());
try {
testPool.invokeAll(tasks);
} catch (InterruptedException e) {
e.printStackTrace();
}
// you will never see this
log.info("this executor can run through all tasks");
testPool.shutdownNow();
// you will never see this
log.info("good bye");
}

结果仍然是无限阻塞的:

依旧无限阻塞

所以DiscardPolicy在极端场景下会导致线程池无限阻塞,这恐怕是 Doug Lea 最初也没有想到的。

教训

  1. 并不是所有的线程在提交线程池后都会走向终态,线程池并不保证任务总是执行到终态。
  2. 所以所有依赖于Future.get()调用都可能无限阻塞,隐藏得比较深的 API 包括invokeAll(不知道还有多少个类似的 API)。如果程序走向无限阻塞,则线程永远不会退出,这是一种无法在文档里看出来,而且在极端情况下才会出现的 bug。但这种 bug 一旦出现,线程甚至线程池就会“死掉”,最终导致程序万劫不复地雪崩。
  3. 要慎用DiscardPolicy这个RejectedExecutionHandler,它虽然不是设计出来坑害线程池使用者的,但你并不知道线程池什么时候就满了,你也不知道它什么时候会让任务进入未定态。如果可以,不要使用这个策略,使用DiscardOldestPolicy都比它强。不要自作聪明地写一个自己的“空RejectedExecutionHandler”,它一样会造成难以预料的问题。
  4. 慎用无超时的 get(),你并不知道什么时候线程就无限阻塞了。

API 的设计风格问题

当然可能会有人问,是不是我不要使用invokeAll这个 API 就行了?实际上很多年前就有人发现了 JDK 里有一个类似的 bug-ExecutorService::shutdownNow may block invokeAll indefinitely。Doug Lea 的回复是,这些 API 的设计都是遵循 spec 的,你用一些巧妙的用法可以规避这个问题,比如如果你怕 Task 无限阻塞,你可以把它取出来,cancel 它。

问题是:官方文档并没有说明在特定场景下不能正面使用这些 API,而且 cancel 这些任务是需要 Future.cancel接口的。shutdownNow()返回的是List<Runnable>,而invokeAll则会永远阻塞,连返回值都取不到。

于是楼里一位仁兄 Stuart Marks 提了一个刁钻的问题:

Unfortunately shutdownNow returns a List so it’s not clear
(to me at least) how the caller of shutdownNow can cancel those tasks.
Perhaps a note in the documentation could recommend something, though
I’m not sure what the recommendation would be.

这个问题的 workaround 非常丑陋:

1
2
3
4
5
6
7
8
9
Stuart makes a good point. There is a mismatch between the Runnable-oriented Executor interface and the Future-oriented ExecutorService interface. With ThreadPoolExecutor, if you execute() a Runnable, that Runnable is enqueued and returned by shutdownNow. But if you submit() a Runnable, a FutureTask is enqueued and returned by shutdownNow.

So in fact users can in practice do

for (Runnable r : pool.shutdownNow()) {
if (r instanceof Future) ((Future)r).cancel(false);
}

but I don't think we guarantee that they can do this. Is there some way we can improve the docs?

很遗憾,至今 Java 8 的文档仍然没有更新这一点。而这个bug到底要不要修复,自16年以来仍然的 Unresolved 的。什么时候要修复这个bug呢?tbd。

当然,业务代码走到 shutdownNow() 的时候,通常已经不需要关注程序的返回值了,有些线程无限阻塞也无所谓-因为现代的互联网公司的托管程序总带有一些kill -9之类的非优雅关闭策略,有些返回值注定会丢失,而有些永久阻塞注定会随着进程终止而 unblock 掉。

然而,还有大量的程序依赖于无超时的getinvokeAll,业务流程在运行中遇到这个问题将造成无法预测的后果。这类问题似乎不可避免要困扰后面类似的程序员。invokeAll 可能永远受这类问题困扰的原因是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
// 这一步生成的 future 只在方法内部可见
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// invokeAll 在方法内部没有 cancel 的机会,只有无限阻塞
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
// 这个 cancel 在这里毫无用处
futures.get(i).cancel(true);
}
}

后续

我写了一个 bug report 给甲骨文,甲骨文将它收录为:JDK-8286463 : DiscardPolicy may block invokeAll forever。看起来这是一个影响 Java 8 以后全版本的 bug(实际上看 Javadoc的痕迹,应该自 Java 5 juc 诞生依赖这个设计缺陷就存在了),看看 Doug Lea 会不会出手修掉这个bug吧。