← 返回首页
🧠

Prefect与LLM:现代工作流编排

📂 llm ⏱ 2 min 391 words

--- title: "Prefect与LLM:现代工作流编排" description: "使用Prefect构建灵活、可观察的LLM工作流" tags: ["Prefect", "LLM", "工作流", "Python", "编排"] category: "llm" icon: "🔮"

Prefect与LLM:现代工作流编排

Prefect概述

Prefect是一个现代工作流编排框架,使用Python原生语法定义工作流,提供优秀的可观测性和错误处理能力。

基础工作流定义

1. 基本流程

from prefect import flow, task
from prefect.tasks import task_input_hash
from openai import OpenAI
from datetime import timedelta

@task(retries=3, retry_delay_seconds=10)
def fetch_data(source: str) -> list:
    """获取数据"""
    print(f"从 {source} 获取数据...")
    return [{"content": "示例内容"}]

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def process_text(text: str) -> str:
    """处理文本"""
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"处理文本: {text}"}]
    )
    return response.choices[0].message.content

@task
def analyze_results(results: list) -> dict:
    """分析结果"""
    return {"total": len(results), "processed": results}

@flow(name="llm-workflow")
def llm_workflow(source: str):
    """LLM工作流主流程"""
    # 获取数据
    data = fetch_data(source)
    
    # 处理每个项目
    processed = []
    for item in data:
        result = process_text(item["content"])
        processed.append(result)
    
    # 分析结果
    analysis = analyze_results(processed)
    
    return analysis

# 执行工作流
if __name__ == "__main__":
    result = llm_workflow("database")
    print(f"工作流完成: {result}")

2. 条件分支

from prefect import flow, task
from prefect.tasks import task_input_hash

@task
def classify_input(text: str) -> str:
    """分类输入"""
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"将以下内容分类为: 技术、业务、其他。只返回分类名称。\n{text}"
        }]
    )
    return response.choices[0].message.content.strip()

@task
def handle_technical(text: str) -> str:
    """处理技术内容"""
    return f"技术分析: {text}"

@task
def handle_business(text: str) -> str:
    """处理业务内容"""
    return f"业务分析: {text}"

@task
def handle_other(text: str) -> str:
    """处理其他内容"""
    return f"通用处理: {text}"

@flow(name="conditional-llm")
def conditional_workflow(text: str):
    """条件分支工作流"""
    category = classify_input(text)
    print(f"分类结果: {category}")
    
    if "技术" in category:
        return handle_technical(text)
    elif "业务" in category:
        return handle_business(text)
    else:
        return handle_other(text)

高级特性

1. 并行执行

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def parallel_process(item: dict) -> dict:
    """并行处理"""
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"处理: {item['content']}"}]
    )
    return {"original": item, "result": response.choices[0].message.content}

@flow(task_runner=ConcurrentTaskRunner(max_workers=5))
def parallel_workflow(items: list):
    """并行工作流"""
    futures = [parallel_process.submit(item) for item in items]
    results = [f.result() for f in futures]
    return results

# 使用
items = [{"content": f"项目{i}"} for i in range(10)]
results = parallel_workflow(items)

2. 子流程调用

@flow(name="sub-process")
def sub_process(data: dict) -> dict:
    """子流程"""
    result = process_text(data["content"])
    return {"processed": result}

@flow(name="main-process")
def main_process(data_list: list):
    """主流程"""
    results = []
    for data in data_list:
        sub_result = sub_process(data)
        results.append(sub_result)
    return results

3. 异步执行

from prefect import flow, task
import asyncio

@task
async def async_process(text: str) -> str:
    """异步处理"""
    await asyncio.sleep(1)  # 模拟异步操作
    return f"异步处理: {text}"

@flow
async def async_workflow(items: list):
    """异步工作流"""
    tasks = [async_process(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results

# 执行
if __name__ == "__main__":
    asyncio.run(async_workflow(["a", "b", "c"]))

监控与部署

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# 部署工作流
deployment = Deployment.build_from_flow(
    flow=llm_workflow,
    name="daily-llm-workflow",
    parameters={"source": "database"},
    schedule=CronSchedule(cron="0 9 * * *"),  # 每天9点执行
    work_pool_name="default",
)

deployment.apply()

最佳实践

  1. 任务缓存:使用缓存避免重复计算
  2. 重试机制:为LLM调用设置合理的重试策略
  3. 并发控制:使用任务运行器控制并发数
  4. 日志记录:充分利用Prefect的日志功能

总结

Prefect为LLM工作流提供了现代化的编排方案。通过Python原生语法、优秀的可观测性和灵活的部署选项,可以快速构建可靠的AI工作流。