数据管道ETL/ELT架构
数据管道ETL/ELT架构
数据管道概述
数据管道是将数据从源系统移动到目标系统的自动化流程。它包括数据抽取、转换、加载(ETL)或抽取、加载、转换(ELT)等步骤。
ETL架构
ETL(Extract, Transform, Load)是传统的数据集成模式,数据在加载到目标系统之前进行转换。
架构图
┌─────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 数据源 │ -> │ 抽取(Extract)│ -> │ 转换(Transform)│ -> │ 加载(Load) │
└─────────┘ └─────────────┘ └─────────────┘ └─────────────┘
数据库 数据清洗 数据转换 数据仓库
文件 数据验证 数据聚合 数据湖
API 格式转换 数据 enrichment 数据集市
ETL实现
// ETL作业框架
@Component
public class EtlJob {
private final Extractor extractor;
private final Transformer transformer;
private final Loader loader;
public void execute(EtlJobConfig config) {
try {
// 1. 抽取阶段
List<RawRecord> rawRecords = extractor.extract(config.getSource());
log.info("Extracted {} records", rawRecords.size());
// 2. 转换阶段
List<TransformedRecord> transformedRecords = transformer.transform(rawRecords);
log.info("Transformed {} records", transformedRecords.size());
// 3. 加载阶段
loader.load(transformedRecords, config.getTarget());
log.info("Loaded {} records", transformedRecords.size());
} catch (Exception e) {
log.error("ETL job failed", e);
throw new EtlJobException("ETL job failed", e);
}
}
}
// 抽取器接口
public interface Extractor {
List<RawRecord> extract(DataSource source);
}
// 数据库抽取器
@Component
public class DatabaseExtractor implements Extractor {
private final JdbcTemplate jdbcTemplate;
@Override
public List<RawRecord> extract(DataSource source) {
DatabaseSource dbSource = (DatabaseSource) source;
String sql = dbSource.getQuery();
return jdbcTemplate.query(sql, (rs, rowNum) -> {
RawRecord record = new RawRecord();
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
record.put(rs.getMetaData().getColumnName(i), rs.getObject(i));
}
return record;
});
}
}
// 文件抽取器
@Component
public class FileExtractor implements Extractor {
@Override
public List<RawRecord> extract(DataSource source) {
FileSource fileSource = (FileSource) source;
List<RawRecord> records = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(fileSource.getPath()))) {
String line;
boolean isHeader = true;
while ((line = reader.readLine()) != null) {
if (isHeader) {
isHeader = false;
continue;
}
String[] fields = line.split(",");
RawRecord record = new RawRecord();
for (int i = 0; i < fields.length; i++) {
record.put(fileSource.getColumns().get(i), fields[i]);
}
records.add(record);
}
} catch (IOException e) {
throw new ExtractorException("Failed to read file", e);
}
return records;
}
}
// 转换器接口
public interface Transformer {
List<TransformedRecord> transform(List<RawRecord> rawRecords);
}
// 数据转换器
@Component
public class DataTransformer implements Transformer {
@Override
public List<TransformedRecord> transform(List<RawRecord> rawRecords) {
return rawRecords.stream()
.map(this::transformRecord)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private TransformedRecord transformRecord(RawRecord rawRecord) {
try {
TransformedRecord record = new TransformedRecord();
// 数据清洗
record.setUserId(cleanUserId(rawRecord.get("user_id")));
record.setUserName(cleanUserName(rawRecord.get("user_name")));
record.setEmail(cleanEmail(rawRecord.get("email")));
// 数据类型转换
record.setAmount(convertToBigDecimal(rawRecord.get("amount")));
record.setOrderDate(convertToLocalDateTime(rawRecord.get("order_date")));
// 数据验证
if (!isValid(record)) {
log.warn("Invalid record: {}", rawRecord);
return null;
}
return record;
} catch (Exception e) {
log.error("Failed to transform record", e);
return null;
}
}
private String cleanUserId(Object value) {
if (value == null) return null;
return value.toString().trim();
}
private String cleanUserName(Object value) {
if (value == null) return null;
return value.toString().trim().replaceAll("\\s+", " ");
}
private String cleanEmail(Object value) {
if (value == null) return null;
return value.toString().trim().toLowerCase();
}
private BigDecimal convertToBigDecimal(Object value) {
if (value == null) return BigDecimal.ZERO;
return new BigDecimal(value.toString());
}
private LocalDateTime convertToLocalDateTime(Object value) {
if (value == null) return null;
if (value instanceof LocalDateTime) {
return (LocalDateTime) value;
}
return LocalDateTime.parse(value.toString());
}
private boolean isValid(TransformedRecord record) {
return record.getUserId() != null && !record.getUserId().isEmpty()
&& record.getEmail() != null && record.getEmail().contains("@");
}
}
// 加载器接口
public interface Loader {
void load(List<TransformedRecord> records, DataTarget target);
}
// 数据库加载器
@Component
public class DatabaseLoader implements Loader {
private final JdbcTemplate jdbcTemplate;
@Override
public void load(List<TransformedRecord> records, DataTarget target) {
DatabaseTarget dbTarget = (DatabaseTarget) target;
String tableName = dbTarget.getTableName();
String sql = "INSERT INTO " + tableName +
" (user_id, user_name, email, amount, order_date) " +
"VALUES (?, ?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TransformedRecord record = records.get(i);
ps.setString(1, record.getUserId());
ps.setString(2, record.getUserName());
ps.setString(3, record.getEmail());
ps.setBigDecimal(4, record.getAmount());
ps.setTimestamp(5, Timestamp.valueOf(record.getOrderDate()));
}
@Override
public int getBatchSize() {
return records.size();
}
});
}
}
ELT架构
ELT(Extract, Load, Load)是现代数据集成模式,数据先加载到目标系统,然后在目标系统中进行转换。
架构图
┌─────────┐ ┌─────────────┐ ┌─────────────┐
│ 数据源 │ -> │ 抽取(Extract)│ -> │ 加载(Load) │
└─────────┘ └─────────────┘ └─────────────┘
数据库 数据清洗 数据湖/数据仓库
文件 数据验证 (Snowflake, BigQuery, Redshift)
API 格式转换
┌─────────────────────┐
│ 转换(Transform) │
│ - 数据清洗 │
│ - 数据聚合 │
│ - 数据建模 │
└─────────────────────┘
ELT实现
// ELT作业框架
@Component
public class EltJob {
private final Extractor extractor;
private final Loader loader;
private final DataWarehouseTransformer dwTransformer;
public void execute(EltJobConfig config) {
try {
// 1. 抽取阶段
List<RawRecord> rawRecords = extractor.extract(config.getSource());
log.info("Extracted {} records", rawRecords.size());
// 2. 加载阶段(到数据湖)
String stagingPath = loadToStaging(rawRecords, config.getTarget());
log.info("Loaded {} records to staging", rawRecords.size());
// 3. 转换阶段(在数据仓库中)
dwTransformer.transform(stagingPath, config.getTarget());
log.info("Transformed data in data warehouse");
} catch (Exception e) {
log.error("ELT job failed", e);
throw new EltJobException("ELT job failed", e);
}
}
private String loadToStaging(List<RawRecord> records, DataTarget target) {
// 加载到暂存区(S3、GCS等)
String stagingPath = generateStagingPath(target);
// 转换为Parquet格式
try (ParquetWriter<GenericRecord> writer = createParquetWriter(stagingPath)) {
Schema schema = createSchema(records.get(0));
for (RawRecord record : records) {
GenericRecord genericRecord = convertToGenericRecord(record, schema);
writer.write(genericRecord);
}
} catch (IOException e) {
throw new LoaderException("Failed to load to staging", e);
}
return stagingPath;
}
}
// 数据仓库转换器
@Component
public class DataWarehouseTransformer {
private final JdbcTemplate jdbcTemplate;
public void transform(String stagingPath, DataTarget target) {
// 创建暂存表
createStagingTable(stagingPath);
// 执行数据转换SQL
String transformSql = generateTransformSql();
jdbcTemplate.execute(transformSql);
// 清理暂存表
dropStagingTable();
}
private void createStagingTable(String stagingPath) {
String sql = "CREATE TABLE staging_data AS " +
"SELECT * FROM parquet_scan('" + stagingPath + "')";
jdbcTemplate.execute(sql);
}
private String generateTransformSql() {
return "INSERT INTO target_table " +
"SELECT " +
" user_id," +
" user_name," +
" email," +
" SUM(amount) as total_amount," +
" COUNT(*) as order_count," +
" DATE_TRUNC('day', order_date) as order_date " +
"FROM staging_data " +
"WHERE email IS NOT NULL " +
"GROUP BY user_id, user_name, email, DATE_TRUNC('day', order_date)";
}
private void dropStagingTable() {
jdbcTemplate.execute("DROP TABLE IF EXISTS staging_data");
}
}
使用Apache Airflow编排
DAG定义
# airflow_dags/etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email': ['data-team@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='ETL pipeline for order data',
schedule_interval='@daily',
catchup=False,
)
def extract_data(**context):
"""从源系统抽取数据"""
import psycopg2
import json
# 连接源数据库
conn = psycopg2.connect(
host='source-db-host',
database='source_db',
user='user',
password='password'
)
# 执行查询
cursor = conn.cursor()
cursor.execute("SELECT * FROM orders WHERE date = %s", (context['ds'],))
# 获取数据
columns = [desc[0] for desc in cursor.description]
records = cursor.fetchall()
# 保存到文件
data = [dict(zip(columns, record)) for record in records]
with open(f'/tmp/orders_{context["ds"]}.json', 'w') as f:
json.dump(data, f)
conn.close()
return f'/tmp/orders_{context["ds"]}.json'
def transform_data(**context):
"""转换数据"""
import json
import pandas as pd
# 读取数据
with open(f'/tmp/orders_{context["ds"]}.json', 'r') as f:
data = json.load(f)
df = pd.DataFrame(data)
# 数据清洗
df = df.dropna(subset=['user_id', 'email'])
df['email'] = df['email'].str.lower()
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# 数据转换
df['order_date'] = pd.to_datetime(df['order_date'])
df['order_date'] = df['order_date'].dt.date
# 保存转换后的数据
df.to_csv(f'/tmp/orders_transformed_{context["ds"]}.csv', index=False)
return f'/tmp/orders_transformed_{context["ds"]}.csv'
# 定义任务
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
load_to_s3 = LocalFilesystemToS3Operator(
task_id='load_to_s3',
filename=f'/tmp/orders_transformed_{context["ds"]}.csv',
bucket='data-lake',
key=f'orders/{context["ds"]}/orders.csv',
dag=dag,
)
load_to_redshift = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='public',
table='orders',
s3_bucket='data-lake',
s3_key=f'orders/{context["ds"]}/orders.csv',
copy_options=['FORMAT AS CSV', 'IGNOREHEADER 1'],
dag=dag,
)
# 定义任务依赖
extract_task >> transform_task >> load_to_s3 >> load_to_redshift
数据质量保证
// 数据质量检查
@Component
public class DataQualityChecker {
private final QualityRuleRepository ruleRepository;
public QualityReport checkDataQuality(List<TransformedRecord> records, String datasetName) {
QualityReport report = new QualityReport(datasetName);
// 获取质量规则
List<QualityRule> rules = ruleRepository.findByDatasetName(datasetName);
for (QualityRule rule : rules) {
QualityCheckResult result = checkRule(records, rule);
report.addResult(result);
if (!result.isPassed() && rule.isCritical()) {
throw new DataQualityException(
"Critical quality check failed: " + rule.getName()
);
}
}
return report;
}
private QualityCheckResult checkRule(List<TransformedRecord> records, QualityRule rule) {
switch (rule.getType()) {
case COMPLETENESS:
return checkCompleteness(records, rule);
case UNIQUENESS:
return checkUniqueness(records, rule);
case VALIDITY:
return checkValidity(records, rule);
case CONSISTENCY:
return checkConsistency(records, rule);
default:
throw new IllegalArgumentException("Unknown rule type: " + rule.getType());
}
}
private QualityCheckResult checkCompleteness(List<TransformedRecord> records, QualityRule rule) {
String field = rule.getField();
long total = records.size();
long nonNull = records.stream()
.filter(record -> record.get(field) != null)
.count();
double completeness = (double) nonNull / total;
boolean passed = completeness >= rule.getThreshold();
return new QualityCheckResult(
rule.getName(),
passed,
completeness,
"Completeness: " + (completeness * 100) + "%"
);
}
private QualityCheckResult checkUniqueness(List<TransformedRecord> records, QualityRule rule) {
String field = rule.getField();
long total = records.size();
long unique = records.stream()
.map(record -> record.get(field))
.distinct()
.count();
double uniqueness = (double) unique / total;
boolean passed = uniqueness >= rule.getThreshold();
return new QualityCheckResult(
rule.getName(),
passed,
uniqueness,
"Uniqueness: " + (uniqueness * 100) + "%"
);
}
private QualityCheckResult checkValidity(List<TransformedRecord> records, QualityRule rule) {
String field = rule.getField();
String pattern = rule.getPattern();
long total = records.size();
long valid = records.stream()
.filter(record -> {
String value = record.get(field);
return value != null && value.matches(pattern);
})
.count();
double validity = (double) valid / total;
boolean passed = validity >= rule.getThreshold();
return new QualityCheckResult(
rule.getName(),
passed,
validity,
"Validity: " + (validity * 100) + "%"
);
}
private QualityCheckResult checkConsistency(List<TransformedRecord> records, QualityRule rule) {
// 检查数据一致性
// 例如:检查字段之间的逻辑关系
return new QualityCheckResult(rule.getName(), true, 1.0, "Consistency check passed");
}
}
最佳实践
- 增量处理:使用增量处理而不是全量处理,提高效率
- 幂等性:确保ETL作业可以安全地重试
- 数据血缘:记录数据的来源和转换过程
- 监控告警:监控数据管道的执行状态和数据质量
- 错误处理:实现完善的错误处理和恢复机制
- 性能优化:并行处理、批量加载、索引优化
- 数据安全:加密敏感数据,实施访问控制