← 返回首页
🤖

AI Agent架构:规划、记忆与多Agent协作

📂 architecture ⏱ 7 min 1251 words

AI Agent架构:规划、记忆与多Agent协作

Agent核心架构

AI Agent是能够自主感知环境、制定计划并执行行动来完成复杂任务的智能系统。核心组件包括:感知模块(接收输入)、规划模块(制定行动方案)、记忆模块(存储经验和知识)、执行模块(调用工具完成任务)。

# Agent核心类定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from abc import ABC, abstractmethod
from enum import Enum
import uuid

class AgentState(Enum):
    IDLE = "idle"
    THINKING = "thinking"
    ACTING = "acting"
    WAITING = "waiting"

@dataclass
class Tool:
    name: str
    description: str
    function: Callable
    parameters: Dict = field(default_factory=dict)

@dataclass
class Action:
    tool_name: str
    parameters: Dict
    action_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])

@dataclass
class Observation:
    action_id: str
    result: Any
    success: bool
    error: Optional[str] = None

@dataclass
class Thought:
    content: str
    reasoning: str
    confidence: float = 1.0

class BaseAgent(ABC):
    def __init__(self, name: str, llm_client, tools: List[Tool] = None):
        self.name = name
        self.llm = llm_client
        self.tools = {tool.name: tool for tool in (tools or [])}
        self.state = AgentState.IDLE
        self.memory = AgentMemory()
        self.max_iterations = 10
    
    async def run(self, task: str) -> dict:
        self.state = AgentState.THINKING
        context = {"task": task, "history": []}
        
        for i in range(self.max_iterations):
            # 思考
            thought = await self._think(context)
            context["history"].append({"thought": thought})
            
            # 决定行动
            action = await self._plan_action(thought, context)
            
            if action is None:
                # 完成任务
                return await self._finalize(context)
            
            # 执行行动
            self.state = AgentState.ACTING
            observation = await self._act(action)
            context["history"].append({"action": action, "observation": observation})
            
            # 更新记忆
            self.memory.add_episode(thought, action, observation)
        
        return {"status": "max_iterations_reached", "task": task}
    
    @abstractmethod
    async def _think(self, context: Dict) -> Thought:
        pass
    
    @abstractmethod
    async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
        pass
    
    async def _act(self, action: Action) -> Observation:
        tool = self.tools.get(action.tool_name)
        if not tool:
            return Observation(
                action_id=action.action_id,
                result=None,
                success=False,
                error=f"Tool not found: {action.tool_name}"
            )
        
        try:
            result = await tool.function(**action.parameters)
            return Observation(
                action_id=action.action_id,
                result=result,
                success=True
            )
        except Exception as e:
            return Observation(
                action_id=action.action_id,
                result=None,
                success=False,
                error=str(e)
            )
    
    async def _finalize(self, context: Dict) -> dict:
        self.state = AgentState.IDLE
        return {
            "status": "completed",
            "task": context["task"],
            "iterations": len(context["history"])
        }

任务规划与分解

复杂任务需要分解为可执行的子任务。规划策略包括:递归分解(将大任务拆分为小任务)、层次规划(多级抽象)、动态规划(根据执行结果调整计划)。

# 任务规划器
@dataclass
class Task:
    id: str
    description: str
    status: str = "pending"
    subtasks: List['Task'] = field(default_factory=list)
    dependencies: List[str] = field(default_factory=list)
    result: Any = None

class TaskPlanner:
    def __init__(self, llm_client):
        self.llm = llm_client
    
    async def decompose(self, task_description: str) -> List[Task]:
        """将复杂任务分解为子任务"""
        prompt = f"""将以下任务分解为可执行的子任务:

任务:{task_description}

请列出需要完成的子任务,每个任务一行:"""
        
        response = await self.llm.generate(prompt)
        subtask_descriptions = [line.strip() for line in response.split('\n') if line.strip()]
        
        tasks = []
        for i, desc in enumerate(subtask_descriptions):
            tasks.append(Task(
                id=f"subtask_{i}",
                description=desc,
                dependencies=[f"subtask_{j}" for j in range(i)] if i > 0 else []
            ))
        
        return tasks
    
    async def replan(self, current_task: Task, 
                     completed_tasks: List[Task]) -> List[Task]:
        """根据执行结果重新规划"""
        completed_ids = {t.id for t in completed_tasks}
        remaining = [t for t in current_task.subtasks 
                    if t.id not in completed_ids and 
                    all(d in completed_ids for d in t.dependencies)]
        
        if not remaining:
            return []
        
        # 重新评估剩余任务
        prompt = f"""基于已完成的任务,重新规划剩余任务:

已完成:
{chr(10).join(f'- {t.description}: {t.status}' for t in completed_tasks)}

剩余任务:
{chr(10).join(f'- {t.description}' for t in remaining)}

是否需要调整任务?调整后的任务列表:"""
        
        response = await self.llm.generate(prompt)
        return remaining  # 简化示例

class HierarchicalPlanner:
    """层次规划器"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
    
    async def plan(self, goal: str, depth: int = 3) -> Dict:
        """多层次规划"""
        plan = {"goal": goal, "levels": []}
        
        # 高层计划
        high_level = await self._create_high_level_plan(goal)
        plan["levels"].append(high_level)
        
        # 逐层细化
        for level_plan in high_level.get("tasks", []):
            if depth > 1:
                detailed = await self.plan(level_plan["description"], depth - 1)
                plan["levels"].append(detailed)
        
        return plan
    
    async def _create_high_level_plan(self, goal: str) -> Dict:
        return {
            "description": f"Plan for: {goal}",
            "tasks": [
                {"id": "phase_1", "description": "分析和准备"},
                {"id": "phase_2", "description": "执行主要工作"},
                {"id": "phase_3", "description": "验证和完善"}
            ]
        }

记忆系统设计

Agent的记忆系统需要支持短期记忆(当前对话)、工作记忆(任务上下文)和长期记忆(历史经验)。记忆管理包括存储、检索、遗忘和压缩。

# Agent记忆系统
from datetime import datetime
import json

@dataclass
class MemoryEntry:
    content: str
    timestamp: datetime
    importance: float
    access_count: int = 0
    embedding: Optional[Any] = None
    metadata: Dict = field(default_factory=dict)

class AgentMemory:
    def __init__(self, short_term_limit: int = 20, long_term_limit: int = 1000):
        self.short_term = []  # 短期记忆(最近交互)
        self.working_memory = {}  # 工作记忆(当前任务上下文)
        self.long_term = []  # 长期记忆(重要经验)
        self.short_term_limit = short_term_limit
        self.long_term_limit = long_term_limit
    
    def add_episode(self, thought: Thought, action: Action, 
                   observation: Observation):
        """添加一个经验到记忆"""
        entry = MemoryEntry(
            content=json.dumps({
                "thought": thought.content,
                "action": action.tool_name,
                "result": str(observation.result)[:200]
            }),
            timestamp=datetime.now(),
            importance=thought.confidence
        )
        
        self.short_term.append(entry)
        
        # 如果短期记忆满了,将不重要的移入长期记忆
        if len(self.short_term) > self.short_term_limit:
            self._consolidate()
    
    def update_working_memory(self, key: str, value: Any):
        """更新工作记忆"""
        self.working_memory[key] = value
    
    def recall(self, query: str, k: int = 5) -> List[MemoryEntry]:
        """检索相关记忆"""
        all_memories = self.short_term + self.long_term
        
        # 简单的相关性计算(实际应用中使用embedding相似度)
        scored = []
        for entry in all_memories:
            score = self._calculate_relevance(query, entry)
            scored.append((score, entry))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [entry for _, entry in scored[:k]]
    
    def _consolidate(self):
        """记忆整合:将短期记忆移入长期记忆"""
        # 按重要性排序
        self.short_term.sort(key=lambda x: x.importance, reverse=True)
        
        # 保留最重要的在短期记忆
        to_archive = self.short_term[self.short_term_limit // 2:]
        self.short_term = self.short_term[:self.short_term_limit // 2]
        
        # 添加到长期记忆
        self.long_term.extend(to_archive)
        
        # 如果长期记忆满了,删除最不重要的
        if len(self.long_term) > self.long_term_limit:
            self.long_term.sort(key=lambda x: x.importance, reverse=True)
            self.long_term = self.long_term[:self.long_term_limit]
    
    def _calculate_relevance(self, query: str, entry: MemoryEntry) -> float:
        # 简单的关键词匹配
        query_words = set(query.lower().split())
        content_words = set(entry.content.lower().split())
        overlap = len(query_words & content_words)
        return overlap / max(len(query_words), 1)

# 记忆检索器
class MemoryRetriever:
    def __init__(self, memory: AgentMemory, embedding_model):
        self.memory = memory
        self.embedding_model = embedding_model
    
    async def retrieve_with_context(self, query: str, 
                                   context_window: int = 3) -> Dict:
        """带上下文的记忆检索"""
        relevant_memories = self.memory.recall(query, k=context_window)
        
        context_parts = []
        for mem in relevant_memories:
            context_parts.append({
                "content": mem.content,
                "timestamp": mem.timestamp.isoformat(),
                "importance": mem.importance
            })
        
        return {
            "query": query,
            "relevant_memories": context_parts,
            "working_memory": self.memory.working_memory
        }

多Agent协作架构

多Agent系统通过Agent间协作解决复杂问题。协作模式包括:主管-工人模式(Supervisor)、辩论模式(Debate)、管道模式(Pipeline)和自组织模式(Swarm)。

# 多Agent协作框架
class MultiAgentSystem:
    def __init__(self, agents: Dict[str, BaseAgent], 
                 coordinator: str = "supervisor"):
        self.agents = agents
        self.coordinator = coordinator
        self.message_queue = []
        self.shared_memory = {}
    
    async def execute_task(self, task: str) -> dict:
        """协调多个Agent完成任务"""
        # 分配任务
        assignments = await self._assign_task(task)
        
        # 并行执行
        results = {}
        for agent_name, subtask in assignments.items():
            agent = self.agents.get(agent_name)
            if agent:
                result = await agent.run(subtask)
                results[agent_name] = result
        
        # 合并结果
        final_result = await self._merge_results(results)
        
        return {
            "task": task,
            "results": results,
            "final": final_result
        }
    
    async def _assign_task(self, task: str) -> Dict[str, str]:
        """任务分配"""
        # 简单的轮询分配(实际应用中应根据能力匹配)
        assignments = {}
        agent_names = list(self.agents.keys())
        
        for i, agent_name in enumerate(agent_names):
            assignments[agent_name] = f"Part {i+1} of: {task}"
        
        return assignments
    
    async def _merge_results(self, results: Dict) -> str:
        """合并多个Agent的结果"""
        parts = []
        for agent_name, result in results.items():
            parts.append(f"[{agent_name}]: {result.get('status', 'unknown')}")
        
        return "\n".join(parts)
    
    def send_message(self, from_agent: str, to_agent: str, message: str):
        """Agent间通信"""
        self.message_queue.append({
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": datetime.now()
        })

class SupervisorAgent(BaseAgent):
    """主管Agent:负责任务分配和协调"""
    
    async def _think(self, context: Dict) -> Thought:
        return Thought(
            content="分析任务并分配给合适的Agent",
            reasoning="需要协调多个Agent完成复杂任务"
        )
    
    async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
        return Action(
            tool_name="delegate_task",
            parameters={"task": context["task"]}
        )

class SpecialistAgent(BaseAgent):
    """专家Agent:专注于特定领域"""
    
    def __init__(self, name: str, llm_client, specialty: str, tools: List[Tool] = None):
        super().__init__(name, llm_client, tools)
        self.specialty = specialty
    
    async def _think(self, context: Dict) -> Thought:
        return Thought(
            content=f"运用{self.specialty}专业知识解决问题",
            reasoning=f"这是{self.specialty}领域的任务"
        )
    
    async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
        return Action(
            tool_name="use_specialty_tool",
            parameters={"input": context["task"]}
        )

# 辩论模式
class DebateSystem:
    def __init__(self, agents: List[BaseAgent], rounds: int = 3):
        self.agents = agents
        self.rounds = rounds
    
    async def debate(self, topic: str) -> dict:
        """多轮辩论"""
        arguments = []
        
        for round_num in range(self.rounds):
            round_arguments = []
            for agent in self.agents:
                argument = await self._get_argument(agent, topic, arguments)
                round_arguments.append({
                    "agent": agent.name,
                    "argument": argument
                })
            arguments.extend(round_arguments)
        
        # 综合各方观点
        conclusion = await self._synthesize(arguments)
        
        return {
            "topic": topic,
            "rounds": self.rounds,
            "arguments": arguments,
            "conclusion": conclusion
        }
    
    async def _get_argument(self, agent: BaseAgent, topic: str, 
                           history: List) -> str:
        prompt = f"针对话题'{topic}'发表观点"
        return await agent.llm.generate(prompt)
    
    async def _synthesize(self, arguments: List) -> str:
        return "综合各方观点得出的结论"

工具调用与函数执行

Agent通过工具调用与外部世界交互。工具系统需要支持参数验证、错误处理、超时控制和结果缓存。

# 工具注册与执行
import inspect
from functools import wraps

class ToolRegistry:
    def __init__(self):
        self.tools = {}
    
    def register(self, name: str = None, description: str = ""):
        def decorator(func):
            tool_name = name or func.__name__
            tool_desc = description or inspect.getdoc(func) or ""
            
            # 提取参数信息
            sig = inspect.signature(func)
            parameters = {}
            for param_name, param in sig.parameters.items():
                parameters[param_name] = {
                    "type": str(param.annotation.__name__) if param.annotation != inspect.Parameter.empty else "Any",
                    "default": param.default if param.default != inspect.Parameter.empty else None
                }
            
            self.tools[tool_name] = Tool(
                name=tool_name,
                description=tool_desc,
                function=func,
                parameters=parameters
            )
            
            @wraps(func)
            async def wrapper(*args, **kwargs):
                return await func(*args, **kwargs)
            return wrapper
        return decorator
    
    def get_tool(self, name: str) -> Optional[Tool]:
        return self.tools.get(name)
    
    def list_tools(self) -> List[Dict]:
        return [
            {"name": t.name, "description": t.description}
            for t in self.tools.values()
        ]

# 使用示例
registry = ToolRegistry()

@registry.register(name="search", description="搜索互联网")
async def search_web(query: str) -> str:
    return f"Search results for: {query}"

@registry.register(name="calculate", description="执行数学计算")
async def calculate(expression: str) -> float:
    return eval(expression)

@registry.register(name="read_file", description="读取文件内容")
async def read_file(file_path: str) -> str:
    with open(file_path, 'r') as f:
        return f.read()

# 工具执行器
class ToolExecutor:
    def __init__(self, registry: ToolRegistry):
        self.registry = registry
        self.cache = {}
    
    async def execute(self, tool_name: str, params: Dict) -> dict:
        tool = self.registry.get_tool(tool_name)
        if not tool:
            return {"error": f"Tool '{tool_name}' not found"}
        
        # 检查缓存
        cache_key = f"{tool_name}:{hash(json.dumps(params, sort_keys=True))}"
        if cache_key in self.cache:
            return {"result": self.cache[cache_key], "cached": True}
        
        try:
            result = await tool.function(**params)
            self.cache[cache_key] = result
            return {"result": result, "cached": False}
        except Exception as e:
            return {"error": str(e)}