AI系统架构:从训练到推理的完整流水线
AI系统架构:从训练到推理的完整流水线
AI系统分层架构
现代AI系统采用分层架构设计,每一层承担特定职责。基础设施层提供GPU/TPU计算资源和分布式存储;数据层负责数据采集、清洗和标注;特征层管理特征的离线计算与在线服务;模型层处理训练、评估和版本管理;服务层提供低延迟推理API。
# AI系统核心组件定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class ModelStatus(Enum):
TRAINING = "training"
VALIDATING = "validating"
DEPLOYING = "deploying"
SERVING = "serving"
ARCHIVED = "archived"
@dataclass
class AISystemConfig:
name: str
feature_store_endpoint: str
model_registry_endpoint: str
serving_endpoint: str
training_cluster: str
resource_limits: Dict[str, str] = field(default_factory=dict)
@dataclass
class PipelineStage:
name: str
type: str
depends_on: List[str] = field(default_factory=list)
config: Dict = field(default_factory=dict)
@dataclass
class AIPipeline:
name: str
stages: List[PipelineStage]
config: AISystemConfig
def validate_dag(self) -> bool:
stage_names = {s.name for s in self.stages}
for stage in self.stages:
for dep in stage.depends_on:
if dep not in stage_names:
return False
return True
训练流水线架构
训练流水线是AI系统的核心,需要支持数据预处理、特征工程、模型训练、超参调优和模型评估等阶段。采用DAG(有向无环图)编排各阶段,支持并行执行和失败重试。常用工具包括Kubeflow Pipelines、Apache Airflow和自研调度系统。
# 分布式训练流水线
import ray
from ray import tune
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
class TrainingPipeline:
def __init__(self, config: AISystemConfig):
self.config = config
self.stages = []
def add_data_loading(self, source: str, batch_size: int = 32):
self.stages.append({
"name": "data_loading",
"source": source,
"batch_size": batch_size,
"parallel_workers": 4
})
return self
def add_preprocessing(self, transforms: list):
self.stages.append({
"name": "preprocessing",
"transforms": transforms,
"gpu_enabled": True
})
return self
def add_training(self, model_class, epochs: int, lr: float = 1e-3):
self.stages.append({
"name": "training",
"model_class": model_class,
"epochs": epochs,
"learning_rate": lr,
"distributed": True,
"num_workers": 8
})
return self
def add_evaluation(self, metrics: list):
self.stages.append({
"name": "evaluation",
"metrics": metrics,
"test_split": 0.2
})
return self
def execute(self):
print(f"Executing pipeline with {len(self.stages)} stages")
for stage in self.stages:
print(f" Running stage: {stage['name']}")
return {"status": "completed", "stages_completed": len(self.stages)}
特征平台设计
特征平台统一管理离线特征计算和在线特征服务,确保训练与推理使用一致的特征。离线存储采用数据湖(HDFS/S3),在线存储使用Redis或DynamoDB提供毫秒级查询。特征注册中心维护特征的元数据、血缘和版本信息。
# 特征存储接口
class FeatureStore:
def __init__(self, offline_store, online_store, registry):
self.offline_store = offline_store
self.online_store = online_store
self.registry = registry
def register_feature(self, feature_name: str, entity: str,
dtype: str, description: str):
self.registry.register({
"name": feature_name,
"entity": entity,
"dtype": dtype,
"description": description,
"version": 1,
"created_at": "2024-01-01"
})
def get_online_features(self, entity_ids: list,
feature_names: list) -> dict:
features = {}
for entity_id in entity_ids:
key = f"{entity_id}:{':'.join(feature_names)}"
cached = self.online_store.get(key)
if cached:
features[entity_id] = cached
else:
features[entity_id] = self._compute_features(
entity_id, feature_names
)
self.online_store.set(key, features[entity_id])
return features
def _compute_features(self, entity_id, feature_names):
return {name: 0.0 for name in feature_names}
模型服务与部署
模型服务层需要支持多种部署策略:金丝雀发布逐步放量、A/B测试对比效果、蓝绿部署快速回滚。推理引擎选择Triton Inference Server或TensorFlow Serving,通过模型并行和流水线并行提升吞吐量。
# 模型服务配置
@dataclass
class ServingConfig:
model_name: str
model_version: str
replicas: int = 3
gpu_per_replica: int = 1
max_batch_size: int = 64
timeout_ms: int = 100
canary_percentage: float = 0.0
class ModelServer:
def __init__(self, config: ServingConfig):
self.config = config
self.health_status = "healthy"
def predict(self, input_data: dict) -> dict:
# 模型推理
return {"prediction": [], "latency_ms": 15}
def get_metrics(self) -> dict:
return {
"requests_total": 10000,
"avg_latency_ms": 15.2,
"error_rate": 0.001,
"gpu_utilization": 0.75
}
监控与可观测性
AI系统需要全方位监控:数据漂移检测输入分布变化,模型性能监控预测质量退化,资源监控GPU/CPU利用率,业务指标追踪模型对业务的实际影响。通过Grafana和Prometheus构建统一监控面板。
# 数据漂移检测
from scipy import stats
import numpy as np
class DriftDetector:
def __init__(self, reference_data: np.ndarray, threshold: float = 0.05):
self.reference = reference_data
self.threshold = threshold
def detect(self, current_data: np.ndarray) -> dict:
# KS检验检测分布变化
stat, p_value = stats.ks_2samp(self.reference, current_data)
return {
"drift_detected": p_value < self.threshold,
"ks_statistic": stat,
"p_value": p_value,
"recommendation": "retrain" if p_value < self.threshold else "continue"
}