Java响应式编程:Reactor与WebFlux
Java响应式编程:Reactor与WebFlux
概述
响应式编程是处理异步数据流的编程范式。本教程介绍Reactor和WebFlux的使用。
1. Reactor基础
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorExample {
public static void main(String[] args) {
// Mono:0或1个元素
Mono<String> mono = Mono.just("Hello");
mono.subscribe(System.out::println);
// Flux:0到N个元素
Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
flux.subscribe(System.out::println);
// 操作符
Flux.range(1, 10)
.filter(i -> i % 2 == 0)
.map(i -> i * 2)
.subscribe(System.out::println);
// 错误处理
Flux.error(new RuntimeException("错误"))
.onErrorResume(e -> Flux.just("恢复"))
.subscribe(System.out::println);
}
}
2. WebFlux
import org.springframework.web.reactive.function.server.*;
import org.springframework.web.reactive.function.server.RouterFunctions;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Configuration
public class WebFluxConfig {
@Bean
public RouterFunction<ServerResponse> routes(UserHandler handler) {
return RouterFunctions.route()
.GET("/users", handler::getAll)
.GET("/users/{id}", handler::getById)
.POST("/users", handler::create)
.build();
}
}
@Component
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<User> users = userService.findAll();
return ServerResponse.ok().body(users, User.class);
}
public Mono<ServerResponse> getById(ServerRequest request) {
String id = request.pathVariable("id");
Mono<User> user = userService.findById(id);
return ServerResponse.ok().body(user, User.class);
}
public Mono<ServerResponse> create(ServerRequest request) {
Mono<User> user = request.bodyToMono(User.class);
Mono<User> created = user.flatMap(userService::save);
return ServerResponse.ok().body(created, User.class);
}
}
3. 实际应用示例
响应式REST API
@RestController
@RequestMapping("/api/users")
public class UserReactiveController {
private final UserService userService;
public UserReactiveController(UserService userService) {
this.userService = userService;
}
@GetMapping
public Flux<User> getAll() {
return userService.findAll();
}
@GetMapping("/{id}")
public Mono<User> getById(@PathVariable String id) {
return userService.findById(id);
}
@PostMapping
public Mono<User> create(@RequestBody User user) {
return userService.save(user);
}
@GetMapping("/stream")
public Flux<User> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new User("User-" + i, i));
}
}
响应式数据库访问
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Flux<User> findByName(String name);
}
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Flux<User> findAll() {
return userRepository.findAll();
}
public Mono<User> findById(String id) {
return userRepository.findById(id);
}
public Mono<User> save(User user) {
return userRepository.save(user);
}
}
4. 最佳实践
- 使用操作符:避免手动处理异步逻辑
- 错误处理:使用onErrorResume、onErrorReturn
- 背压处理:使用Buffer、Drop等策略
- 并发控制:使用parallel、runOn
- 测试验证:使用StepVerifier测试响应式代码
总结
响应式编程是现代Java开发的重要技术。掌握Reactor和WebFlux的使用,可以构建高并发、非阻塞的应用系统。