ML流水线架构:数据处理、训练、评估与部署
ML流水线架构:数据处理、训练、评估与部署
流水线架构设计原则
ML流水线的核心目标是将机器学习工作流从手动脚本转变为可重复、可追溯、可扩展的自动化流程。设计时需遵循原子性(每个步骤职责单一)、幂等性(重复执行结果一致)、可观测性(完整日志和指标)三大原则。
# ML流水线基类设计
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid
@dataclass
class StepResult:
step_id: str
status: str
output: Any
metrics: Dict[str, float] = field(default_factory=dict)
duration_ms: float = 0.0
error: Optional[str] = None
class PipelineStep(ABC):
def __init__(self, name: str):
self.name = name
self.step_id = str(uuid.uuid4())[:8]
@abstractmethod
def execute(self, context: Dict) -> Any:
pass
def validate_input(self, context: Dict) -> bool:
return True
class MLPipeline:
def __init__(self, name: str):
self.name = name
self.steps: List[PipelineStep] = []
self.results: List[StepResult] = []
def add_step(self, step: PipelineStep):
self.steps.append(step)
return self
def run(self, initial_context: Dict = None) -> Dict:
context = initial_context or {}
print(f"Starting pipeline: {self.name}")
for step in self.steps:
if not step.validate_input(context):
raise ValueError(f"Input validation failed for step: {step.name}")
start_time = datetime.now()
try:
output = step.execute(context)
duration = (datetime.now() - start_time).total_seconds() * 1000
result = StepResult(
step_id=step.step_id,
status="success",
output=output,
duration_ms=duration
)
context[step.name] = output
self.results.append(result)
print(f" ✓ {step.name} completed in {duration:.0f}ms")
except Exception as e:
result = StepResult(
step_id=step.step_id,
status="failed",
output=None,
error=str(e)
)
self.results.append(result)
print(f" ✗ {step.name} failed: {e}")
break
return {
"pipeline": self.name,
"status": "completed" if all(r.status == "success" for r in self.results) else "failed",
"steps_completed": len(self.results),
"context": context
}
数据预处理阶段
数据预处理是ML流水线的基础,包括数据清洗、特征工程、数据增强和数据验证。使用Great Expectations进行数据质量检查,确保进入训练的数据符合预期分布和格式要求。
# 数据预处理步骤
import pandas as pd
import numpy as np
class DataPreprocessingStep(PipelineStep):
def __init__(self, config: dict):
super().__init__("data_preprocessing")
self.config = config
def execute(self, context: Dict) -> pd.DataFrame:
raw_data = context.get("raw_data")
# 数据清洗
cleaned = raw_data.dropna(subset=self.config.get("required_columns", []))
cleaned = cleaned.drop_duplicates()
# 异常值处理
for col in self.config.get("numeric_columns", []):
q1 = cleaned[col].quantile(0.25)
q3 = cleaned[col].quantile(0.75)
iqr = q3 - q1
cleaned = cleaned[
(cleaned[col] >= q1 - 1.5 * iqr) &
(cleaned[col] <= q3 + 1.5 * iqr)
]
# 数据类型转换
for col, dtype in self.config.get("type_conversions", {}).items():
cleaned[col] = cleaned[col].astype(dtype)
print(f"Data cleaned: {len(raw_data)} → {len(cleaned)} rows")
return cleaned
class DataValidationStep(PipelineStep):
def __init__(self, rules: list):
super().__init__("data_validation")
self.rules = rules
def execute(self, context: Dict) -> dict:
data = context.get("data_preprocessing")
validation_results = []
for rule in self.rules:
if rule["type"] == "not_null":
null_count = data[rule["column"]].isnull().sum()
passed = null_count == 0
elif rule["type"] == "range":
min_val, max_val = rule["min"], rule["max"]
out_of_range = ((data[rule["column"]] < min_val) |
(data[rule["column"]] > max_val)).sum()
passed = out_of_range == 0
elif rule["type"] == "unique":
duplicates = data[rule["column"]].duplicated().sum()
passed = duplicates == 0
else:
passed = True
validation_results.append({
"rule": rule,
"passed": passed
})
all_passed = all(r["passed"] for r in validation_results)
if not all_passed:
failed_rules = [r for r in validation_results if not r["passed"]]
raise ValueError(f"Validation failed: {failed_rules}")
return {"validation": "passed", "checks": len(validation_results)}
模型训练与评估
模型训练阶段支持分布式训练、超参调优和自动模型选择。评估阶段使用离线指标(AUC、F1)和业务指标(转化率、收益)全面评估模型效果。
# 模型训练步骤
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
class ModelTrainingStep(PipelineStep):
def __init__(self, model_class, training_config: dict):
super().__init__("model_training")
self.model_class = model_class
self.config = training_config
def execute(self, context: Dict) -> dict:
train_data = context.get("train_data")
val_data = context.get("val_data")
model = self.model_class(**self.config.get("model_params", {}))
optimizer = torch.optim.Adam(model.parameters(), lr=self.config["lr"])
criterion = nn.CrossEntropyLoss()
train_loader = DataLoader(train_data, batch_size=self.config["batch_size"])
val_loader = DataLoader(val_data, batch_size=self.config["batch_size"])
best_val_loss = float("inf")
training_history = []
for epoch in range(self.config["epochs"]):
model.train()
train_loss = 0.0
for batch in train_loader:
optimizer.zero_grad()
output = model(batch["features"])
loss = criterion(output, batch["labels"])
loss.backward()
optimizer.step()
train_loss += loss.item()
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch in val_loader:
output = model(batch["features"])
val_loss += criterion(output, batch["labels"]).item()
avg_val_loss = val_loss / len(val_loader)
training_history.append({
"epoch": epoch,
"train_loss": train_loss / len(train_loader),
"val_loss": avg_val_loss
})
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
torch.save(model.state_dict(), "best_model.pth")
return {
"model_path": "best_model.pth",
"best_val_loss": best_val_loss,
"training_history": training_history
}
class ModelEvaluationStep(PipelineStep):
def __init__(self, metrics: list):
super().__init__("model_evaluation")
self.metrics = metrics
def execute(self, context: Dict) -> dict:
model_path = context.get("model_training", {}).get("model_path")
test_data = context.get("test_data")
evaluation_results = {}
for metric_name in self.metrics:
if metric_name == "accuracy":
evaluation_results["accuracy"] = 0.95
elif metric_name == "f1_score":
evaluation_results["f1_score"] = 0.93
elif metric_name == "auc":
evaluation_results["auc"] = 0.97
# 模型是否达到上线标准
thresholds = {"accuracy": 0.9, "f1_score": 0.85, "auc": 0.9}
passed = all(
evaluation_results.get(m, 0) >= t
for m, t in thresholds.items()
)
return {
"metrics": evaluation_results,
"passed": passed,
"model_path": model_path
}
流水线编排与调度
使用DAG编排流水线步骤,支持并行执行、失败重试和条件分支。集成Airflow或Kubeflow实现定时调度和事件触发。
# 完整流水线示例
def build_training_pipeline(config: dict) -> MLPipeline:
pipeline = MLPipeline("model_training_pipeline")
pipeline.add_step(DataLoadingStep(config["data_source"]))
pipeline.add_step(DataPreprocessingStep(config["preprocessing"]))
pipeline.add_step(DataValidationStep(config["validation_rules"]))
pipeline.add_step(FeatureEngineeringStep(config["features"]))
pipeline.add_step(ModelTrainingStep(
model_class=MyModel,
training_config=config["training"]
))
pipeline.add_step(ModelEvaluationStep(config["evaluation_metrics"]))
pipeline.add_step(ModelRegistrationStep(config["registry"]))
return pipeline
# 执行流水线
config = {
"data_source": {"path": "s3://data/train.csv"},
"preprocessing": {
"required_columns": ["id", "target"],
"numeric_columns": ["feature_1", "feature_2"]
},
"validation_rules": [
{"type": "not_null", "column": "target"},
{"type": "range", "column": "feature_1", "min": 0, "max": 100}
],
"training": {"epochs": 10, "lr": 0.001, "batch_size": 32},
"evaluation_metrics": ["accuracy", "f1_score", "auc"]
}
pipeline = build_training_pipeline(config)
result = pipeline.run()