← 返回首页
🌐

实时架构全链路设计

📂 architecture ⏱ 9 min 1677 words

实时架构全链路设计

实时架构概述

实时架构是指能够快速响应事件和数据变化的系统架构。它广泛应用于实时监控、实时推荐、实时风控、实时分析等场景。

全链路架构

架构图

┌─────────────────────────────────────────────────────────────────┐
│                        数据采集层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  App埋点  │  │  日志采集  │  │ 数据库CDC │  │  API接入  │       │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        数据传输层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                     │
│  │   Kafka   │  │  RocketMQ │  │   Pulsar │                     │
│  └──────────┘  └──────────┘  └──────────┘                     │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        数据处理层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  Flink    │  │Spark流处理│  │Kafka流处理│  │ 自研引擎  │       │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        数据存储层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  Redis    │  │  HBase   │  │ClickHouse│  │Elasticsearch│     │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        应用服务层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  API网关  │  │  业务服务  │  │  推荐服务 │  │  风控服务  │       │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                        数据展示层                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  实时大屏 │  │  监控告警  │  │  报表分析 │  │  用户界面  │       │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────────────────────┘

数据采集

埋点采集

// 埋点SDK
@Component
public class EventTracker {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    public void track(String eventName, Map<String, Object> properties) {
        TrackEvent event = new TrackEvent();
        event.setEventName(eventName);
        event.setProperties(properties);
        event.setTimestamp(System.currentTimeMillis());
        event.setUserId(getUserId());
        event.setDeviceId(getDeviceId());
        event.setSessionId(getSessionId());
        
        // 异步发送到Kafka
        kafkaTemplate.send("track-events", objectMapper.writeValueAsString(event));
    }
    
    public void trackPageView(String pageName) {
        Map<String, Object> properties = new HashMap<>();
        properties.put("page_name", pageName);
        properties.put("page_url", getCurrentPageUrl());
        track("page_view", properties);
    }
    
    public void trackClick(String elementId, String elementName) {
        Map<String, Object> properties = new HashMap<>();
        properties.put("element_id", elementId);
        properties.put("element_name", elementName);
        track("click", properties);
    }
}

// 埋点数据模型
@Data
public class TrackEvent {
    private String eventName;
    private Map<String, Object> properties;
    private long timestamp;
    private String userId;
    private String deviceId;
    private String sessionId;
}

数据库CDC

// Debezium CDC配置
@Configuration
public class CdcConfig {
    
    @Bean
    public Map<String, String> debeziumConfig() {
        Map<String, String> config = new HashMap<>();
        config.put("database.hostname", "mysql-host");
        config.put("database.port", "3306");
        config.put("database.user", "debezium");
        config.put("database.password", "password");
        config.put("database.server.id", "184054");
        config.put("database.server.name", "dbserver1");
        config.put("database.include.list", "inventory");
        config.put("database.history.kafka.bootstrap.servers", "kafka:9092");
        config.put("database.history.kafka.topic", "schema-changes.inventory");
        return config;
    }
}

// CDC事件处理器
@Component
public class CdcEventHandler {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @KafkaListener(topics = "dbserver1.inventory.orders")
    public void handleOrderChange(String message) {
        CdcEvent event = objectMapper.readValue(message, CdcEvent.class);
        
        switch (event.getOp()) {
            case "c": // create
                handleOrderCreated(event);
                break;
            case "u": // update
                handleOrderUpdated(event);
                break;
            case "d": // delete
                handleOrderDeleted(event);
                break;
        }
    }
    
    private void handleOrderCreated(CdcEvent event) {
        OrderCreatedEvent orderEvent = new OrderCreatedEvent();
        orderEvent.setOrderId(event.getAfter().get("id").toString());
        orderEvent.setCustomerId(event.getAfter().get("customer_id").toString());
        orderEvent.setAmount(new BigDecimal(event.getAfter().get("amount").toString()));
        
        kafkaTemplate.send("order-events", objectMapper.writeValueAsString(orderEvent));
    }
}

实时处理

Flink实时处理

// 实时统计作业
public class RealTimeStatisticsJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置检查点
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 从Kafka读取订单流
        DataStream<OrderEvent> orderStream = env.addSource(
            new FlinkKafkaConsumer<>(
                "order-events",
                new OrderEventSchema(),
                getKafkaProperties()
            )
        );
        
        // 实时统计每分钟订单数
        DataStream<WindowStatistics> statistics = orderStream
            .keyBy(OrderEvent::getRegion)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new OrderStatisticsAggregator());
        
        // 输出到Redis
        statistics.addSink(new RedisSink<>(getRedisConfig(), new StatisticsRedisMapper()));
        
        // 输出到控制台
        statistics.print();
        
        env.execute("Real-time Statistics Job");
    }
}

// 聚合器
public class OrderStatisticsAggregator implements AggregateFunction<OrderEvent, OrderAccumulator, WindowStatistics> {
    
    @Override
    public OrderAccumulator createAccumulator() {
        return new OrderAccumulator();
    }
    
    @Override
    public OrderAccumulator add(OrderEvent event, OrderAccumulator accumulator) {
        accumulator.setCount(accumulator.getCount() + 1);
        accumulator.setTotalAmount(accumulator.getTotalAmount().add(event.getAmount()));
        accumulator.setRegion(event.getRegion());
        accumulator.setWindowStart(event.getTimestamp());
        return accumulator;
    }
    
    @Override
    public WindowStatistics getResult(OrderAccumulator accumulator) {
        return new WindowStatistics(
            accumulator.getRegion(),
            accumulator.getCount(),
            accumulator.getTotalAmount(),
            accumulator.getWindowStart()
        );
    }
    
    @Override
    public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
        a.setCount(a.getCount() + b.getCount());
        a.setTotalAmount(a.getTotalAmount().add(b.getTotalAmount()));
        return a;
    }
}

class OrderAccumulator {
    private long count = 0;
    private BigDecimal totalAmount = BigDecimal.ZERO;
    private String region;
    private long windowStart;
    
    // getters and setters
}

实时风控

// 实时风控引擎
@Component
public class RealTimeRiskEngine {
    
    private final RedisTemplate<String, String> redisTemplate;
    private final RiskRuleRepository ruleRepository;
    
    public RiskResult evaluate(RiskEvent event) {
        // 1. 检查黑名单
        if (isBlacklisted(event.getUserId())) {
            return RiskResult.block("User is blacklisted");
        }
        
        // 2. 检查频率限制
        if (exceedsFrequencyLimit(event)) {
            return RiskResult.block("Frequency limit exceeded");
        }
        
        // 3. 检查金额限制
        if (exceedsAmountLimit(event)) {
            return RiskResult.review("Amount limit exceeded, needs review");
        }
        
        // 4. 检查地理位置异常
        if (isGeographicallyAnomaly(event)) {
            return RiskResult.review("Geographic anomaly detected");
        }
        
        // 5. 检查设备指纹
        if (isSuspiciousDevice(event)) {
            return RiskResult.review("Suspicious device detected");
        }
        
        return RiskResult.allow();
    }
    
    private boolean isBlacklisted(String userId) {
        return redisTemplate.opsForSet().isMember("blacklist", userId);
    }
    
    private boolean exceedsFrequencyLimit(RiskEvent event) {
        String key = "frequency:" + event.getUserId() + ":" + event.getEventType();
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            redisTemplate.expire(key, 1, TimeUnit.MINUTES);
        }
        
        return count > getFrequencyLimit(event.getEventType());
    }
    
    private boolean exceedsAmountLimit(RiskEvent event) {
        if (event.getAmount() == null) {
            return false;
        }
        
        String key = "amount:" + event.getUserId();
        BigDecimal totalAmount = new BigDecimal(
            redisTemplate.opsForValue().getOrDefault(key, "0")
        );
        
        totalAmount = totalAmount.add(event.getAmount());
        redisTemplate.opsForValue().set(key, totalAmount.toString(), 1, TimeUnit.DAYS);
        
        return totalAmount.compareTo(getDailyAmountLimit()) > 0;
    }
    
    private boolean isGeographicallyAnomaly(RiskEvent event) {
        String lastLocation = redisTemplate.opsForValue().get("location:" + event.getUserId());
        
        if (lastLocation == null) {
            return false;
        }
        
        // 检查地理位置是否异常(如短时间内跨越很远距离)
        return isLocationAnomaly(lastLocation, event.getLocation());
    }
    
    private boolean isSuspiciousDevice(RiskEvent event) {
        String deviceFingerprint = event.getDeviceFingerprint();
        String userId = event.getUserId();
        
        // 检查设备是否被多个用户使用
        Set<String> users = redisTemplate.opsForSet().members("device:" + deviceFingerprint);
        return users != null && users.size() > 3;
    }
}

// 风控结果
@Data
public class RiskResult {
    private RiskLevel level;
    private String reason;
    private Map<String, Object> details;
    
    public static RiskResult allow() {
        return new RiskResult(RiskLevel.ALLOW, "Passed all checks", null);
    }
    
    public static RiskResult block(String reason) {
        return new RiskResult(RiskLevel.BLOCK, reason, null);
    }
    
    public static RiskResult review(String reason) {
        return new RiskResult(RiskLevel.REVIEW, reason, null);
    }
}

enum RiskLevel {
    ALLOW, BLOCK, REVIEW
}

实时存储

Redis缓存

// 实时数据缓存
@Component
public class RealTimeCache {
    
    private final RedisTemplate<String, String> redisTemplate;
    
    // 实时计数器
    public void incrementCounter(String key, long delta) {
        redisTemplate.opsForValue().increment(key, delta);
    }
    
    public Long getCounter(String key) {
        String value = redisTemplate.opsForValue().get(key);
        return value != null ? Long.parseLong(value) : 0L;
    }
    
    // 实时排行榜
    public void addToLeaderboard(String boardName, String memberId, double score) {
        redisTemplate.opsForZSet().add(boardName, memberId, score);
    }
    
    public Set<String> getTopMembers(String boardName, int count) {
        return redisTemplate.opsForZSet().reverseRange(boardName, 0, count - 1);
    }
    
    // 实时统计
    public void recordEvent(String eventType, String data) {
        String key = "events:" + eventType + ":" + LocalDate.now();
        redisTemplate.opsForList().rightPush(key, data);
        redisTemplate.expire(key, 7, TimeUnit.DAYS);
    }
    
    public List<String> getEvents(String eventType, LocalDate date) {
        String key = "events:" + eventType + ":" + date;
        return redisTemplate.opsForList().range(key, 0, -1);
    }
}

ClickHouse分析

// ClickHouse实时分析
@Component
public class ClickHouseAnalytics {
    
    private final JdbcTemplate jdbcTemplate;
    
    // 创建分析表
    public void createAnalyticsTable() {
        String sql = "CREATE TABLE IF NOT EXISTS order_analytics (" +
                     "  order_id String," +
                     "  user_id String," +
                     "  product_id String," +
                     "  amount Decimal(10,2)," +
                     "  region String," +
                     "  order_time DateTime," +
                     "  event_date Date" +
                     ") ENGINE = MergeTree()" +
                     "PARTITION BY toYYYYMM(event_date)" +
                     "ORDER BY (user_id, order_time)";
        
        jdbcTemplate.execute(sql);
    }
    
    // 实时插入数据
    public void insertOrder(OrderEvent event) {
        String sql = "INSERT INTO order_analytics VALUES (?, ?, ?, ?, ?, ?, ?)";
        
        jdbcTemplate.update(sql,
            event.getOrderId(),
            event.getUserId(),
            event.getProductId(),
            event.getAmount(),
            event.getRegion(),
            event.getOrderTime(),
            event.getOrderTime().toLocalDate()
        );
    }
    
    // 实时查询统计
    public List<RegionStatistics> getRegionStatistics(LocalDate date) {
        String sql = "SELECT " +
                     "  region," +
                     "  COUNT(*) as order_count," +
                     "  SUM(amount) as total_amount," +
                     "  AVG(amount) as avg_amount " +
                     "FROM order_analytics " +
                     "WHERE event_date = ? " +
                     "GROUP BY region " +
                     "ORDER BY total_amount DESC";
        
        return jdbcTemplate.query(sql, new Object[]{date}, (rs, rowNum) -> {
            RegionStatistics stats = new RegionStatistics();
            stats.setRegion(rs.getString("region"));
            stats.setOrderCount(rs.getLong("order_count"));
            stats.setTotalAmount(rs.getBigDecimal("total_amount"));
            stats.setAvgAmount(rs.getBigDecimal("avg_amount"));
            return stats;
        });
    }
    
    // 实时用户行为分析
    public List<UserBehavior> getUserBehavior(String userId, int days) {
        String sql = "SELECT " +
                     "  toDate(order_time) as event_date," +
                     "  COUNT(*) as order_count," +
                     "  SUM(amount) as total_amount " +
                     "FROM order_analytics " +
                     "WHERE user_id = ? " +
                     "  AND order_time >= now() - INTERVAL ? DAY " +
                     "GROUP BY event_date " +
                     "ORDER BY event_date";
        
        return jdbcTemplate.query(sql, new Object[]{userId, days}, (rs, rowNum) -> {
            UserBehavior behavior = new UserBehavior();
            behavior.setUserId(userId);
            behavior.setDate(rs.getDate("event_date").toLocalDate());
            behavior.setOrderCount(rs.getLong("order_count"));
            behavior.setTotalAmount(rs.getBigDecimal("total_amount"));
            return behavior;
        });
    }
}

高并发设计

限流策略

// 限流器实现
@Component
public class RateLimiter {
    
    private final RedisTemplate<String, String> redisTemplate;
    
    // 令牌桶算法
    public boolean tryAcquire(String key, int maxTokens, int refillRate) {
        String script = 
            "local tokens = redis.call('get', KEYS[1]) " +
            "tokens = tokens and tonumber(tokens) or " + maxTokens + " " +
            "if tokens > 0 then " +
            "  tokens = tokens - 1 " +
            "  redis.call('set', KEYS[1], tokens) " +
            "  redis.call('expire', KEYS[1], 1) " +
            "  return 1 " +
            "else " +
            "  return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key)
        );
        
        return result != null && result == 1L;
    }
    
    // 滑动窗口算法
    public boolean tryAcquireSlidingWindow(String key, int maxRequests, int windowSeconds) {
        long now = System.currentTimeMillis();
        long windowStart = now - (windowSeconds * 1000);
        
        String script = 
            "redis.call('zremrangebyscore', KEYS[1], 0, " + windowStart + ") " +
            "local count = redis.call('zcard', KEYS[1]) " +
            "if count < " + maxRequests + " then " +
            "  redis.call('zadd', KEYS[1], " + now + ", " + now + ") " +
            "  redis.call('expire', KEYS[1], " + windowSeconds + ") " +
            "  return 1 " +
            "else " +
            "  return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key)
        );
        
        return result != null && result == 1L;
    }
}

缓存策略

// 多级缓存
@Component
public class MultiLevelCache {
    
    private final Cache<String, Object> localCache;
    private final RedisTemplate<String, String> redisTemplate;
    private final ObjectMapper objectMapper;
    
    public <T> T get(String key, Class<T> type) {
        // 1. 查本地缓存
        Object localValue = localCache.getIfPresent(key);
        if (localValue != null) {
            return type.cast(localValue);
        }
        
        // 2. 查Redis缓存
        String redisValue = redisTemplate.opsForValue().get(key);
        if (redisValue != null) {
            T value = objectMapper.readValue(redisValue, type);
            localCache.put(key, value);
            return value;
        }
        
        // 3. 查数据库
        T dbValue = loadFromDatabase(key);
        if (dbValue != null) {
            // 写入Redis缓存
            redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(dbValue));
            // 写入本地缓存
            localCache.put(key, dbValue);
        }
        
        return dbValue;
    }
    
    public void put(String key, Object value) {
        // 写入本地缓存
        localCache.put(key, value);
        // 写入Redis缓存
        redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(value));
    }
    
    public void invalidate(String key) {
        // 清除本地缓存
        localCache.invalidate(key);
        // 清除Redis缓存
        redisTemplate.delete(key);
    }
}

监控告警

// 实时监控
@Component
public class RealTimeMonitor {
    
    private final MeterRegistry meterRegistry;
    private final AlertService alertService;
    
    public void recordMetric(String name, double value) {
        meterRegistry.gauge(name, value);
        
        // 检查是否需要告警
        checkAlert(name, value);
    }
    
    public void incrementCounter(String name) {
        meterRegistry.counter(name).increment();
    }
    
    public void recordHistogram(String name, double value) {
        meterRegistry.timer(name).record(Duration.ofMillis((long) value));
    }
    
    private void checkAlert(String metricName, double value) {
        AlertRule rule = alertService.getRule(metricName);
        
        if (rule != null && value > rule.getThreshold()) {
            Alert alert = new Alert();
            alert.setMetricName(metricName);
            alert.setCurrentValue(value);
            alert.setThreshold(rule.getThreshold());
            alert.setTimestamp(LocalDateTime.now());
            alert.setLevel(rule.getLevel());
            
            alertService.sendAlert(alert);
        }
    }
}

// 告警规则
@Data
public class AlertRule {
    private String metricName;
    private double threshold;
    private AlertLevel level;
    private String message;
}

enum AlertLevel {
    INFO, WARNING, ERROR, CRITICAL
}

最佳实践

  1. 端到端延迟:优化全链路延迟,从数据采集到展示
  2. 数据一致性:确保实时数据的准确性和一致性
  3. 容错设计:处理各种故障场景,确保系统可用性
  4. 弹性伸缩:根据负载自动扩缩容
  5. 监控告警:建立完善的监控和告警体系
  6. 性能测试:定期进行性能测试和压测
  7. 数据安全:保护实时数据的安全性和隐私