Java响应式编程入门
Java响应式编程入门
什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。核心思想是:当数据变化时,依赖这些数据的计算会自动更新。
核心概念
// 传统命令式编程
public List<String> traditional() {
List<String> result = new ArrayList<>();
for (User user : users) {
if (user.isActive()) {
result.add(user.getName().toUpperCase());
}
}
return result;
}
// 响应式编程
public Flux<String> reactive() {
return Flux.fromIterable(users)
.filter(User::isActive)
.map(user -> user.getName().toUpperCase());
}
Reactor框架
依赖配置
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
Mono和Flux
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorDemo {
public static void main(String[] args) {
// Mono:0或1个元素
Mono<String> mono = Mono.just("Hello");
Mono<Void> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("错误"));
// Flux:0或N个元素
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> range = Flux.range(1, 10);
Flux<String> fromArray = Flux.fromArray(new String[]{"X", "Y", "Z"});
}
}
操作符
public class ReactorOperators {
public static void main(String[] args) {
// map:转换
Flux.range(1, 5)
.map(i -> i * 2)
.subscribe(System.out::println);
// 2, 4, 6, 8, 10
// filter:过滤
Flux.range(1, 10)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println);
// 2, 4, 6, 8, 10
// flatMap:扁平化
Flux.just("hello", "world")
.flatMap(s -> Flux.fromArray(s.split("")))
.subscribe(System.out::println);
// reduce:归约
Mono<Integer> sum = Flux.range(1, 100)
.reduce(0, Integer::sum);
// collectList:收集为List
Mono<List<Integer>> list = Flux.range(1, 5)
.collectList();
// distinct:去重
Flux.just(1, 2, 2, 3, 3, 4)
.distinct()
.subscribe(System.out::println);
// take:取前N个
Flux.range(1, 100)
.take(5)
.subscribe(System.out::println);
// skip:跳过前N个
Flux.range(1, 10)
.skip(5)
.subscribe(System.out::println);
}
}
错误处理
public class ReactorErrorHandling {
public static void main(String[] args) {
// onErrorReturn:异常时返回默认值
Flux.error(new RuntimeException("错误"))
.onErrorReturn("默认值")
.subscribe(System.out::println);
// onErrorResume:异常时切换到备用流
Flux.error(new RuntimeException("错误"))
.onResume(e -> Flux.just("恢复", "数据"))
.subscribe(System.out::println);
// retry:重试
Flux.error(new RuntimeException("错误"))
.retry(3)
.subscribe(
System.out::println,
e -> System.out.println("最终失败: " + e.getMessage())
);
// doOnError:记录异常
Flux.error(new RuntimeException("错误"))
.doOnError(e -> System.err.println("记录异常: " + e.getMessage()))
.subscribe();
}
}
实际应用:WebFlux
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable int id) {
return userService.findById(id);
}
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}
@PostMapping
public Mono<User> createUser(@RequestBody User user) {
return userService.save(user);
}
@GetMapping("/search")
public Flux<User> searchUsers(@RequestParam String name) {
return userService.findByName(name);
}
}
@Service
public class UserService {
private final UserRepository repository;
public UserService(UserRepository repository) {
this.repository = repository;
}
public Mono<User> findById(int id) {
return repository.findById(id);
}
public Flux<User> findAll() {
return repository.findAll();
}
public Mono<User> save(User user) {
return repository.save(user);
}
public Flux<User> findByName(String name) {
return repository.findByNameContaining(name);
}
}
背压(Backpressure)
public class BackpressureDemo {
public static void main(String[] args) {
// 背压策略
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 缓冲100个
.onBackpressureDrop(i -> log.info("丢弃: {}", i))
.onBackpressureLatest() // 只保留最新
.subscribe();
// 使用subscriber控制节奏
Flux.range(1, 100)
.subscribe(new Subscriber<>() {
private Subscription subscription;
private int count = 0;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(10); // 每次请求10个
}
@Override
public void onNext(Integer i) {
count++;
if (count % 10 == 0) {
subscription.request(10);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
}
}
总结
响应式编程适合处理高并发、异步数据流场景。Reactor的Mono和Flux提供了丰富的操作符,结合WebFlux可以构建高性能的响应式应用。