← 返回首页
🤖

机器学习Pipeline构建

📂 ai ⏱ 3 min 526 words

机器学习Pipeline构建

在实际机器学习项目中,数据预处理和模型训练往往需要串联成一个完整的工作流。Scikit-learn的Pipeline机制能帮你构建可复用、可部署的ML系统。

Pipeline基础

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# 加载数据
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# 创建Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
])

# 整体训练
pipe.fit(X_train, y_train)

# 整体预测
predictions = pipe.predict(X_test)

# 整体评估
score = pipe.score(X_test, y_test)
print(f"Pipeline准确率: {score:.4f}")

# 访问Pipeline中的步骤
print(f"PCA解释方差比: {pipe.named_steps['pca'].explained_variance_ratio_}")

简化的Pipeline创建

from sklearn.pipeline import make_pipeline

# 使用make_pipeline简化创建
pipe = make_pipeline(
    StandardScaler(),
    PCA(n_components=2),
    LogisticRegression()
)

print("Pipeline步骤:")
for name, step in pipe.steps:
    print(f"  {name}: {step.__class__.__name__}")

# 训练和预测
pipe.fit(X_train, y_train)
print(f"准确率: {pipe.score(X_test, y_test):.4f}")

特征处理Pipeline

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer

# 模拟真实数据
np.random.seed(42)
df = pd.DataFrame({
    'age': np.random.randint(18, 70, 100),
    'salary': np.random.normal(50000, 15000, 100),
    'city': np.random.choice(['北京', '上海', '广州', '深圳'], 100),
    'education': np.random.choice(['高中', '本科', '硕士', '博士'], 100),
    'purchased': np.random.choice([0, 1], 100)
})

# 添加一些缺失值
df.loc[np.random.choice(df.index, 5), 'salary'] = np.nan

# 定义数值特征和类别特征
numeric_features = ['age', 'salary']
categorical_features = ['city', 'education']

# 数值特征处理Pipeline
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 类别特征处理Pipeline
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# 组合特征处理
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# 完整Pipeline
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])

# 准备数据
X = df.drop('purchased', axis=1)
y = df['purchased']

# 训练
full_pipeline.fit(X, y)
print("Pipeline训练完成")

# 预测新数据
new_data = pd.DataFrame({
    'age': [25, 35],
    'salary': [40000, 80000],
    'city': ['北京', '上海'],
    'education': ['本科', '硕士']
})

predictions = full_pipeline.predict(new_data)
print(f"预测结果: {predictions}")

网格搜索与Pipeline

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

# 创建带参数的Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(random_state=42))
])

# 定义参数网格(使用双下划线访问子步骤参数)
param_grid = {
    'scaler': [StandardScaler(), None],
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [5, 10, None]
}

# 网格搜索
grid_search = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
print(f"测试集分数: {grid_search.score(X_test, y_test):.4f}")

自定义Transformer

from sklearn.base import BaseEstimator, TransformerMixin

# 自定义特征选择Transformer
class FeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, threshold=0.1):
        self.threshold = threshold
        self.selected_features_ = None
    
    def fit(self, X, y=None):
        # 计算每个特征与目标的相关性
        if y is not None:
            correlations = np.abs([np.corrcoef(X[:, i], y)[0, 1] for i in range(X.shape[1])])
            self.selected_features_ = np.where(correlations > self.threshold)[0]
        else:
            self.selected_features_ = np.arange(X.shape[1])
        return self
    
    def transform(self, X):
        return X[:, self.selected_features_]

# 使用自定义Transformer
pipe_custom = Pipeline([
    ('selector', FeatureSelector(threshold=0.3)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

pipe_custom.fit(X_train, y_train)
score = pipe_custom.score(X_test, y_test)
print(f"自定义Pipeline准确率: {score:.4f}")

Pipeline持久化

import joblib
import pickle

# 保存Pipeline
joblib.dump(full_pipeline, 'ml_pipeline.pkl')

# 加载Pipeline
loaded_pipeline = joblib.load('ml_pipeline.pkl')

# 使用加载的Pipeline预测
predictions = loaded_pipeline.predict(new_data)
print(f"加载后预测结果: {predictions}")

# 使用pickle保存
with open('ml_pipeline.pkl', 'wb') as f:
    pickle.dump(full_pipeline, f)

# 验证保存的Pipeline
with open('ml_pipeline.pkl', 'rb') as f:
    loaded_pipe = pickle.load(f)

print(f"验证加载的Pipeline: {loaded_pipe.score(X, y):.4f}")

多输出Pipeline

from sklearn.multioutput import MultiOutputClassifier
from sklearn.preprocessing import MultiLabelBinarizer

# 多输出分类Pipeline
np.random.seed(42)
X_multi = np.random.randn(200, 10)
y_multi = np.random.randint(0, 2, (200, 3))  # 3个输出

pipe_multi = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', MultiOutputClassifier(
        RandomForestClassifier(n_estimators=100, random_state=42)
    ))
])

pipe_multi.fit(X_multi, y_multi)
score = pipe_multi.score(X_multi, y_multi)
print(f"多输出Pipeline准确率: {score:.4f}")

完整项目Pipeline

from sklearn.model_selection import cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
import warnings
warnings.filterwarnings('ignore')

# 完整项目Pipeline
def create_full_pipeline(numeric_features, categorical_features):
    # 数值处理
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    # 类别处理
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    # 特征预处理
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )
    
    # 完整Pipeline
    full_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', GradientBoostingClassifier(random_state=42))
    ])
    
    return full_pipeline

# 使用示例
pipeline = create_full_pipeline(
    numeric_features=['age', 'salary'],
    categorical_features=['city', 'education']
)

# 交叉验证
cv_scores = cross_val_score(pipeline, X, y, cv=5, scoring='accuracy')
print(f"交叉验证准确率: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# 训练最终模型
pipeline.fit(X, y)
print("最终模型训练完成")

# 保存模型
joblib.dump(pipeline, 'final_pipeline.pkl')
print("模型已保存")

总结

Pipeline是构建可复用ML系统的关键工具。通过Pipeline,你可以将数据预处理、特征工程、模型训练串联成一个整体,便于部署和复现。关键要点:使用ColumnTransformer处理异构数据;通过双下划线访问子步骤参数;保存Pipeline实现模型持久化。掌握Pipeline能让你的ML项目更加专业和可维护。