← 返回首页
🌐

流式架构Flink/Kafka Streams

📂 architecture ⏱ 7 min 1299 words

流式架构Flink/Kafka Streams

流式处理概述

流式处理是一种数据处理范式,它持续地处理到达的数据流,而不是批量处理。流式处理适用于实时分析、实时监控、实时推荐等场景。

核心概念

数据流(Data Stream)

数据流是无界的数据序列,数据不断到达并被处理。

// Flink数据流示例
public class StreamProcessingExample {
    
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 从Kafka读取数据流
        DataStream<String> inputStream = env.addSource(
            new FlinkKafkaConsumer<>(
                "input-topic",
                new SimpleStringSchema(),
                kafkaProperties()
            )
        );
        
        // 处理数据流
        DataStream<ProcessedEvent> processedStream = inputStream
            .map(json -> objectMapper.readValue(json, RawEvent.class))
            .filter(event -> event.isValid())
            .flatMap(new EventEnricher())
            .keyBy(ProcessedEvent::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .aggregate(new EventAggregator());
        
        // 输出到Kafka
        processedStream.addSink(
            new FlinkKafkaProducer<>(
                "output-topic",
                new ProcessedEventSchema(),
                kafkaProperties()
            )
        );
        
        env.execute("Stream Processing Job");
    }
}

窗口(Window)

窗口是将无限流分割成有限块的机制,用于对数据进行分组聚合。

// Flink窗口示例
public class WindowExample {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<Event> events = env.addSource(new EventSource());
        
        // 滚动窗口(Tumbling Window)
        DataStream<WindowAggregation> tumblingResult = events
            .keyBy(Event::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .aggregate(new EventCountAggregator());
        
        // 滑动窗口(Sliding Window)
        DataStream<WindowAggregation> slidingResult = events
            .keyBy(Event::getUserId)
            .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
            .aggregate(new EventCountAggregator());
        
        // 会话窗口(Session Window)
        DataStream<WindowAggregation> sessionResult = events
            .keyBy(Event::getUserId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
            .aggregate(new EventCountAggregator());
        
        // 全局窗口(Global Window)
        DataStream<WindowAggregation> globalResult = events
            .window(GlobalWindows.create())
            .trigger(ContinuousEventTimeTrigger.of(Time.minutes(5)))
            .aggregate(new EventCountAggregator());
    }
}

// 窗口聚合函数
public class EventCountAggregator implements AggregateFunction<Event, WindowAccumulator, WindowAggregation> {
    
    @Override
    public WindowAccumulator createAccumulator() {
        return new WindowAccumulator();
    }
    
    @Override
    public WindowAccumulator add(Event value, WindowAccumulator accumulator) {
        accumulator.count++;
        accumulator.userId = value.getUserId();
        accumulator.windowStart = value.getTimestamp();
        return accumulator;
    }
    
    @Override
    public WindowAggregation getResult(WindowAccumulator accumulator) {
        return new WindowAggregation(
            accumulator.userId,
            accumulator.count,
            accumulator.windowStart
        );
    }
    
    @Override
    public WindowAccumulator merge(WindowAccumulator a, WindowAccumulator b) {
        a.count += b.count;
        return a;
    }
}

class WindowAccumulator {
    long count = 0;
    String userId;
    long windowStart;
}

时间语义

// Flink时间语义示例
public class TimeSemanticsExample {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置事件时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<Event> events = env.addSource(new EventSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
            );
        
        // 处理事件时间数据
        events
            .keyBy(Event::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
                @Override
                public void process(String key, Context context, Iterable<Event> elements, Collector<Result> out) {
                    long windowStart = context.window().getStart();
                    long windowEnd = context.window().getEnd();
                    long count = StreamSupport.stream(elements.spliterator(), false).count();
                    
                    out.collect(new Result(key, windowStart, windowEnd, count));
                }
            });
    }
}

Apache Flink

Flink架构

// Flink作业示例
public class FlinkJob {
    
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置并行度
        env.setParallelism(4);
        
        // 检查点配置
        env.enableCheckpointing(60000); // 60秒
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
        env.getCheckpointConfig().setCheckpointTimeout(120000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        
        // 状态后端配置
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        
        // 从Kafka读取数据
        DataStream<String> inputStream = env.addSource(
            new FlinkKafkaConsumer<>(
                "input-topic",
                new SimpleStringSchema(),
                getKafkaProperties()
            )
        ).setStartFromLatest();
        
        // 处理数据
        DataStream<Result> result = inputStream
            .map(json -> objectMapper.readValue(json, Event.class))
            .keyBy(Event::getUserId)
            .process(new EventProcessor())
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .aggregate(new ResultAggregator());
        
        // 输出结果
        result.addSink(new FlinkKafkaProducer<>(
            "output-topic",
            new ResultSchema(),
            getKafkaProperties()
        ));
        
        // 输出到控制台
        result.print();
        
        // 执行作业
        env.execute("Flink Event Processing Job");
    }
}

// Flink状态处理
public class EventProcessor extends KeyedProcessFunction<String, Event, ProcessedEvent> {
    
    // 值状态
    private ValueState<Long> countState;
    private ValueState<BigDecimal> amountState;
    
    // 列表状态
    private ListState<Event> eventListState;
    
    // 映射状态
    private MapState<String, Long> eventCountState;
    
    // 定时器状态
    private ValueState<Long> timerState;
    
    @Override
    public void open(Configuration parameters) {
        // 初始化状态
        countState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("count", Long.class)
        );
        
        amountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("amount", BigDecimal.class)
        );
        
        eventListState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("events", Event.class)
        );
        
        eventCountState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("eventCounts", String.class, Long.class)
        );
        
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class)
        );
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out) throws Exception {
        // 更新状态
        Long count = countState.value();
        if (count == null) {
            count = 0L;
        }
        countState.update(count + 1);
        
        BigDecimal amount = amountState.value();
        if (amount == null) {
            amount = BigDecimal.ZERO;
        }
        amountState.update(amount.add(event.getAmount()));
        
        // 更新事件列表
        eventListState.add(event);
        
        // 更新事件计数
        String eventType = event.getType();
        Long eventCount = eventCountState.get(eventType);
        if (eventCount == null) {
            eventCount = 0L;
        }
        eventCountState.put(eventType, eventCount + 1);
        
        // 注册定时器
        long timer = event.getTimestamp() + 5 * 60 * 1000; // 5分钟后
        if (timerState.value() == null) {
            ctx.timerService().registerEventTimeTimer(timer);
            timerState.update(timer);
        }
        
        // 输出处理后的事件
        out.collect(new ProcessedEvent(
            event.getId(),
            event.getUserId(),
            event.getType(),
            count + 1,
            amount.add(event.getAmount())
        ));
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ProcessedEvent> out) throws Exception {
        // 定时器触发时执行清理
        Long count = countState.value();
        BigDecimal amount = amountState.value();
        
        if (count != null && amount != null) {
            out.collect(new ProcessedEvent(
                "timer-" + timestamp,
                ctx.getCurrentKey(),
                "TIMER",
                count,
                amount
            ));
        }
        
        // 清理状态
        countState.clear();
        amountState.clear();
        eventListState.clear();
        eventCountState.clear();
        timerState.clear();
    }
}

Flink SQL

// Flink SQL示例
public class FlinkSqlExample {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // 创建Kafka源表
        tableEnv.executeSql(
            "CREATE TABLE orders (" +
            "  order_id STRING," +
            "  user_id STRING," +
            "  product_id STRING," +
            "  amount DECIMAL(10,2)," +
            "  order_time TIMESTAMP(3)," +
            "  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic' = 'orders'," +
            "  'properties.bootstrap.servers' = 'localhost:9092'," +
            "  'format' = 'json'" +
            ")"
        );
        
        // 创建输出表
        tableEnv.executeSql(
            "CREATE TABLE order_aggregations (" +
            "  user_id STRING," +
            "  window_start TIMESTAMP(3)," +
            "  window_end TIMESTAMP(3)," +
            "  order_count BIGINT," +
            "  total_amount DECIMAL(10,2)" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic' = 'order-aggregations'," +
            "  'properties.bootstrap.servers' = 'localhost:9092'," +
            "  'format' = 'json'" +
            ")"
        );
        
        // 执行聚合查询
        tableEnv.executeSql(
            "INSERT INTO order_aggregations " +
            "SELECT " +
            "  user_id," +
            "  window_start," +
            "  window_end," +
            "  COUNT(*) AS order_count," +
            "  SUM(amount) AS total_amount " +
            "FROM TABLE(" +
            "  TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)" +
            ") " +
            "GROUP BY user_id, window_start, window_end"
        );
    }
}

Kafka Streams

核心概念

// Kafka Streams配置
public class KafkaStreamsConfig {
    
    public StreamsConfig getConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
        
        // 高可用配置
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
        
        return new StreamsConfig(props);
    }
}

// Kafka Streams拓扑
@Component
public class KafkaStreamsTopology {
    
    public Topology buildTopology() {
        Topology topology = new Topology();
        
        // 添加源
        topology.addSource("source", "input-topic")
            // 添加处理器
            .addProcessor("processor", 
                () -> new EventProcessor(), 
                "source")
            // 添加状态存储
            .addStateStore(
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("event-store"),
                    Serdes.String(),
                    Serdes.String()
                ),
                "processor"
            )
            // 添加汇
            .addSink("sink", "output-topic", "processor");
        
        return topology;
    }
}

// Kafka Streams处理器
public class EventProcessor implements Processor<String, String, String, String> {
    
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, String> store;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        this.store = context.getStateStore("event-store");
    }
    
    @Override
    public void process(Record<String, String> record) {
        String key = record.key();
        String value = record.value();
        
        // 处理事件
        String processedValue = processEvent(value);
        
        // 更新状态存储
        store.put(key, processedValue);
        
        // 发送结果
        context.forward(new Record<>(key, processedValue, record.timestamp()));
    }
    
    private String processEvent(String event) {
        // 事件处理逻辑
        return "processed-" + event;
    }
    
    @Override
    public void close() {
        // 清理资源
    }
}

Kafka Streams DSL

// Kafka Streams DSL示例
@Component
public class KafkaStreamsDslExample {
    
    public void process() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取输入流
        KStream<String, String> inputStream = builder.stream("input-topic");
        
        // 处理流
        KStream<String, ProcessedEvent> processedStream = inputStream
            .mapValues(value -> objectMapper.readValue(value, RawEvent.class))
            .filter((key, event) -> event.isValid())
            .mapValues(event -> new ProcessedEvent(event.getId(), event.getData()));
        
        // 分组和聚合
        KTable<Windowed<String>, AggregatedResult> aggregatedTable = processedStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                AggregatedResult::new,
                (key, value, aggregate) -> aggregate.add(value),
                Materialized.as("aggregated-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new AggregatedResultSerde())
            );
        
        // 聚合结果流
        KStream<String, AggregatedResult> aggregatedStream = aggregatedTable
            .toStream()
            .map((windowedKey, value) -> 
                KeyValue.pair(windowedKey.key(), value)
            );
        
        // 输出到Kafka
        aggregatedStream.to("output-topic");
        
        // 创建并启动拓扑
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

方案对比

特性 Flink Kafka Streams
部署模式 独立集群 嵌入应用
状态管理 内置 基于Kafka
容错机制 检查点 Kafka复制
延迟 毫秒级 毫秒级
吞吐量 中等
复杂事件处理 支持 不支持
SQL支持 支持 不支持
适用场景 复杂流处理 简单流处理

使用场景

实时监控

// 实时监控告警
@Component
public class RealTimeMonitoring {
    
    public void monitor() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "monitoring-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取指标流
        KStream<String, Metric> metricStream = builder.stream("metrics-topic");
        
        // 检测异常
        KStream<String, Alert> alertStream = metricStream
            .filter((key, metric) -> metric.getValue() > metric.getThreshold())
            .mapValues(metric -> new Alert(
                metric.getName(),
                metric.getValue(),
                metric.getThreshold(),
                LocalDateTime.now()
            ));
        
        // 发送告警
        alertStream.to("alerts-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

实时推荐

// 实时推荐系统
@Component
public class RealTimeRecommendation {
    
    public void recommend() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "recommendation-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取用户行为流
        KStream<String, UserBehavior> behaviorStream = builder.stream("user-behavior-topic");
        
        // 分析用户兴趣
        KTable<Windowed<String>, UserInterest> userInterestTable = behaviorStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30)))
            .aggregate(
                UserInterest::new,
                (key, behavior, interest) -> interest.addBehavior(behavior),
                Materialized.as("user-interest-store")
            );
        
        // 生成推荐
        KStream<String, Recommendation> recommendationStream = userInterestTable
            .toStream()
            .map((windowedKey, interest) -> 
                KeyValue.pair(windowedKey.key(), generateRecommendation(interest))
            );
        
        // 输出推荐结果
        recommendationStream.to("recommendations-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
    
    private Recommendation generateRecommendation(UserInterest interest) {
        // 基于用户兴趣生成推荐
        List<String> recommendedItems = interest.getTopInterests(10);
        return new Recommendation(interest.getUserId(), recommendedItems);
    }
}

实践建议

  1. 状态管理:合理设计状态存储,避免状态过大
  2. 容错机制:启用检查点和故障恢复机制
  3. 性能优化:调整并行度、缓冲区大小等参数
  4. 监控告警:监控流处理作业的延迟、吞吐量和状态大小
  5. 数据一致性:确保端到端的exactly-once语义