实时架构全链路设计
实时架构全链路设计
实时架构概述
实时架构是指能够快速响应事件和数据变化的系统架构。它广泛应用于实时监控、实时推荐、实时风控、实时分析等场景。
全链路架构
架构图
┌─────────────────────────────────────────────────────────────────┐
│ 数据采集层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ App埋点 │ │ 日志采集 │ │ 数据库CDC │ │ API接入 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据传输层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Kafka │ │ RocketMQ │ │ Pulsar │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据处理层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Flink │ │Spark流处理│ │Kafka流处理│ │ 自研引擎 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │ HBase │ │ClickHouse│ │Elasticsearch│ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ API网关 │ │ 业务服务 │ │ 推荐服务 │ │ 风控服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据展示层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 实时大屏 │ │ 监控告警 │ │ 报表分析 │ │ 用户界面 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
数据采集
埋点采集
// 埋点SDK
@Component
public class EventTracker {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void track(String eventName, Map<String, Object> properties) {
TrackEvent event = new TrackEvent();
event.setEventName(eventName);
event.setProperties(properties);
event.setTimestamp(System.currentTimeMillis());
event.setUserId(getUserId());
event.setDeviceId(getDeviceId());
event.setSessionId(getSessionId());
// 异步发送到Kafka
kafkaTemplate.send("track-events", objectMapper.writeValueAsString(event));
}
public void trackPageView(String pageName) {
Map<String, Object> properties = new HashMap<>();
properties.put("page_name", pageName);
properties.put("page_url", getCurrentPageUrl());
track("page_view", properties);
}
public void trackClick(String elementId, String elementName) {
Map<String, Object> properties = new HashMap<>();
properties.put("element_id", elementId);
properties.put("element_name", elementName);
track("click", properties);
}
}
// 埋点数据模型
@Data
public class TrackEvent {
private String eventName;
private Map<String, Object> properties;
private long timestamp;
private String userId;
private String deviceId;
private String sessionId;
}
数据库CDC
// Debezium CDC配置
@Configuration
public class CdcConfig {
@Bean
public Map<String, String> debeziumConfig() {
Map<String, String> config = new HashMap<>();
config.put("database.hostname", "mysql-host");
config.put("database.port", "3306");
config.put("database.user", "debezium");
config.put("database.password", "password");
config.put("database.server.id", "184054");
config.put("database.server.name", "dbserver1");
config.put("database.include.list", "inventory");
config.put("database.history.kafka.bootstrap.servers", "kafka:9092");
config.put("database.history.kafka.topic", "schema-changes.inventory");
return config;
}
}
// CDC事件处理器
@Component
public class CdcEventHandler {
private final KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "dbserver1.inventory.orders")
public void handleOrderChange(String message) {
CdcEvent event = objectMapper.readValue(message, CdcEvent.class);
switch (event.getOp()) {
case "c": // create
handleOrderCreated(event);
break;
case "u": // update
handleOrderUpdated(event);
break;
case "d": // delete
handleOrderDeleted(event);
break;
}
}
private void handleOrderCreated(CdcEvent event) {
OrderCreatedEvent orderEvent = new OrderCreatedEvent();
orderEvent.setOrderId(event.getAfter().get("id").toString());
orderEvent.setCustomerId(event.getAfter().get("customer_id").toString());
orderEvent.setAmount(new BigDecimal(event.getAfter().get("amount").toString()));
kafkaTemplate.send("order-events", objectMapper.writeValueAsString(orderEvent));
}
}
实时处理
Flink实时处理
// 实时统计作业
public class RealTimeStatisticsJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置检查点
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 从Kafka读取订单流
DataStream<OrderEvent> orderStream = env.addSource(
new FlinkKafkaConsumer<>(
"order-events",
new OrderEventSchema(),
getKafkaProperties()
)
);
// 实时统计每分钟订单数
DataStream<WindowStatistics> statistics = orderStream
.keyBy(OrderEvent::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderStatisticsAggregator());
// 输出到Redis
statistics.addSink(new RedisSink<>(getRedisConfig(), new StatisticsRedisMapper()));
// 输出到控制台
statistics.print();
env.execute("Real-time Statistics Job");
}
}
// 聚合器
public class OrderStatisticsAggregator implements AggregateFunction<OrderEvent, OrderAccumulator, WindowStatistics> {
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator();
}
@Override
public OrderAccumulator add(OrderEvent event, OrderAccumulator accumulator) {
accumulator.setCount(accumulator.getCount() + 1);
accumulator.setTotalAmount(accumulator.getTotalAmount().add(event.getAmount()));
accumulator.setRegion(event.getRegion());
accumulator.setWindowStart(event.getTimestamp());
return accumulator;
}
@Override
public WindowStatistics getResult(OrderAccumulator accumulator) {
return new WindowStatistics(
accumulator.getRegion(),
accumulator.getCount(),
accumulator.getTotalAmount(),
accumulator.getWindowStart()
);
}
@Override
public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
a.setCount(a.getCount() + b.getCount());
a.setTotalAmount(a.getTotalAmount().add(b.getTotalAmount()));
return a;
}
}
class OrderAccumulator {
private long count = 0;
private BigDecimal totalAmount = BigDecimal.ZERO;
private String region;
private long windowStart;
// getters and setters
}
实时风控
// 实时风控引擎
@Component
public class RealTimeRiskEngine {
private final RedisTemplate<String, String> redisTemplate;
private final RiskRuleRepository ruleRepository;
public RiskResult evaluate(RiskEvent event) {
// 1. 检查黑名单
if (isBlacklisted(event.getUserId())) {
return RiskResult.block("User is blacklisted");
}
// 2. 检查频率限制
if (exceedsFrequencyLimit(event)) {
return RiskResult.block("Frequency limit exceeded");
}
// 3. 检查金额限制
if (exceedsAmountLimit(event)) {
return RiskResult.review("Amount limit exceeded, needs review");
}
// 4. 检查地理位置异常
if (isGeographicallyAnomaly(event)) {
return RiskResult.review("Geographic anomaly detected");
}
// 5. 检查设备指纹
if (isSuspiciousDevice(event)) {
return RiskResult.review("Suspicious device detected");
}
return RiskResult.allow();
}
private boolean isBlacklisted(String userId) {
return redisTemplate.opsForSet().isMember("blacklist", userId);
}
private boolean exceedsFrequencyLimit(RiskEvent event) {
String key = "frequency:" + event.getUserId() + ":" + event.getEventType();
Long count = redisTemplate.opsForValue().increment(key);
if (count == 1) {
redisTemplate.expire(key, 1, TimeUnit.MINUTES);
}
return count > getFrequencyLimit(event.getEventType());
}
private boolean exceedsAmountLimit(RiskEvent event) {
if (event.getAmount() == null) {
return false;
}
String key = "amount:" + event.getUserId();
BigDecimal totalAmount = new BigDecimal(
redisTemplate.opsForValue().getOrDefault(key, "0")
);
totalAmount = totalAmount.add(event.getAmount());
redisTemplate.opsForValue().set(key, totalAmount.toString(), 1, TimeUnit.DAYS);
return totalAmount.compareTo(getDailyAmountLimit()) > 0;
}
private boolean isGeographicallyAnomaly(RiskEvent event) {
String lastLocation = redisTemplate.opsForValue().get("location:" + event.getUserId());
if (lastLocation == null) {
return false;
}
// 检查地理位置是否异常(如短时间内跨越很远距离)
return isLocationAnomaly(lastLocation, event.getLocation());
}
private boolean isSuspiciousDevice(RiskEvent event) {
String deviceFingerprint = event.getDeviceFingerprint();
String userId = event.getUserId();
// 检查设备是否被多个用户使用
Set<String> users = redisTemplate.opsForSet().members("device:" + deviceFingerprint);
return users != null && users.size() > 3;
}
}
// 风控结果
@Data
public class RiskResult {
private RiskLevel level;
private String reason;
private Map<String, Object> details;
public static RiskResult allow() {
return new RiskResult(RiskLevel.ALLOW, "Passed all checks", null);
}
public static RiskResult block(String reason) {
return new RiskResult(RiskLevel.BLOCK, reason, null);
}
public static RiskResult review(String reason) {
return new RiskResult(RiskLevel.REVIEW, reason, null);
}
}
enum RiskLevel {
ALLOW, BLOCK, REVIEW
}
实时存储
Redis缓存
// 实时数据缓存
@Component
public class RealTimeCache {
private final RedisTemplate<String, String> redisTemplate;
// 实时计数器
public void incrementCounter(String key, long delta) {
redisTemplate.opsForValue().increment(key, delta);
}
public Long getCounter(String key) {
String value = redisTemplate.opsForValue().get(key);
return value != null ? Long.parseLong(value) : 0L;
}
// 实时排行榜
public void addToLeaderboard(String boardName, String memberId, double score) {
redisTemplate.opsForZSet().add(boardName, memberId, score);
}
public Set<String> getTopMembers(String boardName, int count) {
return redisTemplate.opsForZSet().reverseRange(boardName, 0, count - 1);
}
// 实时统计
public void recordEvent(String eventType, String data) {
String key = "events:" + eventType + ":" + LocalDate.now();
redisTemplate.opsForList().rightPush(key, data);
redisTemplate.expire(key, 7, TimeUnit.DAYS);
}
public List<String> getEvents(String eventType, LocalDate date) {
String key = "events:" + eventType + ":" + date;
return redisTemplate.opsForList().range(key, 0, -1);
}
}
ClickHouse分析
// ClickHouse实时分析
@Component
public class ClickHouseAnalytics {
private final JdbcTemplate jdbcTemplate;
// 创建分析表
public void createAnalyticsTable() {
String sql = "CREATE TABLE IF NOT EXISTS order_analytics (" +
" order_id String," +
" user_id String," +
" product_id String," +
" amount Decimal(10,2)," +
" region String," +
" order_time DateTime," +
" event_date Date" +
") ENGINE = MergeTree()" +
"PARTITION BY toYYYYMM(event_date)" +
"ORDER BY (user_id, order_time)";
jdbcTemplate.execute(sql);
}
// 实时插入数据
public void insertOrder(OrderEvent event) {
String sql = "INSERT INTO order_analytics VALUES (?, ?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql,
event.getOrderId(),
event.getUserId(),
event.getProductId(),
event.getAmount(),
event.getRegion(),
event.getOrderTime(),
event.getOrderTime().toLocalDate()
);
}
// 实时查询统计
public List<RegionStatistics> getRegionStatistics(LocalDate date) {
String sql = "SELECT " +
" region," +
" COUNT(*) as order_count," +
" SUM(amount) as total_amount," +
" AVG(amount) as avg_amount " +
"FROM order_analytics " +
"WHERE event_date = ? " +
"GROUP BY region " +
"ORDER BY total_amount DESC";
return jdbcTemplate.query(sql, new Object[]{date}, (rs, rowNum) -> {
RegionStatistics stats = new RegionStatistics();
stats.setRegion(rs.getString("region"));
stats.setOrderCount(rs.getLong("order_count"));
stats.setTotalAmount(rs.getBigDecimal("total_amount"));
stats.setAvgAmount(rs.getBigDecimal("avg_amount"));
return stats;
});
}
// 实时用户行为分析
public List<UserBehavior> getUserBehavior(String userId, int days) {
String sql = "SELECT " +
" toDate(order_time) as event_date," +
" COUNT(*) as order_count," +
" SUM(amount) as total_amount " +
"FROM order_analytics " +
"WHERE user_id = ? " +
" AND order_time >= now() - INTERVAL ? DAY " +
"GROUP BY event_date " +
"ORDER BY event_date";
return jdbcTemplate.query(sql, new Object[]{userId, days}, (rs, rowNum) -> {
UserBehavior behavior = new UserBehavior();
behavior.setUserId(userId);
behavior.setDate(rs.getDate("event_date").toLocalDate());
behavior.setOrderCount(rs.getLong("order_count"));
behavior.setTotalAmount(rs.getBigDecimal("total_amount"));
return behavior;
});
}
}
高并发设计
限流策略
// 限流器实现
@Component
public class RateLimiter {
private final RedisTemplate<String, String> redisTemplate;
// 令牌桶算法
public boolean tryAcquire(String key, int maxTokens, int refillRate) {
String script =
"local tokens = redis.call('get', KEYS[1]) " +
"tokens = tokens and tonumber(tokens) or " + maxTokens + " " +
"if tokens > 0 then " +
" tokens = tokens - 1 " +
" redis.call('set', KEYS[1], tokens) " +
" redis.call('expire', KEYS[1], 1) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key)
);
return result != null && result == 1L;
}
// 滑动窗口算法
public boolean tryAcquireSlidingWindow(String key, int maxRequests, int windowSeconds) {
long now = System.currentTimeMillis();
long windowStart = now - (windowSeconds * 1000);
String script =
"redis.call('zremrangebyscore', KEYS[1], 0, " + windowStart + ") " +
"local count = redis.call('zcard', KEYS[1]) " +
"if count < " + maxRequests + " then " +
" redis.call('zadd', KEYS[1], " + now + ", " + now + ") " +
" redis.call('expire', KEYS[1], " + windowSeconds + ") " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key)
);
return result != null && result == 1L;
}
}
缓存策略
// 多级缓存
@Component
public class MultiLevelCache {
private final Cache<String, Object> localCache;
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
public <T> T get(String key, Class<T> type) {
// 1. 查本地缓存
Object localValue = localCache.getIfPresent(key);
if (localValue != null) {
return type.cast(localValue);
}
// 2. 查Redis缓存
String redisValue = redisTemplate.opsForValue().get(key);
if (redisValue != null) {
T value = objectMapper.readValue(redisValue, type);
localCache.put(key, value);
return value;
}
// 3. 查数据库
T dbValue = loadFromDatabase(key);
if (dbValue != null) {
// 写入Redis缓存
redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(dbValue));
// 写入本地缓存
localCache.put(key, dbValue);
}
return dbValue;
}
public void put(String key, Object value) {
// 写入本地缓存
localCache.put(key, value);
// 写入Redis缓存
redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(value));
}
public void invalidate(String key) {
// 清除本地缓存
localCache.invalidate(key);
// 清除Redis缓存
redisTemplate.delete(key);
}
}
监控告警
// 实时监控
@Component
public class RealTimeMonitor {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
public void recordMetric(String name, double value) {
meterRegistry.gauge(name, value);
// 检查是否需要告警
checkAlert(name, value);
}
public void incrementCounter(String name) {
meterRegistry.counter(name).increment();
}
public void recordHistogram(String name, double value) {
meterRegistry.timer(name).record(Duration.ofMillis((long) value));
}
private void checkAlert(String metricName, double value) {
AlertRule rule = alertService.getRule(metricName);
if (rule != null && value > rule.getThreshold()) {
Alert alert = new Alert();
alert.setMetricName(metricName);
alert.setCurrentValue(value);
alert.setThreshold(rule.getThreshold());
alert.setTimestamp(LocalDateTime.now());
alert.setLevel(rule.getLevel());
alertService.sendAlert(alert);
}
}
}
// 告警规则
@Data
public class AlertRule {
private String metricName;
private double threshold;
private AlertLevel level;
private String message;
}
enum AlertLevel {
INFO, WARNING, ERROR, CRITICAL
}
最佳实践
- 端到端延迟:优化全链路延迟,从数据采集到展示
- 数据一致性:确保实时数据的准确性和一致性
- 容错设计:处理各种故障场景,确保系统可用性
- 弹性伸缩:根据负载自动扩缩容
- 监控告警:建立完善的监控和告警体系
- 性能测试:定期进行性能测试和压测
- 数据安全:保护实时数据的安全性和隐私