AI Agent架构:规划、记忆与多Agent协作
AI Agent架构:规划、记忆与多Agent协作
Agent核心架构
AI Agent是能够自主感知环境、制定计划并执行行动来完成复杂任务的智能系统。核心组件包括:感知模块(接收输入)、规划模块(制定行动方案)、记忆模块(存储经验和知识)、执行模块(调用工具完成任务)。
# Agent核心类定义
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from abc import ABC, abstractmethod
from enum import Enum
import uuid
class AgentState(Enum):
IDLE = "idle"
THINKING = "thinking"
ACTING = "acting"
WAITING = "waiting"
@dataclass
class Tool:
name: str
description: str
function: Callable
parameters: Dict = field(default_factory=dict)
@dataclass
class Action:
tool_name: str
parameters: Dict
action_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
@dataclass
class Observation:
action_id: str
result: Any
success: bool
error: Optional[str] = None
@dataclass
class Thought:
content: str
reasoning: str
confidence: float = 1.0
class BaseAgent(ABC):
def __init__(self, name: str, llm_client, tools: List[Tool] = None):
self.name = name
self.llm = llm_client
self.tools = {tool.name: tool for tool in (tools or [])}
self.state = AgentState.IDLE
self.memory = AgentMemory()
self.max_iterations = 10
async def run(self, task: str) -> dict:
self.state = AgentState.THINKING
context = {"task": task, "history": []}
for i in range(self.max_iterations):
# 思考
thought = await self._think(context)
context["history"].append({"thought": thought})
# 决定行动
action = await self._plan_action(thought, context)
if action is None:
# 完成任务
return await self._finalize(context)
# 执行行动
self.state = AgentState.ACTING
observation = await self._act(action)
context["history"].append({"action": action, "observation": observation})
# 更新记忆
self.memory.add_episode(thought, action, observation)
return {"status": "max_iterations_reached", "task": task}
@abstractmethod
async def _think(self, context: Dict) -> Thought:
pass
@abstractmethod
async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
pass
async def _act(self, action: Action) -> Observation:
tool = self.tools.get(action.tool_name)
if not tool:
return Observation(
action_id=action.action_id,
result=None,
success=False,
error=f"Tool not found: {action.tool_name}"
)
try:
result = await tool.function(**action.parameters)
return Observation(
action_id=action.action_id,
result=result,
success=True
)
except Exception as e:
return Observation(
action_id=action.action_id,
result=None,
success=False,
error=str(e)
)
async def _finalize(self, context: Dict) -> dict:
self.state = AgentState.IDLE
return {
"status": "completed",
"task": context["task"],
"iterations": len(context["history"])
}
任务规划与分解
复杂任务需要分解为可执行的子任务。规划策略包括:递归分解(将大任务拆分为小任务)、层次规划(多级抽象)、动态规划(根据执行结果调整计划)。
# 任务规划器
@dataclass
class Task:
id: str
description: str
status: str = "pending"
subtasks: List['Task'] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
result: Any = None
class TaskPlanner:
def __init__(self, llm_client):
self.llm = llm_client
async def decompose(self, task_description: str) -> List[Task]:
"""将复杂任务分解为子任务"""
prompt = f"""将以下任务分解为可执行的子任务:
任务:{task_description}
请列出需要完成的子任务,每个任务一行:"""
response = await self.llm.generate(prompt)
subtask_descriptions = [line.strip() for line in response.split('\n') if line.strip()]
tasks = []
for i, desc in enumerate(subtask_descriptions):
tasks.append(Task(
id=f"subtask_{i}",
description=desc,
dependencies=[f"subtask_{j}" for j in range(i)] if i > 0 else []
))
return tasks
async def replan(self, current_task: Task,
completed_tasks: List[Task]) -> List[Task]:
"""根据执行结果重新规划"""
completed_ids = {t.id for t in completed_tasks}
remaining = [t for t in current_task.subtasks
if t.id not in completed_ids and
all(d in completed_ids for d in t.dependencies)]
if not remaining:
return []
# 重新评估剩余任务
prompt = f"""基于已完成的任务,重新规划剩余任务:
已完成:
{chr(10).join(f'- {t.description}: {t.status}' for t in completed_tasks)}
剩余任务:
{chr(10).join(f'- {t.description}' for t in remaining)}
是否需要调整任务?调整后的任务列表:"""
response = await self.llm.generate(prompt)
return remaining # 简化示例
class HierarchicalPlanner:
"""层次规划器"""
def __init__(self, llm_client):
self.llm = llm_client
async def plan(self, goal: str, depth: int = 3) -> Dict:
"""多层次规划"""
plan = {"goal": goal, "levels": []}
# 高层计划
high_level = await self._create_high_level_plan(goal)
plan["levels"].append(high_level)
# 逐层细化
for level_plan in high_level.get("tasks", []):
if depth > 1:
detailed = await self.plan(level_plan["description"], depth - 1)
plan["levels"].append(detailed)
return plan
async def _create_high_level_plan(self, goal: str) -> Dict:
return {
"description": f"Plan for: {goal}",
"tasks": [
{"id": "phase_1", "description": "分析和准备"},
{"id": "phase_2", "description": "执行主要工作"},
{"id": "phase_3", "description": "验证和完善"}
]
}
记忆系统设计
Agent的记忆系统需要支持短期记忆(当前对话)、工作记忆(任务上下文)和长期记忆(历史经验)。记忆管理包括存储、检索、遗忘和压缩。
# Agent记忆系统
from datetime import datetime
import json
@dataclass
class MemoryEntry:
content: str
timestamp: datetime
importance: float
access_count: int = 0
embedding: Optional[Any] = None
metadata: Dict = field(default_factory=dict)
class AgentMemory:
def __init__(self, short_term_limit: int = 20, long_term_limit: int = 1000):
self.short_term = [] # 短期记忆(最近交互)
self.working_memory = {} # 工作记忆(当前任务上下文)
self.long_term = [] # 长期记忆(重要经验)
self.short_term_limit = short_term_limit
self.long_term_limit = long_term_limit
def add_episode(self, thought: Thought, action: Action,
observation: Observation):
"""添加一个经验到记忆"""
entry = MemoryEntry(
content=json.dumps({
"thought": thought.content,
"action": action.tool_name,
"result": str(observation.result)[:200]
}),
timestamp=datetime.now(),
importance=thought.confidence
)
self.short_term.append(entry)
# 如果短期记忆满了,将不重要的移入长期记忆
if len(self.short_term) > self.short_term_limit:
self._consolidate()
def update_working_memory(self, key: str, value: Any):
"""更新工作记忆"""
self.working_memory[key] = value
def recall(self, query: str, k: int = 5) -> List[MemoryEntry]:
"""检索相关记忆"""
all_memories = self.short_term + self.long_term
# 简单的相关性计算(实际应用中使用embedding相似度)
scored = []
for entry in all_memories:
score = self._calculate_relevance(query, entry)
scored.append((score, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [entry for _, entry in scored[:k]]
def _consolidate(self):
"""记忆整合:将短期记忆移入长期记忆"""
# 按重要性排序
self.short_term.sort(key=lambda x: x.importance, reverse=True)
# 保留最重要的在短期记忆
to_archive = self.short_term[self.short_term_limit // 2:]
self.short_term = self.short_term[:self.short_term_limit // 2]
# 添加到长期记忆
self.long_term.extend(to_archive)
# 如果长期记忆满了,删除最不重要的
if len(self.long_term) > self.long_term_limit:
self.long_term.sort(key=lambda x: x.importance, reverse=True)
self.long_term = self.long_term[:self.long_term_limit]
def _calculate_relevance(self, query: str, entry: MemoryEntry) -> float:
# 简单的关键词匹配
query_words = set(query.lower().split())
content_words = set(entry.content.lower().split())
overlap = len(query_words & content_words)
return overlap / max(len(query_words), 1)
# 记忆检索器
class MemoryRetriever:
def __init__(self, memory: AgentMemory, embedding_model):
self.memory = memory
self.embedding_model = embedding_model
async def retrieve_with_context(self, query: str,
context_window: int = 3) -> Dict:
"""带上下文的记忆检索"""
relevant_memories = self.memory.recall(query, k=context_window)
context_parts = []
for mem in relevant_memories:
context_parts.append({
"content": mem.content,
"timestamp": mem.timestamp.isoformat(),
"importance": mem.importance
})
return {
"query": query,
"relevant_memories": context_parts,
"working_memory": self.memory.working_memory
}
多Agent协作架构
多Agent系统通过Agent间协作解决复杂问题。协作模式包括:主管-工人模式(Supervisor)、辩论模式(Debate)、管道模式(Pipeline)和自组织模式(Swarm)。
# 多Agent协作框架
class MultiAgentSystem:
def __init__(self, agents: Dict[str, BaseAgent],
coordinator: str = "supervisor"):
self.agents = agents
self.coordinator = coordinator
self.message_queue = []
self.shared_memory = {}
async def execute_task(self, task: str) -> dict:
"""协调多个Agent完成任务"""
# 分配任务
assignments = await self._assign_task(task)
# 并行执行
results = {}
for agent_name, subtask in assignments.items():
agent = self.agents.get(agent_name)
if agent:
result = await agent.run(subtask)
results[agent_name] = result
# 合并结果
final_result = await self._merge_results(results)
return {
"task": task,
"results": results,
"final": final_result
}
async def _assign_task(self, task: str) -> Dict[str, str]:
"""任务分配"""
# 简单的轮询分配(实际应用中应根据能力匹配)
assignments = {}
agent_names = list(self.agents.keys())
for i, agent_name in enumerate(agent_names):
assignments[agent_name] = f"Part {i+1} of: {task}"
return assignments
async def _merge_results(self, results: Dict) -> str:
"""合并多个Agent的结果"""
parts = []
for agent_name, result in results.items():
parts.append(f"[{agent_name}]: {result.get('status', 'unknown')}")
return "\n".join(parts)
def send_message(self, from_agent: str, to_agent: str, message: str):
"""Agent间通信"""
self.message_queue.append({
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": datetime.now()
})
class SupervisorAgent(BaseAgent):
"""主管Agent:负责任务分配和协调"""
async def _think(self, context: Dict) -> Thought:
return Thought(
content="分析任务并分配给合适的Agent",
reasoning="需要协调多个Agent完成复杂任务"
)
async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
return Action(
tool_name="delegate_task",
parameters={"task": context["task"]}
)
class SpecialistAgent(BaseAgent):
"""专家Agent:专注于特定领域"""
def __init__(self, name: str, llm_client, specialty: str, tools: List[Tool] = None):
super().__init__(name, llm_client, tools)
self.specialty = specialty
async def _think(self, context: Dict) -> Thought:
return Thought(
content=f"运用{self.specialty}专业知识解决问题",
reasoning=f"这是{self.specialty}领域的任务"
)
async def _plan_action(self, thought: Thought, context: Dict) -> Optional[Action]:
return Action(
tool_name="use_specialty_tool",
parameters={"input": context["task"]}
)
# 辩论模式
class DebateSystem:
def __init__(self, agents: List[BaseAgent], rounds: int = 3):
self.agents = agents
self.rounds = rounds
async def debate(self, topic: str) -> dict:
"""多轮辩论"""
arguments = []
for round_num in range(self.rounds):
round_arguments = []
for agent in self.agents:
argument = await self._get_argument(agent, topic, arguments)
round_arguments.append({
"agent": agent.name,
"argument": argument
})
arguments.extend(round_arguments)
# 综合各方观点
conclusion = await self._synthesize(arguments)
return {
"topic": topic,
"rounds": self.rounds,
"arguments": arguments,
"conclusion": conclusion
}
async def _get_argument(self, agent: BaseAgent, topic: str,
history: List) -> str:
prompt = f"针对话题'{topic}'发表观点"
return await agent.llm.generate(prompt)
async def _synthesize(self, arguments: List) -> str:
return "综合各方观点得出的结论"
工具调用与函数执行
Agent通过工具调用与外部世界交互。工具系统需要支持参数验证、错误处理、超时控制和结果缓存。
# 工具注册与执行
import inspect
from functools import wraps
class ToolRegistry:
def __init__(self):
self.tools = {}
def register(self, name: str = None, description: str = ""):
def decorator(func):
tool_name = name or func.__name__
tool_desc = description or inspect.getdoc(func) or ""
# 提取参数信息
sig = inspect.signature(func)
parameters = {}
for param_name, param in sig.parameters.items():
parameters[param_name] = {
"type": str(param.annotation.__name__) if param.annotation != inspect.Parameter.empty else "Any",
"default": param.default if param.default != inspect.Parameter.empty else None
}
self.tools[tool_name] = Tool(
name=tool_name,
description=tool_desc,
function=func,
parameters=parameters
)
@wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
return decorator
def get_tool(self, name: str) -> Optional[Tool]:
return self.tools.get(name)
def list_tools(self) -> List[Dict]:
return [
{"name": t.name, "description": t.description}
for t in self.tools.values()
]
# 使用示例
registry = ToolRegistry()
@registry.register(name="search", description="搜索互联网")
async def search_web(query: str) -> str:
return f"Search results for: {query}"
@registry.register(name="calculate", description="执行数学计算")
async def calculate(expression: str) -> float:
return eval(expression)
@registry.register(name="read_file", description="读取文件内容")
async def read_file(file_path: str) -> str:
with open(file_path, 'r') as f:
return f.read()
# 工具执行器
class ToolExecutor:
def __init__(self, registry: ToolRegistry):
self.registry = registry
self.cache = {}
async def execute(self, tool_name: str, params: Dict) -> dict:
tool = self.registry.get_tool(tool_name)
if not tool:
return {"error": f"Tool '{tool_name}' not found"}
# 检查缓存
cache_key = f"{tool_name}:{hash(json.dumps(params, sort_keys=True))}"
if cache_key in self.cache:
return {"result": self.cache[cache_key], "cached": True}
try:
result = await tool.function(**params)
self.cache[cache_key] = result
return {"result": result, "cached": False}
except Exception as e:
return {"error": str(e)}