从执行器到线程池
Pooling is the grouping together of resources (assets, equipment,
personnel, effort, etc.) for the purposes of maximizing advantage or
minimizing risk to the users. The term is used in finance, computing
and equipment management.——wikipedia
“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。 连接池(Connection
Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。 实例池(Object
Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。
Doug Lea 对线程池的期待有:
改善性能。
有界地利用资源(多次强调 bounds)。
提供统计。
执行器继承体系
“设计良好的API应该简单、一致、可扩展。”
我们将任务交给执行器,于是有了执行器;我们将执行器内部用 FutureTask 包装任务,于是有了同步转异步,异步转同步的设计,和多种 API;我们将执行器用线程池来实现,于是我们得到了 ThreadPoolExecutor。
Executor:只定义"执行"契约
ExecutorService:定义生命周期、 多种任务类型(Runnable/Callable)、批量任务契约
这两层都是契约层,方法之间没有明确关联 。
AbstractExecutorService:只提供算法模板-这一层提供了方法在 execute 之上的实现,把 api 关联起来。但是唯独 execute 的实现空余了。
ThreadPoolExecutor:只实现 execute,并且围绕它搭建了一整套线程池的参考实现:Worker+状态机+队列。
FutureTask:只负责任务包装。但是它的两个父接口让它成为连接了 execute(Runnable)和Future.get()两个世界的桥梁 。
classDiagram
class Executor {
<<interface>>
+execute(Runnable command) void // 基础执行契约
}
class ExecutorService {
<<interface>>
+submit(Callable~T~ task) Future~T~ // 新增:结果获取API
+invokeAll(Collection~Callable~T~~ tasks) List~Future~T~~ // 新增:批量处理API
+shutdown() void // 新增:生命周期API
+awaitTermination(long timeout, TimeUnit unit) boolean // 新增:等待关闭API
}
class AbstractExecutorService {
<<abstract>>
# 所有高级功能都围绕 newTaskFor(有返回值和无返回值的任务类型) + execute 构建
#newTaskFor(Callable~T~ callable) RunnableFuture~T~ // 新增:任务包装工厂API
#newTaskFor(Runnable runnable, T value) RunnableFuture~T~ // 新增:任务包装工厂API
+submit(Callable~T~ task) Future~T~ // 实现:包装+提交算法
+invokeAll(Collection~Callable~T~~ tasks) List~Future~T~~ // 实现:批量包装+提交算法
+invokeAny(Collection~Callable~T~~ tasks) T // 实现:任意完成算法
// 关键:execute() 是抽象的,留给子类实现
~execute(Runnable command) void
}
class ThreadPoolExecutor {
-corePoolSize: int
-maximumPoolSize: int
-workQueue: BlockingQueue~Runnable~
-workers: HashSet~Worker~
+execute(Runnable command) void
+shutdown() void
+beforeExecute(Thread t, Runnable r) void
+afterExecute(Runnable r, Throwable t) void
}
class ForkJoinPool {
<<concrete>>
-parallelism: int
-workQueues: WorkQueue[] // 工作窃取队列数组
-commonPool: ForkJoinPool // 静态共享池
+execute(ForkJoinTask~?~ task) void // 重载 execute,专用于 ForkJoinTask
+submit(ForkJoinTask~T~ task) ForkJoinTask~T~ // 支持 ForkJoinTask 的 submit
+invoke(ForkJoinTask~T~ task) T // 同步调用并返回结果(核心方法)
+shutdown() void
+awaitTermination(...) boolean
// 内部线程继承 ForkJoinWorkerThread
}
class ForkJoinTask {
<<abstract>>
-status: int
+fork() ForkJoinTask~T~ // 异步执行(入队到当前线程的队列)
+join() T // 阻塞等待结果
+invoke() T // fork + join 的快捷方式
+tryUnfork() boolean
+quietlyComplete()
}
class RecursiveTask {
<<abstract>>
+compute() V // 用户实现逻辑,返回结果
}
class RecursiveAction {
<<abstract>>
+compute() void // 用户实现逻辑,无返回值
}
class FutureTask {
-callable: Callable~V~
-outcome: Object
-state: int
-runner: Thread
+run() void
+get() V
+cancel(boolean mayInterruptIfRunning) boolean
}
class Worker {
-thread: Thread
-firstTask: Runnable
+run() void
}
class ForkJoinWorkerThread {
<<concrete>>
-pool: ForkJoinPool
-workQueue: WorkQueue
+run() void // 执行工作窃取循环
}
%% 关系
Executor <|-- ExecutorService : "adds lifecycle+results API"
ExecutorService <|-- AbstractExecutorService : "adds algorithm implementation"
AbstractExecutorService <|-- ThreadPoolExecutor : "adds thread pool strategy"
AbstractExecutorService <|-- ForkJoinPool : "adds work-stealing strategy"
AbstractExecutorService ..> FutureTask : "creates via newTaskFor()"
ThreadPoolExecutor o-- Worker : "manages pool of"
Worker ..> Runnable : "executes tasks"
ThreadPoolExecutor ..> FutureTask : "executes via workers"
ForkJoinPool o-- ForkJoinWorkerThread : "manages pool of"
ForkJoinWorkerThread ..> WorkQueue : "owns"
WorkQueue ..> ForkJoinTask : "holds tasks"
ForkJoinPool ..> ForkJoinTask : "executes via work-stealing"
ForkJoinTask <|-- RecursiveTask : "returns result"
ForkJoinTask <|-- RecursiveAction : "no result"
FutureTask ..|> RunnableFuture : "implements"
RunnableFuture --|> Runnable : "extends"
RunnableFuture --|> Future : "extends"
ForkJoinTask ..|> Future : "implements (indirectly via ForkJoinTask<V> extends Future<V>)"
ForkJoinTask ..|> Serializable : "implements"
Executor 接口
将任务提交和任务执行进行解耦(decoupling the execution mechanic )。用户无需关注如何创建线程,如何调度线程(scheduling)来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。
JUC 里所有的解耦设计都不一定是异步的,它只是解耦 ,所以执行器本身也是可以同步执行的:
1 2 3 4 5 6 class DirectExecutor implements Executor { public void execute (Runnable r) { r.run(); } }
一般而言可以认为,executor 会 spawns a new thread for each task.
ExecutorService 接口
增加了一些能力:
扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法 (),从这里开始执行器开始可以执行异步任务 :
1 2 3 4 5 6 7 8 public Future<?> submit(Runnable task)public <T> Future<T> submit (Runnable task, T result) public <T> Future<T> submit (Callable<T> task) <T> T invokeAny (Collection<? extends Callable<T>> tasks) <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
在上面的方法里,submit 能接收无结果的 Runnable、有结果的 Runnable、能返回结果的 Callable,再加上底层无返回结果的 execute,构成了4个基础的单任务api。
ExecutorService 还提供了管控线程池的方法,比如停止线程池的运行。
shutdown 拒绝接收任务,触发 rejection policy。
shutdownNow 除了 shutdown 的功能以外,还会强制触发线程中断。
Memory consistency effects:future.get 满足 JSL 定义的 Memory consistency properties,也就是 happens before relation。
理解 happens before relation 一定不要按照硬件的工作方式来理解(Flushing model is fundamentally flawed (it is just not how hardware works) ),最好从 JLS 的规范出发。
AbstractExecutorService
将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法execute即可。其他契约的借口全都在这一层实现了:
大部分的任务接口在这一层有了实现,它们最终都调向了 execute() 接口。
生命周期接口留给下一层。
这一层大部分的方法实现都是这样的:
把任务包装成 FutureTask。
用 execute 执行这个 FutureTask。因为这个 FutureTask 包住了原始的 run 方法,所以它的 exception 处理机制会比 UncaughtExceptionHandler 更快一步拦截异常。
把 FutureTask 实例阻塞或者不阻塞地在同步流程里直接返回。
外部调用者不断与这些 task 通讯决定是否状态完成。
其中多种调用模式的层叠大概是这样的:
sequenceDiagram
participant User
participant AbstractExecutorService
participant ThreadPoolExecutor
participant WorkerThread
participant FutureTask
User->>AbstractExecutorService: invokeAll(tasks)
loop for each task
AbstractExecutorService->>AbstractExecutorService: newTaskFor(task)
AbstractExecutorService->>FutureTask: new FutureTask(task)
AbstractExecutorService->>ThreadPoolExecutor: execute(futureTask)
ThreadPoolExecutor->>WorkerThread: 分配任务
WorkerThread->>FutureTask: futureTask.run()
FutureTask->>Task: callable.call()
Task-->>FutureTask: 返回结果
FutureTask->>FutureTask: set(result)
end
AbstractExecutorService->>AbstractExecutorService: for each future: future.get()
loop for each future
AbstractExecutorService->>FutureTask: future.get()
alt 任务已完成
FutureTask-->>AbstractExecutorService: 立即返回结果
else 任务未完成
FutureTask-->>FutureTask: 挂起等待
FutureTask->>FutureTask: 任务完成后唤醒
FutureTask-->>AbstractExecutorService: 返回结果
end
end
AbstractExecutorService-->>User: 返回所有Future结果
ThreadPoolExecutor
实现了 execute,围绕execute 的批量和异步化给出了一个经典的线程池实现。
将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ForkJoinPool
这个线程池本身就是一个复杂框架,为 JDK 其他组件提供 yet another executor alternative。
这个框架有个特点:
产生的线程默认是守护线程。
产生的线程会自动收缩-不存在空转的 core thread 问题。
公共线程池的名字一般叫“ForkJoinPool.commonPool-worker-1”。
这里就要讨论到一个很多人忽略的问题:我们如何决定何时使用守护类线程。这类线程可以用来执行一些:
临时执行的任务,这些任务之间如果存在父子关系更好。
后台监控类任务。
某些与 io 解耦的计算任务。
这类线程池(包括守护线程本身)适合执行计算密集型任务,不适合执行 io 密集型任务,不然:
工作线程池会在 JVM 关闭时被无声无息地杀死。
当其他非守护线程都结束后,这些守护线程的存在反而是 JVM 进入关闭态的理由。
典型的工作线程池就是 IO 线程池,和与他们绑定的计算线程池。
初始化这类线程池有一些简单的工厂方法,比原始构造器更加可用: Executors.newWorkStealingPool(int parallelism)。
更多内容见《线程池详解:ForkJoinPool》 。
The Executors
provides convenient factory methods for these Executors.
层次调用关系
FutureTask 作为线程池的核心任务载体
以 FutureTask 为核心,可以看出如下关系:
classDiagram
class FutureTask {
-callable: Callable~V~
-outcome: Object
-state: int
-runner: Thread
+run() void
+get() V
+cancel(boolean) boolean
+isDone() boolean
}
class RunnableFuture {
<<interface>>
+run() void
}
class Runnable {
<<interface>>
+run() void
}
class Future {
<<interface>>
+get() V
+cancel(boolean) boolean
+isDone() boolean
}
class Callable {
<<interface>>
+call() V
}
FutureTask ..|> RunnableFuture : implements
RunnableFuture --|> Runnable : extends
RunnableFuture --|> Future : extends
FutureTask o-- Callable : has
FutureTask o-- Object : stores
FutureTask o-- Thread : references
我们先产生一个新的 Future 接口,然后把 Runnable 和 Future 联合继承出 RunnableFuture。这两个类型一个是对内线程池使用,一个是对外对任务的使用者使用。
然后 RunnableFuture 有个经典实现类 FutureTask-这是两个经典的 RunnableFuture 的经典实现之一(All Known Implementing Classes:FutureTask, SwingWorker),可以认为 FutureTask 是可执行的 Future 的最忠实实现。它本身不是 Callble,但是包含 Callable 。它的5个一级子成员:
callable
outCome 这个成员同时可以是 result 也可以是异常,由 report 自行决定处理方式
runner:用来状态检查,并持有它保证拥有中断能力、取消能力
state:这是线程池里出现的第一个状态,
因为是 runnable 包着 callable,所以底层的运行模式大概是这样的:
1 2 3 4 5 6 7 8 9 10 11 task.run() { try { outcome = callable.call(); state = NORMAL; } catch (Exception e) { exception = e; state = EXCEPTIONAL; } }
Thread 模型底层
之所以这样设计,实际上是因为 Thread 模型底层只支持 run 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Thread implements Runnable { private Runnable target; public Thread (Runnable target) { this .target = target; } @Override public void run () { if (target != null ) { target.run(); } } }
底层的 cpp 源码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 JVM_ENTRY (void , JVM_StartThread (JNIEnv* jni, jobject jthread)) JVMWrapper ("JVM_StartThread" ); JavaThread* native_thread = java_lang_Thread::thread (JNIHandles::resolve_non_null (jthread)); if (native_thread->is_being_ext_suspended ()) { native_thread->set_terminated_before_start (true ); } if (native_thread->osthread () != NULL ) { if (os::create_thread (native_thread, java_thread)) { native_thread->set_state (INITIALIZED); os::start_thread (native_thread); } } JVM_ENDbool os::create_thread (Thread* thread, ThreadType thr_type) { pthread_attr_t attr; pthread_attr_init (&attr); size_t stack_size = ...; pthread_attr_setstacksize (&attr, stack_size); pthread_t tid; int ret = pthread_create (&tid, &attr, thread_native_entry, thread); if (ret == 0 ) { thread->set_thread_id (tid); return true ; } return false ; }static void * thread_native_entry (Thread* thread) { thread->set_state (RUNNABLE); thread->run (); return NULL ; }class JavaThread : public Thread {private : oop _threadObj; OSThread* _osthread; volatile JavaThreadState _state; public : void run () { this ->thread_main_inner (); } void thread_main_inner () { if (has_java_lang_thread ()) { JavaCalls::call_virtual ( &result, klass, method, threadObj, CHECK ); } } };static void call_run_method (JNIEnv* env, jobject jthread) { jclass threadClass = env->FindClass ("java/lang/Thread" ); jmethodID runMethod = env->GetMethodID (threadClass, "run" , "()V" ); env->CallVoidMethod (jthread, runMethod); }
整体调用的流程是从 java 到 cpp 再到 java 的:
1 2 3 4 5 6 7 8 9 10 11 12 thread.start () → Thread.start () [Java] → start0 () [native] → JVM_StartThread () [JVM C++] → os::create_thread () [JVM C++] → pthread_create () [Linux C] → thread_native_entry () [JVM C++] → JavaThread::run () [JVM C++] → JNI: CallVoidMethod (threadObj, "run" ) [JNI] → Thread.run () [Java] → target.run () [Java]
classDiagram
class JavaThread {
-OSThread* _osthread
-oop _threadObj
-JavaThreadState _state
-address _stack_base
-size_t _stack_size
+run()
+thread_main_inner()
+osthread() OSThread*
+threadObj() oop
}
class OSThread {
-pthread_t _thread_id
-int _thread_state
+set_thread_id(pthread_t)
+thread_id() pthread_t
}
class JavaThreadObj {
<<Java Object>>
-Runnable target
-int threadStatus
+start()
+run()
}
JavaThread o-- OSThread : "持有操作系统线程"
JavaThread o-- JavaThreadObj : "关联Java对象"
sequenceDiagram
participant User as Java代码
participant JVM as JVM(JavaThread)
participant OS as 操作系统
User->>JVM: new Thread(runnable)
JVM->>JVM: 创建JavaThread对象
JVM->>JVM: 创建OSThread对象
JVM->>JVM: 关联JavaThread和OSThread
User->>JVM: thread.start()
JVM->>JVM: 检查状态(threadStatus)
JVM->>JVM: 添加到线程组
JVM->>JVM: 调用start0()(native)
JVM->>OS: os::create_thread()
OS->>OS: 创建pthread线程
OS->>OS: 设置入口为thread_native_entry
OS-->>JVM: 线程创建成功
JVM-->>User: start()返回
Note over OS: 新线程开始执行
OS->>JVM: thread_native_entry(JavaThread*)
JVM->>JVM: 设置线程状态为RUNNABLE
JVM->>JVM: thread->run()
JVM->>JVM: thread_main_inner()
JVM->>JVM: JNI: 查找Thread.run()方法
JVM->>JVM: JavaCalls::call_virtual()
JVM->>User: 调用Thread.run()
User->>User: target.run()(如果target!=null)
User-->>JVM: 返回
JVM->>JVM: 线程结束清理
JVM->>OS: 释放操作系统资源
FutureTask 的 run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class FutureTask <V> implements RunnableFuture <V> { public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
FutureTask 的 runner
其中 runner 的注入/获取流程大致如下:
sequenceDiagram
participant ThreadA
participant FutureTask
participant ThreadB
ThreadA->>FutureTask: submit(task)
FutureTask->>FutureTask: state = NEW, runner = null
ThreadB->>FutureTask: execute(task) // 线程池线程
FutureTask->>FutureTask: CAS设置 runner = ThreadB
FutureTask->>ThreadB: 执行任务逻辑
Note right of FutureTask: 此时 runner = ThreadB
ThreadA->>FutureTask: cancel(true)
FutureTask->>FutureTask: 检查 mayInterruptIfRunning = true
FutureTask->>ThreadB: runner.interrupt() // 使用runner字段
ThreadB-->>FutureTask: 任务被中断
FutureTask->>FutureTask: state = INTERRUPTED
FutureTask->>FutureTask: runner = null // 清理引用
Note right of FutureTask: 任务完成,runner = null
invokeAll
invokeAll 是有界的 ,如果一次性提交了超过它界限的任务,即使这些任务是一瞬间执行的-invokeAll 也会触发拒绝,除非任务执行的速度比 for 循环调用底层的 execute 的速度还要快。
如果有得选,我们批量执行任务应该尽量采用 invokeAll,因为它带有这些特殊的代码块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null ) throw new NullPointerException (); long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList <Future<T>>(tasks.size()); boolean done = false ; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); final long deadline = System.nanoTime() + nanos; final int size = futures.size(); for (int i = 0 ; i < size; i++) { execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); if (nanos <= 0L ) return futures; } for (int i = 0 ; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { if (nanos <= 0L ) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } nanos = deadline - System.nanoTime(); } } done = true ; return futures; } finally { if (!done) for (int i = 0 , size = futures.size(); i < size; i++) futures.get(i).cancel(true ); } }
两种针对 Runnable 的 submit
1 2 3 4 5 6 7 Future<?> future3 = executor.submit(() -> System.out.println("普通任务" ));Object result3 = future3.get(); Future<String> future4 = executor.submit(() -> System.out.println("带结果的任务" ), "自定义结果" );String result4 = future4.get();
这里面使用到了2种适配器:
1 2 3 4 5 6 7 用户层:需要 submit (Runnable, T result) 这样的高级API ↓ 适配层:RunnableAdapter 提供语义适配 ↓ 执行层:FutureTask 提供接口适配 ↓ 基础层:ThreadPoolExecutor 只认 execute (Runnable)
也就是说,底层的 execute 本身要求一个包含 callable + result 的 runnbale - FutureTask(向底层的 execute api 适配,向外提供 Future 的 get、cancel 等能力),但是这样的 callable 最初又要经过 RunnableAdapter 从 Runnable 得来(向上向原始的没有返回值的 Runnable 适配)。
想象一个国际物流系统:
FutureTask:相当于报关代理,将"有特殊要求的货物"(Callable)转换成"标准集装箱"(Runnable)以便运输
RunnableAdapter:相当于包装服务,将"普通货物"(Runnable)包装成"带保价标签的货物"(Callable),提供额外保障
两类底层调用链:
线程池如何维护自身状态
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
1 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况 。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public static void main (String[] args) { int COUNT_BITS = Integer.SIZE - 3 ; System.out.println(COUNT_BITS); final int shifted = 1 << COUNT_BITS; System.out.println(Integer.toBinaryString(shifted)); int CAPACITY = shifted - 1 ; System.out.println(Integer.toBinaryString(CAPACITY)); System.out.println(Integer.toBinaryString(~CAPACITY)); System.out.println(Integer.toBinaryString(~CAPACITY)); int RUNNING = -1 << COUNT_BITS; System.out.println(Integer.toBinaryString(-1 )); System.out.println(Integer.toBinaryString(RUNNING)); int SHUTDOWN = 0 << COUNT_BITS; System.out.println(Integer.toBinaryString(SHUTDOWN)); int STOP = 1 << COUNT_BITS; System.out.println(Integer.toBinaryString(STOP)); int TIDYING = 2 << COUNT_BITS; System.out.println(Integer.toBinaryString(TIDYING)); int TERMINATED = 3 << COUNT_BITS; System.out.println(Integer.toBinaryString(TERMINATED)); } private static int runStateOf (int c) { int CAPACITY = getCapacity(); return c & ~CAPACITY; } private static int workerCountOf (int c) { int CAPACITY = getCapacity(); return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; } private static int getCapacity () { int COUNT_BITS = Integer.SIZE - 3 ; final int shifted = 1 << COUNT_BITS; int CAPACITY = shifted - 1 ; return CAPACITY; }
运行状态
状态描述
RUNNING
能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN
关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP
不能接受新任务,【也不处理队列中的任务,会中断正在处理任务的线程。】增加了两条措施,是一个更严厉的状态,理论上只要线程被中断完,线程池就可以走向关闭
TIDYING
所有的任务都已终止了,workerCount (有效线程数) 为0,这个状态的意思不是整理中,而是整理完了。
TERMINATED
在terminated() 方法执行完后进入该状态。
其中 running 既是初始态,也是中间态,所以才有private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));作为初始化块的一部分。
尝试关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
线程池如何管理任务
每个线程池的 Worker 管理的实质上是 FutureTask,它既是Callable(确切地说,wrap Callable),也是Future(一个最完美的任务是一个RunnableFuture<V>,用成员变量来帮助 Runnable来保存一个Callable的返回值,以供Future使用):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class FutureTask <V> implements RunnableFuture <V> { }
线程池使用一个把 Runnable 转变为 Callable 的适配器(Callable 转 Runnable 理论上也是容易做到的,但应该没有必要转换),来兼容把 Runnable 传进 submit 的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }public static <T> Callable<T> callable (Runnable task, T result) { if (task == null ) throw new NullPointerException (); return new RunnableAdapter <T>(task, result); } static final class RunnableAdapter <T> implements Callable <T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this .task = task; this .result = result; } public T call () { task.run(); return result; } }
FutureTask 实现了 RunnableFuture,它本质上是一个携带 Runnable 和 state 的任务。
首先看它的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private volatile int state;
FutureTask 状态转换图 :
stateDiagram-v2
[*] --> NEW: 任务创建
NEW --> COMPLETING: call()执行完成
NEW --> CANCELLED: cancel(false)
NEW --> INTERRUPTING: cancel(true)
COMPLETING --> NORMAL: 正常结果设置完成
COMPLETING --> EXCEPTIONAL: 异常结果设置完成
INTERRUPTING --> INTERRUPTED: 中断完成
NORMAL --> [*]: 终态
EXCEPTIONAL --> [*]: 终态
CANCELLED --> [*]: 终态
INTERRUPTED --> [*]: 终态
note right of NEW: 初始状态,任务可被取消
note right of COMPLETING: 瞬态,正在设置结果
note right of INTERRUPTING: 瞬态,正在中断runner
1 2 3 4 5 6 7 private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
值得一提的是,任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,所以可以这么说:
只要state不处于 NEW 状态,就说明任务已经执行完毕。
注意,这里的执行完毕是指传入的Callable对象的call方法执行完毕 ,或者抛出了异常 。所以这里的COMPLETING的名字显得有点迷惑性,它并不意味着任务正在执行中,而意味着call方法已经执行完毕,正在设置任务执行的结果。
换言之,只有 NEW 状态才是 cancellable 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters;public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException (); this .callable = callable; this .state = NEW; }public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
它的状态管理方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public boolean isCancelled () { return state >= CANCELLED; } public boolean isDone () { return state != NEW; } private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; } public static void unpark (Thread thread) { if (thread != null ) UNSAFE.unpark(thread); }
实际被工作线程调度的 run 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run 有一个重跑版本,这个版本会重复执行,但不会影响 get 的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected boolean runAndReset () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return false ; boolean ran = false ; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); ran = true ; } catch (Throwable ex) { setException(ex); } } } finally { runner = null ; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
在 FutureTask 里有三类终态方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } }protected void setException (Throwable t) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL); finishCompletion(); } }public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
如果程序进入终态,则 get 终于可以得到合理的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); } public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null ) throw new NullPointerException (); int s = state; if (s <= COMPLETING && (s = awaitDone(true , unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException (); return report(s); }
其中等待流程见:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException (); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield (); else if (q == null ) q = new WaitNode (); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } } static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } private void removeWaiter (WaitNode node) { if (node != null ) { node.thread = null ; retry: for (;;) { for (WaitNode pred = null , q = waiters, s; q != null ; q = s) { s = q.next; if (q.thread != null ) pred = q; else if (pred != null ) { pred.next = s; if (pred.thread == null ) continue retry; } else if (!UNSAFE.compareAndSwapObject(this , waitersOffset, q, s)) continue retry; } break ; } } }
然后就把outcome 通过 report 传出来:
1 2 3 4 5 6 7 8 9 10 11 @SuppressWarnings("unchecked") private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException (); throw new ExecutionException ((Throwable)x); }
任务执行
提交任务调度
首先检测线程池运行状态,如果不是 RUNNING,则直接拒绝,线程池要保证在 RUNNING 的状态下执行任务。
如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦 ,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是**(阻塞的本质即为此):在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。**阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
名称
描述
ArrayBlockingQueue
一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁 。
DelayQueue
一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半 。
LinkedBlockingQueue
一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险 。
LinkedTransferQueue
一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
PriorityBlockingQueue
一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
SynchronousQueue
一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收 。
阻塞队列选择决策流程图:
flowchart TD
A[选择阻塞队列] --> B{是否需要有界队列?}
B -->|是| C{是否需要公平性?}
B -->|否| D{是否需要优先级?}
C -->|是| E[ArrayBlockingQueue<br/>公平模式]
C -->|否| F{数组还是链表?}
F -->|数组-内存连续| G[ArrayBlockingQueue<br/>非公平模式]
F -->|链表-动态扩展| H[LinkedBlockingQueue<br/>指定容量]
D -->|是| I[PriorityBlockingQueue]
D -->|否| J{是否需要延迟?}
J -->|是| K[DelayQueue]
J -->|否| L{是否需要直接传递?}
L -->|是| M[SynchronousQueue]
L -->|否| N[LinkedBlockingQueue<br/>默认容量]
style E fill:#90EE90
style G fill:#90EE90
style H fill:#87CEEB
style I fill:#FFB6C1
style K fill:#DDA0DD
style M fill:#F0E68C
style N fill:#87CEEB
各种阻塞队列的使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 import java.util.concurrent.*;public class BlockingQueueExamples { public static void arrayBlockingQueueExample () { BlockingQueue<String> fairQueue = new ArrayBlockingQueue <>(10 , true ); BlockingQueue<String> unfairQueue = new ArrayBlockingQueue <>(10 , false ); ExecutorService executor = new ThreadPoolExecutor ( 4 , 8 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(100 ), new ThreadPoolExecutor .CallerRunsPolicy() ); } public static void linkedBlockingQueueExample () { BlockingQueue<String> boundedQueue = new LinkedBlockingQueue <>(1000 ); BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue <>(); ExecutorService executor = new ThreadPoolExecutor ( 10 , 10 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <>(5000 ), new ThreadPoolExecutor .AbortPolicy() ); } public static void synchronousQueueExample () { BlockingQueue<String> unfairQueue = new SynchronousQueue <>(); BlockingQueue<String> fairQueue = new SynchronousQueue <>(true ); ExecutorService executor = new ThreadPoolExecutor ( 0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); } public static void priorityBlockingQueueExample () { BlockingQueue<Integer> naturalQueue = new PriorityBlockingQueue <>(); BlockingQueue<PriorityTask> customQueue = new PriorityBlockingQueue <>( 11 , (t1, t2) -> Integer.compare(t2.priority, t1.priority) ); ExecutorService executor = new ThreadPoolExecutor ( 4 , 4 , 0L , TimeUnit.MILLISECONDS, new PriorityBlockingQueue <>() ); } static class PriorityTask implements Runnable , Comparable<PriorityTask> { final int priority; final String name; PriorityTask(int priority, String name) { this .priority = priority; this .name = name; } @Override public void run () { System.out.println("执行任务: " + name + ", 优先级: " + priority); } @Override public int compareTo (PriorityTask other) { return Integer.compare(other.priority, this .priority); } } public static void delayQueueExample () throws InterruptedException { DelayQueue<DelayedTask> delayQueue = new DelayQueue <>(); delayQueue.put(new DelayedTask ("任务1" , 3 , TimeUnit.SECONDS)); delayQueue.put(new DelayedTask ("任务2" , 1 , TimeUnit.SECONDS)); delayQueue.put(new DelayedTask ("任务3" , 2 , TimeUnit.SECONDS)); while (!delayQueue.isEmpty()) { DelayedTask task = delayQueue.take(); System.out.println("执行: " + task.name); } } static class DelayedTask implements Delayed { final String name; final long expireTime; DelayedTask(String name, long delay, TimeUnit unit) { this .name = name; this .expireTime = System.nanoTime() + unit.toNanos(delay); } @Override public long getDelay (TimeUnit unit) { long remaining = expireTime - System.nanoTime(); return unit.convert(remaining, TimeUnit.NANOSECONDS); } @Override public int compareTo (Delayed other) { if (other instanceof DelayedTask) { return Long.compare(this .expireTime, ((DelayedTask) other).expireTime); } return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); } } public static void linkedBlockingDequeExample () { LinkedBlockingDeque<String> deque = new LinkedBlockingDeque <>(100 ); deque.addFirst("头部元素" ); deque.offerFirst("头部元素2" ); deque.addLast("尾部元素" ); deque.offerLast("尾部元素2" ); String ownTask = deque.pollFirst(); String stolenTask = deque.pollLast(); } }
阻塞队列性能对比:
graph LR
subgraph 吞吐量对比
A[SynchronousQueue] -->|最高| B[直接传递无锁竞争]
C[LinkedBlockingQueue] -->|高| D[读写分离锁]
E[ArrayBlockingQueue] -->|中| F[单锁]
G[PriorityBlockingQueue] -->|低| H[堆排序开销]
end
subgraph 内存占用对比
I[SynchronousQueue] -->|最低| J[不存储元素]
K[ArrayBlockingQueue] -->|固定| L[预分配数组]
M[LinkedBlockingQueue] -->|动态| N[按需分配节点]
O[PriorityBlockingQueue] -->|动态| P[堆结构]
end
任务拒绝
当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize 时,如果还有任务到来就会采取任务拒绝策略。
拒绝策略触发流程:
sequenceDiagram
participant Client as 客户端
participant TPE as ThreadPoolExecutor
participant Queue as 阻塞队列
participant Handler as RejectedExecutionHandler
Client->>TPE: execute(task)
TPE->>TPE: 检查线程数 < corePoolSize?
alt 线程数 < corePoolSize
TPE->>TPE: 创建核心线程执行
else 线程数 >= corePoolSize
TPE->>Queue: offer(task)
alt 队列未满
Queue-->>TPE: true
TPE-->>Client: 任务入队成功
else 队列已满
Queue-->>TPE: false
TPE->>TPE: 检查线程数 < maximumPoolSize?
alt 线程数 < maximumPoolSize
TPE->>TPE: 创建非核心线程执行
else 线程数 >= maximumPoolSize
TPE->>Handler: rejectedExecution(task, executor)
Note over Handler: 执行拒绝策略
end
end
end
四种内置拒绝策略详解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class RejectionPolicyExamples { public static void abortPolicyExample () { ThreadPoolExecutor executor = new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .AbortPolicy() ); try { for (int i = 0 ; i < 3 ; i++) { final int taskId = i; executor.execute(() -> { try { Thread.sleep(1000 ); System.out.println("任务" + taskId + "完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } } catch (RejectedExecutionException e) { System.err.println("任务被拒绝: " + e.getMessage()); } finally { executor.shutdown(); } } public static void callerRunsPolicyExample () { ThreadPoolExecutor executor = new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .CallerRunsPolicy() ); System.out.println("主线程: " + Thread.currentThread().getName()); for (int i = 0 ; i < 3 ; i++) { final int taskId = i; executor.execute(() -> { System.out.println("任务" + taskId + "在线程" + Thread.currentThread().getName() + "中执行" ); try { Thread.sleep(500 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executor.shutdown(); } public static void discardPolicyExample () { ThreadPoolExecutor executor = new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .DiscardPolicy() ); AtomicInteger completedCount = new AtomicInteger (0 ); for (int i = 0 ; i < 10 ; i++) { final int taskId = i; executor.execute(() -> { completedCount.incrementAndGet(); System.out.println("任务" + taskId + "完成" ); }); } executor.shutdown(); try { executor.awaitTermination(5 , TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("完成任务数: " + completedCount.get() + "/10" ); } public static void discardOldestPolicyExample () { ThreadPoolExecutor executor = new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(2 ), new ThreadPoolExecutor .DiscardOldestPolicy() ); for (int i = 0 ; i < 5 ; i++) { final int taskId = i; executor.execute(() -> { System.out.println("执行任务" + taskId); try { Thread.sleep(500 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); System.out.println("提交任务" + taskId); } executor.shutdown(); } public static void customPolicyExample () { RejectedExecutionHandler customHandler = (runnable, executor) -> { System.err.println("任务被拒绝: " + runnable.toString()); System.err.println("线程池状态 - 活跃线程: " + executor.getActiveCount() + ", 队列大小: " + executor.getQueue().size()); throw new RejectedExecutionException ("自定义拒绝: 线程池已满" ); }; ThreadPoolExecutor executor = new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(1 ), customHandler ); try { for (int i = 0 ; i < 3 ; i++) { executor.execute(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } } catch (RejectedExecutionException e) { System.out.println("捕获到自定义拒绝异常: " + e.getMessage()); } executor.shutdown(); } public static ThreadPoolExecutor createProductionExecutor () { RejectedExecutionHandler productionHandler = (runnable, executor) -> { String taskInfo = runnable.toString(); int queueSize = executor.getQueue().size(); int activeCount = executor.getActiveCount(); int poolSize = executor.getPoolSize(); System.err.printf("任务被拒绝 [task=%s, queue=%d, active=%d, pool=%d]%n" , taskInfo, queueSize, activeCount, poolSize); if (!executor.isShutdown()) { try { boolean offered = executor.getQueue().offer(runnable, 100 , TimeUnit.MILLISECONDS); if (offered) { System.out.println("重试入队成功" ); return ; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (!executor.isShutdown()) { runnable.run(); } }; return new ThreadPoolExecutor ( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(1000 ), new ThreadFactoryBuilder () .setNameFormat("production-pool-%d" ) .setUncaughtExceptionHandler((t, e) -> System.err.println("线程异常: " + e.getMessage())) .build(), productionHandler ); } static class ThreadFactoryBuilder { private String nameFormat; private Thread.UncaughtExceptionHandler handler; ThreadFactoryBuilder setNameFormat (String format) { this .nameFormat = format; return this ; } ThreadFactoryBuilder setUncaughtExceptionHandler (Thread.UncaughtExceptionHandler h) { this .handler = h; return this ; } ThreadFactory build () { AtomicInteger count = new AtomicInteger (0 ); return r -> { Thread t = new Thread (r); if (nameFormat != null ) { t.setName(String.format(nameFormat, count.getAndIncrement())); } if (handler != null ) { t.setUncaughtExceptionHandler(handler); } return t; }; } } }
拒绝策略对比图:
graph TB
subgraph 拒绝策略对比
A[AbortPolicy] --> A1[抛出异常]
A1 --> A2[调用方感知]
A2 --> A3[适合:需要明确处理拒绝的场景]
B[CallerRunsPolicy] --> B1[调用者执行]
B1 --> B2[提供反压]
B2 --> B3[适合:不能丢弃任务的场景]
C[DiscardPolicy] --> C1[静默丢弃]
C1 --> C2[无感知]
C2 --> C3[适合:允许丢弃且无需感知]
D[DiscardOldestPolicy] --> D1[丢弃最老任务]
D1 --> D2[新任务优先]
D2 --> D3[适合:新任务更重要的场景]
end
style A fill:#FF6B6B
style B fill:#4ECDC4
style C fill:#95A5A6
style D fill:#F39C12
任务申请
任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
任务的执行主要有 submit->execute,submit 的主要逻辑是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
execute 的主要逻辑是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 execute(Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
这需要用到尝试增加线程
线程池如何管理线程
核心线程的 idle 不影响核心线程的创建;非核心线程的 idle time 会导致它们退出。
尝试增加线程
注意 addWorker 只是 execute 的一个子分支而已。
Worker 可以被认为是线程和锁的结合体,它的使命就是不断地把 runnable 从缓冲队列里拿出来,放在自己的 thread 里执行,其中关键的方法是 addWorker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }private void decrementWorkerCount () { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }private boolean compareAndIncrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect + 1 ); }
线程执行
线程的执行强依赖于 worker 本身的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker 生命周期状态图:
stateDiagram-v2
[*] --> Created: new Worker(firstTask)
Created --> Initialized: setState(-1)<br/>禁止中断
Initialized --> Started: thread.start()
Started --> Running: runWorker(this)
Running --> Unlocked: w.unlock()<br/>允许中断
state Running {
Unlocked --> WaitingTask: getTask()
WaitingTask --> GotTask: task != null
WaitingTask --> NoTask: task == null
GotTask --> Locked: w.lock()
Locked --> Executing: task.run()
Executing --> Unlocked: w.unlock()
}
NoTask --> Exiting: processWorkerExit()
Exiting --> [*]: 线程终止
note right of Created
Worker 构造时:
1. setState(-1) 禁止中断
2. 保存 firstTask
3. 通过 ThreadFactory 创建线程
end note
note right of WaitingTask
getTask() 返回 null 的情况:
1. 线程数 > maximumPoolSize
2. 线程池 STOP 状态
3. 线程池 SHUTDOWN 且队列空
4. 等待超时
end note
Worker 锁状态转换图:
graph TD
subgraph Worker锁状态
A[state = -1<br/>初始化状态] -->|w.unlock| B[state = 0<br/>空闲状态]
B -->|w.lock| C[state = 1<br/>执行任务中]
C -->|w.unlock| B
end
subgraph 锁状态含义
D[-1: 禁止中断<br/>Worker刚创建]
E[0: 允许中断<br/>Worker空闲等待任务]
F[1: 正在执行任务<br/>不应被中断]
end
subgraph 中断规则
G[interruptIfStarted] --> H{getState >= 0?}
H -->|是| I[可以中断]
H -->|否| J[不能中断]
end
style A fill:#FFB6C1
style B fill:#90EE90
style C fill:#87CEEB
runWorker 执行流程详解:
sequenceDiagram
participant TPE as ThreadPoolExecutor
participant W as Worker
participant T as Thread
participant Q as BlockingQueue
Note over W: 构造函数: setState(-1)
TPE->>W: addWorker(task, core)
TPE->>T: thread.start()
T->>W: run()
W->>TPE: runWorker(this)
TPE->>W: w.unlock() // setState(0)
Note over W: 现在允许中断
loop 任务循环
alt 有 firstTask
TPE->>TPE: task = firstTask
else 无 firstTask
TPE->>Q: getTask()
Q-->>TPE: task 或 null
end
alt task != null
TPE->>W: w.lock() // setState(1)
Note over W: 执行期间不应中断
TPE->>TPE: beforeExecute(thread, task)
TPE->>T: task.run()
TPE->>TPE: afterExecute(task, thrown)
TPE->>W: w.unlock() // setState(0)
TPE->>W: completedTasks++
else task == null
Note over TPE: 退出循环
end
end
TPE->>TPE: processWorkerExit(w, completedAbruptly)
Note over W: Worker 生命周期结束
在一个工作线程里,worker delegate 调用给线程池的 runWorker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
回收线程
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反映线程现在的执行状态。
1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
线程池使用中可能遇到的问题
线程池的调参有几个难点:
如果核心线程数过小,则吞吐可能不够,遇到流量矛刺可能导致 RejectExecutionException;但值得警惕的是,如果核心线程数很大,可能导致频繁的上下文切换和过多的资源消耗(不管是 cpu 时间片还是操作系统的内核线程)。
如果队列过长,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。
那么,如何计算这些参数呢?
有一个基本的原则是:
计算密集型的线程数本身应该尽量贴进 cpu 核数。
io 密集型的线程数要注意伸缩,要配合阻塞队列使用,要有承受拒绝失败的的准备。
我们常见的计算方式主要来自于《Java并发编程实战》:
现实中可选的线程数计算公式最好是取一个并发 qps 数和 cpu 数的折中。通常可以认为 单任务的 rt/1ms 可以得到单一线程的吞吐数,qps 除以吞吐数可以得到 qps 相应的线程数,但这个方案没有考虑cpu 核数和上下文切换的问题。所以这样算出来的线程数的实际 qps 表现应该低于理论 qps,但可以通过估算和压测不断让理论值逼近实际值。
线程池的可替换方案
其他可替代方案,都不如线程池的调优方案成熟(在可以使用新技术的前提下,我们是否还有调优旧方案的魄力呢? ):
名称
描述
优势
劣势
Disruptor框架
线程池内部是通过一个工作队列去维护任务的执行的,它有一个根本性的缺陷:连续争用问题。也就是多个线程在申请任务时,为了合理地分配任务要付出锁资源,对比快速的任务执行来说,这部分申请的损耗是巨大的。高性能进程间消息库LMAX使用了一个叫作环形缓冲的数据结构,用这种这个特殊的数据结构替代队列,将会避免申请任务时出现的连续争用状况。
避免连续争用,性能更佳
缺乏线程管理的能力,使用场景较少
协程框架
协程是一种用户态的轻量级线程,其拥有自己的寄存器上下文和栈,当调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。这种切换上下文的方式要小于线程的开销。在瓶颈侧重IO的情况,使用协程获得并发性要优于使用线程。
侧重IO情况时,性能更佳。与多线程策略无冲突,可结合使用
在Java中缺乏成熟的应用
Actor框架
Actor模型通过维护多个Actor去处理并发的任务,它放弃了直接使用线程去获取并发性,而是自己定义了一系列系统组件应该如何动作和交互的通用规则,不需要开发者直接使用线程。通过在原生的线程或协程的级别上做了更高层次的封装,只需要开发者关心每个Actor的逻辑即可实现并发操作。由于避免了直接使用锁,很大程度解决了传统并发编程模式下大量依赖悲观锁导致的资源竞争情况。
无锁策略,性能更佳,避免直接使用线程,安全性更高
在Java中缺乏成熟的应用,内部复杂,难以排查和调试
缺乏管控能力就不适合调优。
最终解决方案
通过监控线程池负载,制定告警策略:
线程池活跃度 = activeCount/maximumPoolSize。看看这个值是不是趋近于 1。
监控队列的capacity 和 size 的比例。
监控 RejectExecutionException 的出现。
加引入线程池动态管控能力,基于告警制定 sop,确定是否要动态调节线程数和拒绝策略。
如果还是解决不了问题,需要考虑全局动态扩容的方案。
线程池监控指标体系:
graph TB
subgraph 核心监控指标
A[线程池监控] --> B[线程指标]
A --> C[队列指标]
A --> D[任务指标]
A --> E[异常指标]
B --> B1[poolSize<br/>当前线程数]
B --> B2[activeCount<br/>活跃线程数]
B --> B3[largestPoolSize<br/>历史最大线程数]
B --> B4[corePoolSize<br/>核心线程数]
B --> B5[maximumPoolSize<br/>最大线程数]
C --> C1[queue.size<br/>队列当前大小]
C --> C2[queue.remainingCapacity<br/>队列剩余容量]
C --> C3[队列使用率<br/>size/capacity]
D --> D1[taskCount<br/>总任务数]
D --> D2[completedTaskCount<br/>已完成任务数]
D --> D3[待执行任务数<br/>taskCount-completedTaskCount]
E --> E1[rejectedCount<br/>拒绝任务数]
E --> E2[exceptionCount<br/>异常任务数]
end
style A fill:#4ECDC4
style B fill:#87CEEB
style C fill:#90EE90
style D fill:#FFB6C1
style E fill:#FF6B6B
完整的线程池监控实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicLong;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicLong rejectedCount = new AtomicLong (0 ); private final AtomicLong exceptionCount = new AtomicLong (0 ); private final AtomicLong totalExecutionTime = new AtomicLong (0 ); private final Map<Runnable, Long> taskStartTimes = new ConcurrentHashMap <>(); private final String poolName; private volatile double activeRatioThreshold = 0.8 ; private volatile double queueUsageThreshold = 0.8 ; private volatile long taskTimeoutMs = 30000 ; public MonitoredThreadPoolExecutor ( String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, (r, executor) -> { ((MonitoredThreadPoolExecutor) executor).rejectedCount.incrementAndGet(); System.err.printf("[%s] 任务被拒绝: queue=%d, active=%d, pool=%d%n" , ((MonitoredThreadPoolExecutor) executor).poolName, executor.getQueue().size(), executor.getActiveCount(), executor.getPoolSize()); throw new RejectedExecutionException ("Task rejected from " + ((MonitoredThreadPoolExecutor) executor).poolName); }); this .poolName = poolName; } @Override protected void beforeExecute (Thread t, Runnable r) { super .beforeExecute(t, r); taskStartTimes.put(r, System.nanoTime()); } @Override protected void afterExecute (Runnable r, Throwable t) { try { Long startTime = taskStartTimes.remove(r); if (startTime != null ) { long executionTime = System.nanoTime() - startTime; totalExecutionTime.addAndGet(executionTime); long executionTimeMs = TimeUnit.NANOSECONDS.toMillis(executionTime); if (executionTimeMs > taskTimeoutMs) { System.err.printf("[%s] 任务执行超时: %dms > %dms%n" , poolName, executionTimeMs, taskTimeoutMs); } } if (t != null ) { exceptionCount.incrementAndGet(); System.err.printf("[%s] 任务执行异常: %s%n" , poolName, t.getMessage()); } } finally { super .afterExecute(r, t); } } public double getActiveRatio () { return (double ) getActiveCount() / getMaximumPoolSize(); } public double getQueueUsageRatio () { BlockingQueue<Runnable> queue = getQueue(); int size = queue.size(); int capacity = size + queue.remainingCapacity(); return capacity > 0 ? (double ) size / capacity : 0.0 ; } public double getAverageExecutionTimeMs () { long completed = getCompletedTaskCount(); if (completed == 0 ) return 0.0 ; return TimeUnit.NANOSECONDS.toMillis(totalExecutionTime.get()) / (double ) completed; } public long getRejectedCount () { return rejectedCount.get(); } public long getExceptionCount () { return exceptionCount.get(); } public long getPendingTaskCount () { return getTaskCount() - getCompletedTaskCount(); } public ThreadPoolMetrics getMetrics () { return new ThreadPoolMetrics ( poolName, getCorePoolSize(), getMaximumPoolSize(), getPoolSize(), getActiveCount(), getLargestPoolSize(), getTaskCount(), getCompletedTaskCount(), getQueue().size(), getQueue().remainingCapacity(), rejectedCount.get(), exceptionCount.get(), getAverageExecutionTimeMs() ); } public void checkAndAlert () { double activeRatio = getActiveRatio(); if (activeRatio >= activeRatioThreshold) { System.err.printf("[%s] 告警: 线程池活跃度过高 %.2f%% >= %.2f%%%n" , poolName, activeRatio * 100 , activeRatioThreshold * 100 ); } double queueUsage = getQueueUsageRatio(); if (queueUsage >= queueUsageThreshold) { System.err.printf("[%s] 告警: 队列使用率过高 %.2f%% >= %.2f%%%n" , poolName, queueUsage * 100 , queueUsageThreshold * 100 ); } if (rejectedCount.get() > 0 ) { System.err.printf("[%s] 告警: 存在被拒绝的任务 count=%d%n" , poolName, rejectedCount.get()); } } public void adjustCorePoolSize (int newCorePoolSize) { int oldSize = getCorePoolSize(); setCorePoolSize(newCorePoolSize); System.out.printf("[%s] 核心线程数调整: %d -> %d%n" , poolName, oldSize, newCorePoolSize); } public void adjustMaximumPoolSize (int newMaximumPoolSize) { int oldSize = getMaximumPoolSize(); setMaximumPoolSize(newMaximumPoolSize); System.out.printf("[%s] 最大线程数调整: %d -> %d%n" , poolName, oldSize, newMaximumPoolSize); } public static class ThreadPoolMetrics { public final String poolName; public final int corePoolSize; public final int maximumPoolSize; public final int poolSize; public final int activeCount; public final int largestPoolSize; public final long taskCount; public final long completedTaskCount; public final int queueSize; public final int queueRemainingCapacity; public final long rejectedCount; public final long exceptionCount; public final double avgExecutionTimeMs; public ThreadPoolMetrics (String poolName, int corePoolSize, int maximumPoolSize, int poolSize, int activeCount, int largestPoolSize, long taskCount, long completedTaskCount, int queueSize, int queueRemainingCapacity, long rejectedCount, long exceptionCount, double avgExecutionTimeMs) { this .poolName = poolName; this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .poolSize = poolSize; this .activeCount = activeCount; this .largestPoolSize = largestPoolSize; this .taskCount = taskCount; this .completedTaskCount = completedTaskCount; this .queueSize = queueSize; this .queueRemainingCapacity = queueRemainingCapacity; this .rejectedCount = rejectedCount; this .exceptionCount = exceptionCount; this .avgExecutionTimeMs = avgExecutionTimeMs; } @Override public String toString () { return String.format( "ThreadPoolMetrics{pool=%s, core=%d, max=%d, current=%d, active=%d, " + "largest=%d, tasks=%d, completed=%d, queue=%d/%d, rejected=%d, " + "exceptions=%d, avgTime=%.2fms}" , poolName, corePoolSize, maximumPoolSize, poolSize, activeCount, largestPoolSize, taskCount, completedTaskCount, queueSize, queueSize + queueRemainingCapacity, rejectedCount, exceptionCount, avgExecutionTimeMs); } } }
定时监控任务示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import java.util.concurrent.*;public class ThreadPoolMonitorExample { public static void main (String[] args) throws InterruptedException { MonitoredThreadPoolExecutor executor = new MonitoredThreadPoolExecutor ( "business-pool" , 4 , 8 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(100 ), r -> { Thread t = new Thread (r); t.setName("business-pool-" + t.getId()); return t; } ); ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(); monitor.scheduleAtFixedRate(() -> { MonitoredThreadPoolExecutor.ThreadPoolMetrics metrics = executor.getMetrics(); System.out.println(metrics); executor.checkAndAlert(); }, 0 , 5 , TimeUnit.SECONDS); for (int i = 0 ; i < 50 ; i++) { final int taskId = i; try { executor.execute(() -> { try { Thread.sleep((long ) (Math.random() * 1000 )); System.out.println("任务" + taskId + "完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } catch (RejectedExecutionException e) { System.err.println("任务" + taskId + "被拒绝" ); } Thread.sleep(100 ); } Thread.sleep(10000 ); executor.shutdown(); monitor.shutdown(); } }
监控告警决策流程:
flowchart TD
A[定时采集指标] --> B{活跃度 >= 80%?}
B -->|是| C[告警: 线程池繁忙]
B -->|否| D{队列使用率 >= 80%?}
D -->|是| E[告警: 队列积压]
D -->|否| F{有拒绝任务?}
F -->|是| G[告警: 任务被拒绝]
F -->|否| H{平均执行时间过长?}
H -->|是| I[告警: 任务执行慢]
H -->|否| J[正常]
C --> K{是否自动扩容?}
E --> K
G --> K
K -->|是| L[动态调整参数]
K -->|否| M[通知运维处理]
L --> N{调整core?}
N -->|是| O[setCorePoolSize]
N -->|否| P{调整max?}
P -->|是| Q[setMaximumPoolSize]
P -->|否| R[调整队列容量]
style C fill:#FF6B6B
style E fill:#FF6B6B
style G fill:#FF6B6B
style I fill:#FFB6C1
style J fill:#90EE90
style L fill:#4ECDC4
Spring Boot 集成监控示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 import org.springframework.boot.actuate.endpoint.annotation.*;import org.springframework.stereotype.Component;import java.util.Map;import java.util.HashMap;@Component @Endpoint(id = "threadpool") public class ThreadPoolEndpoint { private final Map<String, MonitoredThreadPoolExecutor> executors = new HashMap <>(); public void register (String name, MonitoredThreadPoolExecutor executor) { executors.put(name, executor); } @ReadOperation public Map<String, Object> health () { Map<String, Object> result = new HashMap <>(); for (Map.Entry<String, MonitoredThreadPoolExecutor> entry : executors.entrySet()) { MonitoredThreadPoolExecutor executor = entry.getValue(); Map<String, Object> poolInfo = new HashMap <>(); poolInfo.put("corePoolSize" , executor.getCorePoolSize()); poolInfo.put("maximumPoolSize" , executor.getMaximumPoolSize()); poolInfo.put("poolSize" , executor.getPoolSize()); poolInfo.put("activeCount" , executor.getActiveCount()); poolInfo.put("largestPoolSize" , executor.getLargestPoolSize()); poolInfo.put("taskCount" , executor.getTaskCount()); poolInfo.put("completedTaskCount" , executor.getCompletedTaskCount()); poolInfo.put("pendingTaskCount" , executor.getPendingTaskCount()); poolInfo.put("queueSize" , executor.getQueue().size()); poolInfo.put("queueRemainingCapacity" , executor.getQueue().remainingCapacity()); poolInfo.put("activeRatio" , String.format("%.2f%%" , executor.getActiveRatio() * 100 )); poolInfo.put("queueUsageRatio" , String.format("%.2f%%" , executor.getQueueUsageRatio() * 100 )); poolInfo.put("avgExecutionTimeMs" , String.format("%.2f" , executor.getAverageExecutionTimeMs())); poolInfo.put("rejectedCount" , executor.getRejectedCount()); poolInfo.put("exceptionCount" , executor.getExceptionCount()); result.put(entry.getKey(), poolInfo); } return result; } @WriteOperation public String adjustPoolSize ( @Selector String poolName, @Nullable Integer corePoolSize, @Nullable Integer maximumPoolSize) { MonitoredThreadPoolExecutor executor = executors.get(poolName); if (executor == null ) { return "线程池不存在: " + poolName; } if (corePoolSize != null ) { executor.adjustCorePoolSize(corePoolSize); } if (maximumPoolSize != null ) { executor.adjustMaximumPoolSize(maximumPoolSize); } return "调整成功" ; } }
这里的 activeCount 是每个 worker 是否互斥 held 的总数的统计:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public int getActiveCount () { getActiveCount 需要遍历这个集合,如果不加全局的 mainLock,在遍历过程中如果有线程销毁或创建,会抛出 ConcurrentModificationException 或者读到错误的数据。所以 mainLock 是为了保护 workers 集合的遍历安全。 final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int n = 0 ; for (Worker w : workers) getActiveCount 的目的就是统计“当前有多少个线程正在干活”,所以必须统计被锁住的 Worker。 if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } }
这里的 isLocked 意味着这个工作线程正在跑 task 的run,意味着可能是如下状态:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。
线程组
线程组提供一个“集合”,可以把一群线程归于一处,可以批量 interrupt/stop/suspend。
但这个方案是很危险的,使用线程池和并发安全的 Collection 都可以管理好线程。
原本设计目的
资源管理:将相关线程组织在一起,便于批量操作
安全隔离:不同线程组可以有不同的安全策略
异常处理:提供组级别的未捕获异常处理
层次结构:支持线程组的嵌套(parent-child关系)
缺陷
API设计不一致且不完整
线程安全问题
功能缺失
安全模型过时
替代方案
Executor框架
CompletableFuture
ForkJoinPool
官方态度
Java 17+:线程组API标记为@Deprecated(forRemoval = true)
JEP 411:移除SecurityManager,线程组失去最后的存在意义
OpenJDK邮件列表:多次讨论完全移除线程组
CompletionStage
这是定义“可能是”异步计算的一个阶段,可能被其他阶段触发,也可以触发其他阶段。它是 CompletableFuture 的父接口。
它有一个特点,大量非 void 方法返回值都是 CompletionStage 类型,这样既允许 builder 模式,也允许各种 transformation 模式。
CompletableFuture
线程池的超时中断机制
1 invokeAll(tasks, 300L , TimeUnit.MILLISECONDS);
会让这个类型中断提前返回:
1 2 3 4 5 6 7 8 432369 -Caused by: java.lang.InterruptedException: null 432370 - at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347 )432371 - at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915 )432372 - at org.apache.dubbo.rpc.AsyncRpcResult.get(AsyncRpcResult.java:196 )432373 - at org.apache.dubbo.rpc.protocol.AbstractInvoker.waitForResultIfSync(AbstractInvoker.java:266 )432374 - at org.apache.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:186 )432375 - at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:379 )432376 - at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:81 )
小技巧
如何处理任务超时问题
方法1:使用 FutureTask 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Future<Map<String, Object>> future = executor.submit(() -> getFeatures(context, zeusSceneId)); try { features.putAll(future.get(paramCollectTimeout, TimeUnit.MILLISECONDS)); } catch (InterruptedException | ExecutionException | TimeoutException e) { } public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null ) throw new NullPointerException (); int s = state; if (s <= COMPLETING && (s = awaitDone(true , unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException (); return report(s); } 其中超时底层的最简单实现是: public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long nanos = unit.toNanos(timeout); long deadline = System.nanoTime() + nanos; while (state <= COMPLETING) { long remaining = deadline - System.nanoTime(); if (remaining <= 0L ) { throw new TimeoutException (); } LockSupport.parkNanos(this , remaining); } return report(state); }
方法2:使用条件变量
方法3:使用 countDownLatch/CyclicBarrier
这个方法不需要依赖于 ReentrantLock,是通过纯 AQS 实现的,见 CountDownLatch 源码。
自定义线程池实现自定义中断
Java 异步执行中的异常处理与线程生命周期
从 FutureTask、AsyncUncaughtExceptionHandler 到 UncaughtExceptionHandler
在 Java 并发和 Spring 异步执行模型中,异常处理涉及多个抽象层级:
JVM 线程模型、JDK 并发工具以及 Spring 框架本身。
这些层级各自对异常承担不同职责,但它们的行为经常被混淆,典型问题包括:
异步任务抛出的异常为何没有日志
UncaughtExceptionHandler 在线程池中为何不生效
AsyncUncaughtExceptionHandler 是否会影响线程生命周期
本文从线程是否终止 这一确定性问题出发,系统梳理三种机制的边界与协作方式。
一、线程是否终止的唯一判定标准
在 JVM 层面,线程是否终止只取决于一个条件:
是否存在未被捕获、并逃逸出 Thread.run() 的 Throwable 。从这个 run 出去以后,就进入 jvm 的cpp 代码的接管范围
这一规则与使用何种框架无关。
1.1 会导致线程终止的情况
1 2 3 new Thread (() -> { throw new RuntimeException ("error" ); }).start();
执行结果:
异常未被捕获
异常逃逸出 run()
JVM 调用 UncaughtExceptionHandler
线程终止
源码位置:Thread 的实现
还可以参考这个:《01.崩溃捕获设计实践方案 crash方案》
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void JavaThread::exit (bool destroy_vm) { if (has_pending_exception ()) { Handle exception (this , pending_exception()) ; clear_pending_exception (); if (threadObj () != NULL ) { Klass* klass = SystemDictionary::Thread_klass (); JavaCalls::call_virtual (&result, klass, vmSymbols::uncaughtException_name (), vmSymbols::thread_throwable_void_signature (), &args, this ); } } }
sequenceDiagram
participant JVM as JVM(C++)
participant JavaThread as JavaThread(C++)
participant ThreadJava as java.lang.Thread
JVM->>JavaThread: 线程执行中抛出异常
JavaThread->>JavaThread: set_pending_exception(exception)
JVM->>JavaThread: 线程退出调用 exit()
JavaThread->>JavaThread: has_pending_exception()
JavaThread->>JavaThread: 直接在 exit() 中处理
JavaThread->>ThreadJava: JNI call_virtual("uncaughtException")
ThreadJava->>ThreadJava: 实际调用 java.lang.Thread.uncaughtException
ThreadJava->>ThreadJava: dispatchUncaughtException(e)
ThreadJava->>ThreadGroup: getUncaughtExceptionHandler()
ThreadGroup->>SystemErr: 默认处理
1.2 不会导致线程终止的情况
1 2 3 4 5 6 7 new Thread (() -> { try { throw new RuntimeException ("error" ); } catch (Exception e) { } }).start();
执行结果:
结论:
异常是否被捕获,决定了线程是否终止;
异常由谁处理,并不决定线程生死。
二、FutureTask 对异常传播路径的改变
理解线程池与 Spring 异步行为,必须先理解 FutureTask。
2.1 execute 与 submit 的根本差异
1 2 executor.execute(runnable); executor.submit(callable);
差异不在返回值,而在执行结构:
execute:Runnable直接在线程中执行
submit:任务先被包装为FutureTask,再执行
2.2 FutureTask 中的异常拦截点
FutureTask.run() 的核心逻辑如下(简化):
1 2 3 4 5 6 7 public void run () { try { callable.call(); } catch (Throwable ex) { setException(ex); } }
关键点:
Throwable 被主动捕获
异常不会逃逸出 run()
JVM 不认为线程发生未捕获异常
线程不会终止
异常在这里已经脱离“线程异常”的语义。
2.3 异常如何被重新抛出:report()
异常并未消失,而是作为执行结果存储:
1 2 outcome = ex; state = EXCEPTIONAL;
在调用Future.get()时:
1 2 3 4 5 6 private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; throw new ExecutionException ((Throwable) x); }
report()的职责是:
将“执行结果状态”翻译为 Java 异常语义
将原始异常包装为 ExecutionException
结论:
FutureTask 将异常从“线程控制流”转移为“任务结果数据”。
三、UncaughtExceptionHandler 的职责边界
3.1 触发条件
UncaughtExceptionHandler仅在以下条件满足时被调用:
异常未被捕获
异常逃逸出Thread.run()
线程即将终止
JVM 调用顺序为:
Thread 自身的 handler
ThreadGroup
DefaultUncaughtExceptionHandler
JVM 默认 stderr
3.2 能力与限制
UncaughtExceptionHandler:
无法阻止线程终止
无法恢复线程执行
仅用于日志、告警等系统级兜底
在线程池中,只有 execute()且异常未被捕获时,才可能触发该 handler。
四、Spring AsyncUncaughtExceptionHandler 的作用范围
4.1 适用条件
Spring 明确限定:
仅处理 @Async 标注的 void 方法
不处理返回 Future / CompletableFuture 的方法
1 2 3 4 @Async public void asyncTask () { throw new RuntimeException ("error" ); }
4.2 Spring 的异常拦截方式
Spring 在异步调用边界处捕获异常:
1 2 3 4 5 try { invokeMethod(); } catch (Throwable ex) { asyncUncaughtExceptionHandler.handle(ex, method, params); }
4.3 对线程生命周期的影响
AsyncUncaughtExceptionHandler:
原因是:
异常已经被 Spring 捕获
JVM 无法感知未捕获异常
线程本身没有死亡条件
结论:
AsyncUncaughtExceptionHandler 只影响异常的业务处理路径,不影响线程生命周期。
五、三种异常路径的对比
5.1 @Async void 方法
1 2 3 4 @Async public void task () { throw new RuntimeException (); }
异常路径:
1 2 3 4 方法执行 → Spring 捕获 → AsyncUncaughtExceptionHandler → 线程继续运行
5.2 线程池execute
1 2 3 executor.execute(() -> { throw new RuntimeException (); });
异常路径:
1 2 3 4 5 Runnable.run → 异常逃逸 → JVM → UncaughtExceptionHandler → 线程终止
5.3 线程池 submit
1 2 3 Future<?> future = executor.submit(() -> { throw new RuntimeException (); });
异常路径:
1 2 3 4 FutureTask.run → catch Throwable → 异常存入 Future → get() 时抛 ExecutionException
六、两种 Handler 的推荐使用方式
6.1 AsyncUncaughtExceptionHandler(业务层)
1 2 3 4 5 6 7 8 9 10 11 @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler () { return (ex, method, params) -> { log.error("Async void method failed: {}" , method.getName(), ex); }; } }
适用场景:
@Async void 方法
业务补偿、告警、日志
6.2 UncaughtExceptionHandler(系统层)
1 2 3 4 5 6 7 8 9 10 11 12 13 ThreadFactory factory = r -> { Thread t = new Thread (r); t.setUncaughtExceptionHandler((thread, ex) -> { log.error("Thread {} terminated" , thread.getName(), ex); }); return t; };ExecutorService executor = new ThreadPoolExecutor ( 4 , 8 , 60 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(), factory );
适用场景:
6.3 有返回值的异步任务
1 2 3 4 5 6 CompletableFuture .supplyAsync(this ::work, executor) .exceptionally(ex -> { log.error("Async failed" , ex); return null ; });
必须显式消费异常,否则异常不会被观察到。
结论
线程是否终止,仅由异常是否逃逸到 JVM 决定
FutureTask 和 Spring 已在更高层捕获异常,因此不会触发 JVM 机制
AsyncUncaughtExceptionHandler 不控制线程生死,仅提供业务回调
UncaughtExceptionHandler 只用于处理线程终止前的系统级事件
Spring 的异步支持
如果只是@EnableAsync,Spring 会创建一个默认的 SimpleAsyncTaskExecutor(注意不是 ThreadPoolTaskExecutor):
每个任务都会创建新线程
没有线程池复用
性能较差,不适合生产环境
在这个基础上,再实现AsyncConfigurer.getAsyncExecutor()就可以让自己的线程池替代框架的 Bean。
ThreadPoolTaskExecutor
ThreadPoolTaskExecutor 是 ExecutorConfigurationSupport 的子类,也包装了一个 ThreadPoolExecutor。
ExecutorConfigurationSupport 作为基类提供了:
生命周期管理:实现了 InitializingBean, DisposableBean
配置管理:线程工厂、拒绝策略、优雅关闭等配置
模板方法:定义了初始化和销毁的标准流程
特别是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void afterPropertiesSet () { initialize(); }public void initialize () { if (this .logger.isInfoEnabled()) { this .logger.info("Initializing ExecutorService " + (this .beanName != null ? " '" + this .beanName + "'" : "" )); } if (!this .threadNamePrefixSet && this .beanName != null ) { this .setThreadNamePrefix(this .beanName + "-" ); } this .executor = this .initializeExecutor(this .threadFactory, this .rejectedExecutionHandler); }
所以在这个 bean 被使用以前,内部线程池要经过一个 afterPropertiesSet 驱动进行初始化和注入这个 ThreadPoolTaskExecutor 线程池外壳。
ThreadPoolTaskExecutor 内部线程池的替换
推荐:使用初始化器装饰
initializeExecutor 初始化过程里会允许我们装饰这个线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override protected ExecutorService initializeExecutor ( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this .queueCapacity); ThreadPoolExecutor executor; if (this .taskDecorator != null ) { executor = new ThreadPoolExecutor ( this .corePoolSize, this .maxPoolSize, this .keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute (Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super .execute(decorated); } }; } else { executor = new ThreadPoolExecutor ( this .corePoolSize, this .maxPoolSize, this .keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this .allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true ); } this .threadPoolExecutor = executor; return executor; }
java 线程池的装饰逻辑就是只覆盖一个 public void execute(Runnable command)即可。
这个方法本质上是一切线程池外部提交/执行操作的入口 ,所以它的执行线程是外部线程而已不是工作线程 。
可以说这个壳的其他方法都只是包装一下普通线程的成员方法,但是这个 initializeExecutor 和 decorate 是这个壳特有的,是它存在的意义 。
所有外部 command 在执行前都要被 decorate 一下,而且存在 decoratedTaskMap 里,模式是装饰后->原始命令。目前这个map没有用处,未来可能在用修饰后的任务找原始 Runnable 的时候会有用。
其他代理
其他方法都是用类似的模式来代理的,而且不支持多态 :
1 2 3 4 5 6 7 8 9 10 @Override public void execute (Runnable task) { Executor executor = getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException ("Executor [" + executor + "] did not accept task: " + task, ex); } }
如果我们要替换线程池实现
我们只能用反射来替换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Bean("bizCommonTaskExecutor") @Override public ThreadPoolTaskExecutor getAsyncExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor () { @Override protected ThreadPoolExecutor initializeExecutor ( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue <>(QUEUE_CAPACITY); EagleEyeContextAwareThreadPoolExecutor executor = new EagleEyeContextAwareThreadPoolExecutor ( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler ); try { java.lang.reflect.Field field = ThreadPoolTaskExecutor.class.getDeclaredField("threadPoolExecutor" ); field.setAccessible(true ); field.set(this , executor); } catch (Exception e) { LoggerUtils.error(LOGGER, "Failed to set threadPoolExecutor via reflection" , e); } return executor; } }; taskExecutor.setThreadNamePrefix("bd-common-async-" ); taskExecutor.setWaitForTasksToCompleteOnShutdown(true ); taskExecutor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS); return taskExecutor; }
我们不推荐使用这种做法,除非我们真的有增强 execute 以外的诉求 。
对线程池实行 trace 传递
如果使用统一包装器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 taskExecutor.setTaskDecorator(runnable -> { final Object rpcContext = EagleEye.currentRpcContext(); final Thread submitThread = Thread.currentThread(); return () -> { if (submitThread == Thread.currentThread()) { runnable.run(); return ; } boolean needClean = false ; Object oldContext = EagleEye.currentRpcContext(); try { if (oldContext == null ) { needClean = true ; EagleEye.setRpcContext(rpcContext); } runnable.run(); } finally { if (needClean) { EagleEye.clearRpcContext(); } } }; });
其中装饰器被调用的地方是就是上面的使用初始化器装饰 。
本质上 executor 共有三个入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Override protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value) { boolean needClean = false ; final Object rpcContext = EagleEye.currentRpcContext(); String gsid = GsidUtil.getGsid(); @Override public void run () { if (EagleEye.currentRpcContext() == null ) { needClean = true ; EagleEye.setRpcContext(rpcContext); } GsidUtil.setGsid(gsid); try { super .run(); } finally { if (needClean) { EagleEye.clearRpcContext(); } GsidUtil.clear(); } } }; }@Override protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable) { boolean needClean = false ; final Object rpcContext = EagleEye.currentRpcContext(); String gsid = GsidUtil.getGsid(); @Override public void run () { if (EagleEye.currentRpcContext() == null ) { needClean = true ; EagleEye.setRpcContext(rpcContext); } GsidUtil.setGsid(gsid); try { super .run(); } finally { if (needClean) { EagleEye.clearRpcContext(); } GsidUtil.clear(); } } }; }@Override public void execute (Runnable command) { final Object rpcContext = EagleEye.currentRpcContext(); String gsid = GsidUtil.getGsid(); if (command instanceof FutureTask) { super .execute(command); } else { super .execute(() -> { EagleEye.setRpcContext(rpcContext); GsidUtil.setGsid(gsid); try { command.run(); } finally { EagleEye.clearRpcContext(); GsidUtil.clear(); } }); } }
前两个 newTaskFor 是其他 submit 方法到 execute 之前的底层方法。因为前两个入口最终都会调到execute(Runnable command),所以它的内部要避开if (command instanceof FutureTask) {的场景。这个设计因此显得比较累赘。
实际上 decorator 的实现就是最佳的,最终只要实现一个 execute 的包装提交就行了。
这个实现里有一个地方要注意:执行完当前的 runnable 需要 clear,否则可能会出现以前有的遗留 traceId 污染的问题。
参考资料:
《一行一行源码分析清楚AbstractQueuedSynchronizer》
《Java线程池实现原理及其在美团业务中的实践》
《Keep Coding》
ForkJoinPool详解:分治并行的执行引擎
前言:分治并行的诞生
ForkJoinPool 不是为了通用"并行"而设计,而是专门为分治并行 (Divide-and-Conquer Parallelism)这一特定模式量身定制。分治算法(如快速排序、归并排序、树遍历)具有独特的执行模式:
任务天然形成树状结构
父任务派生子任务后需要等待结果
子任务之间通常无依赖关系
计算密集,无I/O阻塞
理解分治算法的执行特性,是理解 ForkJoinPool 设计的关键。传统线程池在处理这类任务时遇到根本性挑战,ForkJoinPool 正是为解决这些挑战而诞生。
1. 核心数据结构:ForkJoinPool的基石
1.1 ForkJoinPool:去中心化的调度器
数据结构定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ForkJoinPool extends AbstractExecutorService { volatile long ctl; volatile WorkQueue[] workQueues; final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; final int config; static final ForkJoinPool commonPool () ; }
与ThreadPoolExecutor的本质区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers; private volatile int corePoolSize; private volatile int maximumPoolSize; }public class ForkJoinPool extends AbstractExecutorService { volatile WorkQueue[] workQueues; volatile long ctl; }
关键差异
队列模型 :TPE使用单一共享队列,FJP使用每个线程私有队列
线程管理 :TPE有明确的core/max参数,FJP只有目标并行度
调度策略 :TPE基于生产者-消费者模型,FJP基于工作窃取模型
1.2 ForkJoinWorkerThreadFactory:专用线程工厂
接口定义与对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface ThreadFactory { Thread newThread (Runnable r) ; }public static interface ForkJoinWorkerThreadFactory { ForkJoinWorkerThread newThread (ForkJoinPool pool) ; }
关键区别
完全不同的接口:两者没有继承关系,签名完全不同
上下文差异:标准工厂只接收Runnable,即任务,负责包装出线程;FJP工厂接收 ForkJoinPool 即线程池。
线程定制能力:线程工厂通常可以设置:
线程名称(便于调试)
线程优先级(Thread.setPriority())
守护状态(Thread.setDaemon())
上下文类加载器(Thread.setContextClassLoader())
异常处理器(Thread.setUncaughtExceptionHandler())
为什么在工厂里创建,传统工厂使用这个接口把任务包装成进程,并且启动:thread.start(); // 一旦 start(),大部分属性就无法修改了
FJP 的扩展方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinWorkerThread;import java.util.concurrent.atomic.AtomicInteger;public class CustomForkJoinWorkerThreadFactory implements ForkJoinPool .ForkJoinWorkerThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger (1 ); private final String poolName; private final int threadPriority; private final ClassLoader contextClassLoader; public CustomForkJoinWorkerThreadFactory ( String poolName, int threadPriority, ClassLoader contextClassLoader ) { this .poolName = poolName; this .threadPriority = threadPriority; this .contextClassLoader = contextClassLoader; } @Override public ForkJoinWorkerThread newThread (ForkJoinPool pool) { ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setName(String.format("%s-worker-%d" , poolName, threadNumber.getAndIncrement())); thread.setPriority(threadPriority); thread.setContextClassLoader(contextClassLoader); thread.setUncaughtExceptionHandler((t, e) -> { System.err.printf("[%s] 线程 %s 发生未捕获异常: %s%n" , poolName, t.getName(), e.getMessage()); e.printStackTrace(); }); thread.setDaemon(true ); System.out.printf("创建工作线程: %s, 优先级: %d, 类加载器: %s%n" , thread.getName(), thread.getPriority(), contextClassLoader.getClass().getSimpleName()); return thread; } }
设计意义
FJP的工作线程需要知道池的存在,才能参与工作窃取算法。标准ThreadFactory无法提供这种上下文,因此需要专用接口。
1.3 ForkJoinWorkerThread:协作式执行者
数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; final WorkQueue workQueue; protected ForkJoinWorkerThread (ForkJoinPool pool) { this .pool = pool; this .workQueue = pool.registerWorker(this ); } }
与ThreadPoolExecutor.Worker的对比
1 2 3 4 5 6 7 8 9 10 11 12 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; }public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; final WorkQueue workQueue; }
Worker:“可以自锁定的 runnable,初始化是把自己装进 Thread 里,run 是在线程里自旋获取拥塞队列里的任务”。
ForkJoinWorkerThread:“具有自主窃取能力的协作线程,run 是在本地队列空闲时主动扫描并窃取其他工作队列任务的群体智能执行器”。
设计决策
Worker使用组合 :TPE的工作线程只需执行任务,无需特殊行为
FJP使用继承 :需要重写run()实现工作窃取,且需要池上下文。继承意味着可以直接使用线程的生命周期方法。
线程安全特性:为什么ForkJoinPool不会有线程泄露问题?
ForkJoinPool 在线程管理上具有两个关键特性,使其天然避免了线程泄露问题:
1. 所有工作线程都是 Daemon 线程
1 2 3 4 5 final WorkQueue registerWorker (ForkJoinWorkerThread wt) { wt.setDaemon(true ); }
这意味着:
即使忘记调用 shutdown(),ForkJoinPool 的线程也不会阻止 JVM 退出
当所有非 daemon 线程结束时,JVM 会自动终止,daemon 线程随之销毁
与 ThreadPoolExecutor 的关键区别:TPE 默认创建非 daemon 线程,忘记关闭会导致 JVM 无法退出
2. 空闲线程会被自动回收
根据 Javadoc 文档:
“Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).”
ForkJoinPool 的线程回收机制:
空闲线程不会立即销毁,而是先 park() 等待
空闲时间超过阈值后,线程主动退出 run() 循环
下次需要时重新创建,避免永久空闲线程占用资源
结论 :由于 daemon 线程 + 自动回收机制的双重保障,ForkJoinPool 在实践中几乎不会出现线程泄露问题。但这不意味着可以忽略资源管理 ——对于自定义创建的 ForkJoinPool,仍建议使用 try-with-resources(JDK 19+)或 try-finally 显式关闭,以确保及时释放内部资源。
1.4 WorkQueue:双端队列的实现
数据结构
1 2 3 4 5 6 7 static final class WorkQueue { volatile int base; volatile int top; ForkJoinTask<?>[] array; final ForkJoinPool pool; final ForkJoinWorkerThread owner; }
核心操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final void push (ForkJoinTask<?> task) { ForkJoinTask<?>[] a = array; int s = top, cap = a.length; a[s & (cap - 1 )] = task; top = s + 1 ; }final ForkJoinTask<?> pop() { ForkJoinTask<?>[] a = array; int s = top, cap = a.length; if (s != base) { ForkJoinTask<?> t = a[--s & (cap - 1 )]; a[s & (cap - 1 )] = null ; top = s; return t; } return null ; }final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a = array; int b = base, cap = a.length; if (b != top) { int i = b & (cap - 1 ); ForkJoinTask<?> t = a[i]; if (t != null && base == b && UNSAFE.compareAndSwapInt(this , BASE, b, b + 1 )) { a[i] = null ; return t; } } return null ; }
LIFO + FIFO 的设计智慧:
本地LIFO:最近派生的任务最先执行,保持时间局部性(temporal locality):
时间局部性:最近访问的数据很可能再次被访问
在分治算法中,最近派生的任务通常与父任务共享数据
保持这些数据在CPU缓存中,避免缓存失效惩罚
窃取FIFO:最早派生的任务最先被窃取,保证窃取到"大块"任务
"大块任务"解释:这不是编程隐喻,而是指计算量大的任务。在分治算法中:
最早派生的任务通常是父任务分解的第一层子任务,包含大量工作
最近派生的任务通常是深层子任务,计算量较小
例如快速排序中,根任务分解为左右子任务,这两个是"大块任务";而叶子节点任务只处理几个元素,是"小块任务"
本地LIFO执行小任务(保持缓存),窃取FIFO获取大任务(避免任务碎片化)
1.5 ForkJoinTask:任务抽象
核心结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public abstract class ForkJoinTask <V> implements Future <V>, Serializable { volatile int status; static final int DONE_MASK = 0xf0000000 ; static final int NORMAL = 0xf0000000 ; static final int CANCELLED = 0xc0000000 ; static final int EXCEPTIONAL = 0x80000000 ; static final int SIGNAL = 0x00010000 ; static final int SMASK = 0x0000ffff ; public final ForkJoinTask<V> fork () ; public final V join () ; public abstract V compute () ; protected abstract V getRawResult () ; protected abstract void setRawResult (V value) ; public boolean tryUnfork () ; }
标准子类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public abstract class RecursiveAction extends ForkJoinTask <Void> { protected abstract void compute () ; public final Void getRawResult () { return null ; } protected final void setRawResult (Void mustBeNull) { } }public abstract class RecursiveTask <V> extends ForkJoinTask <V> { V result; protected abstract V compute () ; public final V getRawResult () { return result; } protected final void setRawResult (V value) { result = value; } }
与FutureTask的关键区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }public final V join () { if (doJoin() != NORMAL) throw new RuntimeException (); return getRawResult(); }
2. 关键API:与ThreadPoolExecutor的差异
2.1 ForkJoinPool的核心API
外部客户端API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public void execute (Runnable task) { ForkJoinTask<?> job = new TaskAdaptor (task); externalPush(job); }void execute (ForkJoinTask<?> task) ; <T> T invoke (ForkJoinTask<T> task) ; <T> ForkJoinTask<T> submit (ForkJoinTask<T> task) ;
对 TPE:execute(Runnable)是调度层的核心,不是最底层。最底层是 runWorker()。
对于外部提交,externalPush()或externalSubmit()是入口;但内部fork()调用的是WorkQueue.push(),两者是并行的入口路径,不是汇聚关系。
内部计算API
1 2 3 4 5 6 7 8 9 10 void ForkJoinTask.fork(); V ForkJoinTask.join(); V ForkJoinTask.invoke();
监控与管理API
1 2 3 4 5 6 7 8 long getStealCount () ;boolean awaitQuiescence (long timeout, TimeUnit unit) ;void shutdown () ;
2.2 API使用边界:为什么不能混用?
Javadoc 明确划分了 API 使用边界:
meannings
Call from non-fork/join clients
Call from within fork/join computations
Arrange async execution
execute(ForkJoinTask)
ForkJoinTask.fork()
Await and obtain result
invoke(ForkJoinTask)
ForkJoinTask.invoke()
Arrange exec and obtain Future
submit(ForkJoinTask)
ForkJoinTask.fork() (ForkJoinTasks are Futures)
"Arrange"的含义:
此处的"Arrange"是英语动词,意为"安排、组织",描述API的意图:
Arrange async execution = “安排异步执行” → 任务提交后立即返回
Await and obtain result = “等待并获取结果” → 阻塞直到任务完成
Arrange exec and obtain Future = “安排执行并获取 Future” → 异步执行但保留结果句柄,通过 Future 获取结果。
原始文档强调2 : “These methods are designed to be used primarily by clients
not already engaged in fork/join computations in the current pool…
tasks that are already executing in a pool should normally instead use
the within-computation forms…”
混用API的代价 :
性能下降 :绕过工作窃取优化
死锁风险 :阻塞式等待导致资源浪费
缓存失效 :失去任务局部性
3. 线程调度机制:动态适应的艺术
3.1 并行度:唯一的核心参数
配置参数
1 2 3 4 5 public ForkJoinPool (int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) ;
asyncMode:
- false(默认):LIFO(后进先出) - 适合递归分解任务
- true:FIFO(先进先出) - 适合生产者-消费者模式
公共池的系统属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 java.util.concurrent.ForkJoinPool.common.parallelism - 目标并行度,控制活跃线程数 - 默认 = Runtime.getRuntime().availableProcessors() java.util.concurrent.ForkJoinPool.common.threadFactory - 自定义工作线程创建逻辑 - 默认使用DefaultForkJoinWorkerThreadFactory java.util.concurrent.ForkJoinPool.common.exceptionHandler - 处理未捕获异常 - 默认使用系统默认异常处理器: java.util.concurrent.ForkJoinPool.common.maximumSpares - 额外线程上限,用于补偿阻塞 - 默认 = 256
各参数的实际影响:
parallelism :直接影响CPU利用率,设置过大导致上下文切换开销
threadFactory :可控制线程优先级、名称、守护状态
exceptionHandler :确保异常不会静默失败
maximumSpares :处理阻塞时的补偿机制,过小导致性能下降
3.2 无core/max参数的设计哲学
ThreadPoolExecutor的线程管理
1 2 3 4 5 if (当前线程数 < corePoolSize) 创建核心线程else if (队列未满) 入队else if (当前线程数 < maximumPoolSize) 创建非核心线程else 拒绝策略
ForkJoinPool的线程管理
1 2 3 4 5 6 7 8 9 10 11 if (活跃线程数 < parallelism) 创建新线程else if (有线程阻塞且 spare线程数 < maximumSpares) 创建spare线程else 复用现有线程
parallelism vs corePoolSize, maximumSpares vs maximumPoolSize:
本质不同:
TPE:corePoolSize是静态下限(保持的最小线程数),maximumPoolSize是静态上限(允许的最大线程数)
FJP:parallelism是动态目标(期望的活跃线程数 ),maximumSpares是补偿上限(允许的额外线程数):只要有任务,机制可以通过一个有限度的补偿拼命维持 active thread 的 count;ThreadPoolExecutor维持的是线程数量的边界范围(corePoolSize ≤ 线程数 ≤ maximumPoolSize),而不是特别关注线程的活跃状态。
TPE的线程边界是硬性的,FJP的线程边界是软性的
TPE创建线程是为了处理更多任务,FJP创建spare线程是为了补偿阻塞
关键区别:
线程数硬性上限:FJP的线程数永远不会超过 parallelism + maximumSpares
当达到上限后:
有线程阻塞时,无法创建spare线程
新任务只能入队到现有线程的队列
如果所有队列都满了,会触发拒绝策略
与TPE的区别:
TPE:队列满 + 线程=max 时触发拒绝
FJP:所有队列满 + 线程=parallelism+maximumSpares 时触发拒绝
线程回收机制对比
Javadoc明确说明 2 : “Using the common pool normally reduces resource
usage (its threads are slowly reclaimed during periods of non-use, and
reinstated upon subsequent use).”
FJP线程回收细节
空闲线程不会立即销毁,而是park()
如果空闲时间超过阈值,标记为可回收
通过ctl字段的位操作,逐步减少活跃线程计数
下次需要时重新创建,避免频繁创建/销毁开销
3.3 工作线程生命周期
工作线程经历四个阶段:
初始化 :首次提交任务时创建,初始活跃线程数不超过parallelism,但总线程数可能达到 parallelism + maximumSpares
与TPE的关键区别:TPE的核心线程即使空闲也不会回收(除非设置allowCoreThreadTimeOut),而FJP的所有线程在空闲时都会被回收
FJP的设计哲学:按需创建,及时释放,适合间歇性负载
活跃期 :执行任务 + 窃取任务,检测阻塞时触发spare线程
通过ManagedBlocker接口检测阻塞
当线程阻塞时,可能创建spare线程补偿
空闲期 :无任务时park(),保持空闲状态
线程通过awaitWork()方法(内部实现)进入等待,使用LockSupport.park()挂起
有新任务时被唤醒
回收期 :空闲超过阈值,逐步减少线程
通过ctl字段的位操作管理线程状态,空闲超时后线程主动退出run()循环
线程退出run()方法,被垃圾回收
下次需要时重新创建,避免永久空闲线程
根据 ForkJoinPool 源码注释和 Javadoc 文档:
“The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others.”
这意味着 ForkJoinPool 会动态调整线程数以维持目标并行度,同时避免频繁的线程创建/销毁循环。与传统线程池不同,ForkJoinPool 会回收所有空闲线程以减少资源消耗。
4. 实战案例:并行文件搜索系统
4.1 业务需求
搜索指定目录下的所有文件
查找包含特定关键词的文件
统计匹配行数
处理大型目录(10万+文件)
要求高效利用多核CPU
4.2 数据结构定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 class FileSearchTask extends RecursiveTask <List<SearchResult>> { private static final int THRESHOLD = 100 ; private final Path directory; private final String keyword; FileSearchTask(Path directory, String keyword) { this .directory = directory; this .keyword = keyword; } @Override protected List<SearchResult> compute () { try { List<Path> paths = Files.list(directory) .collect(Collectors.toList()); if (paths.size() <= THRESHOLD) { return searchDirectly(paths); } List<FileSearchTask> subtasks = new ArrayList <>(); List<Path> currentBatch = new ArrayList <>(); for (Path path : paths) { currentBatch.add(path); if (currentBatch.size() >= THRESHOLD) { subtasks.add(new FileSearchTask (createTempDir(currentBatch), keyword)); currentBatch = new ArrayList <>(); } } if (!currentBatch.isEmpty()) { subtasks.add(new FileSearchTask (createTempDir(currentBatch), keyword)); } if (subtasks.size() > 1 ) { FileSearchTask lastTask = subtasks.remove(subtasks.size() - 1 ); for (FileSearchTask task : subtasks) { task.fork(); } List<SearchResult> results = lastTask.compute(); for (FileSearchTask task : subtasks) { results.addAll(task.join()); } return results; } else { return subtasks.get(0 ).compute(); } } catch (IOException e) { throw new UncheckedIOException (e); } } private List<SearchResult> searchDirectly (List<Path> paths) { List<SearchResult> results = new ArrayList <>(); for (Path path : paths) { if (Files.isDirectory(path)) { try { results.addAll(new FileSearchTask (path, keyword).compute()); } catch (IOException e) { } } else { try (BufferedReader reader = Files.newBufferedReader(path)) { String line; int lineNumber = 1 ; while ((line = reader.readLine()) != null ) { if (line.contains(keyword)) { results.add(new SearchResult (path, lineNumber, line)); } lineNumber++; } } catch (IOException e) { } } } return results; } private Path createTempDir (List<Path> paths) throws IOException { return Files.createTempDirectory("search_" ); } }class SearchResult { private final Path filePath; private final int lineNumber; private final String lineContent; SearchResult(Path filePath, int lineNumber, String lineContent) { this .filePath = filePath; this .lineNumber = lineNumber; this .lineContent = lineContent; } @Override public String toString () { return String.format("%s:%d: %s" , filePath, lineNumber, lineContent); } }
4.3 线程池初始化与使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class ParallelFileSearch { private static ForkJoinPool createSearchPool () { int parallelism = Math.min( Runtime.getRuntime().availableProcessors(), 16 ); return new ForkJoinPool ( parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (thread, throwable) -> { System.err.printf("Thread %s threw exception: %s%n" , thread.getName(), throwable.getMessage()); }, false ); } public static List<SearchResult> search (Path rootDir, String keyword) { try (ForkJoinPool pool = createSearchPool()) { FileSearchTask rootTask = new FileSearchTask (rootDir, keyword); return pool.invoke(rootTask); } } public static List<SearchResult> wrongApproach (Path rootDir, String keyword) { try (ForkJoinPool pool = createSearchPool()) { FileSearchTask rootTask = new FileSearchTask (rootDir, keyword); return pool.invoke(rootTask); } } public static void main (String[] args) throws Exception { if (args.length < 2 ) { System.out.println("Usage: java ParallelFileSearch <directory> <keyword>" ); return ; } Path rootDir = Paths.get(args[0 ]); String keyword = args[1 ]; long startTime = System.currentTimeMillis(); List<SearchResult> results = search(rootDir, keyword); long endTime = System.currentTimeMillis(); System.out.printf("Found %d matches in %d ms%n" , results.size(), endTime - startTime); results.stream().limit(10 ).forEach(System.out::println); } }
所以正确的框架是:
定义一个 RecursiveTask 或者 RecursiveAction,而不是直接使用 ForkJoinTask,然后让 ForkJoinPool 来 invoke 根 task。
每个任务内部:
先检查任务大小,只有大任务才分解生成子任务,小任务直接计算。
从子任务列表中移除最后一个任务,保留给自己直接执行。
对其余子任务调用fork()(异步提交)。
对保留的任务调用compute()(同步执行)。
对其他任务进行 join 收集计算结果,合并进上一个计算结果里。
返回全部计算结果。
所以在任务外部使用 invoke,内部使用 fork/compute/join。外部不应该用fork()/join(),看上面的 arrange。
4.4 关键设计决策解析
1. 为什么使用 RecursiveTask 而不是 RecursiveAction?
需要返回搜索结果(List)
RecursiveTask 提供类型安全的返回值
符合分治模式:子任务结果合并为父任务结果
2. 为什么设置THRESHOLD=100?
任务分解粒度需要平衡:
100是经验值,可根据文件大小调整
通过基准测试确定最优值
3. 为什么fork其他任务,直接执行最后一个任务?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (subtasks.size() > 1 ) { FileSearchTask lastTask = subtasks.remove(subtasks.size() - 1 ); for (FileSearchTask task : subtasks) { task.fork(); } List<SearchResult> results = lastTask.compute(); for (FileSearchTask task : subtasks) { results.addAll(task.join()); } }
设计理由:
避免调度开销 :直接compute()比fork()后再join()少一次入队出队操作
缓存局部性 :直接执行的任务与父任务共享数据,保持CPU缓存热度
栈深度控制 :避免过深的递归导致StackOverflowError
注意 :这里的"最后一个任务"不是指"最大的任务",而是任意选择一个任务直接执行以减少调度开销
4. 为什么创建自定义池而不是使用公共池?
1 2 3 try (ForkJoinPool pool = createSearchPool()) { return pool.invoke(rootTask); }
设计理由 :
资源隔离 :避免影响其他使用公共池的组件
参数定制 :限制并行度(16),避免I/O瓶颈
异常处理 :自定义异常处理器,确保错误可见
生命周期管理 :try-with-resources确保池关闭
5. 为什么asyncMode=false?
1 new ForkJoinPool (parallelism, ..., false );
设计理由 :
5. 常见错误模式与避坑指南
5.1 API混用错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void compute () { FileSearchTask left = new FileSearchTask (leftDir, keyword); FileSearchTask right = new FileSearchTask (rightDir, keyword); pool.submit(left); List<SearchResult> rightResults = pool.invoke(right); List<SearchResult> results = left.get(); results.addAll(rightResults); }
正确做法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected void compute () { FileSearchTask left = new FileSearchTask (leftDir, keyword); FileSearchTask right = new FileSearchTask (rightDir, keyword); if (leftDirSize < rightDirSize) { left.fork(); List<SearchResult> results = right.compute(); results.addAll(left.join()); return results; } else { right.fork(); List<SearchResult> results = left.compute(); results.addAll(right.join()); return results; } }
5.2 I/O阻塞错误
1 2 3 4 5 6 7 8 9 10 11 12 protected List<SearchResult> compute () { try (FileInputStream fis = new FileInputStream (file)) { byte [] data = new byte [fis.available()]; fis.read(data); } catch (IOException e) { } }
正确做法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class FileReadTask extends RecursiveTask <byte []> { protected byte [] compute() { return Files.readAllBytes(path); } }class FileProcessTask extends RecursiveTask <List<SearchResult>> { private final byte [] fileData; protected List<SearchResult> compute () { List<SearchResult> results = new ArrayList <>(); String content = new String (fileData); return results; } }byte [] fileData = fileReadPool.submit(new FileReadTask (path)).get(); List<SearchResult> results = computePool.invoke(new FileProcessTask (fileData));
5.3 资源泄漏错误
1 2 3 ForkJoinPool pool = new ForkJoinPool (); pool.invoke(task);
正确做法(JDK 19+):
1 2 3 4 try (ForkJoinPool pool = new ForkJoinPool ()) { return pool.invoke(task); }
正确做法(JDK 8-18):
1 2 3 4 5 6 7 ForkJoinPool pool = new ForkJoinPool ();try { return pool.invoke(task); } finally { pool.shutdown(); }
要点:即使线程不会泄露,线程池仍然需要关闭,否则线程池实例本身会泄露。
ForkJoinPool会自动回收空闲worker线程到0(通过keepAliveTime机制,默认60秒无任务后线程自动退出),因此线程本身不会泄露 。但这并不意味着可以不关闭线程池——线程池实例本身持有的内部资源不会自动释放 。临时创建的ForkJoinPool若不显式调用shutdown()/shutdownNow(),将导致以下资源泄露:
内存资源泄露:
64位ctl状态字段(原子控制变量)
WorkQueue数组(每个worker线程的任务队列)
任务引用(队列中未处理的任务对象)
线程工厂实例(ForkJoinWorkerThreadFactory引用)
异常处理器(UncaughtExceptionHandler引用)
内部锁和条件变量(同步器对象)
系统资源泄露:
文件描述符(如果ThreadFactory创建了带资源的线程)
本地内存(JVM内部线程状态结构)
类加载器引用(可能导致ClassLoader泄露)
监控资源泄露:
JMX MBean注册(每个pool注册的监控指标)
线程统计信息(活跃/已完成任务计数器)
commonPool()由JVM在shutdown hook中自动管理,手动关闭会破坏全局并发设施。生产环境应通过静态复用或池缓存机制管理ForkJoinPool实例,避免高频创建/销毁。每次临时创建后必须用try-finally确保关闭,否则在长时间运行的应用中将累积严重内存压力。
6. 父子任务死锁:传统线程池的致命缺陷与ForkJoinPool的解决方案
6.1 问题场景:线程池饥饿死锁
在使用传统ThreadPoolExecutor时,一个经典的陷阱是父子任务使用同一线程池导致的死锁 。
死锁复现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ThreadPoolDeadlockDemo { private static final ExecutorService pool = Executors.newFixedThreadPool(2 ); public static void main (String[] args) throws Exception { Future<Integer> f1 = pool.submit(() -> parentTask("Parent-1" )); Future<Integer> f2 = pool.submit(() -> parentTask("Parent-2" )); System.out.println("Result: " + f1.get() + ", " + f2.get()); } static Integer parentTask (String name) throws Exception { System.out.println(name + " started on " + Thread.currentThread().getName()); Future<Integer> childFuture = pool.submit(() -> childTask(name)); return childFuture.get(); } static Integer childTask (String parentName) { System.out.println("Child of " + parentName + " on " + Thread.currentThread().getName()); return 42 ; } }
死锁分析
1 2 3 4 5 6 7 8 9 10 时间线: T1: Parent-1 提交,获得 Thread-1 ,开始执行 T2: Parent-2 提交,获得 Thread-2 ,开始执行 T3: Parent-1 提交 Child-1 到队列,调用 childFuture.get() 阻塞 T4: Parent-2 提交 Child-2 到队列,调用 childFuture.get() 阻塞 T5: 死锁形成 - Thread-1 被 Parent-1 占用,等待 Child-1 - Thread-2 被 Parent-2 占用,等待 Child-2 - Child-1 和 Child-2 在队列中,等待空闲线程 - 没有空闲线程,因为都被父任务占用
这就是经典的线程池饥饿死锁 (Thread Pool Starvation Deadlock):
必要条件1 :父任务持有线程资源
必要条件2 :父任务阻塞等待子任务
必要条件3 :子任务需要线程资源才能执行
必要条件4 :线程池容量有限
6.2 传统解决方案的局限性
方案1:增大线程池容量
1 ExecutorService pool = Executors.newFixedThreadPool(100 );
问题 :
无法预知任务的嵌套深度
递归分治算法的任务数量呈指数增长
过大的线程池浪费资源,过小仍可能死锁
方案2:使用无界线程池
1 ExecutorService pool = Executors.newCachedThreadPool();
问题 :
失去对并发度的控制
可能创建过多线程,导致OOM或上下文切换开销
不适合生产环境
方案3:父子任务使用不同线程池
1 2 ExecutorService parentPool = Executors.newFixedThreadPool(4 );ExecutorService childPool = Executors.newFixedThreadPool(8 );
问题 :
需要预知任务层级结构
多层嵌套需要多个线程池
资源利用率低,管理复杂
6.3 ForkJoinPool的根本性解决:协作式等待
ForkJoinPool通过协作式等待 (Cooperative Waiting)从根本上解决了这个问题。
核心机制:join()不是真正的阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final V join () { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }private int doJoin () { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if ((s = status) < 0 ) return s; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { wt = (ForkJoinWorkerThread)t; w = wt.workQueue; if (w.tryUnpush(this ) && (s = doExec()) < 0 ) return s; return wt.pool.awaitJoin(w, this , 0L ); } return externalAwaitDone(); }
协作等待的核心:awaitJoin()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 final int awaitJoin (WorkQueue w, ForkJoinTask<?> task, long deadline) { int s = 0 ; if (task != null ) { while ((s = task.status) >= 0 ) { if (!tryHelpStealer(w, task)) if (!tryCompensate(w)) LockSupport.park(this ); } } return s; }
帮助窃取者机制(Help Stealer)
当线程A等待任务T完成时,如果T被线程B窃取:
A不会阻塞等待
A会扫描B的工作队列
A帮助执行B队列中的任务(可能是T的子任务)
通过帮助B,间接加速T的完成
1 2 3 4 5 6 7 8 9 10 11 12 13 场景:Thread - 1 执行 Parent ,fork 了 Child ,然后 join ( Child ) Child 被 Thread - 2 窃取 传统线程池: Thread - 1 : [ 阻塞等待 Child 完成] <-- 线程资源浪费 Thread - 2 : [ 执行 Child ] ForkJoinPool : Thread - 1 : [ 发现 Child 被 Thread - 2 窃取] [ 扫描 Thread - 2 的队列] [ 帮助执行 Thread - 2 队列中的其他任务] [ 间接加速 Child 完成] Thread - 2 : [ 执行 Child ]
6.4 为什么父子任务可以不相互阻塞?
这个问题的答案涉及ForkJoinTask接口设计的精妙之处:
接口层面的异步化解耦
ForkJoinTask的fork()和join()方法在接口设计上实现了计算与等待的解耦 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final ForkJoinTask<V> fork () { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this ); else ForkJoinPool.common.externalPush(this ); return this ; }public final V join () { if (doJoin() != NORMAL) throw new RuntimeException (); return getRawResult(); }
关键设计点 :
fork()是纯异步的 :只负责将任务放入队列,立即返回
join()是协作式的 :不是被动等待,而是主动寻找可执行的工作
compute()是可分解的 :子类通过重写compute()定义分解逻辑
分治模式的天然适配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected Integer compute () { if (任务足够小) { return 直接计算(); } SubTask left = new SubTask (左半部分); SubTask right = new SubTask (右半部分); left.fork(); int rightResult = right.compute(); int leftResult = left.join(); return leftResult + rightResult; }
为什么不会死锁 :
right.compute()直接在当前线程执行,不占用额外线程
left.join()时,如果left还在本地队列顶部,直接弹出执行(tryUnpush)
如果left被窃取,当前线程不会阻塞,而是帮助执行其他任务
即使所有线程都在"等待",它们实际上都在执行任务
6.5 线程补偿机制:有限度的弹性
线程可以无限补偿吗?
答案是否定的 。ForkJoinPool的线程补偿受到maximumSpares参数的严格限制。
补偿机制的工作原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private boolean tryCompensate (WorkQueue w) { int sp = (int )(ctl & ~SMASK); int pc = config & SMASK; int tc = (short )(ctl >>> TC_SHIFT); int ac = (int )(ctl >> AC_SHIFT); if (tc >= pc + MAX_SPARES) return false ; if () { createWorker(); return true ; } return false ; }
线程数的硬性上限
ForkJoinPool的线程数永远不会超过:parallelism + maximumSpares
1 2 java.util.concurrent.ForkJoinPool.common.maximumSpares = 256
当达到上限后会发生什么?
无法创建新的补偿线程
tryCompensate()返回false
线程进入真正的阻塞等待 (LockSupport.park)
如果所有线程都阻塞,可能导致吞吐量下降
为什么要限制补偿?
防止线程爆炸 :无限补偿可能导致创建过多线程
资源保护 :每个线程都消耗栈内存(默认1MB)
上下文切换开销 :过多线程导致调度开销增加
设计哲学 :ForkJoinPool假设任务是CPU密集型的,不应频繁阻塞
最佳实践
避免在ForkJoinTask中执行阻塞I/O
如果必须阻塞,使用ManagedBlocker接口
合理设置parallelism和maximumSpares
监控getPoolSize()和getActiveThreadCount()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ForkJoinPool.managedBlock(new ForkJoinPool .ManagedBlocker() { @Override public boolean block () throws InterruptedException { result = blockingOperation(); return true ; } @Override public boolean isReleasable () { return result != null ; } });
6.6 对比验证
ForkJoinPool版本(不会死锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ForkJoinNoDeadlockDemo { private static final ForkJoinPool pool = new ForkJoinPool (2 ); public static void main (String[] args) { Integer result = pool.invoke(new ParentTask ("Root" )); System.out.println("Result: " + result); } static class ParentTask extends RecursiveTask <Integer> { private final String name; ParentTask(String name) { this .name = name; } @Override protected Integer compute () { System.out.println(name + " on " + Thread.currentThread().getName()); ChildTask child1 = new ChildTask (name + "-Child1" ); ChildTask child2 = new ChildTask (name + "-Child2" ); child1.fork(); child2.fork(); return child1.join() + child2.join(); } } static class ChildTask extends RecursiveTask <Integer> { private final String name; ChildTask(String name) { this .name = name; } @Override protected Integer compute () { System.out.println(name + " on " + Thread.currentThread().getName()); return 21 ; } } }
执行结果
1 2 3 4 Root on ForkJoinPool-1 -worker-1 Root -Child1 on ForkJoinPool-1 -worker-1 // 父线程帮助执行子任务Root -Child2 on ForkJoinPool-1 -worker-2 Result : 42
注意:即使只有2个线程,父任务也能完成,因为父线程在join()时直接执行了Child1。
6.7 设计启示
特性
ThreadPoolExecutor
ForkJoinPool
等待语义
阻塞等待(浪费线程)
协作等待(帮助执行)
父子任务
可能死锁
天然支持
线程利用率
等待时为0
等待时仍在工作
线程补偿
无
有限度补偿(maximumSpares)
适用场景
独立任务
分治/递归任务
核心洞察 :ForkJoinPool的join()不是"等待",而是"参与"。线程不会因为等待子任务而闲置,而是主动寻找可执行的工作。这种设计从根本上消除了父子任务死锁的可能性,同时通过有限度的线程补偿机制,在保证系统稳定性的前提下提供了额外的弹性。
7. 结论:设计的本质
ForkJoinPool不是"另一个线程池",而是为分治并行量身定制的执行引擎 。其核心设计决策源于对问题域的深刻理解:
任务结构驱动执行模型 :分治算法的"父-子"任务结构要求特殊的调度策略,工作窃取正是为这种结构优化。
局部性优先于公平性 :LIFO本地执行牺牲任务执行的公平性,换取时间局部性的显著提升。
协作优于竞争 :线程从"竞争共享资源"转变为"协作完成任务",资源利用率显著提高。
动态适应优于静态配置 :与ThreadPoolExecutor的"核心-最大"静态模型不同,ForkJoinPool通过目标并行度+spare线程机制,动态适应工作负载。
资源效率优于固定开销 :与TPE保持核心线程不同,FJP回收所有空闲线程,适合间歇性负载。
正如Doug Lea在论文结论中所述:
“This paper has demonstrated that it is possible to support portable, efficient, scalable parallel processing in pure Java, with a programming model and framework that can be convenient for programmers.”
(本论文证明了,纯粹的Java语言也能够实现可移植、高效可伸缩的并行处理,并且能够为程序员提供便捷的编程模型和框架。)
理解ForkJoinPool的核心数据结构和设计哲学,才能在正确场景发挥其价值。它不是万能的,但在分治并行领域,它代表了并发计算的理论最优解。
ForkJoinPool参考文献 :
异步编程的进化
本章节一部分来自于qwen,一部分来自于以下文章:
《CompletableFuture原理与实践-外卖商家端API的异步化》
演进本质
graph LR
A[Java 5 Future] -->|阻塞痛点| B[Guava ListenableFuture]
B -->|回调地狱| C[CompletableFuture]
C -->|流处理需求| D[RxJava]
D -->|Spring整合| E[Reactor]
A -->|范式转变| F[命令式->声明式]
B -->|抽象提升| G[事件驱动->数据流]
C -->|能力增强| H[组合->背压]
D & E -->|统一理念| I[异步即数据流]
控制流:阻塞等待 → 回调响应 → 声明式组合 → 响应式流
错误处理:分散try-catch → 回调onFailure → 链式exceptionally → 流式onError
组合能力:无 → 有限transform → 丰富组合操作符 → 完整流处理
背压支持:无 → 无 → 有限 → 完整内建支持
模式
核心特征
编程范式
适用场景
Java 5 Future
阻塞等待
命令式
简单异步任务,兼容性要求高
Guava ListenableFuture
回调驱动
事件驱动
中等复杂度,需要非阻塞回调
CompletableFuture
链式组合
声明式
复杂异步流程,需要组合和错误处理
RxJava
响应式流
函数式响应式
事件流处理,背压支持,复杂数据转换
Reactor
响应式流
函数式响应式
Spring生态,高性能流处理,背压内建
传统调用时序
Java 5 Future
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#FFF5E1', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
participant Client as Client
participant Executor as ExecutorService
participant Future as Future<T>
participant Task as Callable<T>
Note over Client,Task: Java 5 Future (阻塞式)
Client->>Executor: submit(Callable)
Executor->>Future: 创建Future
Executor-->>Client: 返回Future
Note right of Client: 非阻塞返回
Executor->>Task: 执行任务
Task-->>Executor: 返回结果
Client->>Future: get() / get(timeout)
Note right of Client: 阻塞等待
Future-->>Client: 返回结果或抛出异常
Note over Client: 阻塞式: submit/get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> { Thread.sleep(1000 ); return "Hello Future" ; });try { String result = future.get(2 , TimeUnit.SECONDS); System.out.println(result); } catch (TimeoutException e) { System.err.println("超时了!" ); } finally { executor.shutdown(); }
Guava ListenableFuture
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#D5E8D4', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
participant Client as Client
participant Executor as ListeningExecutorService
participant Future as ListenableFuture<T>
participant Callback as FutureCallback<T>
participant Task as Callable<T>
Note over Client,Task: Guava ListenableFuture (回调式)
Client->>Executor: submit(Callable)
Executor->>Future: 创建ListenableFuture
Executor-->>Client: 返回ListenableFuture
Note right of Client: 非阻塞返回
Client->>Future: addCallback(FutureCallback)
Note right of Client: 注册回调,非阻塞
Executor->>Task: 执行任务
Task-->>Executor: 返回结果
Future->>Callback: onSuccess(result) 或 onFailure(ex)
Note over Client: 回调式: submit/addCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ListeningExecutorService executor = MoreExecutors.listeningDecorator( Executors.newSingleThreadExecutor() ); ListenableFuture<String> future = executor.submit(() -> { Thread.sleep(1000 ); return "Hello ListenableFuture" ; }); Futures.addCallback(future, new FutureCallback <String>() { @Override public void onSuccess (String result) { System.out.println("成功: " + result); } @Override public void onFailure (Throwable t) { System.err.println("失败: " + t.getMessage()); } }, executor); executor.shutdown();
这个方案是很容易产生回调地狱的,因为总是会有 addCallback + onSuccess 这种不可编排、组合 api 不适合把大型并发结果组合在一起的缺陷:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ListenableFuture<User> userFuture = userService.getUser(userId); Futures.addCallback(userFuture, new FutureCallback <User>() { @Override public void onSuccess (User user) { ListenableFuture<List<Order>> ordersFuture = orderService.getOrders(user.getId()); Futures.addCallback(ordersFuture, new FutureCallback <List<Order>>() { @Override public void onSuccess (List<Order> orders) { ListenableFuture<List<Product>> productsFuture = productService.getProducts(orders.stream() .map(Order::getProductId) .collect(Collectors.toList())); Futures.addCallback(productsFuture, new FutureCallback <List<Product>>() { @Override public void onSuccess (List<Product> products) { OrderDetails details = new OrderDetails (user, orders, products); log.info("订单详情: {}" , details); } @Override public void onFailure (Throwable t) { log.error("Products retrieval failed" , t); auditService.logFailure("products" , t); } }, executor); } @Override public void onFailure (Throwable t) { log.error("Orders retrieval failed" , t); auditService.logFailure("orders" , t); } }, executor); } @Override public void onFailure (Throwable t) { log.error("User retrieval failed" , t); auditService.logFailure("user" , t); } }, executor);
CompletableFuture
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#DAE8FC', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
participant Client as Client
participant CF1 as CompletableFuture<T>
participant CF2 as CompletableFuture<U>
participant Executor as Executor
Note over Client,Executor: CompletableFuture (链式组合)
Client->>CF1: supplyAsync(Supplier)
CF1-->>Client: 返回CompletableFuture
Note right of Client: 非阻塞返回
Client->>CF1: thenApply(Function)
CF1->>CF2: 创建新的CompletableFuture
CF2-->>Client: 返回新的CompletableFuture
Note right of Client: 链式组合,非阻塞
Executor->>CF1: 执行任务
CF1->>CF2: 传递结果
CF2-->>Client: 完成时通知
Note over Client: 链式: supplyAsync/thenApply/thenCompose
CompletableFuture 和 ListenableFuture 的设计哲学差异:
1 2 3 4 5 6 7 8 9 10 11 12 future.addCallback(new FutureCallback <T>() { void onSuccess (T result) ; void onFailure (Throwable t) ; }); future.thenCompose(result -> nextOperation(result)) .thenApply(transformed -> process(transformed)) .exceptionally(ex -> handle(ex));
执行树
核心 API
classDiagram
class CompletableFuture~T~ {
<<核心类>>
+T result
+Throwable exception
+Object stack
%% 创建方法
+supplyAsync(Supplier~T~) CompletableFuture~T~
+runAsync(Runnable) CompletableFuture~Void~
+completedFuture(T) CompletableFuture~T~
%% 转换方法
+thenApply(Function) CompletableFuture~U~
+thenApplyAsync(Function) CompletableFuture~U~
+thenCompose(Function) CompletableFuture~U~
%% 消费方法
+thenAccept(Consumer) CompletableFuture~Void~
+thenRun(Runnable) CompletableFuture~Void~
%% 组合方法
+thenCombine(CompletableFuture, BiFunction) CompletableFuture~V~
+allOf(CompletableFuture...) CompletableFuture~Void~
+anyOf(CompletableFuture...) CompletableFuture~Object~
%% 异常处理
+exceptionally(Function) CompletableFuture~T~
+handle(BiFunction) CompletableFuture~U~
+whenComplete(BiConsumer) CompletableFuture~T~
%% 完成方法
+complete(T) boolean
+completeExceptionally(Throwable) boolean
%% 获取结果
+get() T
+join() T
+getNow(T) T
}
class CompletionStage~T~ {
<<接口>>
+thenApply(Function) CompletionStage~U~
+thenCompose(Function) CompletionStage~U~
+thenCombine(CompletionStage, BiFunction) CompletionStage~V~
+exceptionally(Function) CompletionStage~T~
}
class Future~T~ {
<<接口>>
+get() T
+get(long, TimeUnit) T
+cancel(boolean) boolean
+isDone() boolean
+isCancelled() boolean
}
CompletableFuture ..|> CompletionStage : implements
CompletableFuture ..|> Future : implements
使用原则
原则一:异步回调要传线程池
1 2 3 4 5 6 7 8 9 10 11 12 CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> process(data)) .thenAccept(result -> save(result)); ExecutorService ioExecutor = Executors.newFixedThreadPool(10 );ExecutorService cpuExecutor = Executors.newWorkStealingPool(); CompletableFuture.supplyAsync(() -> fetchData(), ioExecutor) .thenApplyAsync(data -> process(data), cpuExecutor) .thenAcceptAsync(result -> save(result), ioExecutor);
原则二:CompletableFuture 中不要吞异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CompletableFuture.supplyAsync(() -> { throw new RuntimeException ("出错了" ); }).thenApply(result -> result + "processed" ); CompletableFuture.supplyAsync(() -> { throw new RuntimeException ("出错了" ); }).exceptionally(ex -> { log.error("任务执行失败" , ex); return "默认值" ; }).thenApply(result -> result + "processed" ); CompletableFuture.supplyAsync(() -> { throw new RuntimeException ("出错了" ); }).whenComplete((result, ex) -> { if (ex != null ) { log.error("任务执行失败" , ex); } });
原则三:自定义线程池时,注意饱和策略
1 2 3 4 5 6 7 8 9 10 11 12 ExecutorService executor = new ThreadPoolExecutor ( 10 , 10 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <>() );ExecutorService executor = new ThreadPoolExecutor ( 10 , 20 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(1000 ), new ThreadPoolExecutor .CallerRunsPolicy() );
原则四:正确进行异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> process(data)) .exceptionally(ex -> handleFetchError(ex)); CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> process(data)) .thenApply(result -> transform(result)) .exceptionally(ex -> { log.error("处理失败" , ex); return defaultValue; }); CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> process(data)) .handle((result, ex) -> { if (ex != null ) { log.error("处理失败" , ex); return defaultValue; } return result; });
原则五:合理设置超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowService.call(); });String result = future.join(); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowService.call(); }).orTimeout(5 , TimeUnit.SECONDS);try { String result = future.get(5 , TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true ); } CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowService.call(); }).completeOnTimeout("默认值" , 5 , TimeUnit.SECONDS);
原则六:避免在回调中阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> { return anotherService.syncCall(data); }); CompletableFuture.supplyAsync(() -> fetchData()) .thenCompose(data -> { return CompletableFuture.supplyAsync( () -> anotherService.syncCall(data), ioExecutor ); });
完成保证原则
完成保证原则(Completion Guarantee Principle)是 CompletableFuture 编程中的一个核心设计模式,其核心思想是:
在调用 join() 或 get() 之前,确保目标 Future 已经完成,从而将阻塞操作转化为非阻塞的结果获取。
为什么需要完成保证原则?
1 2 3 4 5 6 7 8 9 10 11 12 13 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { Thread.sleep(1000 ); return "result" ; });String result = future.join(); CompletableFuture<Void> allDone = CompletableFuture.allOf(future); allDone.thenApply(v -> { return future.join(); });
完成保证原则的核心模式
1 2 3 4 5 6 7 8 9 10 11 12 List<CompletableFuture<String>> futures = services.stream() .map(service -> CompletableFuture.supplyAsync(() -> service.call(), ioExecutor)) .collect(Collectors.toList()); CompletableFuture<List<String>> resultFuture = CompletableFuture .allOf(futures.toArray(new CompletableFuture [0 ])) .thenApplyAsync(v -> { return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }, cpuExecutor);
完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 import java.util.concurrent.*;import java.util.*;import java.util.stream.*;public class CompletionGuaranteeExample { public static void main (String[] args) { ExecutorService ioPool = Executors.newFixedThreadPool(10 , r -> { Thread t = new Thread (r, "io-pool-" + System.nanoTime()); t.setDaemon(true ); return t; }); ExecutorService cpuPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), r -> { Thread t = new Thread (r, "cpu-pool-" + System.nanoTime()); t.setDaemon(true ); return t; } ); try { System.out.println("[" + Thread.currentThread().getName() + "] 开始执行" ); List<CompletableFuture<String>> serviceFutures = IntStream.range(0 , 30 ) .mapToObj(i -> CompletableFuture.supplyAsync(() -> { System.out.println("[" + Thread.currentThread().getName() + "] 调用服务 " + i); simulateIoOperation(i); return "result-" + i; }, ioPool)) .collect(Collectors.toList()); CompletableFuture<List<String>> resultFuture = CompletableFuture .allOf(serviceFutures.toArray(new CompletableFuture [0 ])) .thenApplyAsync(v -> { System.out.println("\n[正确] 原则验证: 所有服务调用已完成,开始聚合结果" ); System.out.println("[" + Thread.currentThread().getName() + "] 聚合阶段启动(CPU密集型)" ); List<String> results = serviceFutures.stream() .map(future -> { long start = System.nanoTime(); String result = future.join(); long duration = System.nanoTime() - start; System.out.printf( "[%s] join() 耗时: %d ns, 结果: %s\n" , Thread.currentThread().getName(), duration, result ); return result; }) .collect(Collectors.toList()); return results; }, cpuPool) .exceptionally(ex -> { System.err.println("[" + Thread.currentThread().getName() + "] [错误] 全局异常: " + ex.getMessage()); return Arrays.asList("GLOBAL_FALLBACK" ); }); List<String> results = resultFuture.join(); System.out.println("\n最终聚合结果: " + results.size() + " 个结果" ); results.forEach(r -> System.out.println(" - " + r)); } finally { ioPool.shutdown(); cpuPool.shutdown(); } } private static void simulateIoOperation (int id) { try { int delay = 50 + (id * 10 ) % 200 ; Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException ("IO操作被中断" , e); } } }
对传统的线程池的效率改进
CompletableFuture 相对于一般线程池的改进主要来自于对于复杂结果编排的 API 优化,本身并不提供性能优化 。
如果要实现性能优化,可以
基于 Netty/NIO 实现了真正的异步 RPC:
发起调用后立即返回,不阻塞线程;
结果由 Netty 的 IO 线程(或专用回调线程)在数据到达时触发;
一个 IO 线程可同时管理成千上万个连接(C10K+)。
CompletableFuture 被用作"胶水层":
将 NIO 回调封装为 CompletableFuture(如 toCompletableFuture 工具方法);
用 thenCompose / allOf 等组合多个异步 RPC;
业务逻辑不再关心回调注册,只关注数据流依赖。
graph TD
A["Client Request"] --> B["Inbound IO Thread<br>Netty EventLoop"]
B --> C["Business Worker Thread<br>from biz-pool"]
C --> d1["Create CF1 = new CompletableFuture()"]
C --> d2["Create CF2 = new CompletableFuture()"]
C --> d3["Create CF3 = new CompletableFuture()"]
d1 --> e1["Register Observer1:<br>onSuccess → CF1.complete(...)<br>onFailure → CF1.completeEx(...)"]
d2 --> e2["Register Observer2:<br>onSuccess → CF2.complete(...)<br>onFailure → CF2.completeEx(...)"]
d3 --> e3["Register Observer3:<br>onSuccess → CF3.complete(...)<br>onFailure → CF3.completeEx(...)"]
e1 --> f1["Call mtthrift.async(orderService, Observer1)"]
e2 --> f2["Call mtthrift.async(productService, Observer2)"]
e3 --> f3["Call mtthrift.async(deliveryService, Observer3)"]
f1 --> g["Outbound IO Thread<br>Netty Client EventLoop"]
f2 --> g
f3 --> g
g --> h1["(orderService)"]
g --> h2["(productService)"]
g --> h3["(deliveryService)"]
h1 -->|Response| i1["Outbound IO Thread invokes<br>Observer1.onSuccess(result)"]
h2 -->|Response| i2["Outbound IO Thread invokes<br>Observer2.onSuccess(result)"]
h3 -->|Error| i3["Outbound IO Thread invokes<br>Observer3.onFailure(ex)"]
i1 --> j1["CF1.complete(result)"]
i2 --> j2["CF2.complete(result)"]
i3 --> j3["CF3.completeExceptionally(ex)"]
j1 --> k1["CF1.thenApplyAsync(enrichOrder, cpu-pool)"]
j2 --> k2["CF2.thenApplyAsync(enrichProduct, cpu-pool)"]
j3 --> k3["CF3.exceptionally(handleFallback)"]
k1 --> l["CF4 = CF1.thenCombine(CF2, merge)"]
k2 --> l
k1 --> m["CompletableFuture.allOf(CF1..CF30)"]
k2 --> m
k3 --> m
m --> n["m.thenApplyAsync(aggregateAll, cpu-pool)"]
n --> o["Final Result"]
o --> p["Write Response via Inbound IO Thread"]
p --> q["Client"]
classDef io fill:#d5e8d4,stroke:#82b366;
classDef worker fill:#dae8fc,stroke:#6c8ebf;
classDef outbound fill:#e1d5e7,stroke:#9673a6;
classDef cf fill:#fff2cc,stroke:#d6b656;
classDef service fill:#f8cecc,stroke:#b85450;
classDef observer fill:#e6e6fa,stroke:#999;
class B,p io
class C,d1,d2,d3,e1,e2,e3,f1,f2,f3,k1,k2,k3,l,m,n worker
class g,i1,i2,i3 outbound
class j1,j2,j3,o cf
class h1,h2,h3 service
class e1,e2,e3 observer
RxJava
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#E1D5E7', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
participant Client as Client
participant Observable as Observable
participant Operator1 as map
participant Operator2 as filter
participant Subscriber as Subscriber
Note over Client,Subscriber: RxJava (响应式流)
Client->>Observable: create(emitter)
Observable-->>Operator1: 注册操作符
Operator1->>Observable: map(transform)
Observable-->>Operator2: 注册操作符
Operator2->>Observable: filter(predicate)
Observable-->>Subscriber: subscribe(Subscriber)
Note right of Client: 非阻塞订阅
par 数据流处理
Observable->>Operator1: onNext(item)
Operator1->>Operator2: onNext(mapped)
Operator2->>Subscriber: onNext(filtered)
end
Observable->>Subscriber: onComplete()
Note over Client: 响应式: map/filter/subscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Observable<String> observable1 = Observable.fromCallable(() -> { Thread.sleep(1000 ); return "Hello" ; }).subscribeOn(Schedulers.io()); Observable<String> observable2 = Observable.fromCallable(() -> { Thread.sleep(500 ); return "RxJava" ; }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, (s1, s2) -> s1 + " " + s2) .map(String::toUpperCase) .timeout(2 , TimeUnit.SECONDS) .onErrorReturn(ex -> "FALLBACK: " + ex.getMessage()) .subscribe( result -> System.out.println("结果: " + result), error -> System.err.println("错误: " + error.getMessage()) );
Reactor
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#9DC3E6', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
participant Client as Client
participant Flux as Flux
participant Operator1 as map
participant Operator2 as flatMap
participant Subscriber as Subscriber
Note over Client,Subscriber: Reactor (响应式流)
Client->>Flux: create(sink)
Flux-->>Operator1: 注册操作符
Operator1->>Flux: map(transform)
Flux-->>Operator2: 注册操作符
Operator2->>Flux: flatMap(asyncOp)
Flux-->>Subscriber: subscribe(Subscriber)
Note right of Client: 非阻塞订阅
par 数据流处理
Flux->>Operator1: onNext(item)
Operator1->>Operator2: onNext(mapped)
Operator2->>Subscriber: onNext(result)
end
Flux->>Subscriber: onComplete()
Note over Client: 响应式: map/flatMap/subscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Flux<String> flux1 = Flux.fromCallable(() -> { Thread.sleep(1000 ); return "Hello" ; }).subscribeOn(Schedulers.boundedElastic()); Flux<String> flux2 = Flux.fromCallable(() -> { Thread.sleep(500 ); return "Reactor" ; }).subscribeOn(Schedulers.boundedElastic()); Flux.zip(flux1, flux2, (s1, s2) -> s1 + " " + s2) .map(String::toUpperCase) .timeout(Duration.ofSeconds(2 )) .onErrorResume(ex -> Mono.just("FALLBACK: " + ex.getMessage())) .subscribe( result -> System.out.println("结果: " + result), error -> System.err.println("错误: " + error.getMessage()), () -> System.out.println("完成" ) );
虚拟线程:Java 并发模型的未来
虚拟线程(Virtual Threads)是 JDK 21 正式引入的革命性特性(JEP 444),代表了 Java 并发模型的未来方向。理解虚拟线程与传统线程池的差异,对架构决策至关重要。
为什么需要虚拟线程?
传统线程模型的困境
graph TD
subgraph "传统平台线程模型"
A[Java Thread] -->|1:1 映射| B[OS Thread]
B --> C[内核调度]
C --> D[上下文切换开销大]
D --> E[线程数受限于内存]
E --> F[线程池成为必需品]
end
subgraph "问题"
F --> G[线程池大小难以调优]
F --> H[阻塞操作浪费线程]
F --> I[高并发场景受限]
end
传统 Java 线程(平台线程)的问题:
问题
描述
影响
内存开销大
每个线程默认栈大小 1MB
10000 线程 ≈ 10GB 内存
创建成本高
需要 OS 内核参与
创建/销毁耗时约 1ms
上下文切换昂贵
内核态切换
约 1-10μs 每次切换
数量受限
受 OS 和内存限制
通常数千到数万
虚拟线程的解决方案
graph TD
subgraph "虚拟线程模型"
A1[Virtual Thread 1] --> B1[Carrier Thread 1]
A2[Virtual Thread 2] --> B1
A3[Virtual Thread 3] --> B1
A4[Virtual Thread 4] --> B2[Carrier Thread 2]
A5[Virtual Thread ...] --> B2
A6[Virtual Thread N] --> B2
B1 --> C1[OS Thread 1]
B2 --> C2[OS Thread 2]
end
subgraph "优势"
D[M:N 调度模型]
E[JVM 管理调度]
F[阻塞时自动让出]
G[百万级并发]
end
虚拟线程 vs 平台线程
核心差异对比
特性
平台线程 (Platform Thread)
虚拟线程 (Virtual Thread)
映射关系
1:1 映射到 OS 线程
M:N 映射(多对少)
调度者
OS 内核
JVM(用户态调度)
栈大小
固定(默认 1MB)
动态增长(初始 KB 级)
创建成本
高(~1ms)
极低(~1μs)
数量上限
数千到数万
数百万
阻塞行为
阻塞 OS 线程
自动挂起,让出载体线程
适用场景
CPU 密集型
IO 密集型
代码对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ExecutorService platformPool = Executors.newFixedThreadPool(200 ); List<Future<String>> futures = new ArrayList <>();for (int i = 0 ; i < 10000 ; i++) { final int id = i; futures.add(platformPool.submit(() -> { Thread.sleep(100 ); return "Result-" + id; })); }try (ExecutorService virtualPool = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<String>> futures = new ArrayList <>(); for (int i = 0 ; i < 10000 ; i++) { final int id = i; futures.add(virtualPool.submit(() -> { Thread.sleep(100 ); return "Result-" + id; })); } for (Future<String> future : futures) { System.out.println(future.get()); } }
虚拟线程的工作原理
挂载与卸载机制
sequenceDiagram
participant VT as Virtual Thread
participant CT as Carrier Thread
participant OS as OS Thread
participant IO as IO Operation
Note over VT,OS: 虚拟线程执行流程
VT->>CT: 挂载 (mount)
CT->>OS: 执行用户代码
VT->>IO: 发起阻塞IO
Note over VT: 检测到阻塞操作
VT->>CT: 卸载 (unmount)
Note over CT: 载体线程空闲,可执行其他虚拟线程
IO-->>VT: IO完成
VT->>CT: 重新挂载 (remount)
CT->>OS: 继续执行
关键概念
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class VirtualThreadConcepts { public static void main (String[] args) throws Exception { Thread vt1 = Thread.ofVirtual() .name("virtual-thread-1" ) .start(() -> { System.out.println("Running on: " + Thread.currentThread()); System.out.println("Is virtual: " + Thread.currentThread().isVirtual()); }); Thread vt2 = Thread.startVirtualThread(() -> { System.out.println("Quick virtual thread" ); }); try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> { System.out.println("Virtual thread from executor" ); }); } vt1.join(); vt2.join(); Thread.ofVirtual().start(() -> { System.out.println("Virtual thread running" ); try { Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).join(); } }
虚拟线程的使用原则
原则一:不要池化虚拟线程
1 2 3 4 5 6 7 ExecutorService wrongPool = Executors.newFixedThreadPool(100 , Thread.ofVirtual().factory()); ExecutorService correctPool = Executors.newVirtualThreadPerTaskExecutor();
原因 :虚拟线程的创建成本极低(约 1μs),池化反而增加了不必要的复杂性和限制。
原则二:避免在虚拟线程中执行 CPU 密集型任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> { long result = 0 ; for (long i = 0 ; i < 10_000_000_000L ; i++) { result += i; } return result; }); }ExecutorService cpuPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); cpuPool.submit(() -> { return heavyComputation(); });
原因 :虚拟线程的优势在于 IO 等待期间让出载体线程。CPU 密集型任务没有等待,无法发挥虚拟线程优势。
原则三:注意 synchronized 和 native 方法的 Pinning 问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class PinningExample { private final Object lock = new Object (); public void badMethod () { synchronized (lock) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } private final ReentrantLock reentrantLock = new ReentrantLock (); public void goodMethod () { reentrantLock.lock(); try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { reentrantLock.unlock(); } } }
Pinning(钉住) :当虚拟线程执行 synchronized 块或 native 方法时,无法从载体线程卸载,导致载体线程被阻塞。
原则四:正确使用 ThreadLocal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class ThreadLocalWithVirtualThreads { private static final ThreadLocal<ExpensiveObject> threadLocal = ThreadLocal.withInitial(ExpensiveObject::new ); public void process () { ExpensiveObject obj = threadLocal.get(); } }
虚拟线程与传统线程池的选择
决策流程图
flowchart TD
A[新任务] --> B{任务类型?}
B -->|IO密集型| C{JDK版本?}
C -->|JDK 21+| D[使用虚拟线程]
C -->|JDK 21以下| E[使用传统IO线程池]
B -->|CPU密集型| F[使用平台线程池]
F --> G[线程数 = CPU核心数]
B -->|混合型| H{主要瓶颈?}
H -->|IO等待| D
H -->|CPU计算| F
D --> I[newVirtualThreadPerTaskExecutor]
E --> J[newCachedThreadPool 或 自定义线程池]
style D fill:#90EE90
style F fill:#87CEEB
style E fill:#FFE4B5
场景对比
场景
推荐方案
原因
Web 服务器处理 HTTP 请求
虚拟线程
大量 IO 等待(数据库、外部 API)
批量数据处理/ETL
虚拟线程
文件 IO、网络传输
图像/视频处理
平台线程池
CPU 密集型计算
科学计算/机器学习
平台线程池 + ForkJoinPool
CPU 密集型,需要并行计算
实时交易系统
平台线程池
低延迟要求,避免 JVM 调度开销
微服务间调用
虚拟线程
大量网络 IO 等待
迁移示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class MigrationExample { private final ExecutorService legacyPool = new ThreadPoolExecutor ( 10 , 100 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(1000 ), new ThreadPoolExecutor .CallerRunsPolicy() ); public CompletableFuture<String> fetchDataLegacy (String url) { return CompletableFuture.supplyAsync(() -> { return httpClient.get(url); }, legacyPool); } private final ExecutorService virtualPool = Executors.newVirtualThreadPerTaskExecutor(); public CompletableFuture<String> fetchDataVirtual (String url) { return CompletableFuture.supplyAsync(() -> { return httpClient.get(url); }, virtualPool); } private final ExecutorService ioPool = Executors.newVirtualThreadPerTaskExecutor(); private final ExecutorService cpuPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public CompletableFuture<ProcessedData> fetchAndProcess (String url) { return CompletableFuture .supplyAsync(() -> httpClient.get(url), ioPool) .thenApplyAsync(data -> heavyProcess(data), cpuPool); } }
虚拟线程的性能特征
吞吐量对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class PerformanceComparison { private static final int TASK_COUNT = 100_000 ; private static final int IO_DELAY_MS = 50 ; public static void main (String[] args) throws Exception { long platformTime = benchmarkPlatformThreads(); System.out.println("Platform threads: " + platformTime + " ms" ); long virtualTime = benchmarkVirtualThreads(); System.out.println("Virtual threads: " + virtualTime + " ms" ); } static long benchmarkPlatformThreads () throws Exception { ExecutorService pool = Executors.newFixedThreadPool(200 ); long start = System.currentTimeMillis(); List<Future<?>> futures = new ArrayList <>(); for (int i = 0 ; i < TASK_COUNT; i++) { futures.add(pool.submit(() -> { Thread.sleep(IO_DELAY_MS); return null ; })); } for (Future<?> f : futures) f.get(); pool.shutdown(); return System.currentTimeMillis() - start; } static long benchmarkVirtualThreads () throws Exception { long start = System.currentTimeMillis(); try (var pool = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<?>> futures = new ArrayList <>(); for (int i = 0 ; i < TASK_COUNT; i++) { futures.add(pool.submit(() -> { Thread.sleep(IO_DELAY_MS); return null ; })); } for (Future<?> f : futures) f.get(); } return System.currentTimeMillis() - start; } }
内存占用对比
线程类型
10,000 线程内存
100,000 线程内存
1,000,000 线程内存
平台线程
~10 GB
~100 GB(不可行)
不可能
虚拟线程
~20 MB
~200 MB
~2 GB
总结:线程池技术的演进
timeline
title Java 并发模型演进
section JDK 1.0-1.4
1996 : Thread 类
: 手动管理线程
section JDK 5
2004 : ExecutorService
: ThreadPoolExecutor
: 线程池标准化
section JDK 7
2011 : ForkJoinPool
: 工作窃取算法
: 分治并行
section JDK 8
2014 : CompletableFuture
: 异步编程
: 链式组合
section JDK 21
2023 : Virtual Threads
: 百万级并发
: 简化异步编程
技术选型总结
技术
适用场景
核心优势
注意事项
ThreadPoolExecutor
通用任务执行
成熟稳定,参数可控
需要调优线程池参数
ForkJoinPool
分治/递归任务
工作窃取,高效并行
任务需可分解
CompletableFuture
异步编排
链式组合,声明式
注意线程池选择
Virtual Threads
IO 密集型高并发
轻量级,简化编程
JDK 21+,避免 Pinning
核心洞察 :虚拟线程不是要取代线程池,而是为 IO 密集型场景提供更简单、更高效的解决方案。在 CPU 密集型场景,传统线程池仍然是最佳选择。理解每种技术的适用场景,才能做出正确的架构决策。