← 返回首页

Java CompletableFuture详解:异步编程

📂 java ⏱ 3 min 588 words

Java CompletableFuture详解:异步编程

概述

CompletableFuture是Java 8引入的异步编程工具,它提供了丰富的API来处理异步操作。相比于传统的Future,CompletableFuture更加灵活和强大。

1. 创建CompletableFuture

import java.util.concurrent.*;

public class CreateCompletableFuture {
    public static void main(String[] args) {
        // 1. 创建已完成的CompletableFuture
        CompletableFuture<String> completed = CompletableFuture.completedFuture("已完成");
        System.out.println("已完成: " + completed.join());
        
        // 2. 使用supplyAsync创建异步任务
        CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步结果";
        });
        
        // 3. 使用runAsync创建异步任务(无返回值)
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println("异步执行");
        });
        
        try {
            System.out.println("异步结果: " + async.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2. 链式操作

import java.util.concurrent.*;

public class ChainingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        })
        .thenApply(s -> s + " World")  // 转换结果
        .thenApply(String::toUpperCase)  // 再次转换
        .thenApply(s -> s + "!");  // 再次转换
        
        System.out.println("链式结果: " + future.join());
        
        // thenAccept:消费结果(无返回值)
        CompletableFuture.supplyAsync(() -> "Hello")
            .thenAccept(s -> System.out.println("消费: " + s));
        
        // thenRun:执行操作(不使用结果)
        CompletableFuture.supplyAsync(() -> "Hello")
            .thenRun(() -> System.out.println("执行完成"));
    }
}

3. 组合操作

import java.util.concurrent.*;

public class CombiningExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "World";
        });
        
        // thenCombine:组合两个异步结果
        CompletableFuture<String> combined = future1.thenCombine(future2, 
            (s1, s2) -> s1 + " " + s2);
        System.out.println("组合结果: " + combined.join());
        
        // thenCompose:扁平化映射
        CompletableFuture<String> composed = future1.thenCompose(s -> 
            CompletableFuture.supplyAsync(() -> s + " Composed"));
        System.out.println("组合结果: " + composed.join());
        
        // allOf:等待所有完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
        allOf.join();
        System.out.println("所有任务完成");
        
        // anyOf:等待任意一个完成
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
        System.out.println("任意一个完成: " + anyOf.join());
    }
}

4. 异常处理

import java.util.concurrent.*;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        // exceptionally:处理异常
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("异常发生");
        }).exceptionally(ex -> {
            System.out.println("异常处理: " + ex.getMessage());
            return "默认值";
        });
        System.out.println("异常处理结果: " + future1.join());
        
        // handle:处理结果和异常
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("随机异常");
            }
            return "成功";
        }).handle((result, ex) -> {
            if (ex != null) {
                return "异常: " + ex.getMessage();
            }
            return "结果: " + result;
        });
        System.out.println("handle结果: " + future2.join());
        
        // completeExceptionally:手动完成异常
        CompletableFuture<String> future3 = new CompletableFuture<>();
        future3.completeExceptionally(new RuntimeException("手动异常"));
        
        try {
            future3.get();
        } catch (Exception e) {
            System.out.println("手动异常: " + e.getMessage());
        }
    }
}

5. 超时和取消

import java.util.concurrent.*;

public class TimeoutExample {
    public static void main(String[] args) {
        // 超时处理
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "结果";
        }).orTimeout(1, TimeUnit.SECONDS)  // 超时
          .exceptionally(ex -> "超时默认值");
        
        System.out.println("超时结果: " + future.join());
        
        // cancel:取消任务
        CompletableFuture<String> cancellable = CompletableFuture.supplyAsync(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("执行: " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    System.out.println("被中断");
                    return "中断结果";
                }
            }
            return "完成";
        });
        
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        boolean cancelled = cancellable.cancel(true);
        System.out.println("是否取消: " + cancelled);
    }
}

6. 实际应用示例

异步HTTP请求

import java.util.concurrent.*;

public class AsyncHttpClient {
    public static CompletableFuture<String> fetchData(String url) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟HTTP请求
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "数据来自: " + url;
        });
    }
    
    public static void main(String[] args) {
        CompletableFuture<String> future1 = fetchData("https://api1.example.com");
        CompletableFuture<String> future2 = fetchData("https://api2.example.com");
        CompletableFuture<String> future3 = fetchData("https://api3.example.com");
        
        // 并行请求
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
        
        allFutures.thenRun(() -> {
            try {
                System.out.println("结果1: " + future1.get());
                System.out.println("结果2: " + future2.get());
                System.out.println("结果3: " + future3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).join();
    }
}

异步数据处理管道

import java.util.concurrent.*;

public class AsyncPipeline {
    public static CompletableFuture<String> processData(String input) {
        return CompletableFuture.supplyAsync(() -> {
            // 第一步:处理
            return input.toUpperCase();
        }).thenApplyAsync(s -> {
            // 第二步:转换
            return s + "_processed";
        }).thenApplyAsync(s -> {
            // 第三步:格式化
            return "[" + s + "]";
        });
    }
    
    public static void main(String[] args) {
        CompletableFuture<String> result = processData("hello");
        System.out.println("管道结果: " + result.join());
    }
}

7. 最佳实践

  1. 使用链式调用:thenApply、thenCompose等方法
  2. 处理异常:使用exceptionally、handle处理异常
  3. 避免阻塞:使用join()而不是get()
  4. 合理使用线程池:自定义线程池避免ForkJoinPool.commonPool()的限制
  5. 监控异步任务:使用回调和异常处理监控任务状态

总结

CompletableFuture是Java异步编程的核心工具。掌握其使用方法,可以编写高效、可维护的异步程序。在实际编程中,要合理使用链式操作、异常处理和超时控制。