Java CompletableFuture详解:异步编程
Java CompletableFuture详解:异步编程
概述
CompletableFuture是Java 8引入的异步编程工具,它提供了丰富的API来处理异步操作。相比于传统的Future,CompletableFuture更加灵活和强大。
1. 创建CompletableFuture
import java.util.concurrent.*;
public class CreateCompletableFuture {
public static void main(String[] args) {
// 1. 创建已完成的CompletableFuture
CompletableFuture<String> completed = CompletableFuture.completedFuture("已完成");
System.out.println("已完成: " + completed.join());
// 2. 使用supplyAsync创建异步任务
CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "异步结果";
});
// 3. 使用runAsync创建异步任务(无返回值)
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println("异步执行");
});
try {
System.out.println("异步结果: " + async.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. 链式操作
import java.util.concurrent.*;
public class ChainingExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
})
.thenApply(s -> s + " World") // 转换结果
.thenApply(String::toUpperCase) // 再次转换
.thenApply(s -> s + "!"); // 再次转换
System.out.println("链式结果: " + future.join());
// thenAccept:消费结果(无返回值)
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println("消费: " + s));
// thenRun:执行操作(不使用结果)
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("执行完成"));
}
}
3. 组合操作
import java.util.concurrent.*;
public class CombiningExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "World";
});
// thenCombine:组合两个异步结果
CompletableFuture<String> combined = future1.thenCombine(future2,
(s1, s2) -> s1 + " " + s2);
System.out.println("组合结果: " + combined.join());
// thenCompose:扁平化映射
CompletableFuture<String> composed = future1.thenCompose(s ->
CompletableFuture.supplyAsync(() -> s + " Composed"));
System.out.println("组合结果: " + composed.join());
// allOf:等待所有完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
allOf.join();
System.out.println("所有任务完成");
// anyOf:等待任意一个完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
System.out.println("任意一个完成: " + anyOf.join());
}
}
4. 异常处理
import java.util.concurrent.*;
public class ExceptionHandlingExample {
public static void main(String[] args) {
// exceptionally:处理异常
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("异常发生");
}).exceptionally(ex -> {
System.out.println("异常处理: " + ex.getMessage());
return "默认值";
});
System.out.println("异常处理结果: " + future1.join());
// handle:处理结果和异常
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
}).handle((result, ex) -> {
if (ex != null) {
return "异常: " + ex.getMessage();
}
return "结果: " + result;
});
System.out.println("handle结果: " + future2.join());
// completeExceptionally:手动完成异常
CompletableFuture<String> future3 = new CompletableFuture<>();
future3.completeExceptionally(new RuntimeException("手动异常"));
try {
future3.get();
} catch (Exception e) {
System.out.println("手动异常: " + e.getMessage());
}
}
}
5. 超时和取消
import java.util.concurrent.*;
public class TimeoutExample {
public static void main(String[] args) {
// 超时处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果";
}).orTimeout(1, TimeUnit.SECONDS) // 超时
.exceptionally(ex -> "超时默认值");
System.out.println("超时结果: " + future.join());
// cancel:取消任务
CompletableFuture<String> cancellable = CompletableFuture.supplyAsync(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("执行: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("被中断");
return "中断结果";
}
}
return "完成";
});
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean cancelled = cancellable.cancel(true);
System.out.println("是否取消: " + cancelled);
}
}
6. 实际应用示例
异步HTTP请求
import java.util.concurrent.*;
public class AsyncHttpClient {
public static CompletableFuture<String> fetchData(String url) {
return CompletableFuture.supplyAsync(() -> {
// 模拟HTTP请求
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "数据来自: " + url;
});
}
public static void main(String[] args) {
CompletableFuture<String> future1 = fetchData("https://api1.example.com");
CompletableFuture<String> future2 = fetchData("https://api2.example.com");
CompletableFuture<String> future3 = fetchData("https://api3.example.com");
// 并行请求
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> {
try {
System.out.println("结果1: " + future1.get());
System.out.println("结果2: " + future2.get());
System.out.println("结果3: " + future3.get());
} catch (Exception e) {
e.printStackTrace();
}
}).join();
}
}
异步数据处理管道
import java.util.concurrent.*;
public class AsyncPipeline {
public static CompletableFuture<String> processData(String input) {
return CompletableFuture.supplyAsync(() -> {
// 第一步:处理
return input.toUpperCase();
}).thenApplyAsync(s -> {
// 第二步:转换
return s + "_processed";
}).thenApplyAsync(s -> {
// 第三步:格式化
return "[" + s + "]";
});
}
public static void main(String[] args) {
CompletableFuture<String> result = processData("hello");
System.out.println("管道结果: " + result.join());
}
}
7. 最佳实践
- 使用链式调用:thenApply、thenCompose等方法
- 处理异常:使用exceptionally、handle处理异常
- 避免阻塞:使用join()而不是get()
- 合理使用线程池:自定义线程池避免ForkJoinPool.commonPool()的限制
- 监控异步任务:使用回调和异常处理监控任务状态
总结
CompletableFuture是Java异步编程的核心工具。掌握其使用方法,可以编写高效、可维护的异步程序。在实际编程中,要合理使用链式操作、异常处理和超时控制。