LLM编排系统:构建复杂AI工作流
--- title: "LLM编排系统:构建复杂AI工作流" description: "使用LangChain、CrewAI等工具编排复杂的LLM工作流和多代理系统" tags: ["LLM", "编排", "LangChain", "多代理", "工作流"] category: "llm" icon: "🎯"
LLM编排系统:构建复杂AI工作流
什么是LLM编排
LLM编排(Orchestration)是将多个LLM调用、工具调用和外部服务组织成复杂工作流的技术。它让开发者能够构建超越单次LLM调用能力的复杂AI应用。
LangChain编排
1. 链式编排
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
# 创建模型
llm = ChatOpenAI(model="gpt-4", temperature=0)
# 定义提示模板
summarize_prompt = ChatPromptTemplate.from_template(
"请总结以下内容:\n{content}"
)
analyze_prompt = ChatPromptTemplate.from_template(
"基于以下摘要,分析关键要点:\n{summary}"
)
translate_prompt = ChatPromptTemplate.from_template(
"将以下内容翻译成英文:\n{analysis}"
)
# 构建链
chain = (
{"content": RunnablePassthrough()}
| summarize_prompt
| llm
| StrOutputParser()
| (lambda x: {"summary": x})
| analyze_prompt
| llm
| StrOutputParser()
| (lambda x: {"analysis": x})
| translate_prompt
| llm
| StrOutputParser()
)
# 执行
result = chain.invoke("大语言模型正在改变软件开发的方式,从代码生成到测试自动化...")
print(result)
2. 条件路由
from langchain.schema.runnable import RunnableBranch
# 定义条件路由
def classify_content(input_text):
if "代码" in input_text or "编程" in input_text:
return "technical"
elif "销售" in input_text or "市场" in input_text:
return "business"
return "general"
# 定义分支
branch = RunnableBranch(
(lambda x: classify_content(x["input"]) == "technical",
ChatPromptTemplate.from_template("技术专家回答:{input}") | llm),
(lambda x: classify_content(x["input"]) == "business",
ChatPromptTemplate.from_template("商业顾问回答:{input}") | llm),
ChatPromptTemplate.from_template("通用助手回答:{input}") | llm
)
# 执行
result = branch.invoke({"input": "如何优化Python代码性能?"})
CrewAI多代理编排
from crewai import Agent, Task, Crew
# 定义代理
researcher = Agent(
role="研究分析师",
goal="收集和分析市场数据",
backstory="你是一位资深的市场研究分析师",
verbose=True
)
writer = Agent(
role="内容创作者",
goal="基于研究结果创建引人入胜的内容",
backstory="你是一位专业的商业写手",
verbose=True
)
reviewer = Agent(
role="质量审核员",
goal="确保内容的准确性和质量",
backstory="你是一位严格的内容审核专家",
verbose=True
)
# 定义任务
research_task = Task(
description="研究AI在医疗领域的应用趋势",
expected_output="一份包含关键趋势和数据的报告",
agent=researcher
)
writing_task = Task(
description="基于研究结果撰写一篇行业分析文章",
expected_output="一篇1500字的专业分析文章",
agent=writer,
context=[research_task]
)
review_task = Task(
description="审核文章的准确性和质量",
expected_output="修改建议和最终版本",
agent=reviewer,
context=[writing_task]
)
# 组建团队
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, writing_task, review_task],
verbose=True
)
# 执行
result = crew.kickoff()
print(result)
自定义编排框架
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from dataclasses import dataclass
class StepType(Enum):
LLM = "llm"
TOOL = "tool"
CONDITION = "condition"
PARALLEL = "parallel"
@dataclass
class Step:
name: str
type: StepType
config: Dict[str, Any]
next_step: Optional[str] = None
class WorkflowOrchestrator:
def __init__(self):
self.steps: Dict[str, Step] = {}
self.tools: Dict[str, Callable] = {}
def add_step(self, step: Step):
self.steps[step.name] = step
return self
def register_tool(self, name: str, func: Callable):
self.tools[name] = func
def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
current_step = list(self.steps.keys())[0]
context = {"input": input_data, "history": []}
while current_step:
step = self.steps[current_step]
print(f"执行步骤: {step.name}")
if step.type == StepType.LLM:
result = self._execute_llm(step.config, context)
elif step.type == StepType.TOOL:
result = self._execute_tool(step.config, context)
elif step.type == StepType.CONDITION:
result = self._execute_condition(step.config, context)
elif step.type == StepType.PARALLEL:
result = self._execute_parallel(step.config, context)
context["history"].append({"step": step.name, "result": result})
context["output"] = result
current_step = step.next_step
return context
def _execute_llm(self, config, context):
prompt = config["prompt"].format(**context)
# 实际调用LLM
return f"LLM处理: {prompt[:50]}..."
def _execute_tool(self, config, context):
tool_name = config["tool"]
return self.tools[tool_name](context)
def _execute_condition(self, config, context):
condition = config["condition"]
if condition(context):
return config["true_value"]
return config["false_value"]
def _execute_parallel(self, config, context):
results = {}
for step_config in config["steps"]:
results[step_config["name"]] = self._execute_llm(step_config, context)
return results
# 使用示例
orchestrator = WorkflowOrchestrator()
orchestrator.add_step(Step(
name="analyze",
type=StepType.LLM,
config={"prompt": "分析输入: {input}"},
next_step="process"
))
orchestrator.add_step(Step(
name="process",
type=StepType.TOOL,
config={"tool": "format_output"},
next_step=None
))
orchestrator.register_tool("format_output", lambda ctx: f"格式化: {ctx['output']}")
result = orchestrator.execute({"content": "测试数据"})
最佳实践
- 可观测性:记录每个步骤的输入输出
- 错误处理:实现优雅的降级和重试机制
- 状态管理:维护工作流的执行状态
- 资源管理:控制并发和资源使用
总结
LLM编排是构建复杂AI应用的核心技术。通过链式调用、条件路由和多代理协作,我们可以构建超越单次LLM调用能力的智能系统。