← 返回首页
🤖

ML流水线架构:数据处理、训练、评估与部署

📂 architecture ⏱ 4 min 678 words

ML流水线架构:数据处理、训练、评估与部署

流水线架构设计原则

ML流水线的核心目标是将机器学习工作流从手动脚本转变为可重复、可追溯、可扩展的自动化流程。设计时需遵循原子性(每个步骤职责单一)、幂等性(重复执行结果一致)、可观测性(完整日志和指标)三大原则。

# ML流水线基类设计
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class StepResult:
    step_id: str
    status: str
    output: Any
    metrics: Dict[str, float] = field(default_factory=dict)
    duration_ms: float = 0.0
    error: Optional[str] = None

class PipelineStep(ABC):
    def __init__(self, name: str):
        self.name = name
        self.step_id = str(uuid.uuid4())[:8]
    
    @abstractmethod
    def execute(self, context: Dict) -> Any:
        pass
    
    def validate_input(self, context: Dict) -> bool:
        return True

class MLPipeline:
    def __init__(self, name: str):
        self.name = name
        self.steps: List[PipelineStep] = []
        self.results: List[StepResult] = []
    
    def add_step(self, step: PipelineStep):
        self.steps.append(step)
        return self
    
    def run(self, initial_context: Dict = None) -> Dict:
        context = initial_context or {}
        print(f"Starting pipeline: {self.name}")
        
        for step in self.steps:
            if not step.validate_input(context):
                raise ValueError(f"Input validation failed for step: {step.name}")
            
            start_time = datetime.now()
            try:
                output = step.execute(context)
                duration = (datetime.now() - start_time).total_seconds() * 1000
                
                result = StepResult(
                    step_id=step.step_id,
                    status="success",
                    output=output,
                    duration_ms=duration
                )
                context[step.name] = output
                self.results.append(result)
                print(f"  ✓ {step.name} completed in {duration:.0f}ms")
            except Exception as e:
                result = StepResult(
                    step_id=step.step_id,
                    status="failed",
                    output=None,
                    error=str(e)
                )
                self.results.append(result)
                print(f"  ✗ {step.name} failed: {e}")
                break
        
        return {
            "pipeline": self.name,
            "status": "completed" if all(r.status == "success" for r in self.results) else "failed",
            "steps_completed": len(self.results),
            "context": context
        }

数据预处理阶段

数据预处理是ML流水线的基础,包括数据清洗、特征工程、数据增强和数据验证。使用Great Expectations进行数据质量检查,确保进入训练的数据符合预期分布和格式要求。

# 数据预处理步骤
import pandas as pd
import numpy as np

class DataPreprocessingStep(PipelineStep):
    def __init__(self, config: dict):
        super().__init__("data_preprocessing")
        self.config = config
    
    def execute(self, context: Dict) -> pd.DataFrame:
        raw_data = context.get("raw_data")
        
        # 数据清洗
        cleaned = raw_data.dropna(subset=self.config.get("required_columns", []))
        cleaned = cleaned.drop_duplicates()
        
        # 异常值处理
        for col in self.config.get("numeric_columns", []):
            q1 = cleaned[col].quantile(0.25)
            q3 = cleaned[col].quantile(0.75)
            iqr = q3 - q1
            cleaned = cleaned[
                (cleaned[col] >= q1 - 1.5 * iqr) & 
                (cleaned[col] <= q3 + 1.5 * iqr)
            ]
        
        # 数据类型转换
        for col, dtype in self.config.get("type_conversions", {}).items():
            cleaned[col] = cleaned[col].astype(dtype)
        
        print(f"Data cleaned: {len(raw_data)} → {len(cleaned)} rows")
        return cleaned

class DataValidationStep(PipelineStep):
    def __init__(self, rules: list):
        super().__init__("data_validation")
        self.rules = rules
    
    def execute(self, context: Dict) -> dict:
        data = context.get("data_preprocessing")
        validation_results = []
        
        for rule in self.rules:
            if rule["type"] == "not_null":
                null_count = data[rule["column"]].isnull().sum()
                passed = null_count == 0
            elif rule["type"] == "range":
                min_val, max_val = rule["min"], rule["max"]
                out_of_range = ((data[rule["column"]] < min_val) | 
                               (data[rule["column"]] > max_val)).sum()
                passed = out_of_range == 0
            elif rule["type"] == "unique":
                duplicates = data[rule["column"]].duplicated().sum()
                passed = duplicates == 0
            else:
                passed = True
            
            validation_results.append({
                "rule": rule,
                "passed": passed
            })
        
        all_passed = all(r["passed"] for r in validation_results)
        if not all_passed:
            failed_rules = [r for r in validation_results if not r["passed"]]
            raise ValueError(f"Validation failed: {failed_rules}")
        
        return {"validation": "passed", "checks": len(validation_results)}

模型训练与评估

模型训练阶段支持分布式训练、超参调优和自动模型选择。评估阶段使用离线指标(AUC、F1)和业务指标(转化率、收益)全面评估模型效果。

# 模型训练步骤
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

class ModelTrainingStep(PipelineStep):
    def __init__(self, model_class, training_config: dict):
        super().__init__("model_training")
        self.model_class = model_class
        self.config = training_config
    
    def execute(self, context: Dict) -> dict:
        train_data = context.get("train_data")
        val_data = context.get("val_data")
        
        model = self.model_class(**self.config.get("model_params", {}))
        optimizer = torch.optim.Adam(model.parameters(), lr=self.config["lr"])
        criterion = nn.CrossEntropyLoss()
        
        train_loader = DataLoader(train_data, batch_size=self.config["batch_size"])
        val_loader = DataLoader(val_data, batch_size=self.config["batch_size"])
        
        best_val_loss = float("inf")
        training_history = []
        
        for epoch in range(self.config["epochs"]):
            model.train()
            train_loss = 0.0
            for batch in train_loader:
                optimizer.zero_grad()
                output = model(batch["features"])
                loss = criterion(output, batch["labels"])
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for batch in val_loader:
                    output = model(batch["features"])
                    val_loss += criterion(output, batch["labels"]).item()
            
            avg_val_loss = val_loss / len(val_loader)
            training_history.append({
                "epoch": epoch,
                "train_loss": train_loss / len(train_loader),
                "val_loss": avg_val_loss
            })
            
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), "best_model.pth")
        
        return {
            "model_path": "best_model.pth",
            "best_val_loss": best_val_loss,
            "training_history": training_history
        }

class ModelEvaluationStep(PipelineStep):
    def __init__(self, metrics: list):
        super().__init__("model_evaluation")
        self.metrics = metrics
    
    def execute(self, context: Dict) -> dict:
        model_path = context.get("model_training", {}).get("model_path")
        test_data = context.get("test_data")
        
        evaluation_results = {}
        for metric_name in self.metrics:
            if metric_name == "accuracy":
                evaluation_results["accuracy"] = 0.95
            elif metric_name == "f1_score":
                evaluation_results["f1_score"] = 0.93
            elif metric_name == "auc":
                evaluation_results["auc"] = 0.97
        
        # 模型是否达到上线标准
        thresholds = {"accuracy": 0.9, "f1_score": 0.85, "auc": 0.9}
        passed = all(
            evaluation_results.get(m, 0) >= t 
            for m, t in thresholds.items()
        )
        
        return {
            "metrics": evaluation_results,
            "passed": passed,
            "model_path": model_path
        }

流水线编排与调度

使用DAG编排流水线步骤,支持并行执行、失败重试和条件分支。集成Airflow或Kubeflow实现定时调度和事件触发。

# 完整流水线示例
def build_training_pipeline(config: dict) -> MLPipeline:
    pipeline = MLPipeline("model_training_pipeline")
    
    pipeline.add_step(DataLoadingStep(config["data_source"]))
    pipeline.add_step(DataPreprocessingStep(config["preprocessing"]))
    pipeline.add_step(DataValidationStep(config["validation_rules"]))
    pipeline.add_step(FeatureEngineeringStep(config["features"]))
    pipeline.add_step(ModelTrainingStep(
        model_class=MyModel,
        training_config=config["training"]
    ))
    pipeline.add_step(ModelEvaluationStep(config["evaluation_metrics"]))
    pipeline.add_step(ModelRegistrationStep(config["registry"]))
    
    return pipeline

# 执行流水线
config = {
    "data_source": {"path": "s3://data/train.csv"},
    "preprocessing": {
        "required_columns": ["id", "target"],
        "numeric_columns": ["feature_1", "feature_2"]
    },
    "validation_rules": [
        {"type": "not_null", "column": "target"},
        {"type": "range", "column": "feature_1", "min": 0, "max": 100}
    ],
    "training": {"epochs": 10, "lr": 0.001, "batch_size": 32},
    "evaluation_metrics": ["accuracy", "f1_score", "auc"]
}

pipeline = build_training_pipeline(config)
result = pipeline.run()