← 返回首页
🧠

Kubeflow:在Kubernetes上构建ML管道

📂 llm ⏱ 2 min 302 words

--- title: "Kubeflow:在Kubernetes上构建ML管道" description: "使用Kubeflow在Kubernetes集群上构建和部署端到端的机器学习管道" tags: ["Kubeflow", "Kubernetes", "ML管道", "容器化", "MLOps"] category: "llm" icon: "☸️"

Kubeflow:在Kubernetes上构建ML管道

Kubeflow概述

Kubeflow是基于Kubernetes的机器学习平台,提供从数据准备到模型部署的完整ML工作流支持。它将ML管道打包为可移植的容器化组件。

Kubeflow Pipelines

1. 定义管道组件

from kfp import dsl
from kfp.components import load_component_from_text

# 定义组件
@dsl.component(
    base_image="python:3.9",
    packages_to_install=["openai"]
)
def preprocess_data(text: str) -> str:
    """预处理数据"""
    cleaned = text.strip().replace("\n", " ")
    return cleaned

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["openai"]
)
def llm_generate(prompt: str) -> str:
    """LLM生成"""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

@dsl.component
def postprocess(output: str) -> str:
    """后处理"""
    return output.strip()

@dsl.component
def evaluate(response: str) -> float:
    """评估结果"""
    return len(response) / 100.0

2. 定义管道

@dsl.pipeline(
    name="llm-pipeline",
    description="LLM处理管道"
)
def llm_pipeline(text: str):
    """LLM管道"""
    # 预处理
    cleaned_text = preprocess_data(text=text)
    
    # LLM生成
    llm_output = llm_generate(prompt=f"处理以下文本: {cleaned_text}")
    
    # 后处理
    final_output = postprocess(output=llm_output)
    
    # 评估
    score = evaluate(response=final_output)

# 编译管道
from kfp.compiler import Compiler
Compiler().compile(llm_pipeline, "llm_pipeline.yaml")

完整ML管道示例

@dsl.pipeline(
    name="end-to-end-ml-pipeline",
    description="端到端机器学习管道"
)
def ml_pipeline(
    data_path: str,
    model_name: str = "gpt-4"
):
    """端到端ML管道"""
    
    # 数据准备阶段
    data_op = prepare_data(data_path=data_path)
    
    # 训练阶段
    train_op = train_model(
        train_data=data_op.outputs["train_data"],
        val_data=data_op.outputs["val_data"]
    )
    
    # 评估阶段
    eval_op = evaluate_model(
        model=train_op.outputs["model"],
        test_data=data_op.outputs["test_data"]
    )
    
    # 部署阶段
    with dsl.Condition(eval_op.outputs["accuracy"] > 0.8):
        deploy_op = deploy_model(
            model=train_op.outputs["model"],
            model_name=model_name
        )

组件实现

1. 数据处理组件

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn"]
)
def prepare_data(data_path: str):
    """准备训练数据"""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    # 加载数据
    df = pd.read_json(data_path)
    
    # 划分数据集
    train, test = train_test_split(df, test_size=0.2)
    train, val = train_test_split(train, test_size=0.1)
    
    # 保存数据
    train.to_json("train.json", orient="records")
    val.to_json("val.json", orient="records")
    test.to_json("test.json", orient="records")
    
    return {
        "train_data": "train.json",
        "val_data": "val.json",
        "test_data": "test.json"
    }

2. 模型训练组件

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["transformers", "torch"]
)
def train_model(train_data: str, val_data: str):
    """训练模型"""
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from transformers import TrainingArguments, Trainer
    
    # 加载模型
    model_name = "meta-llama/Llama-2-7b-hf"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    
    # 训练参数
    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        learning_rate=2e-5,
        evaluation_strategy="epoch",
    )
    
    # 训练
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_data,
        eval_dataset=val_data,
    )
    trainer.train()
    
    # 保存模型
    model.save_pretrained("./trained_model")
    
    return {"model": "./trained_model"}

部署与管理

# 使用Kubeflow SDK部署管道
import kfp

client = kfp.Client(host="http://localhost:8080")

# 上传管道
client.upload_pipeline(
    pipeline_package_path="llm_pipeline.yaml",
    pipeline_name="llm-pipeline-v1"
)

# 运行管道
run = client.run_pipeline(
    experiment_name="llm-experiments",
    pipeline_name="llm-pipeline-v1",
    params={"text": "测试输入"}
)

最佳实践

  1. 组件复用:将常用操作封装为可复用组件
  2. 版本控制:对管道和组件进行版本管理
  3. 资源管理:合理配置CPU/GPU资源
  4. 监控告警:设置管道执行监控和失败告警

总结

Kubeflow提供了在Kubernetes上构建ML管道的完整解决方案。通过容器化组件和声明式管道定义,可以构建可移植、可扩展的机器学习工作流。