Appearance
第29课:CompletableFuture
🎯 学习目标
- 理解 CompletableFuture 的异步编排能力(vs Future)
- 掌握创建(supplyAsync/runAsync)、转换(thenApply/thenAccept/thenCompose)
- 掌握组合(thenCombine/allOf/anyOf)
- 掌握异常处理(exceptionally/handle/whenComplete)
- 能用 CompletableFuture 编排多步异步任务
📖 一、概念讲解:为什么需要 CompletableFuture
1. Future 的局限
Future(来自线程池 submit)能异步拿结果,但:
get()阻塞等待,不能回调。- 不能链式:A 完成后自动执行 B,需手动轮询 isDone 或阻塞 get。
- 不能组合:A、B 都完成才执行 C,需手写等待逻辑。
2. CompletableFuture 解决什么
CompletableFuture 是可组合的异步编程:
- 完成时自动触发回调(不阻塞)。
- 链式:thenApply/thenAccept 串联多步。
- 组合:allOf(等全部)、anyOf(任一完成)、thenCombine(两者合并)。
- 异常处理:exceptionally/handle。
java
CompletableFuture.supplyAsync(() -> fetchUser(id)) // 异步取用户
.thenApply(user -> user.getName()) // 转换
.thenAccept(name -> System.out.println(name)) // 消费
.exceptionally(e -> { log(e); return null; }); // 异常处理📖 二、创建
java
// 有返回值的异步任务
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(100); return "结果";
});
// 无返回值
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> { /* 副作用 */ });
// 已完成的(直接给值,用于测试或包装)
CompletableFuture<String> done = CompletableFuture.completedFuture("即时结果");默认线程池:ForkJoinPool.commonPool()。生产建议指定自定义线程池(避免污染公共池):
java
CompletableFuture.supplyAsync(supplier, executor);📖 三、转换与消费
java
CompletableFuture.supplyAsync(() -> "1")
.thenApply(s -> Integer.parseInt(s)) // 转换类型 String→Integer
.thenApply(i -> i * 2) // 再转换
.thenAccept(i -> System.out.println(i)) // 消费(无返回值)
.thenRun(() -> System.out.println("done")); // 之后执行- thenApply:上一步结果 → 新值(类似 map)。
- thenAccept:上一步结果 → 消费(无返回)。
- thenRun:不关心上一步结果,执行动作。
- thenCompose:上一步结果 → 另一个 CompletableFuture(扁平化,类似 flatMap)。
thenApply vs thenCompose
java
// thenApply:返回普通值(会自动包装成 CompletableFuture)
.thenApply(user -> user.getName()) // CompletableFuture<String>
// thenCompose:返回 CompletableFuture,避免 CompletableFuture<CompletableFuture<T>>
.thenCompose(user -> CompletableFuture.supplyAsync(() -> user.getName()))嵌套异步用 thenCompose 避免 CompletableFuture<CompletableFuture<T>>。
📖 四、组合
java
// 两个独立任务合并
CompletableFuture<String> user = CompletableFuture.supplyAsync(() -> "张三");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 25);
user.thenCombine(age, (u, a) -> u + "(" + a + "岁)")
.thenAccept(System.out::println); // 张三(25岁)
// 等多个全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 阻塞直到全部完成
// 任一完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object first = any.join(); // 第一个完成的返回值- thenCombine:两个 future 的结果合并。
- allOf:等全部完成(返回 Void,需单独取各 future 结果)。
- anyOf:任一完成即返回(返回最先完成的结果)。
📖 五、异常处理
java
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("失败"); })
.exceptionally(e -> { // 异常时返回默认值
System.out.println("异常: " + e.getMessage());
return "默认值";
})
.thenAccept(System.out::println); // 默认值- exceptionally:仅处理异常,正常时不触发,返回恢复值。
- handle:正常和异常都触发,能同时拿到结果和异常(异常时 result=null)。
- whenComplete:正常和异常都触发,但不能改变结果(用于日志/清理,不影响后续)。
📖 六、阻塞获取与 Async 变体
join vs get
java
String r = future.join(); // 阻塞,异常包装 CompletionException(RuntimeException)
String r = future.get(); // 阻塞,抛 InterruptedException/ExecutionException(受检)join 更适合流式链(不抛受检异常)。但生产应尽量非阻塞(用回调)。
Async 后缀变体
java
thenApply(fn) // 回调在与上一步相同的线程执行
thenApplyAsync(fn) // 回调在 ForkJoinPool 异步执行
thenApplyAsync(fn, pool) // 指定线程池默认同线程执行(少切换,快);Async 变体换线程(适合回调耗时或避免阻塞上游)。
⚠️ 七、常见陷阱
陷阱1:阻塞调用破坏异步
java
future.get(); // 阻塞,违背异步初衷尽量用 thenAccept 等回调,而非 get/join。确需等待用 join(最后一步)。
陷阱2:用公共 ForkJoinPool
默认 commonPool 被所有 parallel stream 和 CompletableFuture 共享,任务阻塞会拖累全局。生产指定自定义线程池。
陷阱3:异常未处理导致任务"消失"
CompletableFuture 异常若没 exceptionally/handle,异常被吞,get/join 时才暴露。建议链尾加异常处理。
陷阱4:thenApply 误用导致嵌套
返回 CompletableFuture 时用 thenCompose,用 thenApply 会得到 CompletableFuture<CompletableFuture<T>>。
陷阱5:allOf 不返回结果
allOf 返回 Void,要分别从各 future 取结果(allOf.join() 后 f1.join())。
🆚 八、对比
| 特性 | Future | CompletableFuture |
|---|---|---|
| 异步取值 | get 阻塞 | 回调不阻塞 |
| 链式 | 不支持 | thenApply/thenCompose |
| 组合 | 手动 | allOf/anyOf/thenCombine |
| 异常处理 | get 抛异常 | exceptionally/handle |
| 取消 | cancel | cancel(completeExceptionally) |
对 C 程序员:CompletableFuture 类似 Promise(JS)/ future(Rust)——异步结果 + 回调链 + 组合。是 Java 异步编程的主力(响应式 WebFlux 用 Reactor,更复杂)。
💡 九、最佳实践
- 回调优于阻塞:用 thenAccept 而非 get/join。
- 指定线程池:supplyAsync(fn, executor),不用公共池。
- 返回 CompletableFuture 用 thenCompose:避免嵌套。
- 链尾加异常处理:exceptionally 或 handle。
- 多任务编排用 allOf/anyOf:少手写 CountDownLatch。
- Async 变体按需:回调耗时或怕阻塞上游时用 *Async。
📝 练习预告
完成 练习/Ex29_CompletableFuture.java 中的 6 道题:
- 创建与取值(supplyAsync + join)
- 转换链(thenApply/thenAccept)
- thenCompose 扁平化
- 组合(thenCombine/allOf/anyOf)
- 异常处理(exceptionally/handle)
- 综合:并行查询 + 合并(模拟用户+订单聚合)
完成后对比 答案/Sol29.java,查看逐行讲解与多解法。
📖 十、线程池隔离
生产中不要依赖默认 commonPool。
原因:
text
parallelStream 也使用 commonPool。
其他库也可能使用 commonPool。
阻塞任务会拖慢整个进程的异步执行。
无法按业务监控和限流。推荐:
java
ExecutorService ioPool = new ThreadPoolExecutor(
8,
32,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
r -> new Thread(r, "user-api-async-" + UUID.randomUUID()),
new ThreadPoolExecutor.CallerRunsPolicy()
);
CompletableFuture<User> future =
CompletableFuture.supplyAsync(() -> queryUser(id), ioPool);不同类型任务应隔离:
text
CPU 计算池。
数据库查询池。
外部 HTTP 调用池。
低优先级后台任务池。📖 十一、超时控制
JDK 9+ 提供超时方法:
java
future.orTimeout(3, TimeUnit.SECONDS);
future.completeOnTimeout(defaultValue, 3, TimeUnit.SECONDS);区别:
text
orTimeout:超时后以 TimeoutException 异常完成。
completeOnTimeout:超时后返回默认值。示例:
java
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> queryUser(id), ioPool)
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(e -> User.guest());注意:
text
CompletableFuture 超时不一定真正中断底层任务。
底层 HTTP、数据库调用仍要设置自己的超时。🧪 十二、allOf 收集结果
allOf 返回 CompletableFuture<Void>,需要自己收集结果。
java
List<CompletableFuture<Order>> futures = orderIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> queryOrder(id), ioPool))
.toList();
CompletableFuture<List<Order>> all = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());为什么 join 放在 allOf 后面?
text
allOf 已保证全部完成。
此时 join 不会逐个长时间阻塞。
代码比手写 CountDownLatch 更清晰。如果任一任务失败,allOf 也会异常完成,需要在链路上处理异常。
📖 十三、异常传播规则
CompletableFuture 链中异常会向后传播,直到被处理。
java
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("fail");
}).thenApply(x -> x + "!")
.exceptionally(e -> "fallback");这里 thenApply 不会执行,异常会进入 exceptionally。
三个常用方法:
text
exceptionally:只在异常时触发,返回兜底值。
handle:正常或异常都触发,可以改变结果。
whenComplete:正常或异常都触发,通常用于日志,不改变结果。实践:
text
局部可恢复异常:在局部 exceptionally 处理。
全链路日志:链尾 whenComplete。
最终兜底:链尾 handle 或 exceptionally。📖 十四、取消与中断
CompletableFuture.cancel(true) 会让 Future 进入取消状态,但不保证底层任务一定停止。
原因:
text
CompletableFuture 不直接拥有正在执行任务的线程。
底层任务是否停止取决于线程池和任务是否响应中断。任务中应主动检查:
java
if (Thread.currentThread().isInterrupted()) {
throw new CancellationException();
}阻塞 IO 还需要底层客户端支持超时或取消。
🛠 十五、CompletableFuture 排查清单
常见问题:
text
默认 commonPool 被阻塞任务占满。
链中调用 get/join 导致线程阻塞。
thenApply 返回 CompletableFuture 导致嵌套。
allOf 后忘记收集结果。
异常没有处理,直到 join 才暴露。
超时只设置在 CompletableFuture,底层调用没有超时。
Async 回调没有指定线程池。
任务取消后底层操作仍在执行。排查建议:
text
给线程池命名。
打印异步链关键节点耗时。
区分 thenApply 和 thenCompose。
链尾保留异常日志。
对外部调用设置真实超时。
监控线程池队列和活跃线程数。✅ 十六、掌握标准
学完本课后,应能做到:
text
能解释 Future 的局限。
能使用 supplyAsync/runAsync 创建异步任务。
能用 thenApply、thenCompose、thenAccept 构建链路。
能用 thenCombine、allOf、anyOf 组合任务。
能正确收集 allOf 的多个结果。
能处理 exceptionally、handle、whenComplete。
能为异步任务指定业务线程池。
能设置超时并理解底层任务不一定停止。
能避免在异步链中无意义阻塞。CompletableFuture 的价值在于异步编排。写得好的异步链路应该能看出任务依赖、线程池边界、异常兜底和超时策略。
🎓 下一步
- 第30课:反射机制 — Class、Constructor、Method、动态代理(进入高级篇)