← 返回首页
💼

数据平台架构:采集、存储、计算与可视化

📂 architecture ⏱ 3 min 427 words

数据平台架构:采集、存储、计算与可视化

数据平台架构概览

数据平台是企业数据资产的核心基础设施,需要处理海量数据的采集、存储、计算和分析。主要模块包括:数据采集、数据存储、数据计算、数据服务、数据可视化。

数据平台架构:

数据源:业务数据库、日志、埋点、第三方数据
采集层:
  - 实时采集:Kafka、Flink CDC、Canal
  - 批量采集:Sqoop、DataX、Airflow
存储层:
  - 数据湖:HDFS、S3、Delta Lake
  - 数据仓库:Hive、ClickHouse、StarRocks
  - 缓存层:Redis、Alluxio
计算层:
  - 批处理:Spark、Hive
  - 流处理:Flink、Spark Streaming
  - OLAP:ClickHouse、Presto、Trino
服务层:
  - 数据API:RESTful、GraphQL
  - 数据查询:SQL接口、BI工具
可视化层:报表、Dashboard、数据大屏

数据采集架构

数据采集是数据平台的第一步,需要保证数据的完整性和实时性。

# 数据采集服务
class DataCollectionService:
    def __init__(self):
        self.kafka_producer = KafkaProducer()
        self.batch_collector = BatchCollector()
    
    def collect_realtime(self, data_source):
        """实时数据采集"""
        # 1. 连接数据源
        connector = self.get_connector(data_source)
        
        # 2. 读取数据
        data_stream = connector.read_stream()
        
        # 3. 数据转换
        transformed = data_stream.map(self.transform_data)
        
        # 4. 发送到Kafka
        for record in transformed:
            self.kafka_producer.send(
                topic=data_source.get_topic(),
                key=record.get_key(),
                value=record.get_value()
            )
    
    def collect_batch(self, data_source, start_time, end_time):
        """批量数据采集"""
        # 1. 读取批量数据
        data = self.batch_collector.collect(
            source=data_source,
            start_time=start_time,
            end_time=end_time
        )
        
        # 2. 数据清洗
        cleaned_data = self.clean_data(data)
        
        # 3. 写入数据湖
        self.write_to_data_lake(cleaned_data, data_source)
    
    def get_connector(self, data_source):
        """获取数据源连接器"""
        connectors = {
            'mysql': MySQLConnector(),
            'postgresql': PostgreSQLConnector(),
            'mongodb': MongoDBConnector(),
            'elasticsearch': ESConnector(),
            'api': APIConnector(),
        }
        return connectors.get(data_source.get_type())

数据存储设计

数据存储需要根据数据特点选择合适的存储方案,支持不同的查询模式。

// 数据存储服务
@Service
public class DataStorageService {
    @Autowired
    private HDFSClient hdfsClient;
    @Autowired
    private ClickHouseClient clickhouseClient;
    @Autowired
    private S3Client s3Client;
    
    // 写入数据湖
    public void writeToDataLake(DataSet dataSet) {
        // 1. 根据数据量选择存储方案
        if (dataSet.getSize() > 1024 * 1024 * 1024) {
            // 大数据量写入HDFS
            writeToHDFS(dataSet);
        } else {
            // 小数据量写入S3
            writeToS3(dataSet);
        }
    }
    
    // 写入数据仓库
    public void writeToDataWarehouse(DataSet dataSet) {
        // 1. 数据格式转换(Parquet/ORC)
        DataSet formatted = convertToColumnarFormat(dataSet);
        
        // 2. 写入ClickHouse
        clickhouseClient.insert(
            dataSet.getTableName(),
            formatted.getColumns(),
            formatted.getRows()
        );
    }
    
    // 查询数据
    public DataSet queryData(String sql) {
        // 1. 解析SQL,确定查询范围
        QueryPlan plan = parseSQL(sql);
        
        // 2. 选择查询引擎
        if (plan.isOLAP()) {
            // OLAP查询走ClickHouse
            return clickhouseClient.query(sql);
        } else {
            // 简单查询走Presto
            return prestoClient.query(sql);
        }
    }
}

数据计算框架

数据计算包括批处理和流处理,需要根据场景选择合适的计算框架。

# 流处理服务(Flink)
class StreamProcessingService:
    def __init__(self):
        self.flink_env = StreamExecutionEnvironment.get_execution_environment()
    
    def process_realtime_metrics(self):
        """实时指标计算"""
        # 1. 从Kafka读取数据流
        data_stream = self.flink_env.add_source(
            FlinkKafkaConsumer(
                topics=['user_events'],
                deserialization_schema=JSONDeserializationSchema()
            )
        )
        
        # 2. 窗口聚合
        windowed_stream = data_stream \
            .key_by(lambda x: x['user_id']) \
            .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
            .aggregate(CountAggregator())
        
        # 3. 写入结果存储
        windowed_stream.add_sink(
            ClickHouseSink(table='realtime_metrics')
        )
        
        # 4. 启动执行
        self.flink_env.execute("Realtime Metrics Job")
    
    def process_user_behavior(self):
        """用户行为分析"""
        # 1. 读取用户行为日志
        behavior_stream = self.flink_env.add_source(
            FlinkKafkaConsumer(
                topics=['user_behavior'],
                deserialization_schema=BehaviorDeserializer()
            )
        )
        
        # 2. 实时计算用户会话
        session_stream = behavior_stream \
            .key_by(lambda x: x['user_id']) \
            .process(SessionWindowProcessFunction())
        
        # 3. 输出会话分析结果
        session_stream.add_sink(
            KafkaSink(topic='user_sessions')
        )

数据可视化

数据可视化是数据平台的展示层,需要支持多种图表类型和交互方式。

// 数据可视化服务
@Service
public class VisualizationService {
    @Autowired
    private ChartRepository chartRepo;
    @Autowired
    private DataQueryService queryService;
    
    // 获取图表数据
    public ChartData getChartData(String chartId, Map<String, Object> params) {
        // 1. 获取图表配置
        ChartConfig config = chartRepo.getConfig(chartId);
        
        // 2. 构建查询
        String sql = buildSQL(config, params);
        
        // 3. 执行查询
        DataSet dataSet = queryService.query(sql);
        
        // 4. 转换为图表数据
        return ChartData.builder()
            .type(config.getChartType())
            .labels(extractLabels(dataSet, config))
            .datasets(extractDatasets(dataSet, config))
            .build();
    }
    
    // 构建SQL
    private String buildSQL(ChartConfig config, Map<String, Object> params) {
        StringBuilder sql = new StringBuilder();
        
        sql.append("SELECT ");
        sql.append(String.join(", ", config.getMetrics()));
        sql.append(" FROM ");
        sql.append(config.getTableName());
        
        if (config.getGroupBy() != null) {
            sql.append(" GROUP BY ");
            sql.append(config.getGroupBy());
        }
        
        if (config.getOrderBy() != null) {
            sql.append(" ORDER BY ");
            sql.append(config.getOrderBy());
        }
        
        return sql.toString();
    }
}

数据质量保障

数据质量是数据平台的关键,需要从多个维度保障数据质量。

数据质量保障体系:

完整性:
  - 数据缺失检测
  - 字段完整性校验
  - 记录数对账

准确性:
  - 数据格式校验
  - 业务规则校验
  - 跨源数据一致性

时效性:
  - 数据延迟监控
  - SLA告警
  - 数据新鲜度评估

一致性:
  - 跨系统数据一致性
  - 历史数据版本管理
  - 数据血缘追踪

处理流程:
  1. 数据入库前校验
  2. 数据处理后校验
  3. 定期数据质量巡检
  4. 数据质量问题告警和修复