数据平台架构:采集、存储、计算与可视化
数据平台架构:采集、存储、计算与可视化
数据平台架构概览
数据平台是企业数据资产的核心基础设施,需要处理海量数据的采集、存储、计算和分析。主要模块包括:数据采集、数据存储、数据计算、数据服务、数据可视化。
数据平台架构:
数据源:业务数据库、日志、埋点、第三方数据
采集层:
- 实时采集:Kafka、Flink CDC、Canal
- 批量采集:Sqoop、DataX、Airflow
存储层:
- 数据湖:HDFS、S3、Delta Lake
- 数据仓库:Hive、ClickHouse、StarRocks
- 缓存层:Redis、Alluxio
计算层:
- 批处理:Spark、Hive
- 流处理:Flink、Spark Streaming
- OLAP:ClickHouse、Presto、Trino
服务层:
- 数据API:RESTful、GraphQL
- 数据查询:SQL接口、BI工具
可视化层:报表、Dashboard、数据大屏
数据采集架构
数据采集是数据平台的第一步,需要保证数据的完整性和实时性。
# 数据采集服务
class DataCollectionService:
def __init__(self):
self.kafka_producer = KafkaProducer()
self.batch_collector = BatchCollector()
def collect_realtime(self, data_source):
"""实时数据采集"""
# 1. 连接数据源
connector = self.get_connector(data_source)
# 2. 读取数据
data_stream = connector.read_stream()
# 3. 数据转换
transformed = data_stream.map(self.transform_data)
# 4. 发送到Kafka
for record in transformed:
self.kafka_producer.send(
topic=data_source.get_topic(),
key=record.get_key(),
value=record.get_value()
)
def collect_batch(self, data_source, start_time, end_time):
"""批量数据采集"""
# 1. 读取批量数据
data = self.batch_collector.collect(
source=data_source,
start_time=start_time,
end_time=end_time
)
# 2. 数据清洗
cleaned_data = self.clean_data(data)
# 3. 写入数据湖
self.write_to_data_lake(cleaned_data, data_source)
def get_connector(self, data_source):
"""获取数据源连接器"""
connectors = {
'mysql': MySQLConnector(),
'postgresql': PostgreSQLConnector(),
'mongodb': MongoDBConnector(),
'elasticsearch': ESConnector(),
'api': APIConnector(),
}
return connectors.get(data_source.get_type())
数据存储设计
数据存储需要根据数据特点选择合适的存储方案,支持不同的查询模式。
// 数据存储服务
@Service
public class DataStorageService {
@Autowired
private HDFSClient hdfsClient;
@Autowired
private ClickHouseClient clickhouseClient;
@Autowired
private S3Client s3Client;
// 写入数据湖
public void writeToDataLake(DataSet dataSet) {
// 1. 根据数据量选择存储方案
if (dataSet.getSize() > 1024 * 1024 * 1024) {
// 大数据量写入HDFS
writeToHDFS(dataSet);
} else {
// 小数据量写入S3
writeToS3(dataSet);
}
}
// 写入数据仓库
public void writeToDataWarehouse(DataSet dataSet) {
// 1. 数据格式转换(Parquet/ORC)
DataSet formatted = convertToColumnarFormat(dataSet);
// 2. 写入ClickHouse
clickhouseClient.insert(
dataSet.getTableName(),
formatted.getColumns(),
formatted.getRows()
);
}
// 查询数据
public DataSet queryData(String sql) {
// 1. 解析SQL,确定查询范围
QueryPlan plan = parseSQL(sql);
// 2. 选择查询引擎
if (plan.isOLAP()) {
// OLAP查询走ClickHouse
return clickhouseClient.query(sql);
} else {
// 简单查询走Presto
return prestoClient.query(sql);
}
}
}
数据计算框架
数据计算包括批处理和流处理,需要根据场景选择合适的计算框架。
# 流处理服务(Flink)
class StreamProcessingService:
def __init__(self):
self.flink_env = StreamExecutionEnvironment.get_execution_environment()
def process_realtime_metrics(self):
"""实时指标计算"""
# 1. 从Kafka读取数据流
data_stream = self.flink_env.add_source(
FlinkKafkaConsumer(
topics=['user_events'],
deserialization_schema=JSONDeserializationSchema()
)
)
# 2. 窗口聚合
windowed_stream = data_stream \
.key_by(lambda x: x['user_id']) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.aggregate(CountAggregator())
# 3. 写入结果存储
windowed_stream.add_sink(
ClickHouseSink(table='realtime_metrics')
)
# 4. 启动执行
self.flink_env.execute("Realtime Metrics Job")
def process_user_behavior(self):
"""用户行为分析"""
# 1. 读取用户行为日志
behavior_stream = self.flink_env.add_source(
FlinkKafkaConsumer(
topics=['user_behavior'],
deserialization_schema=BehaviorDeserializer()
)
)
# 2. 实时计算用户会话
session_stream = behavior_stream \
.key_by(lambda x: x['user_id']) \
.process(SessionWindowProcessFunction())
# 3. 输出会话分析结果
session_stream.add_sink(
KafkaSink(topic='user_sessions')
)
数据可视化
数据可视化是数据平台的展示层,需要支持多种图表类型和交互方式。
// 数据可视化服务
@Service
public class VisualizationService {
@Autowired
private ChartRepository chartRepo;
@Autowired
private DataQueryService queryService;
// 获取图表数据
public ChartData getChartData(String chartId, Map<String, Object> params) {
// 1. 获取图表配置
ChartConfig config = chartRepo.getConfig(chartId);
// 2. 构建查询
String sql = buildSQL(config, params);
// 3. 执行查询
DataSet dataSet = queryService.query(sql);
// 4. 转换为图表数据
return ChartData.builder()
.type(config.getChartType())
.labels(extractLabels(dataSet, config))
.datasets(extractDatasets(dataSet, config))
.build();
}
// 构建SQL
private String buildSQL(ChartConfig config, Map<String, Object> params) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT ");
sql.append(String.join(", ", config.getMetrics()));
sql.append(" FROM ");
sql.append(config.getTableName());
if (config.getGroupBy() != null) {
sql.append(" GROUP BY ");
sql.append(config.getGroupBy());
}
if (config.getOrderBy() != null) {
sql.append(" ORDER BY ");
sql.append(config.getOrderBy());
}
return sql.toString();
}
}
数据质量保障
数据质量是数据平台的关键,需要从多个维度保障数据质量。
数据质量保障体系:
完整性:
- 数据缺失检测
- 字段完整性校验
- 记录数对账
准确性:
- 数据格式校验
- 业务规则校验
- 跨源数据一致性
时效性:
- 数据延迟监控
- SLA告警
- 数据新鲜度评估
一致性:
- 跨系统数据一致性
- 历史数据版本管理
- 数据血缘追踪
处理流程:
1. 数据入库前校验
2. 数据处理后校验
3. 定期数据质量巡检
4. 数据质量问题告警和修复