← 返回首页
🧠

LLM编排系统:构建复杂AI工作流

📂 llm ⏱ 3 min 438 words

--- title: "LLM编排系统:构建复杂AI工作流" description: "使用LangChain、CrewAI等工具编排复杂的LLM工作流和多代理系统" tags: ["LLM", "编排", "LangChain", "多代理", "工作流"] category: "llm" icon: "🎯"

LLM编排系统:构建复杂AI工作流

什么是LLM编排

LLM编排(Orchestration)是将多个LLM调用、工具调用和外部服务组织成复杂工作流的技术。它让开发者能够构建超越单次LLM调用能力的复杂AI应用。

LangChain编排

1. 链式编排

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough

# 创建模型
llm = ChatOpenAI(model="gpt-4", temperature=0)

# 定义提示模板
summarize_prompt = ChatPromptTemplate.from_template(
    "请总结以下内容:\n{content}"
)

analyze_prompt = ChatPromptTemplate.from_template(
    "基于以下摘要,分析关键要点:\n{summary}"
)

translate_prompt = ChatPromptTemplate.from_template(
    "将以下内容翻译成英文:\n{analysis}"
)

# 构建链
chain = (
    {"content": RunnablePassthrough()}
    | summarize_prompt
    | llm
    | StrOutputParser()
    | (lambda x: {"summary": x})
    | analyze_prompt
    | llm
    | StrOutputParser()
    | (lambda x: {"analysis": x})
    | translate_prompt
    | llm
    | StrOutputParser()
)

# 执行
result = chain.invoke("大语言模型正在改变软件开发的方式,从代码生成到测试自动化...")
print(result)

2. 条件路由

from langchain.schema.runnable import RunnableBranch

# 定义条件路由
def classify_content(input_text):
    if "代码" in input_text or "编程" in input_text:
        return "technical"
    elif "销售" in input_text or "市场" in input_text:
        return "business"
    return "general"

# 定义分支
branch = RunnableBranch(
    (lambda x: classify_content(x["input"]) == "technical",
     ChatPromptTemplate.from_template("技术专家回答:{input}") | llm),
    (lambda x: classify_content(x["input"]) == "business",
     ChatPromptTemplate.from_template("商业顾问回答:{input}") | llm),
    ChatPromptTemplate.from_template("通用助手回答:{input}") | llm
)

# 执行
result = branch.invoke({"input": "如何优化Python代码性能?"})

CrewAI多代理编排

from crewai import Agent, Task, Crew

# 定义代理
researcher = Agent(
    role="研究分析师",
    goal="收集和分析市场数据",
    backstory="你是一位资深的市场研究分析师",
    verbose=True
)

writer = Agent(
    role="内容创作者",
    goal="基于研究结果创建引人入胜的内容",
    backstory="你是一位专业的商业写手",
    verbose=True
)

reviewer = Agent(
    role="质量审核员",
    goal="确保内容的准确性和质量",
    backstory="你是一位严格的内容审核专家",
    verbose=True
)

# 定义任务
research_task = Task(
    description="研究AI在医疗领域的应用趋势",
    expected_output="一份包含关键趋势和数据的报告",
    agent=researcher
)

writing_task = Task(
    description="基于研究结果撰写一篇行业分析文章",
    expected_output="一篇1500字的专业分析文章",
    agent=writer,
    context=[research_task]
)

review_task = Task(
    description="审核文章的准确性和质量",
    expected_output="修改建议和最终版本",
    agent=reviewer,
    context=[writing_task]
)

# 组建团队
crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, writing_task, review_task],
    verbose=True
)

# 执行
result = crew.kickoff()
print(result)

自定义编排框架

from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from dataclasses import dataclass

class StepType(Enum):
    LLM = "llm"
    TOOL = "tool"
    CONDITION = "condition"
    PARALLEL = "parallel"

@dataclass
class Step:
    name: str
    type: StepType
    config: Dict[str, Any]
    next_step: Optional[str] = None

class WorkflowOrchestrator:
    def __init__(self):
        self.steps: Dict[str, Step] = {}
        self.tools: Dict[str, Callable] = {}
    
    def add_step(self, step: Step):
        self.steps[step.name] = step
        return self
    
    def register_tool(self, name: str, func: Callable):
        self.tools[name] = func
    
    def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        current_step = list(self.steps.keys())[0]
        context = {"input": input_data, "history": []}
        
        while current_step:
            step = self.steps[current_step]
            print(f"执行步骤: {step.name}")
            
            if step.type == StepType.LLM:
                result = self._execute_llm(step.config, context)
            elif step.type == StepType.TOOL:
                result = self._execute_tool(step.config, context)
            elif step.type == StepType.CONDITION:
                result = self._execute_condition(step.config, context)
            elif step.type == StepType.PARALLEL:
                result = self._execute_parallel(step.config, context)
            
            context["history"].append({"step": step.name, "result": result})
            context["output"] = result
            current_step = step.next_step
        
        return context
    
    def _execute_llm(self, config, context):
        prompt = config["prompt"].format(**context)
        # 实际调用LLM
        return f"LLM处理: {prompt[:50]}..."
    
    def _execute_tool(self, config, context):
        tool_name = config["tool"]
        return self.tools[tool_name](context)
    
    def _execute_condition(self, config, context):
        condition = config["condition"]
        if condition(context):
            return config["true_value"]
        return config["false_value"]
    
    def _execute_parallel(self, config, context):
        results = {}
        for step_config in config["steps"]:
            results[step_config["name"]] = self._execute_llm(step_config, context)
        return results

# 使用示例
orchestrator = WorkflowOrchestrator()
orchestrator.add_step(Step(
    name="analyze",
    type=StepType.LLM,
    config={"prompt": "分析输入: {input}"},
    next_step="process"
))
orchestrator.add_step(Step(
    name="process",
    type=StepType.TOOL,
    config={"tool": "format_output"},
    next_step=None
))

orchestrator.register_tool("format_output", lambda ctx: f"格式化: {ctx['output']}")
result = orchestrator.execute({"content": "测试数据"})

最佳实践

  1. 可观测性:记录每个步骤的输入输出
  2. 错误处理:实现优雅的降级和重试机制
  3. 状态管理:维护工作流的执行状态
  4. 资源管理:控制并发和资源使用

总结

LLM编排是构建复杂AI应用的核心技术。通过链式调用、条件路由和多代理协作,我们可以构建超越单次LLM调用能力的智能系统。