结构化并发(Structured Concurrency)
结构化并发是 Java 并发编程的重要演进方向,与虚拟线程紧密配合,旨在解决传统并发编程中的线程泄漏、错误处理困难等问题。
1. 历史背景
1.1 结构化并发的起源与核心类比
术语起源
“结构化并发”(Structured Concurrency)这个术语由 Martin Sústrik (ZeroMQ 作者)在 2016 年 首次提出。随后,Nathaniel J. Smith 在 2018 年发表了著名的文章《Notes on structured concurrency, or: Go statement considered harmful》,系统性地阐述了结构化并发的理论基础。
timeline
title 结构化并发发展历程
2016 : Martin Sústrik 首创术语
: 在 250bpm.com 发表系列文章
2018 : Nathaniel J. Smith 发表核心论文
: "Go statement considered harmful"
2018 : Python Trio 库发布
: 首个完整实现结构化并发的库
2019 : Kotlin Coroutines 引入结构化并发
: CoroutineScope 设计
2022 : Java JEP 428 (JDK 19)
: StructuredTaskScope 孵化
2023 : Java JEP 453 (JDK 21)
: 结构化并发预览
2024 : Java JEP 462 (JDK 22)
: 结构化并发第二次预览
核心类比:go 语句之于并发,如同 goto 之于顺序编程
结构化并发的核心洞察来自一个类比:Nathaniel J. Smith 指出,传统并发编程中的 go 语句(以及 thread spawn、callbacks、futures、promises 等)在并发领域的危害,类似于 goto 语句在顺序编程领域的危害。
“The popular concurrency primitives – go statements, thread spawning functions, callbacks, futures, promises, … they’re all variants on goto, in theory and in practice. And not even the modern domesticated goto, but the old-testament fire-and-brimstone goto, that could leap across function boundaries.”
— Nathaniel J. Smith, 2018
需要澄清的是 :结构化并发并非"来自于"或"继承自"结构化编程,而是借鉴了结构化编程的思想 ,通过类比的方式将相似的原则应用于并发控制流。两者的关系是类比关系 ,而非继承关系 。
维度
结构化编程(1968)
结构化并发(2016)
问题根源
goto 破坏顺序控制流
go 破坏并发控制流
核心原则
代码块有清晰的入口和出口
并发任务有清晰的生命周期边界
解决方案
用 if/while/函数调用替代 goto
用 Nursery/StructuredTaskScope 替代 go
关系
—
类比,非继承
1.2 与结构化编程的关系
结构化并发借鉴了结构化编程的思想,将控制流的清晰性扩展到并发领域:
特性
结构化编程
结构化并发
控制流
顺序执行
并发执行
代码块
if, while, 函数调用
StructuredTaskScope
出口点
唯一返回点
作用域关闭时
嵌套
支持嵌套
支持嵌套作用域
2. 传统并发编程的问题
2.1 三大核心问题
flowchart TB
subgraph "传统并发编程的问题"
direction TB
subgraph "问题1:线程泄漏"
P1_CODE["executor.submit(task)"]
P1_PROBLEM["任务提交后,谁负责等待?<br/>谁负责取消?<br/>如果忘记处理会怎样?"]
P1_RESULT["线程可能永远运行<br/>资源无法回收"]
P1_CODE --> P1_PROBLEM --> P1_RESULT
style P1_RESULT fill:#ffcdd2
end
subgraph "问题2:错误处理困难"
P2_CODE["多个并发任务"]
P2_PROBLEM["任务A失败了<br/>任务B还在运行<br/>如何协调?"]
P2_RESULT["错误被忽略<br/>或处理不一致"]
P2_CODE --> P2_PROBLEM --> P2_RESULT
style P2_RESULT fill:#ffcdd2
end
subgraph "问题3:取消传播困难"
P3_CODE["父任务被取消"]
P3_PROBLEM["子任务如何感知?<br/>如何优雅终止?"]
P3_RESULT["子任务继续运行<br/>浪费资源"]
P3_CODE --> P3_PROBLEM --> P3_RESULT
style P3_RESULT fill:#ffcdd2
end
end
2.2 问题代码示例
示例 1:线程泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void processUser (long userId) { ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(() -> { while (true ) { processSomething(); Thread.sleep(1000 ); } }); }
示例 2:错误处理困难
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public UserProfile fetchProfile (long userId) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Future<User> userFuture = executor.submit(() -> fetchUser(userId)); Future<List<Order>> ordersFuture = executor.submit(() -> fetchOrders(userId)); try { User user = userFuture.get(); List<Order> orders = ordersFuture.get(); return new UserProfile (user, orders); } finally { executor.shutdown(); } }
示例 3:取消传播困难
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Response handleRequest (String requestId) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Future<Response> responseFuture = executor.submit(() -> { return slowOperation(); }); try { return responseFuture.get(5 , TimeUnit.SECONDS); } catch (TimeoutException e) { responseFuture.cancel(true ); throw new BusinessException ("Timeout" ); } finally { executor.shutdown(); } }
3. 结构化并发的核心思想
结构化并发的核心原则是:并发任务的生命周期应该与代码的词法作用域绑定 。
flowchart TB
subgraph "结构化并发的三大原则"
direction TB
subgraph "原则1:生命周期绑定"
L1["子任务的生命周期"]
L2["不能超过"]
L3["父作用域的生命周期"]
L1 --> L2 --> L3
style L3 fill:#c8e6c9
end
subgraph "原则2:错误传播"
E1["子任务失败"]
E2["自动传播到"]
E3["父作用域"]
E1 --> E2 --> E3
style E3 fill:#c8e6c9
end
subgraph "原则3:取消传播"
C1["父作用域取消"]
C2["自动传播到"]
C3["所有子任务"]
C1 --> C2 --> C3
style C3 fill:#c8e6c9
end
end
核心特性 :
作用域绑定 :并发任务在作用域内启动,作用域关闭时所有任务保证终止
自动取消传播 :父作用域取消时,所有子任务自动取消
错误自动聚合 :子任务的异常自动传播到父作用域
清晰的代码结构 :并发代码的执行流清晰可见
与其他语言的对比
特性
Java StructuredTaskScope
Go goroutine
Kotlin Coroutines
Python Trio
生命周期绑定
强制(try-with-resources)
无
强制(CoroutineScope)
强制(Nursery)
错误传播
自动(ShutdownOnFailure)
手动(errgroup)
自动(supervisorScope)
自动
取消传播
自动
手动(context)
自动
自动
语法集成
API 级别
语言级别(go)
语言级别(suspend)
API 级别
4. Java 中的结构化并发演进
4.1 JEP 演进概览
JEP
JDK 版本
状态
主要变化
JEP 428
JDK 19
Incubator
首次引入 StructuredTaskScope
JEP 437
JDK 20
Second Incubator
API 改进,fork() 返回 Future
JEP 453
JDK 21
Preview
重大变化,fork() 返回 Subtask
JEP 462
JDK 22
Second Preview
API 细化,增强可观测性
JEP 480
JDK 23
Third Preview
进一步完善
4.2 JDK 19 (JEP 428) - 首次孵化
1 2 3 4 5 6 7 8 9 10 11 12 13 import jdk.incubator.concurrent.StructuredTaskScope;public Response handle (String id) throws Exception { try (var scope = new StructuredTaskScope <Object>()) { Future<String> userFuture = scope.fork(() -> fetchUser(id)); Future<Integer> orderFuture = scope.fork(() -> fetchOrder(id)); scope.join(); return new Response (userFuture.get(), orderFuture.get()); } }
特点 :
fork() 返回 Future
需要手动检查异常
基本的生命周期管理
4.3 JDK 20 (JEP 437) - 第二次孵化
API 改进,增强错误处理能力:
1 2 3 4 5 6 7 8 9 10 try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { Future<String> userFuture = scope.fork(() -> fetchUser(id)); Future<Integer> orderFuture = scope.fork(() -> fetchOrder(id)); scope.join() .throwIfFailed(); return new Response (userFuture.get(), orderFuture.get()); }
4.4 JDK 21 (JEP 453) - 第一次预览
重大变化 :fork() 返回 Subtask 而非 Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.StructuredTaskScope;public Response handle (String id) throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { Subtask<String> userTask = scope.fork(() -> fetchUser(id)); Subtask<Integer> orderTask = scope.fork(() -> fetchOrder(id)); scope.join() .throwIfFailed(); return new Response (userTask.get(), orderTask.get()); } }
Subtask vs Future :
特性
Future
Subtask
状态查询
isDone()
state() (UNAVAILABLE/SUCCESS/FAILED)
异常处理
get() 抛出 ExecutionException
exception() 直接获取
可观测性
有限
丰富(线程 dump 支持)
4.5 JDK 22 (JEP 462) - 第二次预览
API 细化,增强可观测性和灵活性:
1 2 3 4 5 6 7 8 9 10 11 try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task1 = scope.fork(() -> fetchUser(id)); var task2 = scope.fork(() -> fetchOrder(id)); scope.joinUntil(Instant.now().plusSeconds(5 )) .throwIfFailed(); return new Response (task1.get(), task2.get()); }
5. StructuredTaskScope 核心 API
5.1 类层次结构
classDiagram
class StructuredTaskScope~T~ {
+fork(Callable~T~ task) Subtask~T~
+join() StructuredTaskScope
+joinUntil(Instant deadline) StructuredTaskScope
+shutdown() void
+close() void
#handleComplete(Subtask~? extends T~ subtask)* void
}
class ShutdownOnFailure {
+throwIfFailed() ShutdownOnFailure
+throwIfFailed(Function~Throwable, X~ exceptionMapper)* X
+exception() Throwable
}
class ShutdownOnSuccess~T~ {
+result() T
+resultOrElse(Supplier~? extends T~ supplier) T
}
class Subtask~T~ {
+get() T
+state() State
+exception() Throwable
+toString() String
}
class State {
<<enumeration>>
UNAVAILABLE
SUCCESS
FAILED
}
StructuredTaskScope <|-- ShutdownOnFailure
StructuredTaskScope <|-- ShutdownOnSuccess
StructuredTaskScope --> Subtask
Subtask --> State
5.2 基础 API
StructuredTaskScope 核心方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public class StructuredTaskScope <T> implements AutoCloseable { public Subtask<T> fork (Callable<? extends T> task) { Objects.requireNonNull(task); Subtask<T> subtask = new Subtask <>(task); subtask.thread = ThreadFactory.ofVirtual().newThread(() -> { try { T result = task.call(); subtask.complete(result); } catch (Throwable e) { subtask.completeExceptionally(e); } }); subtask.thread.start(); return subtask; } public StructuredTaskScope<T> join () throws InterruptedException { synchronized (lock) { while (remaining > 0 && !shutdown) { lock.wait(); } } return this ; } public StructuredTaskScope<T> joinUntil (Instant deadline) throws InterruptedException, TimeoutException { long timeout = Duration.between(Instant.now(), deadline).toMillis(); if (timeout <= 0 ) { throw new TimeoutException (); } synchronized (lock) { while (remaining > 0 && !shutdown) { lock.wait(timeout); if (Instant.now().isAfter(deadline)) { throw new TimeoutException (); } } } return this ; } public void shutdown () { synchronized (lock) { if (shutdown) return ; shutdown = true ; for (Subtask<?> subtask : subtasks) { if (subtask.state() == Subtask.State.UNAVAILABLE) { subtask.thread.interrupt(); } } lock.notifyAll(); } } @Override public void close () { shutdown(); for (Subtask<?> subtask : subtasks) { try { subtask.thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } protected void handleComplete (Subtask<? extends T> subtask) { } }
5.3 ShutdownOnFailure 策略
语义 :“所有任务必须成功”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public final class ShutdownOnFailure extends StructuredTaskScope <Object> { private Throwable firstException; @Override protected void handleComplete (Subtask<?> subtask) { if (subtask.state() == Subtask.State.FAILED && firstException == null ) { firstException = subtask.exception(); shutdown(); } } public ShutdownOnFailure throwIfFailed () throws ExecutionException { if (firstException != null ) { throw new ExecutionException (firstException); } return this ; } public <X extends Throwable > ShutdownOnFailure throwIfFailed ( Function<Throwable, X> exceptionMapper ) throws X { if (firstException != null ) { throw exceptionMapper.apply(firstException); } return this ; } public Throwable exception () { return firstException; } }
使用示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class AggregationService { public UserProfile getUserProfile (String userId) throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var basicInfo = scope.fork(() -> fetchBasicInfo(userId)); var preferences = scope.fork(() -> fetchPreferences(userId)); var activity = scope.fork(() -> fetchActivity(userId)); var social = scope.fork(() -> fetchSocialData(userId)); scope.join() .throwIfFailed(); return new UserProfile ( basicInfo.get(), preferences.get(), activity.get(), social.get() ); } } private BasicInfo fetchBasicInfo (String userId) throws IOException { return httpClient.get("/users/" + userId); } }
5.4 ShutdownOnSuccess 策略
语义 : “只需要一个任务成功”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final class ShutdownOnSuccess <T> extends StructuredTaskScope <T> { private T firstResult; private Subtask<? extends T > firstSuccess; @Override protected void handleComplete (Subtask<? extends T> subtask) { if (subtask.state() == Subtask.State.SUCCESS && firstResult == null ) { firstSuccess = subtask; firstResult = subtask.get(); shutdown(); } } public T result () throws ExecutionException { if (firstResult != null ) { return firstResult; } throw new ExecutionException ("No subtask completed successfully" ); } public T resultOrElse (Supplier<? extends T> supplier) { return firstResult != null ? firstResult : supplier.get(); } }
使用示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class RedundancyService { public Data fetchWithRedundancy (String key) throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnSuccess<Data>()) { scope.fork(() -> fetchFromPrimary(key)); scope.fork(() -> fetchFromSecondary(key)); scope.fork(() -> fetchFromCache(key)); scope.fork(() -> fetchFromCDN(key)); scope.join(); return scope.result(); } } private Data fetchFromPrimary (String key) throws IOException { return db.query(key); } private Data fetchFromSecondary (String key) throws IOException { return backupDb.query(key); } private Data fetchFromCache (String key) throws IOException { return redis.get(key); } }
6. 与虚拟线程的协同
结构化并发与虚拟线程是天然的搭档:
flowchart TB
subgraph "结构化并发 + 虚拟线程"
STS["StructuredTaskScope"]
VT1["虚拟线程 1"]
VT2["虚拟线程 2"]
VT3["虚拟线程 3"]
CT["载体线程池"]
STS -->|"fork()"| VT1
STS -->|"fork()"| VT2
STS -->|"fork()"| VT3
VT1 -.->|"调度"| CT
VT2 -.->|"调度"| CT
VT3 -.->|"调度"| CT
style STS fill:#c8e6c9
style VT1 fill:#e1f5ff
style VT2 fill:#e1f5ff
style VT3 fill:#e1f5ff
end
虚拟线程是结构化并发的基础设施 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class VirtualThreadIntegration { public void defaultVirtualThread () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task1 = scope.fork(() -> blockingIO1()); var task2 = scope.fork(() -> blockingIO2()); scope.join().throwIfFailed(); } } public void customThreadFactory () throws Exception { ThreadFactory platformFactory = Thread.ofPlatform().factory(); try (var scope = new StructuredTaskScope <Object>(null , platformFactory)) { var task = scope.fork(() -> cpuIntensiveTask()); scope.join(); } ThreadFactory namedVirtualFactory = Thread.ofVirtual() .name("worker-" , 0 ) .factory(); try (var scope = new StructuredTaskScope <Object>(null , namedVirtualFactory)) { var task = scope.fork(() -> ioTask()); scope.join(); } } public void nestedScopes () throws Exception { try (var outerScope = new StructuredTaskScope .ShutdownOnFailure()) { var task1 = outerScope.fork(() -> { try (var innerScope = new StructuredTaskScope .ShutdownOnFailure()) { var subTask1 = innerScope.fork(() -> subTask1()); var subTask2 = innerScope.fork(() -> subTask2()); innerScope.join().throwIfFailed(); return combineResults(subTask1.get(), subTask2.get()); } }); var task2 = outerScope.fork(() -> anotherTask()); outerScope.join().throwIfFailed(); } } private static final ScopedValue<String> CONTEXT = ScopedValue.newInstance(); public void withScopedValue () throws Exception { String context = "user-123" ; ScopedValue.where(CONTEXT, context).run(() -> { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task1 = scope.fork(() -> { String ctx = CONTEXT.get(); return processWithContext(ctx); }); var task2 = scope.fork(() -> { String ctx = CONTEXT.get(); return processWithContext(ctx); }); scope.join().throwIfFailed(); } }); } }
虚拟线程的优势 :
轻量级 : 可以创建百万级虚拟线程
阻塞友好 : Thread.sleep(), BlockingQueue.take() 等阻塞操作不会消耗平台线程
结构化友好 : 虚拟线程的生命周期管理简单,适合结构化并发
7. 实际应用场景
7.1 典型使用模式
模式 1: Fan-Out / Fan-In
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class FanOutFanInPattern { public OrderSummary processOrder (String orderId) throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var order = scope.fork(() -> orderService.getOrder(orderId)); var items = scope.fork(() -> itemService.getItems(orderId)); var customer = scope.fork(() -> customerService.getCustomer(orderId)); var shipping = scope.fork(() -> shippingService.getShipping(orderId)); var payment = scope.fork(() -> paymentService.getPayment(orderId)); scope.join().throwIfFailed(); return new OrderSummary ( order.get(), items.get(), customer.get(), shipping.get(), payment.get() ); } } }
模式 2: 竞争模式 (Race)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class RacePattern { public String fetchWithFallback (String url) throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnSuccess<String>()) { scope.fork(() -> fetchFromPrimary(url)); Thread.sleep(50 ); scope.fork(() -> fetchFromSecondary(url)); scope.fork(() -> fetchFromCache(url)); scope.join(); return scope.result(); } } }
模式 3: 批量处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class BatchProcessingPattern { public List<ProcessedItem> processBatch (List<Item> items) throws Exception { try (var scope = new CollectingScope <ProcessedItem>()) { for (Item item : items) { scope.fork(() -> processItem(item)); } scope.join(); return scope.results().toList(); } } static class CollectingScope <T> extends StructuredTaskScope <T> { private final Queue<T> results = new ConcurrentLinkedQueue <>(); @Override protected void handleComplete (Subtask<? extends T> subtask) { if (subtask.state() == Subtask.State.SUCCESS) { results.add(subtask.get()); } } public Stream<T> results () { ensureOwnerAndJoined(); return results.stream(); } } }
模式 4: 服务器处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ServerPattern { public void startServer (ServerSocket serverSocket) throws Exception { try (var scope = new StructuredTaskScope <Void>()) { try { while (true ) { Socket socket = serverSocket.accept(); scope.fork(() -> handleConnection(socket)); } } finally { scope.shutdown(); scope.join(); } } } private Void handleConnection (Socket socket) { try (socket) { processRequests(socket); } catch (IOException e) { log.error("Connection error" , e); } return null ; } }
7.2 与传统 ExecutorService 的对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class Comparison { public Response handleWithExecutorService () throws Exception { ExecutorService executor = Executors.newFixedThreadPool(10 ); try { Future<String> userFuture = executor.submit(() -> findUser()); Future<Integer> orderFuture = executor.submit(() -> fetchOrder()); String user = userFuture.get(); Integer order = orderFuture.get(); return new Response (user, order); } finally { executor.shutdown(); } } public Response handleWithStructuredConcurrency () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var userTask = scope.fork(() -> findUser()); var orderTask = scope.fork(() -> fetchOrder()); scope.join() .throwIfFailed(); return new Response (userTask.get(), orderTask.get()); } } }
详细对比 :
特性
ExecutorService
Structured Concurrency
任务生命周期
不明确,需要手动管理
明确,绑定到作用域
错误处理
需要手动检查 Future
自动传播到父任务
取消传播
需要手动实现
自动传播
资源清理
需要手动 shutdown
自动清理
可观测性
需要额外工具
内置线程 dump
代码可读性
中等
高 (类似同步代码)
适用场景
长期运行的任务池
请求-响应模式
8. 最佳实践
8.1 正确使用 try-with-resources
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public Response goodExample () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> doWork()); scope.join().throwIfFailed(); return task.get(); } }public Response badExample () throws Exception { var scope = new StructuredTaskScope .ShutdownOnFailure(); var task = scope.fork(() -> doWork()); scope.join().throwIfFailed(); return task.get(); }
8.2 正确处理异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Response goodExceptionHandling () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> doWork()); scope.join() .throwIfFailed(e -> new MyBusinessException (e)); return task.get(); } }public Response badExceptionHandling () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> doWork()); scope.join(); return task.get(); } }
8.3 响应中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public String goodInterruptHandling () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> { while (!Thread.currentThread().isInterrupted()) { String data = httpClient.get(url); if (data != null ) return data; Thread.sleep(100 ); } throw new InterruptedException ("Task was cancelled" ); }); scope.join().throwIfFailed(); return task.get(); } }public String badInterruptHandling () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> { while (true ) { String data = httpClient.get(url); if (data != null ) return data; Thread.sleep(100 ); } }); scope.join().throwIfFailed(); return task.get(); } }
8.4 避免过深的嵌套
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public Response deepNesting () throws Exception { try (var scope1 = new StructuredTaskScope .ShutdownOnFailure()) { var task1 = scope1.fork(() -> { try (var scope2 = new StructuredTaskScope .ShutdownOnFailure()) { var task2 = scope2.fork(() -> { try (var scope3 = new StructuredTaskScope .ShutdownOnFailure()) { var task3 = scope3.fork(() -> doWork()); scope3.join().throwIfFailed(); return task3.get(); } }); scope2.join().throwIfFailed(); return task2.get(); } }); scope1.join().throwIfFailed(); return task1.get(); } }public Response flatStructure () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> processNested()); scope.join().throwIfFailed(); return task.get(); } }private String processNested () throws Exception { try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> doWork()); scope.join().throwIfFailed(); return task.get(); } }
8.5 合理使用超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public Response withTimeout () throws Exception { Instant deadline = Instant.now().plusSeconds(5 ); try (var scope = new StructuredTaskScope .ShutdownOnFailure()) { var task = scope.fork(() -> doWork()); try { scope.joinUntil(deadline).throwIfFailed(); } catch (TimeoutException e) { throw new BusinessException ("Operation timed out" , e); } return task.get(); } }
9. 可观测性
结构化并发提供了强大的可观测性支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 { "threadDump" : { "threadContainers" : [ { "container" : "<root>" , "threads" : [...] }, { "container" : "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@123" , "parent" : "<root>" , "owner" : "1" , "threads" : [ { "tid" : "21" , "name" : "" , "stack" : [ "java.base/java.lang.VirtualThread.parkNanos" , "java.base/java.lang.Thread.sleep" , "com.example.Service.doWork" , "..." ] } ] } ] } }
可观测性优势 :
任务树结构 : 清晰显示父子任务关系
作用域信息 : 每个任务属于哪个作用域
栈追踪 : 显示任务在做什么
调试友好 : 类似单线程调试体验
10. 迁移建议
10.1 迁移场景
现有模式
结构化并发替代
说明
ExecutorService.submit() + Future.get()
StructuredTaskScope.fork()
生命周期自动管理
CompletableFuture.allOf()
ShutdownOnFailure
错误自动传播
CompletableFuture.anyOf()
ShutdownOnSuccess
自动取消其他任务
手动线程管理
结构化并发
消除线程泄漏
10.2 适用场景评估
场景
是否迁移
建议
新的 IO 密集型应用
强烈推荐
直接使用结构化并发
现有 ExecutorService 代码
评估
逐步迁移,优先迁移新功能
CPU 密集型任务
不推荐
继续使用 ForkJoinPool
长期运行的任务池
不推荐
继续使用 ExecutorService
需要细粒度控制的场景
评估
结构化并发可能过于严格
11. 未来展望
结构化并发代表了 Java 并发编程的未来方向:
JDK 23/24 : 可能正式定稿
生态系统 : 更多库将支持结构化并发
工具链 : IDE 和监控工具将增强支持
最佳实践 : 社区将积累更多经验
核心洞察 : 结构化并发不是要完全取代 ExecutorService,而是为请求-响应模式的并发编程提供更简单、更安全、更可维护的解决方案。理解其适用场景,才能充分发挥其价值。
12. 参考资料
相关文章