结构化并发(Structured Concurrency)

结构化并发是 Java 并发编程的重要演进方向,与虚拟线程紧密配合,旨在解决传统并发编程中的线程泄漏、错误处理困难等问题。

1. 历史背景

1.1 结构化并发的起源与核心类比

术语起源

“结构化并发”(Structured Concurrency)这个术语由 Martin Sústrik(ZeroMQ 作者)在 2016 年首次提出。随后,Nathaniel J. Smith 在 2018 年发表了著名的文章《Notes on structured concurrency, or: Go statement considered harmful》,系统性地阐述了结构化并发的理论基础。

timeline
    title 结构化并发发展历程
    2016 : Martin Sústrik 首创术语
         : 在 250bpm.com 发表系列文章
    2018 : Nathaniel J. Smith 发表核心论文
         : "Go statement considered harmful"
    2018 : Python Trio 库发布
         : 首个完整实现结构化并发的库
    2019 : Kotlin Coroutines 引入结构化并发
         : CoroutineScope 设计
    2022 : Java JEP 428 (JDK 19)
         : StructuredTaskScope 孵化
    2023 : Java JEP 453 (JDK 21)
         : 结构化并发预览
    2024 : Java JEP 462 (JDK 22)
         : 结构化并发第二次预览

核心类比:go 语句之于并发,如同 goto 之于顺序编程

结构化并发的核心洞察来自一个类比:Nathaniel J. Smith 指出,传统并发编程中的 go 语句(以及 thread spawn、callbacks、futures、promises 等)在并发领域的危害,类似于 goto 语句在顺序编程领域的危害。

“The popular concurrency primitives – go statements, thread spawning functions, callbacks, futures, promises, … they’re all variants on goto, in theory and in practice. And not even the modern domesticated goto, but the old-testament fire-and-brimstone goto, that could leap across function boundaries.”

— Nathaniel J. Smith, 2018

需要澄清的是:结构化并发并非"来自于"或"继承自"结构化编程,而是借鉴了结构化编程的思想,通过类比的方式将相似的原则应用于并发控制流。两者的关系是类比关系,而非继承关系

维度 结构化编程(1968) 结构化并发(2016)
问题根源 goto 破坏顺序控制流 go 破坏并发控制流
核心原则 代码块有清晰的入口和出口 并发任务有清晰的生命周期边界
解决方案 用 if/while/函数调用替代 goto 用 Nursery/StructuredTaskScope 替代 go
关系 类比,非继承

1.2 与结构化编程的关系

结构化并发借鉴了结构化编程的思想,将控制流的清晰性扩展到并发领域:

特性 结构化编程 结构化并发
控制流 顺序执行 并发执行
代码块 if, while, 函数调用 StructuredTaskScope
出口点 唯一返回点 作用域关闭时
嵌套 支持嵌套 支持嵌套作用域

2. 传统并发编程的问题

2.1 三大核心问题

flowchart TB
    subgraph "传统并发编程的问题"
        direction TB
        
        subgraph "问题1:线程泄漏"
            P1_CODE["executor.submit(task)"]
            P1_PROBLEM["任务提交后,谁负责等待?<br/>谁负责取消?<br/>如果忘记处理会怎样?"]
            P1_RESULT["线程可能永远运行<br/>资源无法回收"]
            
            P1_CODE --> P1_PROBLEM --> P1_RESULT
            style P1_RESULT fill:#ffcdd2
        end
        
        subgraph "问题2:错误处理困难"
            P2_CODE["多个并发任务"]
            P2_PROBLEM["任务A失败了<br/>任务B还在运行<br/>如何协调?"]
            P2_RESULT["错误被忽略<br/>或处理不一致"]
            
            P2_CODE --> P2_PROBLEM --> P2_RESULT
            style P2_RESULT fill:#ffcdd2
        end
        
        subgraph "问题3:取消传播困难"
            P3_CODE["父任务被取消"]
            P3_PROBLEM["子任务如何感知?<br/>如何优雅终止?"]
            P3_RESULT["子任务继续运行<br/>浪费资源"]
            
            P3_CODE --> P3_PROBLEM --> P3_RESULT
            style P3_RESULT fill:#ffcdd2
        end
    end

2.2 问题代码示例

示例 1:线程泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 问题:任务提交后,如果忘记处理,线程可能永远运行
public void processUser(long userId) {
ExecutorService executor = Executors.newCachedThreadPool();

executor.submit(() -> {
// 长时间运行的任务
while (true) {
processSomething();
Thread.sleep(1000);
}
});

// 忘记调用 executor.shutdown()
// 线程会永远运行,直到程序退出
}

示例 2:错误处理困难

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 问题:任务A失败后,任务B还在运行
public UserProfile fetchProfile(long userId) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

Future<User> userFuture = executor.submit(() -> fetchUser(userId));
Future<List<Order>> ordersFuture = executor.submit(() -> fetchOrders(userId));

try {
User user = userFuture.get(); // 如果这里抛异常...
List<Order> orders = ordersFuture.get(); // 这个任务还在运行!
return new UserProfile(user, orders);
} finally {
executor.shutdown();
}
}

示例 3:取消传播困难

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 问题:父任务被取消后,子任务继续运行
public Response handleRequest(String requestId) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

Future<Response> responseFuture = executor.submit(() -> {
// 子任务继续运行,即使父任务被取消
return slowOperation();
});

try {
return responseFuture.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 超时了,但子任务还在运行!
responseFuture.cancel(true); // 可能无法中断
throw new BusinessException("Timeout");
} finally {
executor.shutdown();
}
}

3. 结构化并发的核心思想

结构化并发的核心原则是:并发任务的生命周期应该与代码的词法作用域绑定

flowchart TB
    subgraph "结构化并发的三大原则"
        direction TB
        
        subgraph "原则1:生命周期绑定"
            L1["子任务的生命周期"]
            L2["不能超过"]
            L3["父作用域的生命周期"]
            
            L1 --> L2 --> L3
            style L3 fill:#c8e6c9
        end
        
        subgraph "原则2:错误传播"
            E1["子任务失败"]
            E2["自动传播到"]
            E3["父作用域"]
            
            E1 --> E2 --> E3
            style E3 fill:#c8e6c9
        end
        
        subgraph "原则3:取消传播"
            C1["父作用域取消"]
            C2["自动传播到"]
            C3["所有子任务"]
            
            C1 --> C2 --> C3
            style C3 fill:#c8e6c9
        end
    end

核心特性

  1. 作用域绑定:并发任务在作用域内启动,作用域关闭时所有任务保证终止
  2. 自动取消传播:父作用域取消时,所有子任务自动取消
  3. 错误自动聚合:子任务的异常自动传播到父作用域
  4. 清晰的代码结构:并发代码的执行流清晰可见

与其他语言的对比

特性 Java StructuredTaskScope Go goroutine Kotlin Coroutines Python Trio
生命周期绑定 强制(try-with-resources) 强制(CoroutineScope) 强制(Nursery)
错误传播 自动(ShutdownOnFailure) 手动(errgroup) 自动(supervisorScope) 自动
取消传播 自动 手动(context) 自动 自动
语法集成 API 级别 语言级别(go) 语言级别(suspend) API 级别

4. Java 中的结构化并发演进

4.1 JEP 演进概览

JEP JDK 版本 状态 主要变化
JEP 428 JDK 19 Incubator 首次引入 StructuredTaskScope
JEP 437 JDK 20 Second Incubator API 改进,fork() 返回 Future
JEP 453 JDK 21 Preview 重大变化,fork() 返回 Subtask
JEP 462 JDK 22 Second Preview API 细化,增强可观测性
JEP 480 JDK 23 Third Preview 进一步完善

4.2 JDK 19 (JEP 428) - 首次孵化

1
2
3
4
5
6
7
8
9
10
11
12
13
// JDK 19 Incubator API
import jdk.incubator.concurrent.StructuredTaskScope;

public Response handle(String id) throws Exception {
try (var scope = new StructuredTaskScope<Object>()) {
Future<String> userFuture = scope.fork(() -> fetchUser(id));
Future<Integer> orderFuture = scope.fork(() -> fetchOrder(id));

scope.join();

return new Response(userFuture.get(), orderFuture.get());
}
}

特点

  • fork() 返回 Future
  • 需要手动检查异常
  • 基本的生命周期管理

4.3 JDK 20 (JEP 437) - 第二次孵化

API 改进,增强错误处理能力:

1
2
3
4
5
6
7
8
9
10
// JDK 20 引入 ShutdownOnFailure
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> userFuture = scope.fork(() -> fetchUser(id));
Future<Integer> orderFuture = scope.fork(() -> fetchOrder(id));

scope.join()
.throwIfFailed(); // 自动抛出异常

return new Response(userFuture.get(), orderFuture.get());
}

4.4 JDK 21 (JEP 453) - 第一次预览

重大变化fork() 返回 Subtask 而非 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// JDK 21 Preview API (需要 --enable-preview)
import java.util.concurrent.StructuredTaskScope;

public Response handle(String id) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<String> userTask = scope.fork(() -> fetchUser(id));
Subtask<Integer> orderTask = scope.fork(() -> fetchOrder(id));

scope.join()
.throwIfFailed();

return new Response(userTask.get(), orderTask.get());
}
}

Subtask vs Future

特性 Future Subtask
状态查询 isDone() state() (UNAVAILABLE/SUCCESS/FAILED)
异常处理 get() 抛出 ExecutionException exception() 直接获取
可观测性 有限 丰富(线程 dump 支持)

4.5 JDK 22 (JEP 462) - 第二次预览

API 细化,增强可观测性和灵活性:

1
2
3
4
5
6
7
8
9
10
11
// JDK 22 增强功能
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> fetchUser(id));
var task2 = scope.fork(() -> fetchOrder(id));

// 支持超时
scope.joinUntil(Instant.now().plusSeconds(5))
.throwIfFailed();

return new Response(task1.get(), task2.get());
}

5. StructuredTaskScope 核心 API

5.1 类层次结构

classDiagram
    class StructuredTaskScope~T~ {
        +fork(Callable~T~ task) Subtask~T~
        +join() StructuredTaskScope
        +joinUntil(Instant deadline) StructuredTaskScope
        +shutdown() void
        +close() void
        #handleComplete(Subtask~? extends T~ subtask)* void
    }
    
    class ShutdownOnFailure {
        +throwIfFailed() ShutdownOnFailure
        +throwIfFailed(Function~Throwable, X~ exceptionMapper)* X
        +exception() Throwable
    }
    
    class ShutdownOnSuccess~T~ {
        +result() T
        +resultOrElse(Supplier~? extends T~ supplier) T
    }
    
    class Subtask~T~ {
        +get() T
        +state() State
        +exception() Throwable
        +toString() String
    }
    
    class State {
        <<enumeration>>
        UNAVAILABLE
        SUCCESS
        FAILED
    }
    
    StructuredTaskScope <|-- ShutdownOnFailure
    StructuredTaskScope <|-- ShutdownOnSuccess
    StructuredTaskScope --> Subtask
    Subtask --> State

5.2 基础 API

StructuredTaskScope 核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class StructuredTaskScope<T> implements AutoCloseable {

/**
* 启动子任务,返回 Subtask
*/
public Subtask<T> fork(Callable<? extends T> task) {
// 在虚拟线程中执行任务
Objects.requireNonNull(task);

Subtask<T> subtask = new Subtask<>(task);
subtask.thread = ThreadFactory.ofVirtual().newThread(() -> {
try {
T result = task.call();
subtask.complete(result);
} catch (Throwable e) {
subtask.completeExceptionally(e);
}
});

subtask.thread.start();
return subtask;
}

/**
* 等待所有子任务完成(或作用域被关闭)
*/
public StructuredTaskScope<T> join() throws InterruptedException {
// 阻塞直到所有子任务完成
synchronized (lock) {
while (remaining > 0 && !shutdown) {
lock.wait();
}
}
return this;
}

/**
* 带超时的等待
*/
public StructuredTaskScope<T> joinUntil(Instant deadline)
throws InterruptedException, TimeoutException {

long timeout = Duration.between(Instant.now(), deadline).toMillis();
if (timeout <= 0) {
throw new TimeoutException();
}

synchronized (lock) {
while (remaining > 0 && !shutdown) {
lock.wait(timeout);
if (Instant.now().isAfter(deadline)) {
throw new TimeoutException();
}
}
}
return this;
}

/**
* 关闭作用域,取消所有未完成的子任务
*/
public void shutdown() {
synchronized (lock) {
if (shutdown) return;
shutdown = true;

// 中断所有未完成的子任务
for (Subtask<?> subtask : subtasks) {
if (subtask.state() == Subtask.State.UNAVAILABLE) {
subtask.thread.interrupt();
}
}

lock.notifyAll();
}
}

/**
* 关闭作用域,确保所有子任务已终止
*/
@Override
public void close() {
shutdown();

// 等待所有子任务终止
for (Subtask<?> subtask : subtasks) {
try {
subtask.thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

/**
* 子任务完成时的回调,可被子类重写
*/
protected void handleComplete(Subtask<? extends T> subtask) {
// 默认实现:不做任何特殊处理
}
}

5.3 ShutdownOnFailure 策略

语义:“所有任务必须成功”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final class ShutdownOnFailure extends StructuredTaskScope<Object> {
private Throwable firstException;

/**
* 当子任务完成时调用
* - 如果任务失败,记录第一个异常并 shutdown
*/
@Override
protected void handleComplete(Subtask<?> subtask) {
if (subtask.state() == Subtask.State.FAILED && firstException == null) {
firstException = subtask.exception();
shutdown(); // 取消其他任务
}
}

/**
* 如果有任务失败,抛出异常
*/
public ShutdownOnFailure throwIfFailed() throws ExecutionException {
if (firstException != null) {
throw new ExecutionException(firstException);
}
return this;
}

/**
* 自定义异常转换
*/
public <X extends Throwable> ShutdownOnFailure throwIfFailed(
Function<Throwable, X> exceptionMapper
) throws X {
if (firstException != null) {
throw exceptionMapper.apply(firstException);
}
return this;
}

/**
* 获取第一个异常
*/
public Throwable exception() {
return firstException;
}
}

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 场景: 聚合多个数据源,全部成功才能继续
public class AggregationService {

public UserProfile getUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行获取多个数据源
var basicInfo = scope.fork(() -> fetchBasicInfo(userId));
var preferences = scope.fork(() -> fetchPreferences(userId));
var activity = scope.fork(() -> fetchActivity(userId));
var social = scope.fork(() -> fetchSocialData(userId));

// 等待所有任务完成
scope.join()
.throwIfFailed();

// 所有任务都成功了,聚合结果
return new UserProfile(
basicInfo.get(),
preferences.get(),
activity.get(),
social.get()
);
}
// 作用域关闭,所有子任务都已终止
}

private BasicInfo fetchBasicInfo(String userId) throws IOException {
// 可能失败
return httpClient.get("/users/" + userId);
}
}

5.4 ShutdownOnSuccess 策略

语义: “只需要一个任务成功”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> {
private T firstResult;
private Subtask<? extends T> firstSuccess;

/**
* 当子任务成功完成时,记录结果并 shutdown
*/
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS && firstResult == null) {
firstSuccess = subtask;
firstResult = subtask.get();
shutdown(); // 取消其他任务
}
}

/**
* 获取第一个成功的结果
*/
public T result() throws ExecutionException {
if (firstResult != null) {
return firstResult;
}
throw new ExecutionException("No subtask completed successfully");
}

/**
* 如果没有成功结果,返回默认值
*/
public T resultOrElse(Supplier<? extends T> supplier) {
return firstResult != null ? firstResult : supplier.get();
}
}

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 场景: 从多个备份源获取数据,只要一个成功即可
public class RedundancyService {

public Data fetchWithRedundancy(String key) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Data>()) {
// 尝试多个数据源
scope.fork(() -> fetchFromPrimary(key));
scope.fork(() -> fetchFromSecondary(key));
scope.fork(() -> fetchFromCache(key));
scope.fork(() -> fetchFromCDN(key));

// 等待第一个成功
scope.join();
return scope.result();
}
}

private Data fetchFromPrimary(String key) throws IOException {
// 主数据库
return db.query(key);
}

private Data fetchFromSecondary(String key) throws IOException {
// 备份数据库
return backupDb.query(key);
}

private Data fetchFromCache(String key) throws IOException {
// Redis 缓存
return redis.get(key);
}
}

6. 与虚拟线程的协同

结构化并发与虚拟线程是天然的搭档:

flowchart TB
    subgraph "结构化并发 + 虚拟线程"
        STS["StructuredTaskScope"]
        VT1["虚拟线程 1"]
        VT2["虚拟线程 2"]
        VT3["虚拟线程 3"]
        CT["载体线程池"]
        
        STS -->|"fork()"| VT1
        STS -->|"fork()"| VT2
        STS -->|"fork()"| VT3
        
        VT1 -.->|"调度"| CT
        VT2 -.->|"调度"| CT
        VT3 -.->|"调度"| CT
        
        style STS fill:#c8e6c9
        style VT1 fill:#e1f5ff
        style VT2 fill:#e1f5ff
        style VT3 fill:#e1f5ff
    end

虚拟线程是结构化并发的基础设施:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class VirtualThreadIntegration {

// 1. 默认使用虚拟线程
public void defaultVirtualThread() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// fork 默认使用虚拟线程
var task1 = scope.fork(() -> blockingIO1());
var task2 = scope.fork(() -> blockingIO2());

scope.join().throwIfFailed();
}
}

// 2. 自定义线程工厂
public void customThreadFactory() throws Exception {
// 使用平台线程
ThreadFactory platformFactory = Thread.ofPlatform().factory();
try (var scope = new StructuredTaskScope<Object>(null, platformFactory)) {
var task = scope.fork(() -> cpuIntensiveTask());
scope.join();
}

// 使用自定义虚拟线程名称
ThreadFactory namedVirtualFactory = Thread.ofVirtual()
.name("worker-", 0)
.factory();
try (var scope = new StructuredTaskScope<Object>(null, namedVirtualFactory)) {
var task = scope.fork(() -> ioTask());
scope.join();
}
}

// 3. 嵌套作用域
public void nestedScopes() throws Exception {
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = outerScope.fork(() -> {
// 在子任务中创建嵌套作用域
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure()) {
var subTask1 = innerScope.fork(() -> subTask1());
var subTask2 = innerScope.fork(() -> subTask2());

innerScope.join().throwIfFailed();
return combineResults(subTask1.get(), subTask2.get());
}
});

var task2 = outerScope.fork(() -> anotherTask());

outerScope.join().throwIfFailed();
}
}

// 4. 与 ScopedValue 集成
private static final ScopedValue<String> CONTEXT = ScopedValue.newInstance();

public void withScopedValue() throws Exception {
String context = "user-123";

ScopedValue.where(CONTEXT, context).run(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 所有子任务都能访问到 CONTEXT
var task1 = scope.fork(() -> {
String ctx = CONTEXT.get(); // "user-123"
return processWithContext(ctx);
});

var task2 = scope.fork(() -> {
String ctx = CONTEXT.get(); // "user-123"
return processWithContext(ctx);
});

scope.join().throwIfFailed();
}
});
}
}

虚拟线程的优势:

  1. 轻量级: 可以创建百万级虚拟线程
  2. 阻塞友好: Thread.sleep(), BlockingQueue.take() 等阻塞操作不会消耗平台线程
  3. 结构化友好: 虚拟线程的生命周期管理简单,适合结构化并发

7. 实际应用场景

7.1 典型使用模式

模式 1: Fan-Out / Fan-In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Fan-Out: 并行执行多个任务
* Fan-In: 聚合所有结果
*/
public class FanOutFanInPattern {

public OrderSummary processOrder(String orderId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fan-Out: 并行获取订单相关数据
var order = scope.fork(() -> orderService.getOrder(orderId));
var items = scope.fork(() -> itemService.getItems(orderId));
var customer = scope.fork(() -> customerService.getCustomer(orderId));
var shipping = scope.fork(() -> shippingService.getShipping(orderId));
var payment = scope.fork(() -> paymentService.getPayment(orderId));

// Fan-In: 等待所有任务完成并聚合结果
scope.join().throwIfFailed();

return new OrderSummary(
order.get(),
items.get(),
customer.get(),
shipping.get(),
payment.get()
);
}
}
}
模式 2: 竞争模式 (Race)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 竞争模式: 多个任务竞争,第一个成功的胜出
*/
public class RacePattern {

public String fetchWithFallback(String url) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// 主数据源
scope.fork(() -> fetchFromPrimary(url));

// 备份数据源 (延迟启动)
Thread.sleep(50); // 给主数据源一点时间
scope.fork(() -> fetchFromSecondary(url));

// 缓存
scope.fork(() -> fetchFromCache(url));

scope.join();
return scope.result();
}
}
}
模式 3: 批量处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 批量处理: 并行处理一批任务,收集所有成功结果
*/
public class BatchProcessingPattern {

public List<ProcessedItem> processBatch(List<Item> items) throws Exception {
try (var scope = new CollectingScope<ProcessedItem>()) {
// 并行处理所有项目
for (Item item : items) {
scope.fork(() -> processItem(item));
}

scope.join();

// 返回所有成功处理的结果
return scope.results().toList();
}
}

static class CollectingScope<T> extends StructuredTaskScope<T> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();

@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
}

public Stream<T> results() {
ensureOwnerAndJoined();
return results.stream();
}
}
}
模式 4: 服务器处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 服务器模式: 持续接受连接,每个连接在独立作用域中处理
*/
public class ServerPattern {

public void startServer(ServerSocket serverSocket) throws Exception {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
Socket socket = serverSocket.accept();

// 每个连接在独立的子任务中处理
scope.fork(() -> handleConnection(socket));
}
} finally {
// 如果出现异常或被中断,关闭所有连接
scope.shutdown();
scope.join();
}
}
}

private Void handleConnection(Socket socket) {
try (socket) {
// 处理连接
processRequests(socket);
} catch (IOException e) {
log.error("Connection error", e);
}
return null;
}
}

7.2 与传统 ExecutorService 的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 对比: 传统 ExecutorService vs 结构化并发
*/
public class Comparison {

// ========== 传统 ExecutorService ==========
public Response handleWithExecutorService() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);

try {
Future<String> userFuture = executor.submit(() -> findUser());
Future<Integer> orderFuture = executor.submit(() -> fetchOrder());

// 问题 1: 需要手动等待所有任务
String user = userFuture.get();
Integer order = orderFuture.get();

return new Response(user, order);

// 问题 2: 如果任务失败,需要手动处理异常
// 问题 3: 如果方法被中断,任务不会自动取消
// 问题 4: 需要手动管理线程池生命周期
} finally {
executor.shutdown();
}
}

// ========== 结构化并发 ==========
public Response handleWithStructuredConcurrency() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> findUser());
var orderTask = scope.fork(() -> fetchOrder());

// 自动等待所有任务
scope.join()
.throwIfFailed(); // 自动处理异常

// 自动取消未完成任务
return new Response(userTask.get(), orderTask.get());

// 自动清理: try-with-resources 确保关闭
}
}
}

详细对比:

特性 ExecutorService Structured Concurrency
任务生命周期 不明确,需要手动管理 明确,绑定到作用域
错误处理 需要手动检查 Future 自动传播到父任务
取消传播 需要手动实现 自动传播
资源清理 需要手动 shutdown 自动清理
可观测性 需要额外工具 内置线程 dump
代码可读性 中等 高 (类似同步代码)
适用场景 长期运行的任务池 请求-响应模式

8. 最佳实践

8.1 正确使用 try-with-resources

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 推荐: 使用 try-with-resources
public Response goodExample() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> doWork());
scope.join().throwIfFailed();
return task.get();
}
}

// 错误: 忘记关闭作用域
public Response badExample() throws Exception {
var scope = new StructuredTaskScope.ShutdownOnFailure();
var task = scope.fork(() -> doWork());
scope.join().throwIfFailed();
return task.get();
// 忘记 close(),可能导致资源泄漏
}

8.2 正确处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 推荐: 使用 throwIfFailed
public Response goodExceptionHandling() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> doWork());
scope.join()
.throwIfFailed(e -> new MyBusinessException(e));
return task.get();
}
}

// 错误: 忽略异常
public Response badExceptionHandling() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> doWork());
scope.join();
// 忘记检查异常
return task.get();
}
}

8.3 响应中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 推荐: 子任务响应中断
public String goodInterruptHandling() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> {
// 使用可中断的方法
while (!Thread.currentThread().isInterrupted()) {
String data = httpClient.get(url);
if (data != null) return data;
Thread.sleep(100); // 可中断
}
throw new InterruptedException("Task was cancelled");
});

scope.join().throwIfFailed();
return task.get();
}
}

// 错误: 子任务不响应中断
public String badInterruptHandling() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> {
// 使用不可中断的方法
while (true) {
String data = httpClient.get(url);
if (data != null) return data;
Thread.sleep(100); // 可中断
// 忽略中断信号
}
});

scope.join().throwIfFailed();
return task.get();
}
}

8.4 避免过深的嵌套

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 避免过深的嵌套
public Response deepNesting() throws Exception {
try (var scope1 = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope1.fork(() -> {
try (var scope2 = new StructuredTaskScope.ShutdownOnFailure()) {
var task2 = scope2.fork(() -> {
try (var scope3 = new StructuredTaskScope.ShutdownOnFailure()) {
var task3 = scope3.fork(() -> doWork());
scope3.join().throwIfFailed();
return task3.get();
}
});
scope2.join().throwIfFailed();
return task2.get();
}
});
scope1.join().throwIfFailed();
return task1.get();
}
}

// 推荐: 提取方法
public Response flatStructure() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> processNested());
scope.join().throwIfFailed();
return task.get();
}
}

private String processNested() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> doWork());
scope.join().throwIfFailed();
return task.get();
}
}

8.5 合理使用超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 推荐: 使用 joinUntil
public Response withTimeout() throws Exception {
Instant deadline = Instant.now().plusSeconds(5);

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> doWork());

try {
scope.joinUntil(deadline).throwIfFailed();
} catch (TimeoutException e) {
// 超时后,作用域会自动 shutdown
throw new BusinessException("Operation timed out", e);
}

return task.get();
}
}

9. 可观测性

结构化并发提供了强大的可观测性支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 生成结构化线程 dump
// $ jcmd <pid> Thread.dump_to_file -format=json <file>

// 输出示例:
{
"threadDump": {
"threadContainers": [
{
"container": "<root>",
"threads": [...]
},
{
"container": "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@123",
"parent": "<root>",
"owner": "1",
"threads": [
{
"tid": "21",
"name": "",
"stack": [
"java.base/java.lang.VirtualThread.parkNanos",
"java.base/java.lang.Thread.sleep",
"com.example.Service.doWork",
"..."
]
}
]
}
]
}
}

可观测性优势:

  1. 任务树结构: 清晰显示父子任务关系
  2. 作用域信息: 每个任务属于哪个作用域
  3. 栈追踪: 显示任务在做什么
  4. 调试友好: 类似单线程调试体验

10. 迁移建议

10.1 迁移场景

现有模式 结构化并发替代 说明
ExecutorService.submit() + Future.get() StructuredTaskScope.fork() 生命周期自动管理
CompletableFuture.allOf() ShutdownOnFailure 错误自动传播
CompletableFuture.anyOf() ShutdownOnSuccess 自动取消其他任务
手动线程管理 结构化并发 消除线程泄漏

10.2 适用场景评估

场景 是否迁移 建议
新的 IO 密集型应用 强烈推荐 直接使用结构化并发
现有 ExecutorService 代码 评估 逐步迁移,优先迁移新功能
CPU 密集型任务 不推荐 继续使用 ForkJoinPool
长期运行的任务池 不推荐 继续使用 ExecutorService
需要细粒度控制的场景 评估 结构化并发可能过于严格

11. 未来展望

结构化并发代表了 Java 并发编程的未来方向:

  1. JDK 23/24: 可能正式定稿
  2. 生态系统: 更多库将支持结构化并发
  3. 工具链: IDE 和监控工具将增强支持
  4. 最佳实践: 社区将积累更多经验

核心洞察: 结构化并发不是要完全取代 ExecutorService,而是为请求-响应模式的并发编程提供更简单、更安全、更可维护的解决方案。理解其适用场景,才能充分发挥其价值。

12. 参考资料

相关文章