Kubeflow:在Kubernetes上构建ML管道
--- title: "Kubeflow:在Kubernetes上构建ML管道" description: "使用Kubeflow在Kubernetes集群上构建和部署端到端的机器学习管道" tags: ["Kubeflow", "Kubernetes", "ML管道", "容器化", "MLOps"] category: "llm" icon: "☸️"
Kubeflow:在Kubernetes上构建ML管道
Kubeflow概述
Kubeflow是基于Kubernetes的机器学习平台,提供从数据准备到模型部署的完整ML工作流支持。它将ML管道打包为可移植的容器化组件。
Kubeflow Pipelines
1. 定义管道组件
from kfp import dsl
from kfp.components import load_component_from_text
# 定义组件
@dsl.component(
base_image="python:3.9",
packages_to_install=["openai"]
)
def preprocess_data(text: str) -> str:
"""预处理数据"""
cleaned = text.strip().replace("\n", " ")
return cleaned
@dsl.component(
base_image="python:3.9",
packages_to_install=["openai"]
)
def llm_generate(prompt: str) -> str:
"""LLM生成"""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
@dsl.component
def postprocess(output: str) -> str:
"""后处理"""
return output.strip()
@dsl.component
def evaluate(response: str) -> float:
"""评估结果"""
return len(response) / 100.0
2. 定义管道
@dsl.pipeline(
name="llm-pipeline",
description="LLM处理管道"
)
def llm_pipeline(text: str):
"""LLM管道"""
# 预处理
cleaned_text = preprocess_data(text=text)
# LLM生成
llm_output = llm_generate(prompt=f"处理以下文本: {cleaned_text}")
# 后处理
final_output = postprocess(output=llm_output)
# 评估
score = evaluate(response=final_output)
# 编译管道
from kfp.compiler import Compiler
Compiler().compile(llm_pipeline, "llm_pipeline.yaml")
完整ML管道示例
@dsl.pipeline(
name="end-to-end-ml-pipeline",
description="端到端机器学习管道"
)
def ml_pipeline(
data_path: str,
model_name: str = "gpt-4"
):
"""端到端ML管道"""
# 数据准备阶段
data_op = prepare_data(data_path=data_path)
# 训练阶段
train_op = train_model(
train_data=data_op.outputs["train_data"],
val_data=data_op.outputs["val_data"]
)
# 评估阶段
eval_op = evaluate_model(
model=train_op.outputs["model"],
test_data=data_op.outputs["test_data"]
)
# 部署阶段
with dsl.Condition(eval_op.outputs["accuracy"] > 0.8):
deploy_op = deploy_model(
model=train_op.outputs["model"],
model_name=model_name
)
组件实现
1. 数据处理组件
@dsl.component(
base_image="python:3.9",
packages_to_install=["pandas", "scikit-learn"]
)
def prepare_data(data_path: str):
"""准备训练数据"""
import pandas as pd
from sklearn.model_selection import train_test_split
# 加载数据
df = pd.read_json(data_path)
# 划分数据集
train, test = train_test_split(df, test_size=0.2)
train, val = train_test_split(train, test_size=0.1)
# 保存数据
train.to_json("train.json", orient="records")
val.to_json("val.json", orient="records")
test.to_json("test.json", orient="records")
return {
"train_data": "train.json",
"val_data": "val.json",
"test_data": "test.json"
}
2. 模型训练组件
@dsl.component(
base_image="python:3.9",
packages_to_install=["transformers", "torch"]
)
def train_model(train_data: str, val_data: str):
"""训练模型"""
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import TrainingArguments, Trainer
# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# 训练参数
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=4,
learning_rate=2e-5,
evaluation_strategy="epoch",
)
# 训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_data,
eval_dataset=val_data,
)
trainer.train()
# 保存模型
model.save_pretrained("./trained_model")
return {"model": "./trained_model"}
部署与管理
# 使用Kubeflow SDK部署管道
import kfp
client = kfp.Client(host="http://localhost:8080")
# 上传管道
client.upload_pipeline(
pipeline_package_path="llm_pipeline.yaml",
pipeline_name="llm-pipeline-v1"
)
# 运行管道
run = client.run_pipeline(
experiment_name="llm-experiments",
pipeline_name="llm-pipeline-v1",
params={"text": "测试输入"}
)
最佳实践
- 组件复用:将常用操作封装为可复用组件
- 版本控制:对管道和组件进行版本管理
- 资源管理:合理配置CPU/GPU资源
- 监控告警:设置管道执行监控和失败告警
总结
Kubeflow提供了在Kubernetes上构建ML管道的完整解决方案。通过容器化组件和声明式管道定义,可以构建可移植、可扩展的机器学习工作流。