CompletableFuture
上一章的线程池让我们”提交任务”。但拿到结果后呢?Future.get() 是阻塞的——你必须等结果出来才能继续,无法”完成后自动回调”。这是 Java 异步编程一直以来的痛点。
JDK 8 带来了 CompletableFuture——一个可组合、可回调、可异常处理的异步编程 API。它把 Java 异步编程拉到与 JavaScript Promise、C# Task、Scala Future 同一水平。在现代微服务架构里,“并行调多个接口聚合结果”几乎是标配——CompletableFuture 是这套打法的核心工具。
一、Future 的局限
先看 Future 的问题:
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<String> future = pool.submit(() -> queryDb());
// 只能阻塞等
String result = future.get(); // 阻塞!
// 不能回调
// 不能组合:"等 A 完成后再用 A 的结果调 B"
// 不能合并:"同时跑 A 和 B,等都完成"
// 不能异常处理:"失败时返回默认值"
Future 的痛点:
- 只能阻塞
get()——不能”完成时回调”。 - 不能链式组合——A 完成后用结果跑 B,得手动
get()后再submit。 - 不能合并多个——
A和B并行,要等都完成,得自己写CountDownLatch。 - 异常处理弱——
get()抛ExecutionException,但没法”失败回退到默认值”。
CompletableFuture 解决了所有这些痛点。
二、CompletableFuture 的创建
2.1 异步执行任务
// 无返回值
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("跑在 " + Thread.currentThread().getName());
});
// 有返回值
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return "结果 from " + Thread.currentThread().getName();
});
System.out.println(f2.join()); // join 类似 get,但不抛受检异常
runAsync 接 Runnable 返回 CompletableFuture<Void>;supplyAsync 接 Supplier<T> 返回 CompletableFuture<T>。
2.2 指定线程池
默认用 ForkJoinPool.commonPool()——前面说过,不要在 commonPool 跑阻塞任务。生产代码应指定自己的线程池:
ExecutorService myPool = Executors.newFixedThreadPool(8);
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> query(), myPool);
2.3 已知结果
CompletableFuture<String> done = CompletableFuture.completedFuture("已知值");
CompletableFuture<String> failed = CompletableFuture.failedFuture(new RuntimeException("失败"));
failedFuture 是 Java 9+ 的方法。
2.4 join vs get
| 方法 | 阻塞 | 异常 |
|---|---|---|
get() | 是 | 抛 InterruptedException + ExecutionException |
join() | 是 | 抛 CompletionException(非受检) |
getNow(T) | 否 | 未完成返回默认值 |
get(long, unit) | 是 | 超时抛 TimeoutException |
join 适合在流式 API 末尾取值——不抛受检异常,省去 try-catch。
三、异步回调
3.1 thenApply:转换结果
CompletableFuture<Integer> f = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(Integer::parseInt) // String → Integer
.thenApply(x -> x * 2); // 42 → 84
System.out.println(f.join()); // 84
thenApply 接 Function<T, R>——把上一步结果转换为新值。
3.2 thenAccept:消费结果
CompletableFuture<Void> f = CompletableFuture
.supplyAsync(() -> "hello")
.thenAccept(s -> System.out.println("收到: " + s)); // 消费,无返回
thenAccept 接 Consumer<T>——消费结果,返回 CompletableFuture<Void>。
3.3 thenRun:不关心结果,只触发动作
CompletableFuture<Void> f = CompletableFuture
.supplyAsync(() -> "hello")
.thenRun(() -> System.out.println("完成了,不在乎结果"));
thenRun 接 Runnable——上一步完成后执行,不接收上一步结果。
3.4 回调的”Async”变体
每个回调方法都有 Async 变体:
.thenApply(fn) // 在上一步完成的线程跑
.thenApplyAsync(fn) // 提交到 ForkJoinPool.commonPool 跑
.thenApplyAsync(fn, pool) // 提交到指定 pool 跑
默认非 Async 在上一步完成的线程跑——可能是有意的(避免线程切换)也可能是坑(在主线程跑)。Async 重新提交到池里跑——明确线程切换。
实践建议:链短的话非 Async 即可,长链或怕阻塞主线程用 Async。
3.5 速查表
| 方法 | 入参 | 返回 | 描述 |
|---|---|---|---|
thenApply | Function<T,R> | R | 转换 |
thenAccept | Consumer<T> | void | 消费 |
thenRun | Runnable | void | 触发 |
thenApplyAsync 等 | 同上 | 同上 | 异步执行 |
四、异常处理
CompletableFuture 的异常不在 get 时抛——可以在链里处理。
4.1 exceptionally:仅在异常时
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> { throw new RuntimeException("DB 挂了"); })
.exceptionally(ex -> "默认值");
System.out.println(f.join()); // 默认值
exceptionally 接 Function<Throwable, T>——只有上一步异常时才执行,返回默认值恢复正常。
4.2 handle:异常和正常都处理
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> { if (Math.random() < 0.5) throw new RuntimeException(); return "ok"; })
.handle((result, ex) -> {
if (ex != null) return "失败: " + ex.getMessage();
return "成功: " + result;
});
handle 接 BiFunction<T, Throwable, R>——异常和正常都调用,根据情况返回。
4.3 whenComplete:观察但不修改
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> "ok")
.whenComplete((result, ex) -> {
if (ex != null) log.error("失败", ex);
else log.info("成功: {}", result);
});
whenComplete 接 BiConsumer<T, Throwable>——观察结果或异常,但不修改——返回类型和原 future 一致。常用于日志、监控、清理。
4.4 三者对比
| 方法 | 触发时机 | 能否修改结果 |
|---|---|---|
exceptionally | 仅异常 | ✓(恢复成默认值) |
handle | 异常和正常 | ✓ |
whenComplete | 异常和正常 | ✗(仅观察) |
五、多任务组合
5.1 thenCombine:合并两个 Future
CompletableFuture<Integer> price = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Double> discount = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Integer> finalPrice = price.thenCombine(discount, (p, d) -> (int)(p * d));
System.out.println(finalPrice.join()); // 80
thenCombine 把两个 Future 的结果合并——两者并行跑,都完成后调用合并函数。
5.2 thenCompose:串联(flatmap)
// 串行:先查用户,再查订单
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> findUser(1));
CompletableFuture<Order> orderFuture = userFuture.thenCompose(user ->
CompletableFuture.supplyAsync(() -> findOrder(user.orderId))
);
thenCompose 接 Function<T, CompletableFuture<R>>——上一步完成后,用结果启动另一个异步任务,并把那个 Future 的结果作为最终结果。这是 flatMap 的语义——避免 CompletableFuture<CompletableFuture<R>> 嵌套。
5.3 allOf:等所有完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 等三个都完成
// 取每个结果
String a = f1.join();
String b = f2.join();
String c = f3.join();
allOf 返回 CompletableFuture<Void>——所有都完成时完成。注意它不返回结果集合——要单独 join() 每个 future 取结果。
5.4 anyOf:任一完成
CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
Thread.sleep(100); return "fast";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000); return "slow";
});
Object first = CompletableFuture.anyOf(fast, slow).join();
System.out.println(first); // fast
anyOf 任意一个完成即完成——返回最快的结果。常用于”多副本读取,最快返回的赢”。
5.5 速查表
| 方法 | 描述 |
|---|---|
thenCombine(other, fn) | 合并两个 future 的结果 |
thenCompose(fn) | 串联——上一步结果启动新 future |
allOf(f1, f2, ...) | 等所有完成 |
anyOf(f1, f2, ...) | 任一完成 |
六、其他常用方法
6.1 orTimeout / completeOnTimeout(Java 9+)
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> { Thread.sleep(5000); return "late"; })
.orTimeout(1, TimeUnit.SECONDS); // 1 秒超时抛 TimeoutException
// f.join() 抛 TimeoutException
CompletableFuture<String> f2 = CompletableFuture
.supplyAsync(() -> { Thread.sleep(5000); return "late"; })
.completeOnTimeout("default", 1, TimeUnit.SECONDS); // 1 秒超时返回默认值
// f2.join() 返回 "default"
6.2 complete / completeExceptionally
手动完成 future:
CompletableFuture<String> f = new CompletableFuture<>();
f.complete("手动完成"); // 或 f.completeExceptionally(new RuntimeException())
常用于”把回调式 API 转成 CompletableFuture 式”。
6.3 delayedExecutor(Java 9+)
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> "delayed",
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
// 1 秒后才执行
七、实战:异步聚合多个接口
下面这个例子模拟”微服务网关”——并行调用户、订单、商品三个接口,聚合返回。同时演示 thenCompose 串联、thenCombine 合并、allOf 等所有完成、anyOf 任一完成、异常处理。
观察重点:
- thenApply 链:
"42"→42→84,每步转换。- exceptionally:异常时返回默认值,恢复正常。
- thenCompose:用户查询和订单查询串行——总耗时 ≈ 200+300=500ms(不是并行)。
- allOf:三个接口并行——总耗时 ≈ max(300, 250, 200) = 300ms,不是串行的 750ms。 > - anyOf:副本更快(100ms < 200ms),返回副本结果。
- thenCombine:price 和 discount 并行,合并成最终价。
- orTimeout:500ms 超时——任务 2000ms 没完成,500ms 时抛
TimeoutException。- 失败重试:用
handle递归——前两次失败,第三次成功。
八、本章小结
| 方法 | 用途 |
|---|---|
runAsync/supplyAsync | 创建异步任务 |
thenApply | 转换结果 |
thenAccept/thenRun | 消费/触发 |
thenCombine | 合并两个 future |
thenCompose | 串联(flatmap) |
allOf | 等所有完成 |
anyOf | 任一完成 |
exceptionally | 仅异常时恢复 |
handle | 异常/正常都处理 |
whenComplete | 观察,不修改 |
orTimeout | 超时抛异常(Java 9+) |
completeOnTimeout | 超时返回默认值(Java 9+) |
| 概念 | 核心要点 |
|---|---|
| Future 局限 | 只能阻塞 get,不能回调/组合/异常处理 |
| Async 变体 | 提交到池执行,非 Async 在上一步线程跑 |
| thenApply vs thenCompose | 前者转换值,后者串联另一个 future |
| thenCombine vs allOf | 前者合并两个结果,后者等所有但取值要单独 join |
| exceptionally vs handle | 前者仅异常,后者异常和正常都处理 |
| 默认线程池 | ForkJoinPool.commonPool(),生产要指定自定义池 |
| join vs get | join 不抛受检异常,方便链式 |
记忆口诀:
thenApply转换,thenCompose串联,thenCombine合并——三个核心组合。allOf等所有,anyOf任一完成——多任务协调。exceptionally异常恢复,handle异常和正常都处理——异常工具。- 默认 commonPool 别用——指定自己的线程池跑阻塞任务。
orTimeout防止永远等——Java 9+ 超时利器。
结语:从同步到异步
CompletableFuture 是 Java 异步编程的分水岭——它让”并行调多个接口、串行编排、异常回退、超时控制”成为可能。在微服务架构里,网关聚合多个下游接口、批量请求并发处理,都靠它。
但 CompletableFuture 的链式 API 也有局限——长链可读性差、调试栈跟踪丢失、阻塞调用还是阻塞(只是换了个线程跑)。Java 21 引入的虚拟线程给出了另一种思路——用同步代码风格获得异步性能,根本不需要 CompletableFuture 编排。
下一章我们讲 虚拟线程与结构化并发——Java 并发的未来。这是 20 年来 Java 并发最大的变革,也是这一阶段的收官。