← 返回首页
🌐

数据管道ETL/ELT架构

📂 architecture ⏱ 7 min 1375 words

数据管道ETL/ELT架构

数据管道概述

数据管道是将数据从源系统移动到目标系统的自动化流程。它包括数据抽取、转换、加载(ETL)或抽取、加载、转换(ELT)等步骤。

ETL架构

ETL(Extract, Transform, Load)是传统的数据集成模式,数据在加载到目标系统之前进行转换。

架构图

┌─────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  数据源  │ -> │  抽取(Extract)│ -> │  转换(Transform)│ -> │  加载(Load) │
└─────────┘    └─────────────┘    └─────────────┘    └─────────────┘
   数据库           数据清洗            数据转换           数据仓库
   文件             数据验证            数据聚合           数据湖
   API              格式转换            数据 enrichment   数据集市

ETL实现

// ETL作业框架
@Component
public class EtlJob {
    
    private final Extractor extractor;
    private final Transformer transformer;
    private final Loader loader;
    
    public void execute(EtlJobConfig config) {
        try {
            // 1. 抽取阶段
            List<RawRecord> rawRecords = extractor.extract(config.getSource());
            log.info("Extracted {} records", rawRecords.size());
            
            // 2. 转换阶段
            List<TransformedRecord> transformedRecords = transformer.transform(rawRecords);
            log.info("Transformed {} records", transformedRecords.size());
            
            // 3. 加载阶段
            loader.load(transformedRecords, config.getTarget());
            log.info("Loaded {} records", transformedRecords.size());
            
        } catch (Exception e) {
            log.error("ETL job failed", e);
            throw new EtlJobException("ETL job failed", e);
        }
    }
}

// 抽取器接口
public interface Extractor {
    List<RawRecord> extract(DataSource source);
}

// 数据库抽取器
@Component
public class DatabaseExtractor implements Extractor {
    
    private final JdbcTemplate jdbcTemplate;
    
    @Override
    public List<RawRecord> extract(DataSource source) {
        DatabaseSource dbSource = (DatabaseSource) source;
        
        String sql = dbSource.getQuery();
        return jdbcTemplate.query(sql, (rs, rowNum) -> {
            RawRecord record = new RawRecord();
            for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
                record.put(rs.getMetaData().getColumnName(i), rs.getObject(i));
            }
            return record;
        });
    }
}

// 文件抽取器
@Component
public class FileExtractor implements Extractor {
    
    @Override
    public List<RawRecord> extract(DataSource source) {
        FileSource fileSource = (FileSource) source;
        List<RawRecord> records = new ArrayList<>();
        
        try (BufferedReader reader = new BufferedReader(new FileReader(fileSource.getPath()))) {
            String line;
            boolean isHeader = true;
            
            while ((line = reader.readLine()) != null) {
                if (isHeader) {
                    isHeader = false;
                    continue;
                }
                
                String[] fields = line.split(",");
                RawRecord record = new RawRecord();
                for (int i = 0; i < fields.length; i++) {
                    record.put(fileSource.getColumns().get(i), fields[i]);
                }
                records.add(record);
            }
        } catch (IOException e) {
            throw new ExtractorException("Failed to read file", e);
        }
        
        return records;
    }
}

// 转换器接口
public interface Transformer {
    List<TransformedRecord> transform(List<RawRecord> rawRecords);
}

// 数据转换器
@Component
public class DataTransformer implements Transformer {
    
    @Override
    public List<TransformedRecord> transform(List<RawRecord> rawRecords) {
        return rawRecords.stream()
            .map(this::transformRecord)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    }
    
    private TransformedRecord transformRecord(RawRecord rawRecord) {
        try {
            TransformedRecord record = new TransformedRecord();
            
            // 数据清洗
            record.setUserId(cleanUserId(rawRecord.get("user_id")));
            record.setUserName(cleanUserName(rawRecord.get("user_name")));
            record.setEmail(cleanEmail(rawRecord.get("email")));
            
            // 数据类型转换
            record.setAmount(convertToBigDecimal(rawRecord.get("amount")));
            record.setOrderDate(convertToLocalDateTime(rawRecord.get("order_date")));
            
            // 数据验证
            if (!isValid(record)) {
                log.warn("Invalid record: {}", rawRecord);
                return null;
            }
            
            return record;
        } catch (Exception e) {
            log.error("Failed to transform record", e);
            return null;
        }
    }
    
    private String cleanUserId(Object value) {
        if (value == null) return null;
        return value.toString().trim();
    }
    
    private String cleanUserName(Object value) {
        if (value == null) return null;
        return value.toString().trim().replaceAll("\\s+", " ");
    }
    
    private String cleanEmail(Object value) {
        if (value == null) return null;
        return value.toString().trim().toLowerCase();
    }
    
    private BigDecimal convertToBigDecimal(Object value) {
        if (value == null) return BigDecimal.ZERO;
        return new BigDecimal(value.toString());
    }
    
    private LocalDateTime convertToLocalDateTime(Object value) {
        if (value == null) return null;
        if (value instanceof LocalDateTime) {
            return (LocalDateTime) value;
        }
        return LocalDateTime.parse(value.toString());
    }
    
    private boolean isValid(TransformedRecord record) {
        return record.getUserId() != null && !record.getUserId().isEmpty()
                && record.getEmail() != null && record.getEmail().contains("@");
    }
}

// 加载器接口
public interface Loader {
    void load(List<TransformedRecord> records, DataTarget target);
}

// 数据库加载器
@Component
public class DatabaseLoader implements Loader {
    
    private final JdbcTemplate jdbcTemplate;
    
    @Override
    public void load(List<TransformedRecord> records, DataTarget target) {
        DatabaseTarget dbTarget = (DatabaseTarget) target;
        String tableName = dbTarget.getTableName();
        
        String sql = "INSERT INTO " + tableName + 
                     " (user_id, user_name, email, amount, order_date) " +
                     "VALUES (?, ?, ?, ?, ?)";
        
        jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                TransformedRecord record = records.get(i);
                ps.setString(1, record.getUserId());
                ps.setString(2, record.getUserName());
                ps.setString(3, record.getEmail());
                ps.setBigDecimal(4, record.getAmount());
                ps.setTimestamp(5, Timestamp.valueOf(record.getOrderDate()));
            }
            
            @Override
            public int getBatchSize() {
                return records.size();
            }
        });
    }
}

ELT架构

ELT(Extract, Load, Load)是现代数据集成模式,数据先加载到目标系统,然后在目标系统中进行转换。

架构图

┌─────────┐    ┌─────────────┐    ┌─────────────┐
│  数据源  │ -> │  抽取(Extract)│ -> │  加载(Load)  │
└─────────┘    └─────────────┘    └─────────────┘
   数据库           数据清洗           数据湖/数据仓库
   文件             数据验证           (Snowflake, BigQuery, Redshift)
   API              格式转换
                                    ┌─────────────────────┐
                                    │  转换(Transform)     │
                                    │  - 数据清洗          │
                                    │  - 数据聚合          │
                                    │  - 数据建模          │
                                    └─────────────────────┘

ELT实现

// ELT作业框架
@Component
public class EltJob {
    
    private final Extractor extractor;
    private final Loader loader;
    private final DataWarehouseTransformer dwTransformer;
    
    public void execute(EltJobConfig config) {
        try {
            // 1. 抽取阶段
            List<RawRecord> rawRecords = extractor.extract(config.getSource());
            log.info("Extracted {} records", rawRecords.size());
            
            // 2. 加载阶段(到数据湖)
            String stagingPath = loadToStaging(rawRecords, config.getTarget());
            log.info("Loaded {} records to staging", rawRecords.size());
            
            // 3. 转换阶段(在数据仓库中)
            dwTransformer.transform(stagingPath, config.getTarget());
            log.info("Transformed data in data warehouse");
            
        } catch (Exception e) {
            log.error("ELT job failed", e);
            throw new EltJobException("ELT job failed", e);
        }
    }
    
    private String loadToStaging(List<RawRecord> records, DataTarget target) {
        // 加载到暂存区(S3、GCS等)
        String stagingPath = generateStagingPath(target);
        
        // 转换为Parquet格式
        try (ParquetWriter<GenericRecord> writer = createParquetWriter(stagingPath)) {
            Schema schema = createSchema(records.get(0));
            
            for (RawRecord record : records) {
                GenericRecord genericRecord = convertToGenericRecord(record, schema);
                writer.write(genericRecord);
            }
        } catch (IOException e) {
            throw new LoaderException("Failed to load to staging", e);
        }
        
        return stagingPath;
    }
}

// 数据仓库转换器
@Component
public class DataWarehouseTransformer {
    
    private final JdbcTemplate jdbcTemplate;
    
    public void transform(String stagingPath, DataTarget target) {
        // 创建暂存表
        createStagingTable(stagingPath);
        
        // 执行数据转换SQL
        String transformSql = generateTransformSql();
        jdbcTemplate.execute(transformSql);
        
        // 清理暂存表
        dropStagingTable();
    }
    
    private void createStagingTable(String stagingPath) {
        String sql = "CREATE TABLE staging_data AS " +
                     "SELECT * FROM parquet_scan('" + stagingPath + "')";
        jdbcTemplate.execute(sql);
    }
    
    private String generateTransformSql() {
        return "INSERT INTO target_table " +
               "SELECT " +
               "  user_id," +
               "  user_name," +
               "  email," +
               "  SUM(amount) as total_amount," +
               "  COUNT(*) as order_count," +
               "  DATE_TRUNC('day', order_date) as order_date " +
               "FROM staging_data " +
               "WHERE email IS NOT NULL " +
               "GROUP BY user_id, user_name, email, DATE_TRUNC('day', order_date)";
    }
    
    private void dropStagingTable() {
        jdbcTemplate.execute("DROP TABLE IF EXISTS staging_data");
    }
}

使用Apache Airflow编排

DAG定义

# airflow_dags/etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email': ['data-team@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='ETL pipeline for order data',
    schedule_interval='@daily',
    catchup=False,
)

def extract_data(**context):
    """从源系统抽取数据"""
    import psycopg2
    import json
    
    # 连接源数据库
    conn = psycopg2.connect(
        host='source-db-host',
        database='source_db',
        user='user',
        password='password'
    )
    
    # 执行查询
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM orders WHERE date = %s", (context['ds'],))
    
    # 获取数据
    columns = [desc[0] for desc in cursor.description]
    records = cursor.fetchall()
    
    # 保存到文件
    data = [dict(zip(columns, record)) for record in records]
    with open(f'/tmp/orders_{context["ds"]}.json', 'w') as f:
        json.dump(data, f)
    
    conn.close()
    return f'/tmp/orders_{context["ds"]}.json'

def transform_data(**context):
    """转换数据"""
    import json
    import pandas as pd
    
    # 读取数据
    with open(f'/tmp/orders_{context["ds"]}.json', 'r') as f:
        data = json.load(f)
    
    df = pd.DataFrame(data)
    
    # 数据清洗
    df = df.dropna(subset=['user_id', 'email'])
    df['email'] = df['email'].str.lower()
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
    
    # 数据转换
    df['order_date'] = pd.to_datetime(df['order_date'])
    df['order_date'] = df['order_date'].dt.date
    
    # 保存转换后的数据
    df.to_csv(f'/tmp/orders_transformed_{context["ds"]}.csv', index=False)
    
    return f'/tmp/orders_transformed_{context["ds"]}.csv'

# 定义任务
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load_to_s3 = LocalFilesystemToS3Operator(
    task_id='load_to_s3',
    filename=f'/tmp/orders_transformed_{context["ds"]}.csv',
    bucket='data-lake',
    key=f'orders/{context["ds"]}/orders.csv',
    dag=dag,
)

load_to_redshift = S3ToRedshiftOperator(
    task_id='load_to_redshift',
    schema='public',
    table='orders',
    s3_bucket='data-lake',
    s3_key=f'orders/{context["ds"]}/orders.csv',
    copy_options=['FORMAT AS CSV', 'IGNOREHEADER 1'],
    dag=dag,
)

# 定义任务依赖
extract_task >> transform_task >> load_to_s3 >> load_to_redshift

数据质量保证

// 数据质量检查
@Component
public class DataQualityChecker {
    
    private final QualityRuleRepository ruleRepository;
    
    public QualityReport checkDataQuality(List<TransformedRecord> records, String datasetName) {
        QualityReport report = new QualityReport(datasetName);
        
        // 获取质量规则
        List<QualityRule> rules = ruleRepository.findByDatasetName(datasetName);
        
        for (QualityRule rule : rules) {
            QualityCheckResult result = checkRule(records, rule);
            report.addResult(result);
            
            if (!result.isPassed() && rule.isCritical()) {
                throw new DataQualityException(
                    "Critical quality check failed: " + rule.getName()
                );
            }
        }
        
        return report;
    }
    
    private QualityCheckResult checkRule(List<TransformedRecord> records, QualityRule rule) {
        switch (rule.getType()) {
            case COMPLETENESS:
                return checkCompleteness(records, rule);
            case UNIQUENESS:
                return checkUniqueness(records, rule);
            case VALIDITY:
                return checkValidity(records, rule);
            case CONSISTENCY:
                return checkConsistency(records, rule);
            default:
                throw new IllegalArgumentException("Unknown rule type: " + rule.getType());
        }
    }
    
    private QualityCheckResult checkCompleteness(List<TransformedRecord> records, QualityRule rule) {
        String field = rule.getField();
        long total = records.size();
        long nonNull = records.stream()
                .filter(record -> record.get(field) != null)
                .count();
        
        double completeness = (double) nonNull / total;
        boolean passed = completeness >= rule.getThreshold();
        
        return new QualityCheckResult(
            rule.getName(),
            passed,
            completeness,
            "Completeness: " + (completeness * 100) + "%"
        );
    }
    
    private QualityCheckResult checkUniqueness(List<TransformedRecord> records, QualityRule rule) {
        String field = rule.getField();
        long total = records.size();
        long unique = records.stream()
                .map(record -> record.get(field))
                .distinct()
                .count();
        
        double uniqueness = (double) unique / total;
        boolean passed = uniqueness >= rule.getThreshold();
        
        return new QualityCheckResult(
            rule.getName(),
            passed,
            uniqueness,
            "Uniqueness: " + (uniqueness * 100) + "%"
        );
    }
    
    private QualityCheckResult checkValidity(List<TransformedRecord> records, QualityRule rule) {
        String field = rule.getField();
        String pattern = rule.getPattern();
        
        long total = records.size();
        long valid = records.stream()
                .filter(record -> {
                    String value = record.get(field);
                    return value != null && value.matches(pattern);
                })
                .count();
        
        double validity = (double) valid / total;
        boolean passed = validity >= rule.getThreshold();
        
        return new QualityCheckResult(
            rule.getName(),
            passed,
            validity,
            "Validity: " + (validity * 100) + "%"
        );
    }
    
    private QualityCheckResult checkConsistency(List<TransformedRecord> records, QualityRule rule) {
        // 检查数据一致性
        // 例如:检查字段之间的逻辑关系
        return new QualityCheckResult(rule.getName(), true, 1.0, "Consistency check passed");
    }
}

最佳实践

  1. 增量处理:使用增量处理而不是全量处理,提高效率
  2. 幂等性:确保ETL作业可以安全地重试
  3. 数据血缘:记录数据的来源和转换过程
  4. 监控告警:监控数据管道的执行状态和数据质量
  5. 错误处理:实现完善的错误处理和恢复机制
  6. 性能优化:并行处理、批量加载、索引优化
  7. 数据安全:加密敏感数据,实施访问控制