本文一部分来自于qwen,一部分来自于以下文章:

  1. 《CompletableFuture原理与实践-外卖商家端API的异步化》

Java

演进本质

graph LR
    A[Java 5 Future] -->|阻塞痛点| B[Guava ListenableFuture]
    B -->|回调地狱| C[CompletableFuture]
    C -->|流处理需求| D[RxJava]
    D -->|Spring整合| E[Reactor]
    
    A -->|范式转变| F[命令式->声明式]
    B -->|抽象提升| G[事件驱动->数据流]
    C -->|能力增强| H[组合->背压]
    D & E -->|统一理念| I[异步即数据流]
  • 控制流:阻塞等待 → 回调响应 → 声明式组合 → 响应式流
  • 错误处理:分散try-catch → 回调onFailure → 链式exceptionally → 流式onError
  • 组合能力:无 → 有限transform → 丰富组合操作符 → 完整流处理
  • 背压支持:无 → 无 → 有限 → 完整内建支持
模式 核心特征 编程范式 适用场景
Java 5 Future 阻塞等待 命令式 简单异步任务,兼容性要求高
Guava ListenableFuture 回调驱动 事件驱动 中等复杂度,需要非阻塞回调
CompletableFuture 链式组合 声明式 复杂异步流程,需要组合和错误处理
RxJava 响应式流 函数式响应式 事件流处理,背压支持,复杂数据转换
Reactor 响应式流 函数式响应式 Spring生态,高性能流处理,背压内建

传统调用时序

Java 5 Future

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#FFF5E1', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
    participant Client as Client
    participant Executor as ThreadPoolExecutor
    participant FutureTask as FutureTask
    participant Worker as WorkerThread

    Note over Client,Worker: Java 5 Future (阻塞式)
    Client->>Executor: submit(Callable)
    Executor->>FutureTask: 创建FutureTask
    Executor->>Worker: 提交任务
    Client->>FutureTask: future.get()
    Note right of Client: 阻塞等待结果
    Worker->>FutureTask: 执行call()
    FutureTask-->>Client: 返回结果
    Note over Client: 阻塞调用: future.get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 核心特点:阻塞等待结果
ExecutorService executor = Executors.newFixedThreadPool(2);

Future<String> future = executor.submit(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Hello Future";
});

try {
// ⚠️ 阻塞当前线程等待结果
String result = future.get(2, TimeUnit.SECONDS);
System.out.println(result);
} catch (TimeoutException e) {
System.err.println("超时了!");
} finally {
executor.shutdown();
}

Guava ListenableFuture

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#D5E8D4', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
    participant Client as Client
    participant Executor as ListeningExecutorService
    participant ListenableFuture as ListenableFuture
    participant Worker as WorkerThread
    participant Callback as FutureCallback

    Note over Client,Callback: Guava ListenableFuture (回调式)
    Client->>Executor: submit(Callable)
    Executor->>ListenableFuture: 创建ListenableFuture
    Executor->>Worker: 提交任务
    Client->>ListenableFuture: addCallback(Callback, executor)
    Note right of Client: 非阻塞注册回调
    Worker->>ListenableFuture: 执行call()
    alt 执行成功
        ListenableFuture->>Callback: onSuccess(result)
        Callback-->>Client: 处理结果
    else 执行失败
        ListenableFuture->>Callback: onFailure(throwable)
        Callback-->>Client: 处理异常
    end
    Note over Client: 回调模式: onSuccess/onFailure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 核心特点:回调驱动,避免阻塞
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(2)
);

ListenableFuture<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello ListenableFuture";
});

// ✅ 非阻塞:注册回调处理结果
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("成功: " + result);
}
@Override
public void onFailure(Throwable t) {
System.err.println("失败: " + t.getMessage());
}
}, executor);

// ⚠️ 需要手动关闭executor
executor.shutdown();

这个方案是很容易产生回调地狱的,因为总是会有 addCallback + onSuccess 这种不可编排、组合 api 不适合把大型并发结果组合在一起的缺陷:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 🚨 典型的 ListenableFuture 回调地狱(3层依赖)
ListenableFuture<User> userFuture = userService.getUser(userId);

// 第一层回调
Futures.addCallback(userFuture, new FutureCallback<User>() {
@Override
public void onSuccess(User user) {
// 第二层回调:需要 user 结果
ListenableFuture<List<Order>> ordersFuture = orderService.getOrders(user.getId());

Futures.addCallback(ordersFuture, new FutureCallback<List<Order>>() {
@Override
public void onSuccess(List<Order> orders) {
// 第三层回调:需要 orders 结果
ListenableFuture<Report> reportFuture = reportService.generateReport(orders);

Futures.addCallback(reportFuture, new FutureCallback<Report>() {
@Override
public void onSuccess(Report report) {
// 📌 业务逻辑被分散在4层缩进中
sendNotification(report);
logReport(report);
}
@Override
public void onFailure(Throwable t) {
// ❌ 错误处理重复且分散
log.error("Report generation failed", t);
auditService.logFailure("report", t);
}
}, executor);
}
@Override
public void onFailure(Throwable t) {
log.error("Order retrieval failed", t);
auditService.logFailure("orders", t);
}
}, executor);
}
@Override
public void onFailure(Throwable t) {
log.error("User retrieval failed", t);
auditService.logFailure("user", t);
}
}, executor);

CompletableFuture

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#DAE8FC', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
    participant Client as Client
    participant CF as CompletableFuture
    participant Executor as Executor
    participant Stage1 as Stage1
    participant Stage2 as Stage2
    participant Stage3 as Stage3

    Note over Client,Stage3: Java 8+ CompletableFuture (声明式)
    Client->>CF: supplyAsync(() -> getA(), executor)
    CF-->>Stage1: 注册Stage1
    Stage1->>CF: thenCompose(a -> getBAsync(a))
    CF-->>Stage2: 注册Stage2
    Stage2->>CF: thenCombine(getCAsync(), (b,c) -> merge(b,c))
    CF-->>Stage3: 注册Stage3
    Stage3->>CF: exceptionally(ex -> handle(ex))
    CF-->>Client: 返回CompletableFuture

    Note right of Client: 非阻塞链式调用
    par 异步执行
        Executor->>Stage1: 执行Stage1
        Executor->>Stage2: 执行Stage2(依赖Stage1结果)
        Executor->>Stage3: 执行Stage3(合并结果)
    end

    CF-->>Client: complete(result)
    Client->>CF: thenAccept(result -> process(result))
    Note over Client: 链式调用: thenCompose/thenCombine

ListenableFuture 的改进

1
2
3
4
5
6
7
8
9
10
11
12
// ListenableFuture 设计哲学:
// "当异步操作完成时,通知我"
future.addCallback(new FutureCallback<T>() {
void onSuccess(T result); // 回调处理成功
void onFailure(Throwable t); // 回调处理失败
});

// CompletableFuture 设计哲学:
// "将这个异步操作与下一个操作组合起来"
future.thenCompose(result -> nextOperation(result))
.thenApply(transformed -> process(transformed))
.exceptionally(ex -> handle(ex));

执行树

使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行:

CompletableFuture执行树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CompletableFutureThreadContext {

public static void main(String[] args) {
// 🚀 创建专用线程池(生产环境必须)
//
// 📌 IO密集型线程池选择原理:
// - IO密集型任务(网络调用、文件读写)大部分时间在等待IO,不占用CPU
// - 线程数可以 > CPU核心数,充分利用等待时间
// - 最佳线程数公式:CPU核心数 * (1 + 平均等待时间/平均CPU时间)
// - 示例:8核CPU,等待时间:CPU时间=2:1 → 8 * (1+2) = 24个线程
// - 为什么不用ForkJoinPool?IO阻塞会浪费工作窃取线程,固定线程池更适合阻塞操作
ExecutorService ioExecutor = Executors.newFixedThreadPool(20, new NamedThreadFactory("io-pool"));

// 📌 CPU密集型线程池选择原理:
// - CPU密集型任务持续占用CPU,线程数应 ≈ CPU核心数
// - ForkJoinPool使用工作窃取算法,负载均衡最佳
// - asyncMode=false(默认):LIFO调度,适合递归分治任务(CPU计算)
// - asyncMode=true:FIFO调度,适合生产者-消费者模式
// - 聚合结果属于CPU计算,使用默认LIFO模式缓存命中率更高
ExecutorService cpuExecutor = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(), // ✅ CPU密集型使用CPU核心数
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
false // LIFO模式,适合计算密集型任务
);

try {
// 📌 关键原则1:清楚知道每一行代码在哪个线程执行
System.out.println("[" + Thread.currentThread().getName() + "] 🏃‍♂️ 主线程开始执行");

// ✅ 1. 30个服务并行调用
List<CompletableFuture<ServiceResponse>> serviceFutures = IntStream.range(0, 30)
.mapToObj(i -> {
// 📌 关键原则2:supplyAsync明确指定执行线程
// - 这里使用ioExecutor,所有service.call(i)都在IO线程池执行
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] ⚡ 执行服务调用: " + i);
return callExternalService(i); // IO密集型操作
}, ioExecutor)
// 📌 关键原则3:异步方法的行为
// - exceptionally() 是同步方法(无Async后缀)
// - 如果future已完成,由当前线程执行
// - 如果future未完成,由完成future的线程执行
// ✅ 这里由于future可能未完成,异常处理在ioExecutor线程执行
.exceptionally(ex -> {
System.err.println("[" + Thread.currentThread().getName() + "] 🛡️ 服务[" + i + "]异常处理");
return createServiceFallback(i, ex);
})
// 📌 关键原则4:异步方法的线程池选择
// - completeOnTimeout() 是异步方法(内部使用Async)
// - 未指定Executor,使用ForkJoinPool.commonPool()
// - CommonPool大小 = max(1, Runtime.getRuntime().availableProcessors() - 1)
// - IO密集应用中,这可能成为瓶颈!
// ✅ 生产环境最佳实践:应指定专用超时线程池
.completeOnTimeout(createTimeoutFallback(i), 2, TimeUnit.SECONDS);
})
.collect(Collectors.toList());

// ✅ 2. 等待所有完成 + 聚合结果
CompletableFuture<List<ServiceResponse>> resultFuture =
// 📌 allOf() 的设计哲学:
// - 返回 CompletableFuture<Void>,Void表示"完成信号"而非"结果值"
// - 只关心"所有future是否完成",不关心具体结果类型
// - 支持任意类型future组合(String/Integer/Boolean等混合)
// - 与thenApply()配合,将"完成状态"转换为"业务结果"
//
// 📌 toArray(new CompletableFuture[0]) 最佳实践:
// - 让JVM根据集合大小动态创建正确类型的新数组
// • 这里serviceFutures有30个元素,JVM会创建new CompletableFuture[30]
// - 比new T[collection.size()]更高效(避免不必要的数组分配)
// • 如果写new CompletableFuture[serviceFutures.size()],会先创建一个30大小的数组
// • 然后List.toArray()内部还会再创建一个30大小的数组,造成一次额外分配
// - 兼容性最好,避免类型擦除问题
// • 泛型List<T>的toArray()方法在运行时丢失T类型信息
// • 传入空数组让JVM推断类型,最安全
CompletableFuture.allOf(serviceFutures.toArray(new CompletableFuture[0]))
.orTimeout(5, TimeUnit.SECONDS)
// 📌 关键原则5:NIO异步RPC的陷阱
// - 如果serviceFutures来自基于NIO的异步RPC(如Netty)
// - 结果由IO线程设置,回调由IO线程触发
// - thenApply() 同步回调会在IO线程执行!
// - IO线程池通常很小(如Netty的bossGroup=1, workerGroup=CPU核心数)
// - 回调中不能有阻塞/耗时操作,否则阻塞整个IO线程池
// ✅ 解决方案:必须使用thenApplyAsync指定非IO线程池
.thenApplyAsync(v -> {
System.out.println("[" + Thread.currentThread().getName() + "] 🧮 聚合结果(CPU密集型)");
// 📌 这里明确使用cpuExecutor,避免在IO线程执行CPU计算
return serviceFutures.stream()
.map(CompletableFuture::join) // join()不阻塞,因为allOf已等待完成
.collect(Collectors.toList());
}, cpuExecutor) // ✅ 明确指定CPU线程池
.exceptionally(ex -> {
System.err.println("[" + Thread.currentThread().getName() + "] 🌐 全局异常处理");
return Arrays.asList(createGlobalFallback(ex));
});

// 📌 关键原则6:末端阻塞的线程
// - join()是阻塞调用,由调用线程(这里是主线程)执行
System.out.println("[" + Thread.currentThread().getName() + "] ⏳ 等待最终结果...");
List<ServiceResponse> results = resultFuture.join();
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 最终结果: " +
results.stream().filter(r -> !r.isFallback).count() + "个成功");

} finally {
shutdownExecutor(ioExecutor);
shutdownExecutor(cpuExecutor);
}
}

// 其他辅助方法保持不变...
private static ServiceResponse callExternalService(int id) {
try {
Thread.sleep(100 + (id % 10) * 20);
if (id == 15 || id == 25) {
throw new RuntimeException("服务[" + id + "] 模拟故障");
}
return new ServiceResponse("Result-" + id, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("服务调用被中断", e);
}
}

private static ServiceResponse createServiceFallback(int id, Throwable ex) {
return new ServiceResponse("Fallback-" + id, true);
}

private static ServiceResponse createTimeoutFallback(int id) {
return new ServiceResponse("Timeout-" + id, true);
}

private static ServiceResponse createGlobalFallback(Throwable ex) {
return new ServiceResponse("Global-Fallback: " + ex.getMessage(), true);
}

private static void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

static class NamedThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNum = new AtomicInteger(1);

NamedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-worker-" + threadNum.getAndIncrement());
t.setDaemon(false);
t.setUncaughtExceptionHandler((thread, ex) ->
System.err.println("❌ 线程 " + thread.getName() + " 未捕获异常: " + ex.getMessage()));
return t;
}
}

static class ServiceResponse {
final String content;
final boolean isFallback;

ServiceResponse(String content, boolean isFallback) {
this.content = content;
this.isFallback = isFallback;
}
}
}

这个例子里 thenApplyAsync 是一个很好的提示,通常我们都会使用 thenApply-习惯成自然,但是在 io client 里使用很容易让 apply 在 outbound io 线程里使用,这时候一着不慎就会阻塞 outbound io 线程。所以有意识地使用 thenApplyAsync 是一个好习惯

基于纵横分类的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* CompletableFuture API 分类示例 - 最简版本
*
* 本示例严格遵循 CompletionStage 的架构设计思想,只使用基本数据类型,
* 清晰展示横向分类(产出型/消耗型/无产出型/组合型)和纵向分类。
*/
public class SimpleCompletableFutureExamples {

// 简单的线程池
private static final ExecutorService executor = Executors.newFixedThreadPool(3, r -> {
Thread t = new Thread(r, "worker-thread");
t.setDaemon(true);
return t;
});

/**
* 📚 CompletionStage 架构体系核心思想
*
* 横向分类(功能维度):
* 1. 产出型/函数型(apply):产生新结果,影响后续阶段
* 2. 消耗型/消费型(accept):只执行副作用,不影响结果
* 3. 无消费无产出型(run):不依赖结果,只关心阶段完成
* 4. 组合型(compose):扁平化嵌套的 CompletionStage
*
* 纵向分类(执行维度):
* 1. 依赖阶段数:单阶段/双阶段/任一阶段
* 2. 执行方式:默认/默认异步/自定义异步
* 3. 完成状态:正常完成/任意完成/异常完成
*/

/**
* 1. 产出型/函数型(apply)- 产生新结果
*/
public static void applyExamples() {
System.out.println("=== 产出型(apply)示例 ===");

// 单阶段依赖:thenApply
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成初始值");
return 42;
}, executor);

CompletableFuture<String> future2 = future1.thenApply(number -> {
System.out.println("[" + Thread.currentThread().getName() + "] 产出型: 转换 " + number + " -> 字符串");
return "Value: " + number;
});

// 双阶段依赖:thenCombine
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成第二个值");
return 100;
}, executor);

CompletableFuture<String> combined = future1.thenCombine(future3, (num1, num2) -> {
System.out.println("[" + Thread.currentThread().getName() + "] 产出型: 合并 " + num1 + " 和 " + num2);
return "Sum: " + (num1 + num2);
});

System.out.println("结果1: " + future2.join());
System.out.println("结果2: " + combined.join());
}

/**
* 2. 消耗型/消费型(accept)- 只执行副作用
*/
public static void acceptExamples() {
System.out.println("\n=== 消耗型(accept)示例 ===");

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成数据");
return "Hello World";
}, executor);

// 单阶段依赖:thenAccept
CompletableFuture<Void> logFuture = future.thenAccept(result -> {
System.out.println("[" + Thread.currentThread().getName() + "] 消耗型: 记录日志 - " + result);
});

// 双阶段依赖:thenAcceptBoth
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成第二个数据");
return "Java";
}, executor);

CompletableFuture<Void> consumeBoth = future.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println("[" + Thread.currentThread().getName() + "] 消耗型: 同时消费两个结果 - " + result1 + ", " + result2);
});

logFuture.join();
consumeBoth.join();
}

/**
* 3. 无消费无产出型(run)- 只关心阶段完成
*/
public static void runExamples() {
System.out.println("\n=== 无消费无产出型(run)示例 ===");

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成数据");
return "Data";
}, executor);

// 单阶段依赖:thenRun
CompletableFuture<Void> notifyFuture = future.thenRun(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 无消费无产出: 发送通知(不依赖结果)");
});

// 双阶段依赖:runAfterBoth
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 生产者: 生成第二个数据");
return "More Data";
}, executor);

CompletableFuture<Void> cleanupFuture = future.runAfterBoth(future2, () -> {
System.out.println("[" + Thread.currentThread().getName() + "] 无消费无产出: 两个任务完成,清理资源");
});

notifyFuture.join();
cleanupFuture.join();
}

/**
* 4. 组合型(compose)- 扁平化嵌套
*/
public static void composeExamples() {
System.out.println("\n=== 组合型(compose)示例 ===");

// thenCompose 扁平化嵌套
CompletableFuture<Integer> stage1 = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 阶段1: 生成初始值");
return 10;
}, executor);

CompletableFuture<Integer> stage2 = stage1.thenCompose(value -> {
System.out.println("[" + Thread.currentThread().getName() + "] 组合型: 阶段1完成,值=" + value);
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 阶段2: 基于阶段1的值计算");
return value * 2;
}, executor);
});

CompletableFuture<Integer> stage3 = stage2.thenCompose(value -> {
System.out.println("[" + Thread.currentThread().getName() + "] 组合型: 阶段2完成,值=" + value);
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 阶段3: 最终计算");
return value + 5;
}, executor);
});

System.out.println("最终结果: " + stage3.join());
}

/**
* 5. 异常处理 - 完成状态分类
*/
public static void exceptionExamples() {
System.out.println("\n=== 异常处理示例 ===");

// exceptionally - 仅异常完成时处理
CompletableFuture<String> errorFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 模拟失败的操作");
throw new RuntimeException("操作失败");
}, executor);

CompletableFuture<String> fallbackFuture = errorFuture.exceptionally(ex -> {
System.out.println("[" + Thread.currentThread().getName() + "] 异常处理: exceptionally - " + ex.getMessage());
return "降级结果";
});

// handle - 无论正常或异常都处理
CompletableFuture<String> handleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 可能成功或失败的操作");
return Math.random() > 0.5 ? "成功结果" : null; // 模拟可能失败
}, executor).handle((result, ex) -> {
if (ex != null) {
System.out.println("[" + Thread.currentThread().getName() + "] 异常处理: handle - 捕获异常");
return "处理后的异常结果";
} else {
System.out.println("[" + Thread.currentThread().getName() + "] 异常处理: handle - 正常结果");
return "处理后的成功结果: " + result;
}
});

// whenComplete - 无论正常或异常都执行(不改变结果)
CompletableFuture<String> loggingFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 需要记录日志的操作");
return "需要日志的结果";
}, executor).whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("[" + Thread.currentThread().getName() + "] 异常处理: whenComplete - 记录异常");
} else {
System.out.println("[" + Thread.currentThread().getName() + "] 异常处理: whenComplete - 记录结果: " + result);
}
}).exceptionally(ex -> "日志记录后的降级结果");

System.out.println("exceptionally 结果: " + fallbackFuture.join());
System.out.println("handle 结果: " + handleFuture.join());
System.out.println("whenComplete 结果: " + loggingFuture.join());
}

/**
* 6. 最简生产者-消费者模型
*/
public static void producerConsumerExample() {
System.out.println("\n=== 最简生产者-消费者模型 ===");

// 生产者:生成数据
CompletableFuture<String> producer = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 🌱 生产者: 生成数据");
try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "生产的数据";
}, executor);

// 消费者1:转换数据
CompletableFuture<String> consumer1 = producer.thenApply(data -> {
System.out.println("[" + Thread.currentThread().getName() + "] 👥 消费者1: 转换数据 - " + data);
return data.toUpperCase();
});

// 消费者2:消费数据
CompletableFuture<Void> consumer2 = producer.thenAccept(data -> {
System.out.println("[" + Thread.currentThread().getName() + "] 👥 消费者2: 消费数据 - " + data);
});

// 消费者3:组合处理
CompletableFuture<String> consumer3 = producer.thenCompose(data -> {
System.out.println("[" + Thread.currentThread().getName() + "] 👥 消费者3: 开始组合处理");
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 👥 消费者3: 异步处理 " + data);
return data + "-processed";
}, executor);
});

// 等待所有消费者完成
CompletableFuture.allOf(consumer1, consumer2, consumer3)
.thenRun(() -> System.out.println("✅ 所有消费者完成"))
.join();

System.out.println("消费者1最终结果: " + consumer1.join());
System.out.println("消费者3最终结果: " + consumer3.join());
}

public static void main(String[] args) {
try {
// 按架构分类执行最简示例
applyExamples();
acceptExamples();
runExamples();
composeExamples();
exceptionExamples();
producerConsumerExample();

System.out.println("\n✅ 所有 CompletableFuture API 示例执行完成");
System.out.println("✨ 架构验证: 横向分类 + 纵向分类");

} finally {
executor.shutdown();
}
}
}

exceptionally

graph LR
    A[原始 CompletableFuture] -->|正常完成| B[“成功值”]
    A -->|异常完成| C[RuntimeException]
    B --> D[套壳 CompletableFuture]
    C -->|fn.apply| E[“降级值”]
    E --> D
    D -->|总是正常完成| F[最终结果]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import java.util.concurrent.*;
import java.util.function.Function;

public class ExceptionallyPrinciples {

private static final ExecutorService bizPool = Executors.newFixedThreadPool(2, r -> {
Thread t = new Thread(r, "biz-thread");
t.setDaemon(true);
return t;
});

public static void main(String[] args) {
// 创建一个模拟失败的服务调用
CompletableFuture<String> serviceCall = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 🌐 调用外部服务 (模拟失败)");
throw new RuntimeException("网络超时: 服务不可用");
}, bizPool);

// 五大原则实战链 - 展示异常传播跳过中间阶段
String result = serviceCall
// 🚨 中间阶段1: thenApply (依赖成功结果,会被跳过)
.thenApply(data -> {
System.out.println("❌ 这行永远不会执行! (thenApply被跳过)");
return data.toUpperCase();
})

// 🚨 中间阶段2: thenAccept (依赖成功结果,会被跳过)
.thenAccept(data -> {
System.out.println("❌ 这行永远不会执行! (thenAccept被跳过)");
})

// 🚨 中间阶段3: another thenApply (同样被跳过)
.thenApply(data -> {
System.out.println("❌ 这行永远不会执行! (第二个thenApply被跳过)");
return data + "_PROCESSED";
})

// 📌 原则1: 就近逃生原则
// ✅ 关键演示: 异常传播会跳过所有依赖成功结果的中间阶段
// 直到遇到第一个exceptionally才被捕获
// ✅ 就近处理优先级最高,阻止异常继续向后传播
.exceptionally(ex -> {
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 原则1: 就近逃生 - 捕获网络异常: " + ex.getMessage());
// 仅处理特定异常类型,其他异常继续传播
if (ex.getCause().getMessage().contains("网络")) {
return "NEAR_FALLBACK"; // 就近处理,异常被"吸收"
}
// 📌 重要原则: 非目标异常必须透传!
// 不处理的异常必须重新抛出,让后续阶段有机会处理
throw ex;
})

// 📌 原则2: 状态重置原则
// ✅ exceptionally返回新CompletableFuture,总是正常完成
// 后续阶段无法区分是原始成功还是降级成功
.thenApply(value -> {
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 原则2: 状态重置 - 当前值: '" + value + "' (状态已正常化)");
return value + "_ENHANCED";
})

// 🚨 模拟另一个异常点 (测试后续exceptionally)
.thenApply(value -> {
if (value.contains("FAIL")) {
throw new RuntimeException("处理阶段异常");
}
return value;
})

// 📌 原则3: 执行线程陷阱原则
// ✅ exceptionally是同步方法,由完成线程执行
// 在NIO架构中,这可能是Netty IO线程,必须避免阻塞操作
.exceptionally(ex -> {
System.out.println("[" + Thread.currentThread().getName() + "] ⚠️ 原则3: 线程陷阱 - 在" + Thread.currentThread().getName() + "执行降级");
// 模拟危险:在IO线程执行耗时操作(生产环境应避免)
try { Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "TRAP_FALLBACK";
})

// 📌 原则4: 异常信息保留原则
// ✅ 原始异常栈在多次exceptionally后会丢失
// 必须显式记录原始异常,避免调试地狱
.exceptionally(ex -> {
System.err.println("[" + Thread.currentThread().getName() + "] ✅ 原则4: 信息保留 - 原始异常: " +
(ex.getCause() != null ? ex.getCause().getMessage() : ex.getMessage()));
// 实际项目中应记录完整堆栈: log.error("降级原因", ex);
return "PRESERVED:" + (ex.getCause() != null ? ex.getCause().getClass().getSimpleName() : ex.getClass().getSimpleName());
})

// 📌 原则5: 层级化异常处理原则
// ✅ 按异常类型分层处理,而非位置
// 优先业务异常,最后系统异常兜底
// 📌 关键原则: 非目标异常必须透传!
// 只处理自己关心的异常类型,其他异常重新抛出
// 保证异常处理链的完整性
.exceptionally(ex -> handleBusinessException(ex)) // 第一层:业务异常
.exceptionally(ex -> handleSystemException(ex)) // 第二层:系统异常
.exceptionally(ex -> ultimateFallback(ex)) // 第三层:终极兜底
.join(); // 末端阻塞获取结果

System.out.println("\n🎯 最终结果: '" + result + "'");
bizPool.shutdown();
}

// 原则5辅助方法:层级化异常处理
private static String handleBusinessException(Throwable ex) {
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 原则5: 业务异常处理 - 检查: " + ex.getMessage());
if (ex.getMessage().contains("业务") || ex.getCause().getMessage().contains("业务")) {
System.out.println("[" + Thread.currentThread().getName() + "] → 业务异常匹配,返回降级值");
return "BUSINESS_FALLBACK";
}
System.out.println("[" + Thread.currentThread().getName() + "] → 非业务异常,透传给下一层");
// 📌 关键原则: 非目标异常必须透传!
throw new CompletionException(ex); // 透传非目标异常
}

private static String handleSystemException(Throwable ex) {
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 原则5: 系统异常处理 - 检查: " + ex.getMessage());
String errorMsg = ex.getMessage() + (ex.getCause() != null ? ex.getCause().getMessage() : "");
if (errorMsg.contains("网络") || errorMsg.contains("超时") || errorMsg.contains("IO")) {
System.out.println("[" + Thread.currentThread().getName() + "] → 系统异常匹配,返回降级值");
return "SYSTEM_FALLBACK";
}
System.out.println("[" + Thread.currentThread().getName() + "] → 非系统异常,透传给兜底层");
// 📌 关键原则: 非目标异常必须透传!
throw new CompletionException(ex); // 透传未知异常
}

private static String ultimateFallback(Throwable ex) {
System.out.println("[" + Thread.currentThread().getName() + "] ✅ 原则5: 终极兜底 - 处理所有未捕获异常: " + ex.getClass().getSimpleName());
return "MAX_SAFETY:" + ex.getClass().getSimpleName();
}
}

allOf().thenApply(Async) 中的 join() 永不阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* 完成保证原则演示:allOf().thenApply(Async) 中的 join() 永不阻塞
*
* 核心原理:
* CompletableFuture.allOf() 仅在所有传入的 CompletableFuture 都完成后,
* 才会触发后续的 thenApply()/thenApplyAsync() 回调。
*
* 因此在回调内部调用 future.join() 时:
* 1. 该 future 100% 已完成(正常或异常)
* 2. join() 直接返回结果值,不会阻塞线程
* 3. 无上下文切换开销,性能等同于读取内存变量
*
* 这是 Java 异步编程中"黄金三角"模式的基础:
* allOf() + thenApplyAsync() + join() = 高效无阻塞聚合
*/
public class CompletionGuaranteePrinciple {

public static void main(String[] args) {
// 创建IO密集型线程池(模拟外部服务调用)
ExecutorService ioPool = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r, "io-thread");
t.setDaemon(true);
return t;
});

// 创建CPU密集型线程池(用于结果聚合)
ExecutorService cpuPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
r -> {
Thread t = new Thread(r, "cpu-thread");
t.setDaemon(true);
return t;
}
);

try {
System.out.println("🚀 启动完成保证原则演示\n");

// 1. 创建10个异步服务调用(模拟IO操作)
List<CompletableFuture<String>> serviceFutures = IntStream.range(0, 10)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "] 💡 服务调用 #" + i + " 开始执行");
simulateIoOperation(i); // 模拟IO等待
if (i == 5) {
throw new RuntimeException("服务 #5 模拟故障");
}
return "Result-" + i;
}, ioPool))
.collect(Collectors.toList());

// 2. 应用完成保证原则:allOf() + thenApplyAsync() + join()
CompletableFuture<List<String>> resultFuture = CompletableFuture
.allOf(serviceFutures.toArray(new CompletableFuture[0])) // 等待所有完成
.thenApplyAsync(v -> { // 所有future完成后才执行此回调
System.out.println("\n✅ 原则验证: 所有服务调用已完成,开始聚合结果");
System.out.println("[" + Thread.currentThread().getName() + "] 🧮 聚合阶段启动(CPU密集型)");

// 关键点:此处join() 100% 不阻塞!
List<String> results = serviceFutures.stream()
.map(future -> {
long start = System.nanoTime();

// ⚡ 完成保证原则核心:
// 由于 allOf() 确保所有 future 已完成,
// future.join() 直接返回内部存储的 result 字段
// 无锁竞争、无上下文切换、无阻塞
String result = future.join();

long duration = System.nanoTime() - start;
System.out.printf(
"[%s] ⚡ join() 耗时: %d ns, 结果: %s\n",
Thread.currentThread().getName(),
duration,
result
);

// ⚠️ 重要验证:join() 耗时应接近 0(通常 < 1000 ns)
// 如果耗时 > 10000 ns,说明存在阻塞(违反完成保证原则)
return result;
})
.collect(Collectors.toList());

return results;
}, cpuPool) // 显式指定CPU线程池,避免在IO线程执行计算
.exceptionally(ex -> {
System.err.println("[" + Thread.currentThread().getName() + "] ❌ 全局异常: " + ex.getMessage());
return Arrays.asList("GLOBAL_FALLBACK");
});

// 3. 获取最终结果(末端阻塞,由主线程执行)
List<String> results = resultFuture.join();
System.out.println("\n🎯 最终聚合结果: " + results.size() + " 个结果");
results.forEach(r -> System.out.println(" - " + r));

} finally {
ioPool.shutdown();
cpuPool.shutdown();
}
}

/**
* 模拟IO操作(可变延迟)
*/
private static void simulateIoOperation(int id) {
try {
// 不同服务不同延迟,模拟真实场景
int delay = 50 + (id * 10) % 200;
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("IO操作被中断", e);
}
}
}

对传统的线程池的效率改进

CompletableFuture 相对于一般线程池的改进主要来自于对于复杂结果编排的 API 优化,本身并不提供性能优化

如果要实现性能优化,可以

  1. 基于 Netty/NIO 实现了真正的异步 RPC:
  • 发起调用后立即返回,不阻塞线程;
  • 结果由 Netty 的 IO 线程(或专用回调线程)在数据到达时触发;
  • 一个 IO 线程可同时管理成千上万个连接(C10K+)。
  1. CompletableFuture 被用作“胶水层”:
  • 将 NIO 回调封装为 CompletableFuture(如 toCompletableFuture 工具方法);
  • 用 thenCompose / allOf 等组合多个异步 RPC;
  • 业务逻辑不再关心回调注册,只关注数据流依赖。
graph TD
    A["Client Request"] --> B["Inbound IO Thread<br>Netty EventLoop"]
    B --> C["Business Worker Thread<br>from biz-pool"]

    C --> d1["Create CF1 = new CompletableFuture()"]
    C --> d2["Create CF2 = new CompletableFuture()"]
    C --> d3["Create CF3 = new CompletableFuture()"]

    d1 --> e1["Register Observer1:<br>onSuccess → CF1.complete(...)<br>onFailure → CF1.completeEx(...)"]
    d2 --> e2["Register Observer2:<br>onSuccess → CF2.complete(...)<br>onFailure → CF2.completeEx(...)"]
    d3 --> e3["Register Observer3:<br>onSuccess → CF3.complete(...)<br>onFailure → CF3.completeEx(...)"]

    e1 --> f1["Call mtthrift.async(orderService, Observer1)"]
    e2 --> f2["Call mtthrift.async(productService, Observer2)"]
    e3 --> f3["Call mtthrift.async(deliveryService, Observer3)"]

    f1 --> g["Outbound IO Thread<br>Netty Client EventLoop"]
    f2 --> g
    f3 --> g

    g --> h1["(orderService)"]
    g --> h2["(productService)"]
    g --> h3["(deliveryService)"]

    h1 -->|Response| i1["Outbound IO Thread invokes<br>Observer1.onSuccess(result)"]
    h2 -->|Response| i2["Outbound IO Thread invokes<br>Observer2.onSuccess(result)"]
    h3 -->|Error| i3["Outbound IO Thread invokes<br>Observer3.onFailure(ex)"]

    i1 --> j1["CF1.complete(result)"]
    i2 --> j2["CF2.complete(result)"]
    i3 --> j3["CF3.completeExceptionally(ex)"]

    j1 --> k1["CF1.thenApplyAsync(enrichOrder, cpu-pool)"]
    j2 --> k2["CF2.thenApplyAsync(enrichProduct, cpu-pool)"]
    j3 --> k3["CF3.exceptionally(handleFallback)"]

    k1 --> l["CF4 = CF1.thenCombine(CF2, merge)"]
    k2 --> l
    k1 --> m["CompletableFuture.allOf(CF1..CF30)"]
    k2 --> m
    k3 --> m

    m --> n["m.thenApplyAsync(aggregateAll, cpu-pool)"]
    n --> o["Final Result"]
    o --> p["Write Response via Inbound IO Thread"]
    p --> q["Client"]

    classDef io fill:#d5e8d4,stroke:#82b366;
    classDef worker fill:#dae8fc,stroke:#6c8ebf;
    classDef outbound fill:#e1d5e7,stroke:#9673a6;
    classDef cf fill:#fff2cc,stroke:#d6b656;
    classDef service fill:#f8cecc,stroke:#b85450;
    classDef observer fill:#e6e6fa,stroke:#999;

    class B,p io
    class C,d1,d2,d3,e1,e2,e3,f1,f2,f3,k1,k2,k3,l,m,n worker
    class g,i1,i2,i3 outbound
    class j1,j2,j3,o cf
    class h1,h2,h3 service
    class e1,e2,e3 observer

RxJava

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#E1D5E7', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
    participant Client as Client
    participant Observable as Observable
    participant Operator1 as map
    participant Operator2 as filter
    participant Subscriber as Subscriber

    Note over Client,Subscriber: RxJava (响应式流)
    Client->>Observable: create(emitter)
    Observable-->>Operator1: 注册操作符
    Operator1->>Observable: map(transform)
    Observable-->>Operator2: 注册操作符
    Operator2->>Observable: filter(predicate)
    Observable-->>Subscriber: subscribe(Subscriber)
    Note right of Client: 非阻塞订阅

    par 数据流处理
        Observable->>Operator1: onNext(item)
        Operator1->>Operator2: onNext(mapped)
        Operator2->>Subscriber: onNext(filtered)
    end

    Observable->>Subscriber: onComplete()
    Note over Client: 响应式: map/filter/subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 核心特点:数据流处理,背压支持
Observable<String> observable1 = Observable.fromCallable(() -> {
Thread.sleep(1000);
return "Hello";
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.fromCallable(() -> {
Thread.sleep(500);
return "RxJava";
}).subscribeOn(Schedulers.io());

// ✅ 非阻塞:流式处理
Observable.zip(observable1, observable2, (s1, s2) -> s1 + " " + s2)
.map(String::toUpperCase)
.timeout(2, TimeUnit.SECONDS)
.onErrorReturn(ex -> "FALLBACK: " + ex.getMessage())
.subscribe(
result -> System.out.println("结果: " + result),
error -> System.err.println("错误: " + error.getMessage())
);

// ⚠️ RxJava通常不需要手动关闭调度器

Reactor

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#9DC3E6', 'edgeLabelBackground':'#FFF', 'fontFamily': 'monospace'}}}%%
sequenceDiagram
    participant Client as Client
    participant Flux as Flux
    participant Operator1 as map
    participant Operator2 as flatMap
    participant Subscriber as Subscriber

    Note over Client,Subscriber: Reactor (响应式流)
    Client->>Flux: create(sink)
    Flux-->>Operator1: 注册操作符
    Operator1->>Flux: map(transform)
    Flux-->>Operator2: 注册操作符
    Operator2->>Flux: flatMap(asyncOp)
    Flux-->>Subscriber: subscribe(Subscriber)
    Note right of Client: 非阻塞订阅

    par 数据流处理
        Flux->>Operator1: onNext(item)
        Operator1->>Operator2: onNext(mapped)
        Operator2->>Subscriber: onNext(result)
    end

    Flux->>Subscriber: onComplete()
    Note over Client: 响应式: map/flatMap/subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 核心特点:响应式流,背压内建,Spring集成
Flux<String> flux1 = Flux.fromCallable(() -> {
Thread.sleep(1000);
return "Hello";
}).subscribeOn(Schedulers.boundedElastic());

Flux<String> flux2 = Flux.fromCallable(() -> {
Thread.sleep(500);
return "Reactor";
}).subscribeOn(Schedulers.boundedElastic());

// ✅ 非阻塞:声明式流处理
Flux.zip(flux1, flux2, (s1, s2) -> s1 + " " + s2)
.map(String::toUpperCase)
.timeout(Duration.ofSeconds(2))
.onErrorResume(ex -> Mono.just("FALLBACK: " + ex.getMessage()))
.subscribe(
result -> System.out.println("结果: " + result),
error -> System.err.println("错误: " + error.getMessage()),
() -> System.out.println("完成")
);

// ⚠️ Reactor通常由Spring管理生命周期