← 返回首页
🤖

AI系统架构:从训练到推理的完整流水线

📂 architecture ⏱ 3 min 449 words

AI系统架构:从训练到推理的完整流水线

AI系统分层架构

现代AI系统采用分层架构设计,每一层承担特定职责。基础设施层提供GPU/TPU计算资源和分布式存储;数据层负责数据采集、清洗和标注;特征层管理特征的离线计算与在线服务;模型层处理训练、评估和版本管理;服务层提供低延迟推理API。

# AI系统核心组件定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class ModelStatus(Enum):
    TRAINING = "training"
    VALIDATING = "validating"
    DEPLOYING = "deploying"
    SERVING = "serving"
    ARCHIVED = "archived"

@dataclass
class AISystemConfig:
    name: str
    feature_store_endpoint: str
    model_registry_endpoint: str
    serving_endpoint: str
    training_cluster: str
    resource_limits: Dict[str, str] = field(default_factory=dict)

@dataclass
class PipelineStage:
    name: str
    type: str
    depends_on: List[str] = field(default_factory=list)
    config: Dict = field(default_factory=dict)

@dataclass
class AIPipeline:
    name: str
    stages: List[PipelineStage]
    config: AISystemConfig
    
    def validate_dag(self) -> bool:
        stage_names = {s.name for s in self.stages}
        for stage in self.stages:
            for dep in stage.depends_on:
                if dep not in stage_names:
                    return False
        return True

训练流水线架构

训练流水线是AI系统的核心,需要支持数据预处理、特征工程、模型训练、超参调优和模型评估等阶段。采用DAG(有向无环图)编排各阶段,支持并行执行和失败重试。常用工具包括Kubeflow Pipelines、Apache Airflow和自研调度系统。

# 分布式训练流水线
import ray
from ray import tune
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

class TrainingPipeline:
    def __init__(self, config: AISystemConfig):
        self.config = config
        self.stages = []
    
    def add_data_loading(self, source: str, batch_size: int = 32):
        self.stages.append({
            "name": "data_loading",
            "source": source,
            "batch_size": batch_size,
            "parallel_workers": 4
        })
        return self
    
    def add_preprocessing(self, transforms: list):
        self.stages.append({
            "name": "preprocessing",
            "transforms": transforms,
            "gpu_enabled": True
        })
        return self
    
    def add_training(self, model_class, epochs: int, lr: float = 1e-3):
        self.stages.append({
            "name": "training",
            "model_class": model_class,
            "epochs": epochs,
            "learning_rate": lr,
            "distributed": True,
            "num_workers": 8
        })
        return self
    
    def add_evaluation(self, metrics: list):
        self.stages.append({
            "name": "evaluation",
            "metrics": metrics,
            "test_split": 0.2
        })
        return self
    
    def execute(self):
        print(f"Executing pipeline with {len(self.stages)} stages")
        for stage in self.stages:
            print(f"  Running stage: {stage['name']}")
        return {"status": "completed", "stages_completed": len(self.stages)}

特征平台设计

特征平台统一管理离线特征计算和在线特征服务,确保训练与推理使用一致的特征。离线存储采用数据湖(HDFS/S3),在线存储使用Redis或DynamoDB提供毫秒级查询。特征注册中心维护特征的元数据、血缘和版本信息。

# 特征存储接口
class FeatureStore:
    def __init__(self, offline_store, online_store, registry):
        self.offline_store = offline_store
        self.online_store = online_store
        self.registry = registry
    
    def register_feature(self, feature_name: str, entity: str, 
                         dtype: str, description: str):
        self.registry.register({
            "name": feature_name,
            "entity": entity,
            "dtype": dtype,
            "description": description,
            "version": 1,
            "created_at": "2024-01-01"
        })
    
    def get_online_features(self, entity_ids: list, 
                           feature_names: list) -> dict:
        features = {}
        for entity_id in entity_ids:
            key = f"{entity_id}:{':'.join(feature_names)}"
            cached = self.online_store.get(key)
            if cached:
                features[entity_id] = cached
            else:
                features[entity_id] = self._compute_features(
                    entity_id, feature_names
                )
                self.online_store.set(key, features[entity_id])
        return features
    
    def _compute_features(self, entity_id, feature_names):
        return {name: 0.0 for name in feature_names}

模型服务与部署

模型服务层需要支持多种部署策略:金丝雀发布逐步放量、A/B测试对比效果、蓝绿部署快速回滚。推理引擎选择Triton Inference Server或TensorFlow Serving,通过模型并行和流水线并行提升吞吐量。

# 模型服务配置
@dataclass
class ServingConfig:
    model_name: str
    model_version: str
    replicas: int = 3
    gpu_per_replica: int = 1
    max_batch_size: int = 64
    timeout_ms: int = 100
    canary_percentage: float = 0.0

class ModelServer:
    def __init__(self, config: ServingConfig):
        self.config = config
        self.health_status = "healthy"
    
    def predict(self, input_data: dict) -> dict:
        # 模型推理
        return {"prediction": [], "latency_ms": 15}
    
    def get_metrics(self) -> dict:
        return {
            "requests_total": 10000,
            "avg_latency_ms": 15.2,
            "error_rate": 0.001,
            "gpu_utilization": 0.75
        }

监控与可观测性

AI系统需要全方位监控:数据漂移检测输入分布变化,模型性能监控预测质量退化,资源监控GPU/CPU利用率,业务指标追踪模型对业务的实际影响。通过Grafana和Prometheus构建统一监控面板。

# 数据漂移检测
from scipy import stats
import numpy as np

class DriftDetector:
    def __init__(self, reference_data: np.ndarray, threshold: float = 0.05):
        self.reference = reference_data
        self.threshold = threshold
    
    def detect(self, current_data: np.ndarray) -> dict:
        # KS检验检测分布变化
        stat, p_value = stats.ks_2samp(self.reference, current_data)
        return {
            "drift_detected": p_value < self.threshold,
            "ks_statistic": stat,
            "p_value": p_value,
            "recommendation": "retrain" if p_value < self.threshold else "continue"
        }