← 返回首页

Java CompletableFuture详解

📂 java ⏱ 3 min 549 words

Java CompletableFuture详解

什么是CompletableFuture

CompletableFuture是Java 8引入的异步编程工具,支持链式调用、组合多个异步操作和优雅的异常处理。

基本创建

public class CFBasic {
    public static void main(String[] args) {
        // 异步执行无返回值
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
            System.out.println("异步任务: " + Thread.currentThread().getName());
        });

        // 异步执行有返回值
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            return "Hello Async";
        });

        // 使用自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            return "使用自定义线程池";
        }, executor);
    }
}

链式操作

public class CFChain {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> "Hello")
            .thenApply(s -> s + " World")      // 转换
            .thenApply(String::toUpperCase)     // 再转换
            .thenAccept(System.out::println);   // 消费

        // thenApply vs thenApplyAsync
        CompletableFuture<String> sync = CompletableFuture
            .supplyAsync(() -> "sync")
            .thenApply(s -> s + " result");  // 在上一任务的线程执行

        CompletableFuture<String> async = CompletableFuture
            .supplyAsync(() -> "async")
            .thenApplyAsync(s -> s + " result");  // 在新线程执行

        // thenAccept:消费结果
        CompletableFuture.supplyAsync(() -> 100)
            .thenAccept(result -> System.out.println("结果: " + result));

        // thenRun:不关心结果
        CompletableFuture.supplyAsync(() -> "data")
            .thenRun(() -> System.out.println("任务完成"));
    }
}

组合多个异步操作

public class CFCombine {
    public static void main(String[] args) {
        CompletableFuture<String> userFuture = CompletableFuture
            .supplyAsync(() -> fetchUser(1));

        CompletableFuture<String> orderFuture = CompletableFuture
            .supplyAsync(() -> fetchOrder(1));

        // thenCombine:组合两个结果
        CompletableFuture<String> combined = userFuture
            .thenCombine(orderFuture, (user, order) ->
                "用户: " + user + ", 订单: " + order
            );

        // thenCompose:扁平化(类似flatMap)
        CompletableFuture<String> composed = CompletableFuture
            .supplyAsync(() -> getUserId())
            .thenCompose(id -> fetchUserAsync(id));

        // allOf:等待所有完成
        CompletableFuture<Void> all = CompletableFuture.allOf(
            userFuture, orderFuture
        );
        all.thenRun(() -> {
            System.out.println("用户: " + userFuture.join());
            System.out.println("订单: " + orderFuture.join());
        });

        // anyOf:任一完成即可
        CompletableFuture<Object> any = CompletableFuture.anyOf(
            userFuture, orderFuture
        );
    }

    private static String fetchUser(int id) { return "User" + id; }
    private static String fetchOrder(int id) { return "Order" + id; }
    private static int getUserId() { return 1; }
    private static CompletableFuture<String> fetchUserAsync(int id) {
        return CompletableFuture.supplyAsync(() -> "User" + id);
    }
}

异常处理

public class CFException {
    public static void main(String[] args) {
        // exceptionally:处理异常
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("随机异常");
                }
                return "成功";
            })
            .exceptionally(ex -> {
                System.out.println("异常: " + ex.getMessage());
                return "默认值";
            });

        // handle:处理结果和异常
        CompletableFuture<String> handled = CompletableFuture
            .supplyAsync(() -> riskyOperation())
            .handle((result, ex) -> {
                if (ex != null) {
                    return "错误: " + ex.getMessage();
                }
                return "结果: " + result;
            });

        // whenComplete:不改变结果
        CompletableFuture.supplyAsync(() -> "data")
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("任务失败", ex);
                } else {
                    log.info("任务成功: {}", result);
                }
            });
    }

    private static String riskyOperation() {
        if (Math.random() > 0.5) {
            throw new RuntimeException("失败");
        }
        return "success";
    }
}

实际应用:并行查询

public class ParallelQuery {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();

        CompletableFuture<User> userFuture = CompletableFuture
            .supplyAsync(() -> fetchUserFromDB(1));

        CompletableFuture<List<Order>> ordersFuture = CompletableFuture
            .supplyAsync(() -> fetchOrdersFromDB(1));

        CompletableFuture<UserProfile> profileFuture = CompletableFuture
            .supplyAsync(() -> fetchProfileFromAPI(1));

        // 等待所有结果
        CompletableFuture<Void> allDone = CompletableFuture.allOf(
            userFuture, ordersFuture, profileFuture
        );

        allDone.thenRun(() -> {
            User user = userFuture.join();
            List<Order> orders = ordersFuture.join();
            UserProfile profile = profileFuture.join();

            UserProfile result = new UserProfile(user, orders, profile);
            System.out.println("查询完成: " + result);
        }).join();

        long cost = System.currentTimeMillis() - start;
        System.out.println("总耗时: " + cost + "ms");
    }
}

超时控制(Java 9+)

public class CFTimeout {
    public static void main(String[] args) {
        // 超时设置
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                try { Thread.sleep(5000); } catch (InterruptedException e) {}
                return "结果";
            })
            .orTimeout(3, TimeUnit.SECONDS)  // 超时抛出TimeoutException
            .exceptionally(ex -> "超时返回默认值");

        // 降级超时
        CompletableFuture<String> fallback = CompletableFuture
            .supplyAsync(() -> riskyOperation())
            .completeOnTimeout("降级结果", 2, TimeUnit.SECONDS);
    }
}

自定义线程池

public class CFThreadPool {
    public static void main(String[] args) {
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> "使用自定义线程池", executor);

        // 记得关闭线程池
        Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));
    }
}

总结

CompletableFuture是Java异步编程的核心工具。掌握supplyAsync、thenApply、thenCompose、allOf等方法,可以轻松实现并行处理和复杂的异步流程。