Prefect与LLM:现代工作流编排
--- title: "Prefect与LLM:现代工作流编排" description: "使用Prefect构建灵活、可观察的LLM工作流" tags: ["Prefect", "LLM", "工作流", "Python", "编排"] category: "llm" icon: "🔮"
Prefect与LLM:现代工作流编排
Prefect概述
Prefect是一个现代工作流编排框架,使用Python原生语法定义工作流,提供优秀的可观测性和错误处理能力。
基础工作流定义
1. 基本流程
from prefect import flow, task
from prefect.tasks import task_input_hash
from openai import OpenAI
from datetime import timedelta
@task(retries=3, retry_delay_seconds=10)
def fetch_data(source: str) -> list:
"""获取数据"""
print(f"从 {source} 获取数据...")
return [{"content": "示例内容"}]
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def process_text(text: str) -> str:
"""处理文本"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"处理文本: {text}"}]
)
return response.choices[0].message.content
@task
def analyze_results(results: list) -> dict:
"""分析结果"""
return {"total": len(results), "processed": results}
@flow(name="llm-workflow")
def llm_workflow(source: str):
"""LLM工作流主流程"""
# 获取数据
data = fetch_data(source)
# 处理每个项目
processed = []
for item in data:
result = process_text(item["content"])
processed.append(result)
# 分析结果
analysis = analyze_results(processed)
return analysis
# 执行工作流
if __name__ == "__main__":
result = llm_workflow("database")
print(f"工作流完成: {result}")
2. 条件分支
from prefect import flow, task
from prefect.tasks import task_input_hash
@task
def classify_input(text: str) -> str:
"""分类输入"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"将以下内容分类为: 技术、业务、其他。只返回分类名称。\n{text}"
}]
)
return response.choices[0].message.content.strip()
@task
def handle_technical(text: str) -> str:
"""处理技术内容"""
return f"技术分析: {text}"
@task
def handle_business(text: str) -> str:
"""处理业务内容"""
return f"业务分析: {text}"
@task
def handle_other(text: str) -> str:
"""处理其他内容"""
return f"通用处理: {text}"
@flow(name="conditional-llm")
def conditional_workflow(text: str):
"""条件分支工作流"""
category = classify_input(text)
print(f"分类结果: {category}")
if "技术" in category:
return handle_technical(text)
elif "业务" in category:
return handle_business(text)
else:
return handle_other(text)
高级特性
1. 并行执行
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def parallel_process(item: dict) -> dict:
"""并行处理"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"处理: {item['content']}"}]
)
return {"original": item, "result": response.choices[0].message.content}
@flow(task_runner=ConcurrentTaskRunner(max_workers=5))
def parallel_workflow(items: list):
"""并行工作流"""
futures = [parallel_process.submit(item) for item in items]
results = [f.result() for f in futures]
return results
# 使用
items = [{"content": f"项目{i}"} for i in range(10)]
results = parallel_workflow(items)
2. 子流程调用
@flow(name="sub-process")
def sub_process(data: dict) -> dict:
"""子流程"""
result = process_text(data["content"])
return {"processed": result}
@flow(name="main-process")
def main_process(data_list: list):
"""主流程"""
results = []
for data in data_list:
sub_result = sub_process(data)
results.append(sub_result)
return results
3. 异步执行
from prefect import flow, task
import asyncio
@task
async def async_process(text: str) -> str:
"""异步处理"""
await asyncio.sleep(1) # 模拟异步操作
return f"异步处理: {text}"
@flow
async def async_workflow(items: list):
"""异步工作流"""
tasks = [async_process(item) for item in items]
results = await asyncio.gather(*tasks)
return results
# 执行
if __name__ == "__main__":
asyncio.run(async_workflow(["a", "b", "c"]))
监控与部署
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
# 部署工作流
deployment = Deployment.build_from_flow(
flow=llm_workflow,
name="daily-llm-workflow",
parameters={"source": "database"},
schedule=CronSchedule(cron="0 9 * * *"), # 每天9点执行
work_pool_name="default",
)
deployment.apply()
最佳实践
- 任务缓存:使用缓存避免重复计算
- 重试机制:为LLM调用设置合理的重试策略
- 并发控制:使用任务运行器控制并发数
- 日志记录:充分利用Prefect的日志功能
总结
Prefect为LLM工作流提供了现代化的编排方案。通过Python原生语法、优秀的可观测性和灵活的部署选项,可以快速构建可靠的AI工作流。