Java CompletableFuture详解
Java CompletableFuture详解
什么是CompletableFuture
CompletableFuture是Java 8引入的异步编程工具,支持链式调用、组合多个异步操作和优雅的异常处理。
基本创建
public class CFBasic {
public static void main(String[] args) {
// 异步执行无返回值
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务: " + Thread.currentThread().getName());
});
// 异步执行有返回值
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
return "Hello Async";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
return "使用自定义线程池";
}, executor);
}
}
链式操作
public class CFChain {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 转换
.thenApply(String::toUpperCase) // 再转换
.thenAccept(System.out::println); // 消费
// thenApply vs thenApplyAsync
CompletableFuture<String> sync = CompletableFuture
.supplyAsync(() -> "sync")
.thenApply(s -> s + " result"); // 在上一任务的线程执行
CompletableFuture<String> async = CompletableFuture
.supplyAsync(() -> "async")
.thenApplyAsync(s -> s + " result"); // 在新线程执行
// thenAccept:消费结果
CompletableFuture.supplyAsync(() -> 100)
.thenAccept(result -> System.out.println("结果: " + result));
// thenRun:不关心结果
CompletableFuture.supplyAsync(() -> "data")
.thenRun(() -> System.out.println("任务完成"));
}
}
组合多个异步操作
public class CFCombine {
public static void main(String[] args) {
CompletableFuture<String> userFuture = CompletableFuture
.supplyAsync(() -> fetchUser(1));
CompletableFuture<String> orderFuture = CompletableFuture
.supplyAsync(() -> fetchOrder(1));
// thenCombine:组合两个结果
CompletableFuture<String> combined = userFuture
.thenCombine(orderFuture, (user, order) ->
"用户: " + user + ", 订单: " + order
);
// thenCompose:扁平化(类似flatMap)
CompletableFuture<String> composed = CompletableFuture
.supplyAsync(() -> getUserId())
.thenCompose(id -> fetchUserAsync(id));
// allOf:等待所有完成
CompletableFuture<Void> all = CompletableFuture.allOf(
userFuture, orderFuture
);
all.thenRun(() -> {
System.out.println("用户: " + userFuture.join());
System.out.println("订单: " + orderFuture.join());
});
// anyOf:任一完成即可
CompletableFuture<Object> any = CompletableFuture.anyOf(
userFuture, orderFuture
);
}
private static String fetchUser(int id) { return "User" + id; }
private static String fetchOrder(int id) { return "Order" + id; }
private static int getUserId() { return 1; }
private static CompletableFuture<String> fetchUserAsync(int id) {
return CompletableFuture.supplyAsync(() -> "User" + id);
}
}
异常处理
public class CFException {
public static void main(String[] args) {
// exceptionally:处理异常
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
})
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "默认值";
});
// handle:处理结果和异常
CompletableFuture<String> handled = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((result, ex) -> {
if (ex != null) {
return "错误: " + ex.getMessage();
}
return "结果: " + result;
});
// whenComplete:不改变结果
CompletableFuture.supplyAsync(() -> "data")
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("任务失败", ex);
} else {
log.info("任务成功: {}", result);
}
});
}
private static String riskyOperation() {
if (Math.random() > 0.5) {
throw new RuntimeException("失败");
}
return "success";
}
}
实际应用:并行查询
public class ParallelQuery {
public static void main(String[] args) {
long start = System.currentTimeMillis();
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> fetchUserFromDB(1));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> fetchOrdersFromDB(1));
CompletableFuture<UserProfile> profileFuture = CompletableFuture
.supplyAsync(() -> fetchProfileFromAPI(1));
// 等待所有结果
CompletableFuture<Void> allDone = CompletableFuture.allOf(
userFuture, ordersFuture, profileFuture
);
allDone.thenRun(() -> {
User user = userFuture.join();
List<Order> orders = ordersFuture.join();
UserProfile profile = profileFuture.join();
UserProfile result = new UserProfile(user, orders, profile);
System.out.println("查询完成: " + result);
}).join();
long cost = System.currentTimeMillis() - start;
System.out.println("总耗时: " + cost + "ms");
}
}
超时控制(Java 9+)
public class CFTimeout {
public static void main(String[] args) {
// 超时设置
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) {}
return "结果";
})
.orTimeout(3, TimeUnit.SECONDS) // 超时抛出TimeoutException
.exceptionally(ex -> "超时返回默认值");
// 降级超时
CompletableFuture<String> fallback = CompletableFuture
.supplyAsync(() -> riskyOperation())
.completeOnTimeout("降级结果", 2, TimeUnit.SECONDS);
}
}
自定义线程池
public class CFThreadPool {
public static void main(String[] args) {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "使用自定义线程池", executor);
// 记得关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));
}
}
总结
CompletableFuture是Java异步编程的核心工具。掌握supplyAsync、thenApply、thenCompose、allOf等方法,可以轻松实现并行处理和复杂的异步流程。