← 返回首页

Java响应式编程入门

📂 java ⏱ 3 min 442 words

Java响应式编程入门

什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。核心思想是:当数据变化时,依赖这些数据的计算会自动更新。

核心概念

// 传统命令式编程
public List<String> traditional() {
    List<String> result = new ArrayList<>();
    for (User user : users) {
        if (user.isActive()) {
            result.add(user.getName().toUpperCase());
        }
    }
    return result;
}

// 响应式编程
public Flux<String> reactive() {
    return Flux.fromIterable(users)
        .filter(User::isActive)
        .map(user -> user.getName().toUpperCase());
}

Reactor框架

依赖配置

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>

Mono和Flux

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorDemo {
    public static void main(String[] args) {
        // Mono:0或1个元素
        Mono<String> mono = Mono.just("Hello");
        Mono<Void> empty = Mono.empty();
        Mono<String> error = Mono.error(new RuntimeException("错误"));

        // Flux:0或N个元素
        Flux<String> flux = Flux.just("A", "B", "C");
        Flux<Integer> range = Flux.range(1, 10);
        Flux<String> fromArray = Flux.fromArray(new String[]{"X", "Y", "Z"});
    }
}

操作符

public class ReactorOperators {
    public static void main(String[] args) {
        // map:转换
        Flux.range(1, 5)
            .map(i -> i * 2)
            .subscribe(System.out::println);
        // 2, 4, 6, 8, 10

        // filter:过滤
        Flux.range(1, 10)
            .filter(i -> i % 2 == 0)
            .subscribe(System.out::println);
        // 2, 4, 6, 8, 10

        // flatMap:扁平化
        Flux.just("hello", "world")
            .flatMap(s -> Flux.fromArray(s.split("")))
            .subscribe(System.out::println);

        // reduce:归约
        Mono<Integer> sum = Flux.range(1, 100)
            .reduce(0, Integer::sum);

        // collectList:收集为List
        Mono<List<Integer>> list = Flux.range(1, 5)
            .collectList();

        // distinct:去重
        Flux.just(1, 2, 2, 3, 3, 4)
            .distinct()
            .subscribe(System.out::println);

        // take:取前N个
        Flux.range(1, 100)
            .take(5)
            .subscribe(System.out::println);

        // skip:跳过前N个
        Flux.range(1, 10)
            .skip(5)
            .subscribe(System.out::println);
    }
}

错误处理

public class ReactorErrorHandling {
    public static void main(String[] args) {
        // onErrorReturn:异常时返回默认值
        Flux.error(new RuntimeException("错误"))
            .onErrorReturn("默认值")
            .subscribe(System.out::println);

        // onErrorResume:异常时切换到备用流
        Flux.error(new RuntimeException("错误"))
            .onResume(e -> Flux.just("恢复", "数据"))
            .subscribe(System.out::println);

        // retry:重试
        Flux.error(new RuntimeException("错误"))
            .retry(3)
            .subscribe(
                System.out::println,
                e -> System.out.println("最终失败: " + e.getMessage())
            );

        // doOnError:记录异常
        Flux.error(new RuntimeException("错误"))
            .doOnError(e -> System.err.println("记录异常: " + e.getMessage()))
            .subscribe();
    }
}

实际应用:WebFlux

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable int id) {
        return userService.findById(id);
    }

    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll();
    }

    @PostMapping
    public Mono<User> createUser(@RequestBody User user) {
        return userService.save(user);
    }

    @GetMapping("/search")
    public Flux<User> searchUsers(@RequestParam String name) {
        return userService.findByName(name);
    }
}

@Service
public class UserService {
    private final UserRepository repository;

    public UserService(UserRepository repository) {
        this.repository = repository;
    }

    public Mono<User> findById(int id) {
        return repository.findById(id);
    }

    public Flux<User> findAll() {
        return repository.findAll();
    }

    public Mono<User> save(User user) {
        return repository.save(user);
    }

    public Flux<User> findByName(String name) {
        return repository.findByNameContaining(name);
    }
}

背压(Backpressure)

public class BackpressureDemo {
    public static void main(String[] args) {
        // 背压策略
        Flux.range(1, 1000)
            .onBackpressureBuffer(100)      // 缓冲100个
            .onBackpressureDrop(i -> log.info("丢弃: {}", i))
            .onBackpressureLatest()         // 只保留最新
            .subscribe();

        // 使用subscriber控制节奏
        Flux.range(1, 100)
            .subscribe(new Subscriber<>() {
                private Subscription subscription;
                private int count = 0;

                @Override
                public void onSubscribe(Subscription s) {
                    this.subscription = s;
                    s.request(10); // 每次请求10个
                }

                @Override
                public void onNext(Integer i) {
                    count++;
                    if (count % 10 == 0) {
                        subscription.request(10);
                    }
                }

                @Override
                public void onError(Throwable t) {}
                @Override
                public void onComplete() {}
            });
    }
}

总结

响应式编程适合处理高并发、异步数据流场景。Reactor的Mono和Flux提供了丰富的操作符,结合WebFlux可以构建高性能的响应式应用。