思维模式
Agent Loop 实现了经典的 ReAct(Reasoning + Acting)循环模式。ReAct 模式的核心思想是让 LLM 交替进行”推理”和”行动”:
用户提问 → LLM 推理 → 决定调用工具 → 执行工具 → 将结果反馈给 LLM → 继续推理 → ... → 最终回答
这个循环可能执行多轮,直到 LLM 认为已经收集到足够信息来回答用户问题。
初始化参数
class AgentLoop:
"""
Agent Loop 是 agent 的核心引擎,实现了经典的 ReAct(Reasoning + Acting)循环模式。交替进行推理与行动:
"用户提问 → LLM 推理 → 决定调用工具 → 执行工具 → 将结果反馈给 LLM → 继续推理 → ... → 最终回答",
这个循环可能执行多轮,直到 LLM 认为已经收集到足够信息来回答用户问题。
支持以下能力:
1. 对话历史管理(短期记忆)
2. ReAct 推理循环
3. 调用工具(Function call)
4. 异常处理与重试,重试类型包含:
- 网络/API 异常
- 输出解析异常(如 JSON 格式损坏):捕获异常后,打包成 Prompt(eg “解析失败,输出不是合法的 JSON,请修正后重试:[错误堆栈]”),作为一轮新的 User Input 塞回给 LLM 自纠
- 工具执行异常:将 Exception 的错误信息作为 Observation(观察结果)返回给 LLM。LLM 看到错误信息后,通常会触发 `Thought: 刚才的参数似乎不对,我换个参数重试` 的逻辑。
"""
def __init__(
self,
provider: LLMClient, # LLM Client 实例
workspace: Path, # 工作空间路径
tools: ToolRegistry, # ToolRegistry 实例
context_builder: ContextBuilder | None=None, # 上下文构造器
session_manager: SessionManager | None=None, # 会话管理器
subagent_manager: SubAgentManager | None=None, # 子代理管理器
model: str | None=None, # LLMClient 实例参数
max_iterations: int = 10, # 最大循环次数(安全阀)
max_retries: int = 3, # LLM 调用重试次数
retry_delay: float = 1.0, # 重试延迟
temperature: float = 0.5,
max_tokens: int = 4096, # LLMClient 实例参数
):
Provider
1. 主要作用
provider 的主要目的是将 AgentLoop 的核心调度逻辑与具体的 LLM 厂商的底层 API 细节分离开来。AgentLoop 不需要知道当前调用的是哪个具体模型,它只需要调用 provider 暴露的标准化方法(例如 provider.comlete(messages, tools))即可。
2. 工程实践
- 实现方式
class LLMClient:
"""
LLM客户端:用于调用任何兼容OpenAI接口的服务,并默认使用流式响应。
"""
def __init__(
self,
api_key: str | None = None,
base_url: str | None = None,
model: str | None = None, # 默认模型名称
timeout: float | None = None, # 连接超时时间
temperature: float = 0.5, # 默认温度,0.5 适合 Agent
max_tokens: int = 4096, # 默认单次最大输出限制
):
self.api_key = api_key or os.getenv("LLM_API_KEY")
self.base_url = base_url or os.getenv("LLM_BASE_URL")
self.model = model or os.getenv("LLM_MODEL_ID")
if not all([self.model, self.api_key, self.base_url]):
raise ValueError("Model, API key, and base URL are required!")
self.timeout = float(
timeout if timeout is not None else os.getenv("LLM_TIMEOUT", "60.0")
)
self.temperature = temperature
self.max_tokens = max_tokens
self.client = OpenAI(
api_key=self.api_key, base_url=self.base_url, timeout=self.timeout
)
def complete(
self,
messages: List[Dict[str, Any]], # Any 兼容字符串、列表和嵌套字典等
model: str | None = None, # 允许单次调用覆盖默认模型
temperature: float | None = None, # 允许单次调用覆盖默认温度
max_tokens: int | None = None, # 允许单次调用覆盖默认 token 限制
stream: bool = False, # 默认不支持流式输出,方便代码获取响应结果
) -> str | None:
"""
调用大语言模型并返回完整响应。
"""
# 如果本次调用没有传参 (None),则回退使用 __init__ 里的全局默认值
req_model = model or self.model
req_temp = temperature if temperature is not None else self.temperature
req_tokens = max_tokens if max_tokens is not None else self.max_tokens
print(
f"🧠 正在调用 {req_model} 模型... (temperture: {req_temp}, max tokens: {req_tokens})"
)
try:
response = self.client.chat.completions.create(
model=req_model,
messages=messages,
temperature=req_temp,
max_tokens=req_tokens,
stream=stream,
)
print("🥳 大语言模型响应成功!")
if stream:
answer = []
for chunk in response:
# 部分兼容 OpenAI 接口的模型(或在使用本地 VLLM 时),流式的最后一个 chunk 可能是空壳
# 兼容不同模型的流式结束符可能导致的越界或 NoneType 错误
if not chunk.choices or not chunk.choices[0].delta:
continue
content = chunk.choices[0].delta.content or ""
print(content, end="", flush=True)
answer.append(content)
print()
return "".join(answer)
# 非流式处理
content = response.choices[0].message.content or ""
print(content)
return content
except Exception as e:
print(f"😅 调用 LLM API 时发生错误: {e}")
return None
workspace
1. 主要作用
在 Agent 开发的语境中,workspace(工作空间)通常指的是分配给这个 Agent 专属的本地文件夹路径(Directory Path)。主要作用如下:
- Agent 的文件系统根目录
- Agent 所有读写文件操作都限制在这个目录内(安全沙箱)。
- 工具(Tools)执行文件操作时,通常只能访问 workspace 及其子目录,防止 Agent 随意修改系统其他文件。
- 存储持久化数据
- 记忆文件:MEMORY.md、memory/ 目录下的每日日志等(长期记忆)
- 系统提示文件:AGENTS.md(Agent 角色定义)、SOUL.md(人格/风格)、USER.md(用户偏好)、HEARTBEAT.md(周期性任务)等
- 会话历史:由 session_manager 保存的对话记录文件
- 其他文件:Agent 生成的代码、文档、临时数据、知识库等
- 提供上下文信息
- context_builder 会读取 workspace 下的引导文件(AGENTS.md、SOUL.md 等),把它们注入到系统提示中,让 Agent “知道自己是谁、在哪里、应该怎么做事”。
- Agent 可以感知当前工作空间的文件结构、项目内容,从而更好地完成编码、文档处理等任务。
- 支持多实例 / 多 Agent
- 每个 Agent(或每个项目)可以拥有独立的 workspace,实现隔离。
- 例如:一个 workspace 用于 Python 项目,另一个用于写作,另一个用于日常助手。
2. 工程实践
-
使用方式
在 AgentLoop 的运行过程中,workspace 通常被传递给以下组件:
- ContextBuilder:读取 workspace 中的提示词文件,构建系统上下文
- Memory Store:把长期记忆写入 workspace/memory/ 或 MEMORY.md
- Tools(尤其是文件操作工具、shell 工具):执行 cd、ls、read_file、write_file 等操作时,以 workspace 为根路径
- SessionManager:把会话历史保存到 workspace 下的某个子目录或文件
- SubagentManager:子代理可能有自己的子 workspace
# 初始化时
self.workspace = workspace
# 在构建上下文时
system_prompt = self.context_builder.build_system_prompt(self.workspace)
# 文件工具执行时(安全限制)
if not path.is_relative_to(self.workspace):
raise PermissionError("只能在 workspace 内操作文件")
-
实际例子
初始化后,workspace 通常是:
~/.happybot/workspace/
├── AGENTS.md # Agent 角色和规则
├── SOUL.md # Agent 人格设定
├── USER.md # 用户信息和偏好
├── MEMORY.md # 长期记忆
├── HEARTBEAT.md # 周期任务
├── memory/ # 每日记忆日志
├── projects/ # Agent 生成的项目文件
└── ... # 你让 Agent 创建的任何文件
context_builder
1. 主要作用
AgentLoop 负责整个循环流程,context_builder 只负责构建每次发送给 LLM 的上下文(Context)。 主要作用如下:
-
上下文拼装
- 系统提示(System Prompt,Agent 的身份、角色、规则、工作空间信息等)
- 最近 N 条对话历史(滑动窗口)
- 记忆摘要(长期记忆、短期记忆、用户偏好等)
- 可用工具/技能描述(Tools/Skills)
- 当前用户输入(User Message)
- 其他动态信息(如当前时间、工作目录、环境状态等)
-
上下文裁剪
context_builder内部通常会集成一个 Token 计算器(如tiktoken)。当发现总 Token 逼近模型上限时,它会执行滑动窗口策略(丢弃最老的几轮对话),或者执行摘要压缩(调用小模型把老对话总结成一句话)。
2. 工程实践
- 使用方式
def run(self, user_input: str):
# 1. 使用 context_builder 构建本次 LLM 需要看到的完整上下文
context = self.context_builder.build(
history=self.session_manager.get_history(), # 或类似
memory=self.memory_store.get(),
skills=self.tools.get_skills_description(), # 或单独的 skills loader
current_input=user_input,
workspace=self.workspace,
# 其他参数...
)
# 2. 把构建好的 context 发送给 LLM
response = self.provider.complete(context, model=self.model, ...)
# 3. 处理 response(可能是工具调用,也可能是最终回答)
...
context_builder.build(...) 方法通常返回一个消息列表(list of dict),格式类似:
[
{"role": "system", "content": "你是一个...(完整系统提示)"},
{"role": "system", "content": "记忆内容..."},
{"role": "system", "content": "可用工具:..."},
... # 历史消息
{"role": "user", "content": "当前用户问题"}
]
代码可以默认是 context_builder=None,可以在 __init__ 或 run 创建一个默认的 ContextBuilder(例如 DefaultContextBuilder),或者 AgentLoop 中直接用一个简单的拼接逻辑。
- 常见实现方式
可以注入不同的 ContextBuilder 实现,来适应不同场景:
- 简单版:只拼接历史 + 系统提示
- 高级版:带 token 计数、自动截断/压缩历史、动态注入技能、RAG 检索、上下文压缩等
- 自定义版:根据 workspace 加载 ROLE.md、RULES.md、SOUL.md 等引导文件
- 测试
在单元测试中可以 mock 一个假的 context_builder,不同项目可以有自己的上下文构建策略,而不用改动 AgentLoop 的核心代码
session_manager
1. 主要作用
context_builder是负责把数据精简打磨后送给大模型(相当于内存管理),那么session_manager就是负责把所有原始数据长久保存下来(相当于硬盘管理)
负责 管理整个 Agent 会话(Session)的生命周期和状态,特别是 对话历史(Conversation History) 的存储、读取、追加、清理等操作。主要作用如下:
- 管理对话历史(History)
- 存储每一轮的用户输入(User Message)
- 存储 LLM 的回复(Assistant Message)
- 存储工具调用记录(Tool Call + Tool Result)
- 支持多轮对话的连续性
- 会话生命周期管理
- 创建新会话(new session)
- 加载已有会话
- 保存会话到文件或数据库
- 切换会话、删除会话等
- 历史消息控制
- 限制历史消息数量(防止 token 爆炸)
- 提供方法获取“最近 N 条消息”或“压缩后的历史”
- 支持消息总结(summarization)
- 持久化
- 把对话记录保存到工作空间(workspace)下的文件(如 chat_history.json、session_xxx.json)
- 支持从文件中恢复上一次的会话状态
- 多用户并发隔离
- 维护一个映射表(通常是以
session_id或user_id为主键),达到用户、session 隔离。
- 维护一个映射表(通常是以
2. 与其他组件的关系
- context_builder:依赖 session_manager 提供历史记录来构建 prompt
- subagent_manager:可能也需要 session_manager 来管理子代理的独立会话
- AgentLoop:通过 session_manager 保持多轮对话的连续性和状态
3. 工程实践
- 使用方式
def run(self, user_input: str):
# 1. 从 session_manager 获取当前会话的历史记录
history = self.session_manager.get_history()
# 2. 把历史记录交给 context_builder 去构建上下文
context = self.context_builder.build(
history=history, # ← 这里传入
...
)
# 3. 调用 LLM 得到回复
response = self.provider.complete(context, ...)
# 4. 把用户输入和 LLM 回复保存到会话历史中
self.session_manager.add_user_message(user_input)
self.session_manager.add_assistant_message(response)
# 如果有工具调用,也会记录工具执行结果
if response.has_tool_calls():
self.session_manager.add_tool_calls(...)
- 常见实现方式
- 简单版:只用一个列表在内存中保存消息(重启后丢失)
- 文件版:把历史保存为 JSON 文件(推荐,用于持久化)
- 数据库版:使用 SQLite / Redis 等(适合大规模多会话)
- 带总结版:当历史太长时,自动对旧消息进行总结,只保留摘要
subagent_manager
1. 主要作用
单个 Agent 能力有限:
- 上下文窗口有限(容易 token 超限)
- 容易在复杂任务中迷失或循环
- 不同子任务需要不同的工具集、系统提示或专注度
通过 subagent_manager,主 Agent 可以:
- 把大任务拆分成小任务
- spawning(生成)专门的子代理去处理
- 子代理完成后,把结果汇总回来
- 主 Agent 只负责高层决策和最终合成
这是一种典型的 分层多代理架构(Hierarchical Multi-Agent)。
主 Agent 在某一步发起 subagents(action: spawn, prompt: "去写个脚本并跑通...") → 系统新起一条 Agent 运行(新的 LLM 循环、新的临时上下文)→ 子 Agent 自己“边想边做”(也可以调各种工具)→ 子 Agent 跑完后把最终结果作为这一次 tool call 的返回值还给主 Agent → 主 Agent 继续自己的循环(可能再调别的工具或直接回复用户)。
在这个过程中子 Agent 有独立的对话/上下文,不会把中间的试错、长输出塞进主会话。对主 Agent 来说,“调 subagents”和“调 read_file”一样,都是一次 tool call,只是这个 tool 内部是“跑完一整条 Agent”。
一般来说 subagent_manager 的主要作用如下:
- 创建和启动子代理(Spawn Subagent)
- 主 Agent 通过一个特殊的 SpawnTool(生成子代理工具)调用它。
- 可以指定任务目标(goal)、子代理类型、系统提示、允许使用的工具等。
- 独立运行子代理
- 每个子代理拥有自己的独立 AgentLoop(或简化版循环)。
- 有独立的上下文、历史记录(通常从零开始,避免污染主会话)。
- 限制迭代次数,防止无限循环。
- 工具隔离:子代理通常不允许再调用 “生成子代理” 的工具,防止无限递归。
- 结果收集与通信
- 子代理完成后,通过 MessageBus(消息总线)或直接返回结果给主 Agent。
- 主 Agent 可以等待结果、汇总多个子代理的输出。
- 生命周期管理
- 列出正在运行的子代理(list)
- 终止子代理(kill)
- 监控状态、资源限制等
- 安全与隔离
- 子代理通常使用更少的工具(只给需要的,避免危险操作)
- 独立的工作空间或上下文,互不干扰
2. 工程实践
- 使用方式
# LLM 输出中检测到工具调用:spawn_subagent
if tool_name == "spawn_subagent":
task = tool_args["task"] # 例如 "研究一下 XX 技术"
goal = tool_args["goal"]
# 委托给 subagent_manager
result = self.subagent_manager.spawn(
task_id=uuid.uuid4(),
goal=goal,
description=task,
# 可选:指定子代理类型、工具白名单等
)
# 把子代理的结果加回主上下文,继续循环
self.session_manager.add_tool_result(result)
主 Agent 看到的结果通常是子代理执行后的总结,而不是全部过程(节省 token)。
max_tokens
该参数是 LLM client / provider 实例的参数,核心作用是限制模型单次生成的最大输出长度。在 AgentLoop 架构中,设置这个参数有三个现实意义:
- 防止资金燃烧 (Cost Control):Agent 很容易产生“幻觉”,如果在某一步推理中彻底迷失,它可能会疯狂输出几万字的乱码或重复无意义的废话。设置
max_tokens可以作为物理熔断机制,防止单次 API 调用扣除高额费用。 - 规避解析错误:Agent 需要输出严格的 JSON 格式(包含
Action和Action Input)。如果模型输出太长,导致 JSON 的右括号}被切断,你的代码解析必然报错。合理限制长度,能促使模型长话短说。 - 留出上下文空间:绝大多数模型有一个总上下文窗口限制(Total Context Window,即
Prompt Tokens + Completion Tokens的总和,比如 128K)。限制单次输出的长度,可以为前面不断积累的“历史记忆(Memory)”留出更多空间。
max_iterations
AgentLoop 的核心是一个 while 循环,max_iterations 就是限制这个循环最多只能跑 25 轮(默认值)。
iteration = 0
while iteration < self.max_iterations:
iteration += 1
# ... 构建上下文、调用LLM、执行工具 ...
if 任务已经完成:
break
else:
# 达到最大迭代次数仍未完成
print("达到最大迭代次数,强制停止")
- 简单任务可以设小一点(如 10~15)
- 复杂编程、研究类任务可以设大一点(如 30~40)
- 生产环境建议不要超过 50
max_retries ,retry_delay
当调用 LLM(大语言模型)失败时,允许最多重试几次。常见的失败情况包括:
- API 临时超时(Timeout)
- Rate Limit(请求频率限制)
- 网络波动
- 服务端返回 5xx 错误
- JSON 解析失败(工具调用格式错误时也可能重试)
工作流程:
- 调用 LLM 失败 → 等待 retry_delay 秒
- 重试第 1 次
- 仍然失败 → 再等待 → 重试第 2 次
- ……
- 达到 max_retries 次后仍失败 → 放弃本次调用,抛出异常或进入失败处理流程
max_retries 是防止雪崩的冷却期。当第一次 API 请求失败(例如被 OpenAI 或千问的服务端限流拒绝了),如果代码立刻在 1 毫秒后发起第二次请求,大概率还是会被拒绝,甚至导致你的 IP 被封锁。
常见用法:
- 简单指数退避(Exponential Backoff):
delay = self.retry_delay * (2 ** attempt) # 第1次等1s,第2次等2s,第3次等4s
- 或者固定延迟(当前默认是固定 1 秒)
process_message 消息处理流程
process_message 方法是 AgentLoop 中负责“接收用户消息 → 处理 → 流式返回结果”的核心异步入口。
async def process_message(
self,
message, # 用户输入的消息(str 或 Message 对象)
session_id, # 会话 ID,用于区分不同对话
context=None, # 可选:外部预构建的上下文(优先级高于内部构建)
media=None, # 可选:多模态内容(如图片、文件)
channel=None, # 可选:消息来源渠道(web、telegram、discord 等)
chat_id=None, # 可选:聊天室 ID(多群聊场景)
cancel_token=None # 可选:用于中途取消任务的 token
) -> AsyncIterator[str]:
"""
AgentLoop 的核心消息处理入口
- 异步生成器(AsyncIterator),支持实时流式输出
- 完整处理:会话 → 上下文 → LLM流式调用 → 工具执行 → 最终回复
"""
# ========================================
# 1. 会话准备(Session Setup)
# ========================================
# 通过 session_id 从 session_manager 加载或创建当前会话
session = await self.session_manager.get_or_create_session(session_id)
# 把用户输入追加到对话历史(保持会话连续性)
await self.session_manager.add_user_message(message, session_id=session_id)
# ========================================
# 2. 上下文构建(Context Building)
# ========================================
if context is not None:
# 外部传入上下文时直接使用(用于特殊场景如子代理、预处理)
final_context = context
else:
# 正常情况:调用 context_builder 组装完整 prompt
final_context = self.context_builder.build(
history=await self.session_manager.get_history(session_id),
workspace=self.workspace,
tools=self.tools.get_all_descriptions(), # 工具描述
memory=..., # 长期记忆(可选)
media=media, # 多模态内容
current_message=message,
# 其他动态信息...
)
# ========================================
# 3. 预处理(Pre-processing)
# ========================================
# 处理多模态输入(media):图片、文件等转成 provider 支持的格式
if media:
final_context = await self._process_media(final_context, media)
# 检查是否已被取消(支持用户中途停止)
if cancel_token and await cancel_token.is_cancelled():
yield "已取消"
return
# ========================================
# 4. 异步流式调用 LLM(核心循环)
# ========================================
# 注意:真实实现中通常是一个外层 while 循环,支持多次 LLM 调用(工具调用后继续)
# 这里展示简化的单次流式调用结构,工具调用逻辑在 chunk 处理中
async for chunk in self.provider.stream(
messages=final_context,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
stream=True, # 关键:开启流式输出
# 其他参数:top_p、stop 等
):
# 对每个 chunk 进行少量处理
# - 解析是否包含工具调用(tool_calls)
# - 日志记录、日志输出等
processed_chunk = await self._process_chunk(chunk)
# 如果检测到工具调用,则:
# 1. 暂停 yield
# 2. 执行工具
# 3. 把工具结果塞回上下文
# 4. 跳出当前 for 循环,继续外层 while(再次调用 LLM)
if processed_chunk.has_tool_calls():
tool_results = await self.tools.execute_all(
processed_chunk.tool_calls,
session_id=session_id
)
# 将工具结果追加到上下文,准备下一轮 LLM 调用
final_context = self.context_builder.append_tool_results(
final_context, tool_results
)
break # 跳出当前流,进入下一轮 LLM 调用
# 正常文本输出:立即 yield 给调用方(实现“打字机”效果)
yield processed_chunk.content # 或 chunk.delta / chunk.text
# ========================================
# 5. 后续流程(Post-processing)
# ========================================
# 把 LLM 的最终回复保存到 session_manager
await self.session_manager.add_assistant_message(
message=processed_chunk.full_content, # 完整回复
session_id=session_id
)
# 如果本次调用涉及子代理(subagent_manager),处理子代理返回的结果
if hasattr(self, 'subagent_manager') and self.subagent_manager.has_pending_results():
subagent_results = await self.subagent_manager.collect_results()
# 可选择是否 yield 子代理结果,或合并到最终回复
# 释放资源、清理临时状态
await self._cleanup(session_id)
process_message 是一个异步流式生成器(返回 AsyncIterator[str]),通过 yield 逐步返回流式响应。这种设计使得调用方可以实时获取 LLM 的输出,而不必等待整个处理完成:
async(异步):使得方法是异步的。yield(生成器):如果不使用yield,程序必须等大模型把几千字全部写完。使用了yield,只需要确认当前大模型吐出的是“可以给用户看的人话”,它就会立刻yield chunk,把这个字推给前端。
cancel_token 取消令牌机制
实现优雅的任务取消机制:
# 全局取消令牌管理
_session_cancel_tokens: dict[str, CancellationToken] = {}
def cancel_session(session_id: str) -> bool:
if session_id in _session_cancel_tokens:
_session_cancel_tokens[session_id].cancel()
return True
return False
在 AgentLoop 的每次迭代中检查取消令牌,确保用户可以随时中断长时间运行的任务。
Python 原生就有 asyncio.Task.cancel(),为什么还要自己手写一个 CancellationToken 类?”
- 优雅退出 (Graceful Shutdown):直接 cancel Task 会在代码的任意 await 位置抛出
CancelledError,如果 Agent 此时正在写入数据库或更新 Memory,会导致数据损坏(脏数据)。使用 Token,Agent 可以在循环的安全点(Safe Point)检查is_cancelled,完成状态保存后再退出。 - 多线程/跨进程扩展:如果 Agent 的某个工具调用(Tool Execution)是同步的或阻塞了线程池,
asyncio.Task.cancel()往往无法中断它。而CancellationToken可以被传递进底层的同步函数中,让函数自行检查状态并提前返回。
工具调用编排
当 LLM 返回 tool_call 时,AgentLoop 执行以下步骤:
- 解析工具调用:从
StreamChunk.tool_call中提取工具名和参数 - 参数验证:通过
Tool.validate_params()验证参数合法性 - 执行工具:调用
ToolRegistry.execute()执行工具 - 结果注入:将工具执行结果作为
tool角色消息注入对话历史 - 继续推理:将更新后的消息列表再次发送给 LLM
# 伪代码展示核心循环
for iteration in range(self.max_iterations):
async for chunk in self.provider.chat_stream(messages, tools):
if chunk.is_content:
yield chunk.content # 流式输出文本
if chunk.is_tool_call:
tool_calls.append(chunk.tool_call)
if not tool_calls:
break # LLM 没有请求工具调用,循环结束
for tc in tool_calls:
result = await self.tools.execute(tc.name, **tc.arguments)
messages.append({"role": "tool", "content": result, "tool_call_id": tc.id})
tool_calls.clear()
错误处理策略
分层错误处理
- LLM 调用层:自动重试 + 指数退避
- 工具执行层:捕获异常并将错误信息作为工具结果返回给 LLM
- 循环层:
max_iterations防止无限循环
工具执行错误的优雅降级
当工具执行失败时,错误信息会被格式化后返回给 LLM,让 LLM 自行决定如何处理:
try:
result = await tool.execute(**arguments)
except Exception as e:
result = f"工具执行失败: {str(e)}"
# 无论成功失败,都将结果反馈给 LLM
messages.append({"role": "tool", "content": result})
这种设计让 LLM 有机会尝试其他方案或向用户解释失败原因。
流式响应
整个链路都是流式的:LLM 流式生成 → AgentLoop 流式 yield → WebSocket 流式推送。用户可以在 LLM 生成第一个 token 时就看到响应开始。
工程实践
- while 循环是实现 ReAct 模式的关键
- process_message 进入调用 agent 的入口
- 如果没有工具调用了,就会退出 ReAct 模型(循环),返回结果
class AgentLoop:
def __init__(
self,
provider: LLMClient, # LLM Client 实例
workspace: Path, # 工作空间路径
tools: ToolRegistry, # ToolRegistry 实例
context_builder: ContextBuilder | None=None, # 上下文构造器
session_manager: SessionManager | None=None, # 会话管理器
subagent_manager: SubAgentManager | None=None, # 子代理管理器
model: str | None=None, # LLMClient 实例参数
max_iterations: int = 10, # 最大循环次数(安全阀)
max_retries: int = 3, # LLM 调用重试次数
retry_delay: float = 1.0, # 重试延迟
temperature: float = 0.5,
max_tokens: int = 4096, # LLMClient 实例参数
):
async def process_message(
self,
message: str, # 用户输入的消息(str 或 Message 对象)
session_id: str, # 会话 ID,用于区分不同对话
context: Optional[Context] = None, # 可选:外部预构建的上下文(优先级高于内部构建)
media: Optional[list[MediaContent]] = None, # 可选:多模态内容(如图片、文件)
channel: Optional[str] = None, # 可选:消息来源渠道(web、telegram、discord 等)
chat_id: Optional[str] = None, # 可选:聊天室 ID(多群聊场景)
cancel_token: Optional[CancelToken] = None, # 可选:用于中途取消任务的 token
) -> AsyncIterator[str]:
"""
AgentLoop 生产级消息处理入口(完整版)
- 支持多轮工具调用(ReAct 风格)
- 严格遵守 max_iterations 安全上限
- 内置重试机制(max_retries + retry_delay)
- 实时流式输出 + 优雅取消 + 完整会话持久化
"""
# ========================================
# 1. 会话准备(Session Setup)
# ========================================
session = await self.session_manager.get_or_create_session(session_id)
# 把用户消息永久记录到历史(即使后面失败也不会丢失)
await self.session_manager.add_user_message(
message=message,
session_id=session_id,
media=media
)
# 多模态处理:将图片/文件转为大模型 API 支持的 base64 或 URL 格式
processed_media = self._process_media(media) if media else None
# ========================================
# 2. 上下文构建(Context Building)—— 初始上下文
# ========================================
current_context = context if context is not None else self.context_builder.build(
history=await self.session_manager.get_history(session_id),
workspace=self.workspace,
tools=self.tools.get_all_descriptions(),
memory=await self.memory_store.get_relevant_memory(message), # 长期记忆
media=processed_media,
current_message=message,
channel=channel,
chat_id=chat_id,
)
# ========================================
# 3. ReAct 核心:循环。条件是持续有工具调用
# ========================================
for iteration in range(self.max_iterations):
if cancel_token and await cancel_token.is_cancelled():
yield "\n[任务已取消]"
return
final_assistant_response = "" # 每轮重置
# ------------------------------
# 3.1 带重试的 LLM 流式调用。重试是 LLM 层:自动重试 + 指数退避
# ------------------------------
for retry in range(self.max_retries + 1):
try:
# 每次重试必须清空残留状态
step_text_content = ""
current_tool_calls = []
tool_call_map = {} # 用来处理增量 tool_calls
async for chunk in self.provider.complete(
messages=current_context,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
stream=True, # 核心:流式输出
tools=self.tools.get_all_descriptions(),
# tool_choice="auto" 或 "required"(可选,推荐加上)
):
# ------------------------------
# 3.3 Chunk 处理
# ------------------------------
delta = chunk.choices[0].delta
# 1. 处理普通文本(实时 yield),tool_calls 一般 tool_call 的 content 为空
if delta.content is not None:
# 听说工具调用也可能有content内容,但是中间步骤不要放在主 session 里
step_text_content += delta.content
# 最终答案,用于保存到 session
final_assistant_response += delta.content
yield delta.content
if delta.tool_calls:
# 流式积累 tool_calls 到 current_tool_calls 列表(保留原样,后续需要加入到上下文)
for tool_delta in delta.tool_calls:
idx = tool_delta.index
# 如果这是一个新的工具调用请求,初始化一个标准的字典结构
if idx not in tool_call_map:
tool_call_map[idx] = {
"id": "", # 初始化为空
"type": "function",
"function": {
"name": "", # 初始化为空
"arguments": "" # 初始化为空
}
}
# 无差别增量拼接(只要有,就往上加)
# 1. 拼接 ID (应对极端情况下的 ID 碎片化)
if tool_delta.id:
tool_call_map[idx]["id"] += tool_delta.id
if tool_delta.function:
# 2. 拼接 Name
if tool_delta.function.name:
tool_call_map[idx]["function"]["name"] +=
tool_delta.function.name
# 3. 拼接 Arguments
if tool_delta.function.arguments:
tool_call_map[idx]["function"]["arguments"]
+= tool_delta.function.arguments
# 调用 LLM 成功,跳出重试 retry
current_tool_calls = list(tool_call_map.values())
break
except Exception as e:
# 注意第一次不算重试,重试是从第一次后开始算
if retry == self.max_retries:
# 所有重试都失败
error_msg = f"LLM 调用失败(已重试 {self.max_retries} 次):{str(e)}"
yield error_msg
await self.session_manager.add_assistant_message(
message=error_msg, session_id=session_id
)
return # 结束迭代
await asyncio.sleep(self.retry_delay * (2 ** retry)) # 指数退避
# ====================== LLM 流式输出循环结束 ======================
# LLM 直接输出答案,没有请求工具调用,则迭代结束
if not current_tool_calls:
current_context.append({"role": "assistant", "content": final_assistant_response})
yield "[已生成最终答案]"
break
# 工具调用
# 将 LLM 工具返回结果加入上下文
current_context.append({
"role": "assistant",
"content": step_text_content or None, # OpenAI 要求如果没有文本就是 null/None
"tool_calls": current_tool_calls # 把解析好的工具调用原样塞进去
})
for tc in current_tool_calls:
try:
# 给前端透传一点系统提示,增加交互感
yield f"\n[正在执行工具: {tc.function.name} ...]"
tool_result = await self.tools.execute(
tool_call=tc,
session_id=session_id,
workspace=self.workspace
)
except Exception as e:
# 工具执行崩溃了也要当做结果传回去,让模型自行纠错
tool_result = f"Error executing tool {tc.function.name}. Cause by:{str(e)}"
# 将工具结果追加到上下文中
current_context.append({
"role": "tool",
"content": str(tool_result),
"tool_call_id": tc.id
})
# ========================================
# 4. 循环结束,统一落盘清理
# ========================================
# 保存最终回答到会话历史
if final_assistant_response:
await self.session_manager.add_assistant_message(
message=final_assistant_response,
session_id=session_id
)
else:
# 达到 max_iterations 仍未结束
timeout_msg = f"已达到最大迭代次数 ({self.max_iterations}),任务强制结束。"
yield timeout_msg
await self.session_manager.add_assistant_message(
message=timeout_msg, session_id=session_id
)
# 清理临时资源
await self._cleanup(session_id=session_id)
上下文污染:中间工具/子代理调用过程
如果把成百上千 Token 的工具调用 JSON、网页抓取结果、子代理的试错过程全部塞进用户的聊天记录里,不仅会迅速挤爆模型的 Token 上限,还会导致大模型“失忆”——它会忘了用户三轮之前交代的核心需求,因为它满脑子都是刚才查出来的几万字代码和数据。
绝对不应该把中间步骤和原始执行结果当做“聊天记录”永久塞进主会话的上下文里
方案一:阅后即焚(Transient Context Pattern)
这是最干净的做法,其实在我们上一轮修改的代码中已经部分体现了。
- 运作方式:在一次
process_message的执行中,我们克隆一份当下的历史记录作为“草稿本”(current_context)。在这个草稿本上,大模型尽情地调用工具、追加 Observation。 - 如何收尾:一旦大模型给出了最终答案(Final Answer),我们把这句最终答案提取出来,存入数据库(Session)。而那个写满了工具调用记录的“草稿本”(
current_context),随着函数的执行结束,直接丢弃,不存入数据库。 - 结果:下一轮用户提问时,数据库里拉出来的依然是纯净的
User: 查天气 -> Assistant: 北京今天晴天,完全看不到中间调用天气 API 的脏数据。
方案二:轨迹旁路存储(Trace / Trajectory Logging)
如果为了后期 Debug、模型微调(SFT)或者在 UI 上展示给用户看(类似 ChatGPT 那个折叠起来的 “Searching the web…”),直接丢弃中间步骤太可惜了。
- 运作方式:系统不再只有一张
messages表,而是增加一张trajectories(运行轨迹)表。 - 读写分离:中间的工具调用、Observation 统统写进
trajectories表。在构建下一轮对话的 Prompt 时,context_builder只查messages表,坚决不把trajectories里的长篇大论拼接到上下文里。
方案三:摘要坍缩(Summarization & Collapse)
有时候中间步骤的结论对后续聊天是有用的(比如子代理不仅写了代码,还总结了代码逻辑),不能完全丢掉。
- 运作方式:当一个长工具或子代理执行完毕后,主 Agent 不直接接收原始结果,而是经过一个压缩层。
- 例子:子代理写了 500 行代码,并运行输出了结果。存入上下文的不是这 500 行代码,而是一条精简的消息:
[系统摘要:子代理已成功编写数据分析脚本,结论为 2023 年利润上升 15%]。用一句话代替了几千个 Token,极大地保护了主线聊天的纯洁性。