流式架构Flink/Kafka Streams
流式架构Flink/Kafka Streams
流式处理概述
流式处理是一种数据处理范式,它持续地处理到达的数据流,而不是批量处理。流式处理适用于实时分析、实时监控、实时推荐等场景。
核心概念
数据流(Data Stream)
数据流是无界的数据序列,数据不断到达并被处理。
// Flink数据流示例
public class StreamProcessingExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从Kafka读取数据流
DataStream<String> inputStream = env.addSource(
new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
kafkaProperties()
)
);
// 处理数据流
DataStream<ProcessedEvent> processedStream = inputStream
.map(json -> objectMapper.readValue(json, RawEvent.class))
.filter(event -> event.isValid())
.flatMap(new EventEnricher())
.keyBy(ProcessedEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventAggregator());
// 输出到Kafka
processedStream.addSink(
new FlinkKafkaProducer<>(
"output-topic",
new ProcessedEventSchema(),
kafkaProperties()
)
);
env.execute("Stream Processing Job");
}
}
窗口(Window)
窗口是将无限流分割成有限块的机制,用于对数据进行分组聚合。
// Flink窗口示例
public class WindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env.addSource(new EventSource());
// 滚动窗口(Tumbling Window)
DataStream<WindowAggregation> tumblingResult = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventCountAggregator());
// 滑动窗口(Sliding Window)
DataStream<WindowAggregation> slidingResult = events
.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new EventCountAggregator());
// 会话窗口(Session Window)
DataStream<WindowAggregation> sessionResult = events
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new EventCountAggregator());
// 全局窗口(Global Window)
DataStream<WindowAggregation> globalResult = events
.window(GlobalWindows.create())
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(5)))
.aggregate(new EventCountAggregator());
}
}
// 窗口聚合函数
public class EventCountAggregator implements AggregateFunction<Event, WindowAccumulator, WindowAggregation> {
@Override
public WindowAccumulator createAccumulator() {
return new WindowAccumulator();
}
@Override
public WindowAccumulator add(Event value, WindowAccumulator accumulator) {
accumulator.count++;
accumulator.userId = value.getUserId();
accumulator.windowStart = value.getTimestamp();
return accumulator;
}
@Override
public WindowAggregation getResult(WindowAccumulator accumulator) {
return new WindowAggregation(
accumulator.userId,
accumulator.count,
accumulator.windowStart
);
}
@Override
public WindowAccumulator merge(WindowAccumulator a, WindowAccumulator b) {
a.count += b.count;
return a;
}
}
class WindowAccumulator {
long count = 0;
String userId;
long windowStart;
}
时间语义
// Flink时间语义示例
public class TimeSemanticsExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> events = env.addSource(new EventSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// 处理事件时间数据
events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Result> out) {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
long count = StreamSupport.stream(elements.spliterator(), false).count();
out.collect(new Result(key, windowStart, windowEnd, count));
}
});
}
}
Apache Flink
Flink架构
// Flink作业示例
public class FlinkJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
// 检查点配置
env.enableCheckpointing(60000); // 60秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 状态后端配置
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 从Kafka读取数据
DataStream<String> inputStream = env.addSource(
new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
getKafkaProperties()
)
).setStartFromLatest();
// 处理数据
DataStream<Result> result = inputStream
.map(json -> objectMapper.readValue(json, Event.class))
.keyBy(Event::getUserId)
.process(new EventProcessor())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new ResultAggregator());
// 输出结果
result.addSink(new FlinkKafkaProducer<>(
"output-topic",
new ResultSchema(),
getKafkaProperties()
));
// 输出到控制台
result.print();
// 执行作业
env.execute("Flink Event Processing Job");
}
}
// Flink状态处理
public class EventProcessor extends KeyedProcessFunction<String, Event, ProcessedEvent> {
// 值状态
private ValueState<Long> countState;
private ValueState<BigDecimal> amountState;
// 列表状态
private ListState<Event> eventListState;
// 映射状态
private MapState<String, Long> eventCountState;
// 定时器状态
private ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
// 初始化状态
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class)
);
amountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("amount", BigDecimal.class)
);
eventListState = getRuntimeContext().getListState(
new ListStateDescriptor<>("events", Event.class)
);
eventCountState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("eventCounts", String.class, Long.class)
);
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class)
);
}
@Override
public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out) throws Exception {
// 更新状态
Long count = countState.value();
if (count == null) {
count = 0L;
}
countState.update(count + 1);
BigDecimal amount = amountState.value();
if (amount == null) {
amount = BigDecimal.ZERO;
}
amountState.update(amount.add(event.getAmount()));
// 更新事件列表
eventListState.add(event);
// 更新事件计数
String eventType = event.getType();
Long eventCount = eventCountState.get(eventType);
if (eventCount == null) {
eventCount = 0L;
}
eventCountState.put(eventType, eventCount + 1);
// 注册定时器
long timer = event.getTimestamp() + 5 * 60 * 1000; // 5分钟后
if (timerState.value() == null) {
ctx.timerService().registerEventTimeTimer(timer);
timerState.update(timer);
}
// 输出处理后的事件
out.collect(new ProcessedEvent(
event.getId(),
event.getUserId(),
event.getType(),
count + 1,
amount.add(event.getAmount())
));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ProcessedEvent> out) throws Exception {
// 定时器触发时执行清理
Long count = countState.value();
BigDecimal amount = amountState.value();
if (count != null && amount != null) {
out.collect(new ProcessedEvent(
"timer-" + timestamp,
ctx.getCurrentKey(),
"TIMER",
count,
amount
));
}
// 清理状态
countState.clear();
amountState.clear();
eventListState.clear();
eventCountState.clear();
timerState.clear();
}
}
Flink SQL
// Flink SQL示例
public class FlinkSqlExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建Kafka源表
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING," +
" user_id STRING," +
" product_id STRING," +
" amount DECIMAL(10,2)," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// 创建输出表
tableEnv.executeSql(
"CREATE TABLE order_aggregations (" +
" user_id STRING," +
" window_start TIMESTAMP(3)," +
" window_end TIMESTAMP(3)," +
" order_count BIGINT," +
" total_amount DECIMAL(10,2)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order-aggregations'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// 执行聚合查询
tableEnv.executeSql(
"INSERT INTO order_aggregations " +
"SELECT " +
" user_id," +
" window_start," +
" window_end," +
" COUNT(*) AS order_count," +
" SUM(amount) AS total_amount " +
"FROM TABLE(" +
" TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)" +
") " +
"GROUP BY user_id, window_start, window_end"
);
}
}
Kafka Streams
核心概念
// Kafka Streams配置
public class KafkaStreamsConfig {
public StreamsConfig getConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// 高可用配置
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
return new StreamsConfig(props);
}
}
// Kafka Streams拓扑
@Component
public class KafkaStreamsTopology {
public Topology buildTopology() {
Topology topology = new Topology();
// 添加源
topology.addSource("source", "input-topic")
// 添加处理器
.addProcessor("processor",
() -> new EventProcessor(),
"source")
// 添加状态存储
.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("event-store"),
Serdes.String(),
Serdes.String()
),
"processor"
)
// 添加汇
.addSink("sink", "output-topic", "processor");
return topology;
}
}
// Kafka Streams处理器
public class EventProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, String> store;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.store = context.getStateStore("event-store");
}
@Override
public void process(Record<String, String> record) {
String key = record.key();
String value = record.value();
// 处理事件
String processedValue = processEvent(value);
// 更新状态存储
store.put(key, processedValue);
// 发送结果
context.forward(new Record<>(key, processedValue, record.timestamp()));
}
private String processEvent(String event) {
// 事件处理逻辑
return "processed-" + event;
}
@Override
public void close() {
// 清理资源
}
}
Kafka Streams DSL
// Kafka Streams DSL示例
@Component
public class KafkaStreamsDslExample {
public void process() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取输入流
KStream<String, String> inputStream = builder.stream("input-topic");
// 处理流
KStream<String, ProcessedEvent> processedStream = inputStream
.mapValues(value -> objectMapper.readValue(value, RawEvent.class))
.filter((key, event) -> event.isValid())
.mapValues(event -> new ProcessedEvent(event.getId(), event.getData()));
// 分组和聚合
KTable<Windowed<String>, AggregatedResult> aggregatedTable = processedStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
AggregatedResult::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.as("aggregated-store")
.withKeySerde(Serdes.String())
.withValueSerde(new AggregatedResultSerde())
);
// 聚合结果流
KStream<String, AggregatedResult> aggregatedStream = aggregatedTable
.toStream()
.map((windowedKey, value) ->
KeyValue.pair(windowedKey.key(), value)
);
// 输出到Kafka
aggregatedStream.to("output-topic");
// 创建并启动拓扑
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
方案对比
| 特性 | Flink | Kafka Streams |
|---|---|---|
| 部署模式 | 独立集群 | 嵌入应用 |
| 状态管理 | 内置 | 基于Kafka |
| 容错机制 | 检查点 | Kafka复制 |
| 延迟 | 毫秒级 | 毫秒级 |
| 吞吐量 | 高 | 中等 |
| 复杂事件处理 | 支持 | 不支持 |
| SQL支持 | 支持 | 不支持 |
| 适用场景 | 复杂流处理 | 简单流处理 |
使用场景
实时监控
// 实时监控告警
@Component
public class RealTimeMonitoring {
public void monitor() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "monitoring-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取指标流
KStream<String, Metric> metricStream = builder.stream("metrics-topic");
// 检测异常
KStream<String, Alert> alertStream = metricStream
.filter((key, metric) -> metric.getValue() > metric.getThreshold())
.mapValues(metric -> new Alert(
metric.getName(),
metric.getValue(),
metric.getThreshold(),
LocalDateTime.now()
));
// 发送告警
alertStream.to("alerts-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
实时推荐
// 实时推荐系统
@Component
public class RealTimeRecommendation {
public void recommend() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "recommendation-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取用户行为流
KStream<String, UserBehavior> behaviorStream = builder.stream("user-behavior-topic");
// 分析用户兴趣
KTable<Windowed<String>, UserInterest> userInterestTable = behaviorStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(30)))
.aggregate(
UserInterest::new,
(key, behavior, interest) -> interest.addBehavior(behavior),
Materialized.as("user-interest-store")
);
// 生成推荐
KStream<String, Recommendation> recommendationStream = userInterestTable
.toStream()
.map((windowedKey, interest) ->
KeyValue.pair(windowedKey.key(), generateRecommendation(interest))
);
// 输出推荐结果
recommendationStream.to("recommendations-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
private Recommendation generateRecommendation(UserInterest interest) {
// 基于用户兴趣生成推荐
List<String> recommendedItems = interest.getTopInterests(10);
return new Recommendation(interest.getUserId(), recommendedItems);
}
}
实践建议
- 状态管理:合理设计状态存储,避免状态过大
- 容错机制:启用检查点和故障恢复机制
- 性能优化:调整并行度、缓冲区大小等参数
- 监控告警:监控流处理作业的延迟、吞吐量和状态大小
- 数据一致性:确保端到端的exactly-once语义