机器学习Pipeline构建
机器学习Pipeline构建
在实际机器学习项目中,数据预处理和模型训练往往需要串联成一个完整的工作流。Scikit-learn的Pipeline机制能帮你构建可复用、可部署的ML系统。
Pipeline基础
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# 加载数据
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# 创建Pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=2)),
('classifier', LogisticRegression())
])
# 整体训练
pipe.fit(X_train, y_train)
# 整体预测
predictions = pipe.predict(X_test)
# 整体评估
score = pipe.score(X_test, y_test)
print(f"Pipeline准确率: {score:.4f}")
# 访问Pipeline中的步骤
print(f"PCA解释方差比: {pipe.named_steps['pca'].explained_variance_ratio_}")
简化的Pipeline创建
from sklearn.pipeline import make_pipeline
# 使用make_pipeline简化创建
pipe = make_pipeline(
StandardScaler(),
PCA(n_components=2),
LogisticRegression()
)
print("Pipeline步骤:")
for name, step in pipe.steps:
print(f" {name}: {step.__class__.__name__}")
# 训练和预测
pipe.fit(X_train, y_train)
print(f"准确率: {pipe.score(X_test, y_test):.4f}")
特征处理Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
# 模拟真实数据
np.random.seed(42)
df = pd.DataFrame({
'age': np.random.randint(18, 70, 100),
'salary': np.random.normal(50000, 15000, 100),
'city': np.random.choice(['北京', '上海', '广州', '深圳'], 100),
'education': np.random.choice(['高中', '本科', '硕士', '博士'], 100),
'purchased': np.random.choice([0, 1], 100)
})
# 添加一些缺失值
df.loc[np.random.choice(df.index, 5), 'salary'] = np.nan
# 定义数值特征和类别特征
numeric_features = ['age', 'salary']
categorical_features = ['city', 'education']
# 数值特征处理Pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 类别特征处理Pipeline
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# 组合特征处理
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# 完整Pipeline
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', LogisticRegression(max_iter=1000))
])
# 准备数据
X = df.drop('purchased', axis=1)
y = df['purchased']
# 训练
full_pipeline.fit(X, y)
print("Pipeline训练完成")
# 预测新数据
new_data = pd.DataFrame({
'age': [25, 35],
'salary': [40000, 80000],
'city': ['北京', '上海'],
'education': ['本科', '硕士']
})
predictions = full_pipeline.predict(new_data)
print(f"预测结果: {predictions}")
网格搜索与Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# 创建带参数的Pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# 定义参数网格(使用双下划线访问子步骤参数)
param_grid = {
'scaler': [StandardScaler(), None],
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [5, 10, None]
}
# 网格搜索
grid_search = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
print(f"测试集分数: {grid_search.score(X_test, y_test):.4f}")
自定义Transformer
from sklearn.base import BaseEstimator, TransformerMixin
# 自定义特征选择Transformer
class FeatureSelector(BaseEstimator, TransformerMixin):
def __init__(self, threshold=0.1):
self.threshold = threshold
self.selected_features_ = None
def fit(self, X, y=None):
# 计算每个特征与目标的相关性
if y is not None:
correlations = np.abs([np.corrcoef(X[:, i], y)[0, 1] for i in range(X.shape[1])])
self.selected_features_ = np.where(correlations > self.threshold)[0]
else:
self.selected_features_ = np.arange(X.shape[1])
return self
def transform(self, X):
return X[:, self.selected_features_]
# 使用自定义Transformer
pipe_custom = Pipeline([
('selector', FeatureSelector(threshold=0.3)),
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
pipe_custom.fit(X_train, y_train)
score = pipe_custom.score(X_test, y_test)
print(f"自定义Pipeline准确率: {score:.4f}")
Pipeline持久化
import joblib
import pickle
# 保存Pipeline
joblib.dump(full_pipeline, 'ml_pipeline.pkl')
# 加载Pipeline
loaded_pipeline = joblib.load('ml_pipeline.pkl')
# 使用加载的Pipeline预测
predictions = loaded_pipeline.predict(new_data)
print(f"加载后预测结果: {predictions}")
# 使用pickle保存
with open('ml_pipeline.pkl', 'wb') as f:
pickle.dump(full_pipeline, f)
# 验证保存的Pipeline
with open('ml_pipeline.pkl', 'rb') as f:
loaded_pipe = pickle.load(f)
print(f"验证加载的Pipeline: {loaded_pipe.score(X, y):.4f}")
多输出Pipeline
from sklearn.multioutput import MultiOutputClassifier
from sklearn.preprocessing import MultiLabelBinarizer
# 多输出分类Pipeline
np.random.seed(42)
X_multi = np.random.randn(200, 10)
y_multi = np.random.randint(0, 2, (200, 3)) # 3个输出
pipe_multi = Pipeline([
('scaler', StandardScaler()),
('classifier', MultiOutputClassifier(
RandomForestClassifier(n_estimators=100, random_state=42)
))
])
pipe_multi.fit(X_multi, y_multi)
score = pipe_multi.score(X_multi, y_multi)
print(f"多输出Pipeline准确率: {score:.4f}")
完整项目Pipeline
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
import warnings
warnings.filterwarnings('ignore')
# 完整项目Pipeline
def create_full_pipeline(numeric_features, categorical_features):
# 数值处理
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 类别处理
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# 特征预处理
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# 完整Pipeline
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(random_state=42))
])
return full_pipeline
# 使用示例
pipeline = create_full_pipeline(
numeric_features=['age', 'salary'],
categorical_features=['city', 'education']
)
# 交叉验证
cv_scores = cross_val_score(pipeline, X, y, cv=5, scoring='accuracy')
print(f"交叉验证准确率: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
# 训练最终模型
pipeline.fit(X, y)
print("最终模型训练完成")
# 保存模型
joblib.dump(pipeline, 'final_pipeline.pkl')
print("模型已保存")
总结
Pipeline是构建可复用ML系统的关键工具。通过Pipeline,你可以将数据预处理、特征工程、模型训练串联成一个整体,便于部署和复现。关键要点:使用ColumnTransformer处理异构数据;通过双下划线访问子步骤参数;保存Pipeline实现模型持久化。掌握Pipeline能让你的ML项目更加专业和可维护。