Skip to content

第29课:CompletableFuture

🎯 学习目标

  • 理解 CompletableFuture 的异步编排能力(vs Future)
  • 掌握创建(supplyAsync/runAsync)、转换(thenApply/thenAccept/thenCompose)
  • 掌握组合(thenCombine/allOf/anyOf)
  • 掌握异常处理(exceptionally/handle/whenComplete)
  • 能用 CompletableFuture 编排多步异步任务

📖 一、概念讲解:为什么需要 CompletableFuture

1. Future 的局限

Future(来自线程池 submit)能异步拿结果,但:

  • get() 阻塞等待,不能回调。
  • 不能链式:A 完成后自动执行 B,需手动轮询 isDone 或阻塞 get。
  • 不能组合:A、B 都完成才执行 C,需手写等待逻辑。

2. CompletableFuture 解决什么

CompletableFuture 是可组合的异步编程

  • 完成时自动触发回调(不阻塞)。
  • 链式:thenApply/thenAccept 串联多步。
  • 组合:allOf(等全部)、anyOf(任一完成)、thenCombine(两者合并)。
  • 异常处理:exceptionally/handle。
java
CompletableFuture.supplyAsync(() -> fetchUser(id))      // 异步取用户
    .thenApply(user -> user.getName())                  // 转换
    .thenAccept(name -> System.out.println(name))       // 消费
    .exceptionally(e -> { log(e); return null; });       // 异常处理

📖 二、创建

java
// 有返回值的异步任务
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    sleep(100); return "结果";
});
// 无返回值
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> { /* 副作用 */ });

// 已完成的(直接给值,用于测试或包装)
CompletableFuture<String> done = CompletableFuture.completedFuture("即时结果");

默认线程池:ForkJoinPool.commonPool()。生产建议指定自定义线程池(避免污染公共池):

java
CompletableFuture.supplyAsync(supplier, executor);

📖 三、转换与消费

java
CompletableFuture.supplyAsync(() -> "1")
    .thenApply(s -> Integer.parseInt(s))     // 转换类型 String→Integer
    .thenApply(i -> i * 2)                   // 再转换
    .thenAccept(i -> System.out.println(i)) // 消费(无返回值)
    .thenRun(() -> System.out.println("done")); // 之后执行
  • thenApply:上一步结果 → 新值(类似 map)。
  • thenAccept:上一步结果 → 消费(无返回)。
  • thenRun:不关心上一步结果,执行动作。
  • thenCompose:上一步结果 → 另一个 CompletableFuture(扁平化,类似 flatMap)。

thenApply vs thenCompose

java
// thenApply:返回普通值(会自动包装成 CompletableFuture)
.thenApply(user -> user.getName())   // CompletableFuture<String>

// thenCompose:返回 CompletableFuture,避免 CompletableFuture<CompletableFuture<T>>
.thenCompose(user -> CompletableFuture.supplyAsync(() -> user.getName()))

嵌套异步用 thenCompose 避免 CompletableFuture&lt;CompletableFuture&lt;T&gt;&gt;


📖 四、组合

java
// 两个独立任务合并
CompletableFuture<String> user = CompletableFuture.supplyAsync(() -> "张三");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 25);
user.thenCombine(age, (u, a) -> u + "(" + a + "岁)")
    .thenAccept(System.out::println);   // 张三(25岁)

// 等多个全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();   // 阻塞直到全部完成

// 任一完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object first = any.join();   // 第一个完成的返回值
  • thenCombine:两个 future 的结果合并。
  • allOf:等全部完成(返回 Void,需单独取各 future 结果)。
  • anyOf:任一完成即返回(返回最先完成的结果)。

📖 五、异常处理

java
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("失败"); })
    .exceptionally(e -> {           // 异常时返回默认值
        System.out.println("异常: " + e.getMessage());
        return "默认值";
    })
    .thenAccept(System.out::println);   // 默认值
  • exceptionally:仅处理异常,正常时不触发,返回恢复值。
  • handle:正常和异常都触发,能同时拿到结果和异常(异常时 result=null)。
  • whenComplete:正常和异常都触发,但不能改变结果(用于日志/清理,不影响后续)。

📖 六、阻塞获取与 Async 变体

join vs get

java
String r = future.join();   // 阻塞,异常包装 CompletionException(RuntimeException)
String r = future.get();   // 阻塞,抛 InterruptedException/ExecutionException(受检)

join 更适合流式链(不抛受检异常)。但生产应尽量非阻塞(用回调)。

Async 后缀变体

java
thenApply(fn)         // 回调在与上一步相同的线程执行
thenApplyAsync(fn)    // 回调在 ForkJoinPool 异步执行
thenApplyAsync(fn, pool) // 指定线程池

默认同线程执行(少切换,快);Async 变体换线程(适合回调耗时或避免阻塞上游)。


⚠️ 七、常见陷阱

陷阱1:阻塞调用破坏异步

java
future.get();   // 阻塞,违背异步初衷

尽量用 thenAccept 等回调,而非 get/join。确需等待用 join(最后一步)。

陷阱2:用公共 ForkJoinPool

默认 commonPool 被所有 parallel stream 和 CompletableFuture 共享,任务阻塞会拖累全局。生产指定自定义线程池。

陷阱3:异常未处理导致任务"消失"

CompletableFuture 异常若没 exceptionally/handle,异常被吞,get/join 时才暴露。建议链尾加异常处理。

陷阱4:thenApply 误用导致嵌套

返回 CompletableFuture 时用 thenCompose,用 thenApply 会得到 CompletableFuture&lt;CompletableFuture&lt;T&gt;&gt;

陷阱5:allOf 不返回结果

allOf 返回 Void,要分别从各 future 取结果(allOf.join() 后 f1.join())。


🆚 八、对比

特性FutureCompletableFuture
异步取值get 阻塞回调不阻塞
链式不支持thenApply/thenCompose
组合手动allOf/anyOf/thenCombine
异常处理get 抛异常exceptionally/handle
取消cancelcancel(completeExceptionally)

对 C 程序员:CompletableFuture 类似 Promise(JS)/ future(Rust)——异步结果 + 回调链 + 组合。是 Java 异步编程的主力(响应式 WebFlux 用 Reactor,更复杂)。


💡 九、最佳实践

  1. 回调优于阻塞:用 thenAccept 而非 get/join。
  2. 指定线程池:supplyAsync(fn, executor),不用公共池。
  3. 返回 CompletableFuture 用 thenCompose:避免嵌套。
  4. 链尾加异常处理:exceptionally 或 handle。
  5. 多任务编排用 allOf/anyOf:少手写 CountDownLatch。
  6. Async 变体按需:回调耗时或怕阻塞上游时用 *Async。

📝 练习预告

完成 练习/Ex29_CompletableFuture.java 中的 6 道题:

  1. 创建与取值(supplyAsync + join)
  2. 转换链(thenApply/thenAccept)
  3. thenCompose 扁平化
  4. 组合(thenCombine/allOf/anyOf)
  5. 异常处理(exceptionally/handle)
  6. 综合:并行查询 + 合并(模拟用户+订单聚合)

完成后对比 答案/Sol29.java,查看逐行讲解与多解法。


📖 十、线程池隔离

生产中不要依赖默认 commonPool。

原因:

text
parallelStream 也使用 commonPool。
其他库也可能使用 commonPool。
阻塞任务会拖慢整个进程的异步执行。
无法按业务监控和限流。

推荐:

java
ExecutorService ioPool = new ThreadPoolExecutor(
    8,
    32,
    60,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    r -> new Thread(r, "user-api-async-" + UUID.randomUUID()),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

CompletableFuture<User> future =
    CompletableFuture.supplyAsync(() -> queryUser(id), ioPool);

不同类型任务应隔离:

text
CPU 计算池。
数据库查询池。
外部 HTTP 调用池。
低优先级后台任务池。

📖 十一、超时控制

JDK 9+ 提供超时方法:

java
future.orTimeout(3, TimeUnit.SECONDS);
future.completeOnTimeout(defaultValue, 3, TimeUnit.SECONDS);

区别:

text
orTimeout:超时后以 TimeoutException 异常完成。
completeOnTimeout:超时后返回默认值。

示例:

java
CompletableFuture<User> userFuture = CompletableFuture
    .supplyAsync(() -> queryUser(id), ioPool)
    .orTimeout(2, TimeUnit.SECONDS)
    .exceptionally(e -> User.guest());

注意:

text
CompletableFuture 超时不一定真正中断底层任务。
底层 HTTP、数据库调用仍要设置自己的超时。

🧪 十二、allOf 收集结果

allOf 返回 CompletableFuture&lt;Void&gt;,需要自己收集结果。

java
List<CompletableFuture<Order>> futures = orderIds.stream()
    .map(id -> CompletableFuture.supplyAsync(() -> queryOrder(id), ioPool))
    .toList();

CompletableFuture<List<Order>> all = CompletableFuture
    .allOf(futures.toArray(new CompletableFuture[0]))
    .thenApply(v -> futures.stream()
        .map(CompletableFuture::join)
        .toList());

为什么 join 放在 allOf 后面?

text
allOf 已保证全部完成。
此时 join 不会逐个长时间阻塞。
代码比手写 CountDownLatch 更清晰。

如果任一任务失败,allOf 也会异常完成,需要在链路上处理异常。


📖 十三、异常传播规则

CompletableFuture 链中异常会向后传播,直到被处理。

java
CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("fail");
}).thenApply(x -> x + "!")
  .exceptionally(e -> "fallback");

这里 thenApply 不会执行,异常会进入 exceptionally

三个常用方法:

text
exceptionally:只在异常时触发,返回兜底值。
handle:正常或异常都触发,可以改变结果。
whenComplete:正常或异常都触发,通常用于日志,不改变结果。

实践:

text
局部可恢复异常:在局部 exceptionally 处理。
全链路日志:链尾 whenComplete。
最终兜底:链尾 handle 或 exceptionally。

📖 十四、取消与中断

CompletableFuture.cancel(true) 会让 Future 进入取消状态,但不保证底层任务一定停止。

原因:

text
CompletableFuture 不直接拥有正在执行任务的线程。
底层任务是否停止取决于线程池和任务是否响应中断。

任务中应主动检查:

java
if (Thread.currentThread().isInterrupted()) {
    throw new CancellationException();
}

阻塞 IO 还需要底层客户端支持超时或取消。


🛠 十五、CompletableFuture 排查清单

常见问题:

text
默认 commonPool 被阻塞任务占满。
链中调用 get/join 导致线程阻塞。
thenApply 返回 CompletableFuture 导致嵌套。
allOf 后忘记收集结果。
异常没有处理,直到 join 才暴露。
超时只设置在 CompletableFuture,底层调用没有超时。
Async 回调没有指定线程池。
任务取消后底层操作仍在执行。

排查建议:

text
给线程池命名。
打印异步链关键节点耗时。
区分 thenApply 和 thenCompose。
链尾保留异常日志。
对外部调用设置真实超时。
监控线程池队列和活跃线程数。

✅ 十六、掌握标准

学完本课后,应能做到:

text
能解释 Future 的局限。
能使用 supplyAsync/runAsync 创建异步任务。
能用 thenApply、thenCompose、thenAccept 构建链路。
能用 thenCombine、allOf、anyOf 组合任务。
能正确收集 allOf 的多个结果。
能处理 exceptionally、handle、whenComplete。
能为异步任务指定业务线程池。
能设置超时并理解底层任务不一定停止。
能避免在异步链中无意义阻塞。

CompletableFuture 的价值在于异步编排。写得好的异步链路应该能看出任务依赖、线程池边界、异常兜底和超时策略。


🎓 下一步

  • 第30课:反射机制 — Class、Constructor、Method、动态代理(进入高级篇)