← 返回首页
🌊

Java响应式编程:Reactor与WebFlux

📂 java ⏱ 2 min 297 words

Java响应式编程:Reactor与WebFlux

概述

响应式编程是处理异步数据流的编程范式。本教程介绍Reactor和WebFlux的使用。

1. Reactor基础

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorExample {
    public static void main(String[] args) {
        // Mono:0或1个元素
        Mono<String> mono = Mono.just("Hello");
        mono.subscribe(System.out::println);
        
        // Flux:0到N个元素
        Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
        flux.subscribe(System.out::println);
        
        // 操作符
        Flux.range(1, 10)
            .filter(i -> i % 2 == 0)
            .map(i -> i * 2)
            .subscribe(System.out::println);
        
        // 错误处理
        Flux.error(new RuntimeException("错误"))
            .onErrorResume(e -> Flux.just("恢复"))
            .subscribe(System.out::println);
    }
}

2. WebFlux

import org.springframework.web.reactive.function.server.*;
import org.springframework.web.reactive.function.server.RouterFunctions;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Configuration
public class WebFluxConfig {
    @Bean
    public RouterFunction<ServerResponse> routes(UserHandler handler) {
        return RouterFunctions.route()
            .GET("/users", handler::getAll)
            .GET("/users/{id}", handler::getById)
            .POST("/users", handler::create)
            .build();
    }
}

@Component
public class UserHandler {
    private final UserService userService;
    
    public UserHandler(UserService userService) {
        this.userService = userService;
    }
    
    public Mono<ServerResponse> getAll(ServerRequest request) {
        Flux<User> users = userService.findAll();
        return ServerResponse.ok().body(users, User.class);
    }
    
    public Mono<ServerResponse> getById(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<User> user = userService.findById(id);
        return ServerResponse.ok().body(user, User.class);
    }
    
    public Mono<ServerResponse> create(ServerRequest request) {
        Mono<User> user = request.bodyToMono(User.class);
        Mono<User> created = user.flatMap(userService::save);
        return ServerResponse.ok().body(created, User.class);
    }
}

3. 实际应用示例

响应式REST API

@RestController
@RequestMapping("/api/users")
public class UserReactiveController {
    private final UserService userService;
    
    public UserReactiveController(UserService userService) {
        this.userService = userService;
    }
    
    @GetMapping
    public Flux<User> getAll() {
        return userService.findAll();
    }
    
    @GetMapping("/{id}")
    public Mono<User> getById(@PathVariable String id) {
        return userService.findById(id);
    }
    
    @PostMapping
    public Mono<User> create(@RequestBody User user) {
        return userService.save(user);
    }
    
    @GetMapping("/stream")
    public Flux<User> stream() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new User("User-" + i, i));
    }
}

响应式数据库访问

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UserRepository extends ReactiveMongoRepository<User, String> {
    Flux<User> findByName(String name);
}

@Service
public class UserService {
    private final UserRepository userRepository;
    
    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    public Flux<User> findAll() {
        return userRepository.findAll();
    }
    
    public Mono<User> findById(String id) {
        return userRepository.findById(id);
    }
    
    public Mono<User> save(User user) {
        return userRepository.save(user);
    }
}

4. 最佳实践

  1. 使用操作符:避免手动处理异步逻辑
  2. 错误处理:使用onErrorResume、onErrorReturn
  3. 背压处理:使用Buffer、Drop等策略
  4. 并发控制:使用parallel、runOn
  5. 测试验证:使用StepVerifier测试响应式代码

总结

响应式编程是现代Java开发的重要技术。掌握Reactor和WebFlux的使用,可以构建高并发、非阻塞的应用系统。