公开笔记

Agent Loop

覆盖 Agent 核心引擎 Agent Loop 的 ReAct 推理行动循环模式、十大核心初始化参数、process_message 异步流式处理流程,详解取消令牌、工具调用编排、分层错误处理、全链路流式响应四大关键机制,包含上下文污染解决方案与生产级工程实践。

发布于 更新于

思维模式

Agent Loop 实现了经典的 ReAct(Reasoning + Acting)循环模式。ReAct 模式的核心思想是让 LLM 交替进行”推理”和”行动”:

用户提问 → LLM 推理 → 决定调用工具 → 执行工具 → 将结果反馈给 LLM → 继续推理 → ... → 最终回答

这个循环可能执行多轮,直到 LLM 认为已经收集到足够信息来回答用户问题。

初始化参数

class AgentLoop: 
	"""
    Agent Loop 是 agent 的核心引擎,实现了经典的 ReAct(Reasoning + Acting)循环模式。交替进行推理与行动:
    "用户提问 → LLM 推理 → 决定调用工具 → 执行工具 → 将结果反馈给 LLM → 继续推理 → ... → 最终回答",
    这个循环可能执行多轮,直到 LLM 认为已经收集到足够信息来回答用户问题。

    支持以下能力:

    1. 对话历史管理(短期记忆)
    2. ReAct 推理循环
    3. 调用工具(Function call)
    4. 异常处理与重试,重试类型包含:
	    - 网络/API 异常
        - 输出解析异常(如 JSON 格式损坏):捕获异常后,打包成 Prompt(eg “解析失败,输出不是合法的 JSON,请修正后重试:[错误堆栈]”),作为一轮新的 User Input 塞回给 LLM 自纠
        - 工具执行异常:将 Exception 的错误信息作为 Observation(观察结果)返回给 LLM。LLM 看到错误信息后,通常会触发 `Thought: 刚才的参数似乎不对,我换个参数重试` 的逻辑。
    """
    def __init__(
            self,
            provider: LLMClient,                            # LLM Client 实例
            workspace: Path,                                # 工作空间路径
            tools: ToolRegistry,                            # ToolRegistry 实例
            context_builder: ContextBuilder | None=None,    # 上下文构造器
            session_manager: SessionManager | None=None,    # 会话管理器
            subagent_manager: SubAgentManager | None=None,  # 子代理管理器
            model: str | None=None,                         # LLMClient 实例参数
            max_iterations: int = 10,                       # 最大循环次数(安全阀)
            max_retries: int = 3,                           # LLM 调用重试次数
            retry_delay: float = 1.0,                       # 重试延迟
            temperature: float = 0.5,
            max_tokens: int = 4096,                         # LLMClient 实例参数
    ):

Provider

1. 主要作用

provider 的主要目的是将 AgentLoop 的核心调度逻辑与具体的 LLM 厂商的底层 API 细节分离开来。AgentLoop 不需要知道当前调用的是哪个具体模型,它只需要调用 provider 暴露的标准化方法(例如 provider.comlete(messages, tools))即可。

2. 工程实践

  1. 实现方式
class LLMClient:
    """
    LLM客户端:用于调用任何兼容OpenAI接口的服务,并默认使用流式响应。
    """

    def __init__(
        self,
        api_key: str | None = None,
        base_url: str | None = None,
        model: str | None = None,  # 默认模型名称
        timeout: float | None = None,  # 连接超时时间
        temperature: float = 0.5,  # 默认温度,0.5 适合 Agent
        max_tokens: int = 4096,  # 默认单次最大输出限制
    ):
        self.api_key = api_key or os.getenv("LLM_API_KEY")
        self.base_url = base_url or os.getenv("LLM_BASE_URL")
        self.model = model or os.getenv("LLM_MODEL_ID")

        if not all([self.model, self.api_key, self.base_url]):
            raise ValueError("Model, API key, and base URL are required!")

        self.timeout = float(
            timeout if timeout is not None else os.getenv("LLM_TIMEOUT", "60.0")
        )

        self.temperature = temperature
        self.max_tokens = max_tokens

        self.client = OpenAI(
            api_key=self.api_key, base_url=self.base_url, timeout=self.timeout
        )

    def complete(
        self,
        messages: List[Dict[str, Any]],  # Any 兼容字符串、列表和嵌套字典等
        model: str | None = None,  # 允许单次调用覆盖默认模型
        temperature: float | None = None,  # 允许单次调用覆盖默认温度
        max_tokens: int | None = None,  # 允许单次调用覆盖默认 token 限制
        stream: bool = False,  # 默认不支持流式输出,方便代码获取响应结果
    ) -> str | None:
        """
        调用大语言模型并返回完整响应。
        """
        # 如果本次调用没有传参 (None),则回退使用 __init__ 里的全局默认值
        req_model = model or self.model
        req_temp = temperature if temperature is not None else self.temperature
        req_tokens = max_tokens if max_tokens is not None else self.max_tokens

        print(
            f"🧠 正在调用 {req_model} 模型... (temperture: {req_temp}, max tokens: {req_tokens})"
        )

        try:
            response = self.client.chat.completions.create(
                model=req_model,
                messages=messages,
                temperature=req_temp,
                max_tokens=req_tokens,
                stream=stream,
            )

            print("🥳 大语言模型响应成功!")
            if stream:
                answer = []
                for chunk in response:
                    # 部分兼容 OpenAI 接口的模型(或在使用本地 VLLM 时),流式的最后一个 chunk 可能是空壳
                    # 兼容不同模型的流式结束符可能导致的越界或 NoneType 错误
                    if not chunk.choices or not chunk.choices[0].delta:
                        continue

                    content = chunk.choices[0].delta.content or ""
                    print(content, end="", flush=True)
                    answer.append(content)
                print()
                return "".join(answer)

            # 非流式处理
            content = response.choices[0].message.content or ""
            print(content)
            return content

        except Exception as e:
            print(f"😅 调用 LLM API 时发生错误: {e}")
            return None

workspace

1. 主要作用

在 Agent 开发的语境中,workspace(工作空间)通常指的是分配给这个 Agent 专属的本地文件夹路径(Directory Path)。主要作用如下:

  1. Agent 的文件系统根目录
    • Agent 所有读写文件操作都限制在这个目录内(安全沙箱)。
    • 工具(Tools)执行文件操作时,通常只能访问 workspace 及其子目录,防止 Agent 随意修改系统其他文件。
  2. 存储持久化数据
    • 记忆文件:MEMORY.md、memory/ 目录下的每日日志等(长期记忆)
    • 系统提示文件:AGENTS.md(Agent 角色定义)、SOUL.md(人格/风格)、USER.md(用户偏好)、HEARTBEAT.md(周期性任务)等
    • 会话历史:由 session_manager 保存的对话记录文件
    • 其他文件:Agent 生成的代码、文档、临时数据、知识库等
  3. 提供上下文信息
    • context_builder 会读取 workspace 下的引导文件(AGENTS.md、SOUL.md 等),把它们注入到系统提示中,让 Agent “知道自己是谁、在哪里、应该怎么做事”。
    • Agent 可以感知当前工作空间的文件结构、项目内容,从而更好地完成编码、文档处理等任务。
  4. 支持多实例 / 多 Agent
    • 每个 Agent(或每个项目)可以拥有独立的 workspace,实现隔离。
    • 例如:一个 workspace 用于 Python 项目,另一个用于写作,另一个用于日常助手。

2. 工程实践

  1. 使用方式

    在 AgentLoop 的运行过程中,workspace 通常被传递给以下组件:

    • ContextBuilder:读取 workspace 中的提示词文件,构建系统上下文
    • Memory Store:把长期记忆写入 workspace/memory/ 或 MEMORY.md
    • Tools(尤其是文件操作工具、shell 工具):执行 cd、ls、read_file、write_file 等操作时,以 workspace 为根路径
    • SessionManager:把会话历史保存到 workspace 下的某个子目录或文件
    • SubagentManager:子代理可能有自己的子 workspace
# 初始化时
self.workspace = workspace

# 在构建上下文时
system_prompt = self.context_builder.build_system_prompt(self.workspace)

# 文件工具执行时(安全限制)
if not path.is_relative_to(self.workspace):
    raise PermissionError("只能在 workspace 内操作文件")
  1. 实际例子

    初始化后,workspace 通常是:

~/.happybot/workspace/
├── AGENTS.md          # Agent 角色和规则
├── SOUL.md            # Agent 人格设定
├── USER.md            # 用户信息和偏好
├── MEMORY.md          # 长期记忆
├── HEARTBEAT.md       # 周期任务
├── memory/            # 每日记忆日志
├── projects/          # Agent 生成的项目文件
└── ...                # 你让 Agent 创建的任何文件

context_builder

1. 主要作用

AgentLoop 负责整个循环流程,context_builder 只负责构建每次发送给 LLM 的上下文(Context)。 主要作用如下:

  1. 上下文拼装

    • 系统提示(System Prompt,Agent 的身份、角色、规则、工作空间信息等)
    • 最近 N 条对话历史(滑动窗口)
    • 记忆摘要(长期记忆、短期记忆、用户偏好等)
    • 可用工具/技能描述(Tools/Skills)
    • 当前用户输入(User Message)
    • 其他动态信息(如当前时间、工作目录、环境状态等)
  2. 上下文裁剪

    • context_builder 内部通常会集成一个 Token 计算器(如 tiktoken)。当发现总 Token 逼近模型上限时,它会执行滑动窗口策略(丢弃最老的几轮对话),或者执行摘要压缩(调用小模型把老对话总结成一句话)。

2. 工程实践

  1. 使用方式
def run(self, user_input: str):
    # 1. 使用 context_builder 构建本次 LLM 需要看到的完整上下文
    context = self.context_builder.build(
        history=self.session_manager.get_history(),   # 或类似
        memory=self.memory_store.get(),
        skills=self.tools.get_skills_description(),   # 或单独的 skills loader
        current_input=user_input,
        workspace=self.workspace,
        # 其他参数...
    )
    
    # 2. 把构建好的 context 发送给 LLM
    response = self.provider.complete(context, model=self.model, ...)
    
    # 3. 处理 response(可能是工具调用,也可能是最终回答)
    ...

context_builder.build(...) 方法通常返回一个消息列表(list of dict),格式类似:

[
    {"role": "system", "content": "你是一个...(完整系统提示)"},
    {"role": "system", "content": "记忆内容..."},
    {"role": "system", "content": "可用工具:..."},
    ...  # 历史消息
    {"role": "user", "content": "当前用户问题"}
]

代码可以默认是 context_builder=None,可以在 __init__run 创建一个默认的 ContextBuilder(例如 DefaultContextBuilder),或者 AgentLoop 中直接用一个简单的拼接逻辑。

  1. 常见实现方式

可以注入不同的 ContextBuilder 实现,来适应不同场景:

  • 简单版:只拼接历史 + 系统提示
  • 高级版:带 token 计数、自动截断/压缩历史、动态注入技能、RAG 检索、上下文压缩等
  • 自定义版:根据 workspace 加载 ROLE.md、RULES.md、SOUL.md 等引导文件
  1. 测试

在单元测试中可以 mock 一个假的 context_builder,不同项目可以有自己的上下文构建策略,而不用改动 AgentLoop 的核心代码


session_manager

1. 主要作用

context_builder 是负责把数据精简打磨后送给大模型(相当于内存管理),那么 session_manager 就是负责把所有原始数据长久保存下来(相当于硬盘管理)

负责 管理整个 Agent 会话(Session)的生命周期和状态,特别是 对话历史(Conversation History) 的存储、读取、追加、清理等操作。主要作用如下:

  1. 管理对话历史(History)
    • 存储每一轮的用户输入(User Message)
    • 存储 LLM 的回复(Assistant Message)
    • 存储工具调用记录(Tool Call + Tool Result)
    • 支持多轮对话的连续性
  2. 会话生命周期管理
    • 创建新会话(new session)
    • 加载已有会话
    • 保存会话到文件或数据库
    • 切换会话、删除会话等
  3. 历史消息控制
    • 限制历史消息数量(防止 token 爆炸)
    • 提供方法获取“最近 N 条消息”或“压缩后的历史”
    • 支持消息总结(summarization)
  4. 持久化
    • 把对话记录保存到工作空间(workspace)下的文件(如 chat_history.json、session_xxx.json)
    • 支持从文件中恢复上一次的会话状态
  5. 多用户并发隔离
    • 维护一个映射表(通常是以 session_iduser_id 为主键),达到用户、session 隔离。

2. 与其他组件的关系

  • context_builder:依赖 session_manager 提供历史记录来构建 prompt
  • subagent_manager:可能也需要 session_manager 来管理子代理的独立会话
  • AgentLoop:通过 session_manager 保持多轮对话的连续性和状态

3. 工程实践

  1. 使用方式
def run(self, user_input: str):
    # 1. 从 session_manager 获取当前会话的历史记录
    history = self.session_manager.get_history()
    
    # 2. 把历史记录交给 context_builder 去构建上下文
    context = self.context_builder.build(
        history=history,           # ← 这里传入
        ...
    )
    
    # 3. 调用 LLM 得到回复
    response = self.provider.complete(context, ...)
    
    # 4. 把用户输入和 LLM 回复保存到会话历史中
    self.session_manager.add_user_message(user_input)
    self.session_manager.add_assistant_message(response)
    
    # 如果有工具调用,也会记录工具执行结果
    if response.has_tool_calls():
        self.session_manager.add_tool_calls(...)
  1. 常见实现方式
  • 简单版:只用一个列表在内存中保存消息(重启后丢失)
  • 文件版:把历史保存为 JSON 文件(推荐,用于持久化)
  • 数据库版:使用 SQLite / Redis 等(适合大规模多会话)
  • 带总结版:当历史太长时,自动对旧消息进行总结,只保留摘要

subagent_manager

1. 主要作用

单个 Agent 能力有限:

  • 上下文窗口有限(容易 token 超限)
  • 容易在复杂任务中迷失或循环
  • 不同子任务需要不同的工具集、系统提示或专注度

通过 subagent_manager,主 Agent 可以:

  • 把大任务拆分成小任务
  • spawning(生成)专门的子代理去处理
  • 子代理完成后,把结果汇总回来
  • 主 Agent 只负责高层决策和最终合成

这是一种典型的 分层多代理架构(Hierarchical Multi-Agent)

主 Agent 在某一步发起 subagents(action: spawn, prompt: "去写个脚本并跑通...") → 系统新起一条 Agent 运行(新的 LLM 循环、新的临时上下文)→ 子 Agent 自己“边想边做”(也可以调各种工具)→ 子 Agent 跑完后把最终结果作为这一次 tool call 的返回值还给主 Agent → 主 Agent 继续自己的循环(可能再调别的工具或直接回复用户)。

在这个过程中子 Agent 有独立的对话/上下文,不会把中间的试错、长输出塞进主会话。对主 Agent 来说,“调 subagents”和“调 read_file”一样,都是一次 tool call,只是这个 tool 内部是“跑完一整条 Agent”。

一般来说 subagent_manager 的主要作用如下:

  1. 创建和启动子代理(Spawn Subagent)
    • 主 Agent 通过一个特殊的 SpawnTool(生成子代理工具)调用它。
    • 可以指定任务目标(goal)、子代理类型、系统提示、允许使用的工具等。
  2. 独立运行子代理
    • 每个子代理拥有自己的独立 AgentLoop(或简化版循环)。
    • 有独立的上下文、历史记录(通常从零开始,避免污染主会话)。
    • 限制迭代次数,防止无限循环。
    • 工具隔离:子代理通常不允许再调用 “生成子代理” 的工具,防止无限递归。
  3. 结果收集与通信
    • 子代理完成后,通过 MessageBus(消息总线)或直接返回结果给主 Agent。
    • 主 Agent 可以等待结果、汇总多个子代理的输出。
  4. 生命周期管理
    • 列出正在运行的子代理(list)
    • 终止子代理(kill)
    • 监控状态、资源限制等
  5. 安全与隔离
    • 子代理通常使用更少的工具(只给需要的,避免危险操作)
    • 独立的工作空间或上下文,互不干扰

2. 工程实践

  1. 使用方式
# LLM 输出中检测到工具调用:spawn_subagent
if tool_name == "spawn_subagent":
    task = tool_args["task"]          # 例如 "研究一下 XX 技术"
    goal = tool_args["goal"]
    
    # 委托给 subagent_manager
    result = self.subagent_manager.spawn(
        task_id=uuid.uuid4(),
        goal=goal,
        description=task,
        # 可选:指定子代理类型、工具白名单等
    )
    
    # 把子代理的结果加回主上下文,继续循环
    self.session_manager.add_tool_result(result)

主 Agent 看到的结果通常是子代理执行后的总结,而不是全部过程(节省 token)。


max_tokens

该参数是 LLM client / provider 实例的参数,核心作用是限制模型单次生成的最大输出长度。在 AgentLoop 架构中,设置这个参数有三个现实意义:

  • 防止资金燃烧 (Cost Control):Agent 很容易产生“幻觉”,如果在某一步推理中彻底迷失,它可能会疯狂输出几万字的乱码或重复无意义的废话。设置 max_tokens 可以作为物理熔断机制,防止单次 API 调用扣除高额费用。
  • 规避解析错误:Agent 需要输出严格的 JSON 格式(包含 ActionAction Input)。如果模型输出太长,导致 JSON 的右括号 } 被切断,你的代码解析必然报错。合理限制长度,能促使模型长话短说。
  • 留出上下文空间:绝大多数模型有一个总上下文窗口限制(Total Context Window,即 Prompt Tokens + Completion Tokens 的总和,比如 128K)。限制单次输出的长度,可以为前面不断积累的“历史记忆(Memory)”留出更多空间。

max_iterations 

AgentLoop 的核心是一个 while 循环,max_iterations 就是限制这个循环最多只能跑 25 轮(默认值)。

iteration = 0
while iteration < self.max_iterations:
    iteration += 1
    # ... 构建上下文、调用LLM、执行工具 ...
    
    if 任务已经完成:
        break
else:
    # 达到最大迭代次数仍未完成
    print("达到最大迭代次数,强制停止")
  • 简单任务可以设小一点(如 10~15)
  • 复杂编程、研究类任务可以设大一点(如 30~40)
  • 生产环境建议不要超过 50

max_retries ,retry_delay

当调用 LLM(大语言模型)失败时,允许最多重试几次。常见的失败情况包括:

  • API 临时超时(Timeout)
  • Rate Limit(请求频率限制)
  • 网络波动
  • 服务端返回 5xx 错误
  • JSON 解析失败(工具调用格式错误时也可能重试)

工作流程

  1. 调用 LLM 失败 → 等待 retry_delay 秒
  2. 重试第 1 次
  3. 仍然失败 → 再等待 → 重试第 2 次
  4. ……
  5. 达到 max_retries 次后仍失败 → 放弃本次调用,抛出异常或进入失败处理流程

max_retries 是防止雪崩的冷却期。当第一次 API 请求失败(例如被 OpenAI 或千问的服务端限流拒绝了),如果代码立刻在 1 毫秒后发起第二次请求,大概率还是会被拒绝,甚至导致你的 IP 被封锁。

常见用法

  • 简单指数退避(Exponential Backoff):
delay = self.retry_delay * (2 ** attempt)   # 第1次等1s,第2次等2s,第3次等4s
  • 或者固定延迟(当前默认是固定 1 秒)

process_message 消息处理流程

process_message 方法是 AgentLoop 中负责“接收用户消息 → 处理 → 流式返回结果”的核心异步入口。

async def process_message(
    self,
    message,           # 用户输入的消息(str 或 Message 对象)
    session_id,        # 会话 ID,用于区分不同对话
    context=None,      # 可选:外部预构建的上下文(优先级高于内部构建)
    media=None,        # 可选:多模态内容(如图片、文件)
    channel=None,      # 可选:消息来源渠道(web、telegram、discord 等)
    chat_id=None,      # 可选:聊天室 ID(多群聊场景)
    cancel_token=None  # 可选:用于中途取消任务的 token
) -> AsyncIterator[str]:
    """
    AgentLoop 的核心消息处理入口
    - 异步生成器(AsyncIterator),支持实时流式输出
    - 完整处理:会话 → 上下文 → LLM流式调用 → 工具执行 → 最终回复
    """

    # ========================================
    # 1. 会话准备(Session Setup)
    # ========================================
    # 通过 session_id 从 session_manager 加载或创建当前会话
    session = await self.session_manager.get_or_create_session(session_id)
    
    # 把用户输入追加到对话历史(保持会话连续性)
    await self.session_manager.add_user_message(message, session_id=session_id)

    # ========================================
    # 2. 上下文构建(Context Building)
    # ========================================
    if context is not None:
        # 外部传入上下文时直接使用(用于特殊场景如子代理、预处理)
        final_context = context
    else:
        # 正常情况:调用 context_builder 组装完整 prompt
        final_context = self.context_builder.build(
            history=await self.session_manager.get_history(session_id),
            workspace=self.workspace,
            tools=self.tools.get_all_descriptions(),   # 工具描述
            memory=...,                                # 长期记忆(可选)
            media=media,                               # 多模态内容
            current_message=message,
            # 其他动态信息...
        )

    # ========================================
    # 3. 预处理(Pre-processing)
    # ========================================
    # 处理多模态输入(media):图片、文件等转成 provider 支持的格式
    if media:
        final_context = await self._process_media(final_context, media)

    # 检查是否已被取消(支持用户中途停止)
    if cancel_token and await cancel_token.is_cancelled():
        yield "已取消"
        return

    # ========================================
    # 4. 异步流式调用 LLM(核心循环)
    # ========================================
    # 注意:真实实现中通常是一个外层 while 循环,支持多次 LLM 调用(工具调用后继续)
    # 这里展示简化的单次流式调用结构,工具调用逻辑在 chunk 处理中
    async for chunk in self.provider.stream(
        messages=final_context,
        model=self.model,
        temperature=self.temperature,
        max_tokens=self.max_tokens,
        stream=True,                     # 关键:开启流式输出
        # 其他参数:top_p、stop 等
    ):
        # 对每个 chunk 进行少量处理
        # - 解析是否包含工具调用(tool_calls)
        # - 日志记录、日志输出等
        processed_chunk = await self._process_chunk(chunk)

        # 如果检测到工具调用,则:
        #   1. 暂停 yield
        #   2. 执行工具
        #   3. 把工具结果塞回上下文
        #   4. 跳出当前 for 循环,继续外层 while(再次调用 LLM)
        if processed_chunk.has_tool_calls():
            tool_results = await self.tools.execute_all(
                processed_chunk.tool_calls,
                session_id=session_id
            )
            # 将工具结果追加到上下文,准备下一轮 LLM 调用
            final_context = self.context_builder.append_tool_results(
                final_context, tool_results
            )
            break  # 跳出当前流,进入下一轮 LLM 调用

        # 正常文本输出:立即 yield 给调用方(实现“打字机”效果)
        yield processed_chunk.content   # 或 chunk.delta / chunk.text

    # ========================================
    # 5. 后续流程(Post-processing)
    # ========================================
    # 把 LLM 的最终回复保存到 session_manager
    await self.session_manager.add_assistant_message(
        message=processed_chunk.full_content,   # 完整回复
        session_id=session_id
    )

    # 如果本次调用涉及子代理(subagent_manager),处理子代理返回的结果
    if hasattr(self, 'subagent_manager') and self.subagent_manager.has_pending_results():
        subagent_results = await self.subagent_manager.collect_results()
        # 可选择是否 yield 子代理结果,或合并到最终回复

    # 释放资源、清理临时状态
    await self._cleanup(session_id)

process_message 是一个异步流式生成器(返回 AsyncIterator[str]),通过 yield 逐步返回流式响应。这种设计使得调用方可以实时获取 LLM 的输出,而不必等待整个处理完成:

  • async (异步):使得方法是异步的。
  • yield (生成器):如果不使用 yield,程序必须等大模型把几千字全部写完。使用了 yield,只需要确认当前大模型吐出的是“可以给用户看的人话”,它就会立刻 yield chunk,把这个字推给前端。

cancel_token 取消令牌机制

实现优雅的任务取消机制:

# 全局取消令牌管理
_session_cancel_tokens: dict[str, CancellationToken] = {}

def cancel_session(session_id: str) -> bool:
    if session_id in _session_cancel_tokens:
        _session_cancel_tokens[session_id].cancel()
        return True
    return False

在 AgentLoop 的每次迭代中检查取消令牌,确保用户可以随时中断长时间运行的任务。

Python 原生就有 asyncio.Task.cancel(),为什么还要自己手写一个 CancellationToken 类?”

  1. 优雅退出 (Graceful Shutdown):直接 cancel Task 会在代码的任意 await 位置抛出 CancelledError,如果 Agent 此时正在写入数据库或更新 Memory,会导致数据损坏(脏数据)。使用 Token,Agent 可以在循环的安全点(Safe Point)检查 is_cancelled,完成状态保存后再退出。
  2. 多线程/跨进程扩展:如果 Agent 的某个工具调用(Tool Execution)是同步的或阻塞了线程池,asyncio.Task.cancel() 往往无法中断它。而 CancellationToken 可以被传递进底层的同步函数中,让函数自行检查状态并提前返回。

工具调用编排

当 LLM 返回 tool_call 时,AgentLoop 执行以下步骤:

  1. 解析工具调用:从 StreamChunk.tool_call 中提取工具名和参数
  2. 参数验证:通过 Tool.validate_params() 验证参数合法性
  3. 执行工具:调用 ToolRegistry.execute() 执行工具
  4. 结果注入:将工具执行结果作为 tool 角色消息注入对话历史
  5. 继续推理:将更新后的消息列表再次发送给 LLM
# 伪代码展示核心循环
for iteration in range(self.max_iterations):
    async for chunk in self.provider.chat_stream(messages, tools):
        if chunk.is_content:
            yield chunk.content  # 流式输出文本
        if chunk.is_tool_call:
            tool_calls.append(chunk.tool_call)
    
    if not tool_calls:
        break  # LLM 没有请求工具调用,循环结束
    
    for tc in tool_calls:
        result = await self.tools.execute(tc.name, **tc.arguments)
        messages.append({"role": "tool", "content": result, "tool_call_id": tc.id})
    
    tool_calls.clear()

错误处理策略

分层错误处理

  1. LLM 调用层:自动重试 + 指数退避
  2. 工具执行层:捕获异常并将错误信息作为工具结果返回给 LLM
  3. 循环层max_iterations 防止无限循环

工具执行错误的优雅降级

当工具执行失败时,错误信息会被格式化后返回给 LLM,让 LLM 自行决定如何处理:

try:
    result = await tool.execute(**arguments)
except Exception as e:
    result = f"工具执行失败: {str(e)}"
# 无论成功失败,都将结果反馈给 LLM
messages.append({"role": "tool", "content": result})

这种设计让 LLM 有机会尝试其他方案或向用户解释失败原因。

流式响应

整个链路都是流式的:LLM 流式生成 → AgentLoop 流式 yield → WebSocket 流式推送。用户可以在 LLM 生成第一个 token 时就看到响应开始。

工程实践

  • while 循环是实现 ReAct 模式的关键
  • process_message 进入调用 agent 的入口
  • 如果没有工具调用了,就会退出 ReAct 模型(循环),返回结果
class AgentLoop:
    def __init__(
        self,
        provider: LLMClient,                            # LLM Client 实例
        workspace: Path,                                # 工作空间路径
        tools: ToolRegistry,                            # ToolRegistry 实例
        context_builder: ContextBuilder | None=None,    # 上下文构造器
        session_manager: SessionManager | None=None,    # 会话管理器
        subagent_manager: SubAgentManager | None=None,  # 子代理管理器
        model: str | None=None,                         # LLMClient 实例参数
        max_iterations: int = 10,                       # 最大循环次数(安全阀)
        max_retries: int = 3,                           # LLM 调用重试次数
        retry_delay: float = 1.0,                       # 重试延迟
        temperature: float = 0.5,
        max_tokens: int = 4096,                        # LLMClient 实例参数
    ):
    
    async def process_message(
        self,
        message: str,          # 用户输入的消息(str 或 Message 对象)
        session_id: str,        # 会话 ID,用于区分不同对话
        context: Optional[Context] = None,     # 可选:外部预构建的上下文(优先级高于内部构建)
        media: Optional[list[MediaContent]] = None,       # 可选:多模态内容(如图片、文件)
        channel: Optional[str] = None,      # 可选:消息来源渠道(web、telegram、discord 等)
        chat_id: Optional[str] = None,     # 可选:聊天室 ID(多群聊场景)
        cancel_token: Optional[CancelToken] = None,    # 可选:用于中途取消任务的 token
    ) -> AsyncIterator[str]:
        """
        AgentLoop 生产级消息处理入口(完整版)
        - 支持多轮工具调用(ReAct 风格)
        - 严格遵守 max_iterations 安全上限
        - 内置重试机制(max_retries + retry_delay)
        - 实时流式输出 + 优雅取消 + 完整会话持久化
        """
    
        # ========================================
        # 1. 会话准备(Session Setup)
        # ========================================
        session = await self.session_manager.get_or_create_session(session_id)
    
        # 把用户消息永久记录到历史(即使后面失败也不会丢失)
        await self.session_manager.add_user_message(
            message=message,
            session_id=session_id,
            media=media
        )
    
        # 多模态处理:将图片/文件转为大模型 API 支持的 base64 或 URL 格式 
        processed_media = self._process_media(media) if media else None
    
        # ========================================
        # 2. 上下文构建(Context Building)—— 初始上下文
        # ========================================
        current_context = context if context is not None else self.context_builder.build( 
            history=await self.session_manager.get_history(session_id),
            workspace=self.workspace, 
            tools=self.tools.get_all_descriptions(),
            memory=await self.memory_store.get_relevant_memory(message),  # 长期记忆
            media=processed_media, 
            current_message=message,
            channel=channel,
            chat_id=chat_id,
        )
            
        # ========================================
        # 3. ReAct 核心:循环。条件是持续有工具调用
        # ========================================
        for iteration in range(self.max_iterations):
            if cancel_token and await cancel_token.is_cancelled():
                yield "\n[任务已取消]"
                return
                
            final_assistant_response = "" # 每轮重置
            
            # ------------------------------
            # 3.1 带重试的 LLM 流式调用。重试是 LLM 层:自动重试 + 指数退避
            # ------------------------------
            for retry in range(self.max_retries + 1):
                try:          
                    # 每次重试必须清空残留状态
                    step_text_content = ""
                    current_tool_calls = []
                    tool_call_map = {}        # 用来处理增量 tool_calls
                    
                    async for chunk in self.provider.complete(
                        messages=current_context,
                        model=self.model,
                        temperature=self.temperature,
                        max_tokens=self.max_tokens,
                        stream=True,                     # 核心:流式输出
                        tools=self.tools.get_all_descriptions(), 
                        # tool_choice="auto" 或 "required"(可选,推荐加上)
                    ):
                        # ------------------------------
                        # 3.3 Chunk 处理
                        # ------------------------------
                        delta = chunk.choices[0].delta
                        
                        # 1. 处理普通文本(实时 yield),tool_calls 一般 tool_call 的 content 为空
                        if delta.content is not None:
                            # 听说工具调用也可能有content内容,但是中间步骤不要放在主 session 里
                            step_text_content += delta.content          
                            # 最终答案,用于保存到 session
                            final_assistant_response += delta.content
                            yield delta.content
                            
                        if delta.tool_calls:
                            # 流式积累 tool_calls 到 current_tool_calls 列表(保留原样,后续需要加入到上下文)
                            for tool_delta in delta.tool_calls: 
                                idx = tool_delta.index 
                                # 如果这是一个新的工具调用请求,初始化一个标准的字典结构 
                                if idx not in tool_call_map: 
                                    tool_call_map[idx] = { 
                                        "id": "", # 初始化为空 
                                        "type": "function", 
                                        "function": { 
                                            "name": "", # 初始化为空 
                                            "arguments": "" # 初始化为空 
                                        } 
                                    } 
                                # 无差别增量拼接(只要有,就往上加) 
                                # 1. 拼接 ID (应对极端情况下的 ID 碎片化) 
                                if tool_delta.id: 
                                    tool_call_map[idx]["id"] += tool_delta.id 
                                if tool_delta.function: 
                                    # 2. 拼接 Name
                                    if tool_delta.function.name: 
                                        tool_call_map[idx]["function"]["name"] += 
                                            tool_delta.function.name 
                                    # 3. 拼接 Arguments 
                                    if tool_delta.function.arguments: 
                                        tool_call_map[idx]["function"]["arguments"] 
	                                        += tool_delta.function.arguments
                    # 调用 LLM 成功,跳出重试 retry
                    current_tool_calls = list(tool_call_map.values())
                    break
                    
                except Exception as e:
                    # 注意第一次不算重试,重试是从第一次后开始算
                    if retry == self.max_retries:
                        # 所有重试都失败
                        error_msg = f"LLM 调用失败(已重试 {self.max_retries} 次):{str(e)}"
                        yield error_msg
                        await self.session_manager.add_assistant_message(
                            message=error_msg, session_id=session_id
                        )
                        return # 结束迭代
                    await asyncio.sleep(self.retry_delay * (2 ** retry))  # 指数退避

    
            # ====================== LLM 流式输出循环结束 ======================				
            # LLM 直接输出答案,没有请求工具调用,则迭代结束
            if not current_tool_calls:
                current_context.append({"role": "assistant", "content": final_assistant_response})
                yield "[已生成最终答案]"
                break 
            
            # 工具调用	    
            # 将 LLM 工具返回结果加入上下文
            current_context.append({ 
                "role": "assistant", 
                "content": step_text_content or None, # OpenAI 要求如果没有文本就是 null/None 
                "tool_calls": current_tool_calls # 把解析好的工具调用原样塞进去 
            })
            
            for tc in current_tool_calls: 
                try: 
                    # 给前端透传一点系统提示,增加交互感 
                    yield f"\n[正在执行工具: {tc.function.name} ...]" 
                    tool_result = await self.tools.execute( 
                        tool_call=tc, 
                        session_id=session_id, 
                        workspace=self.workspace 
                    ) 
                except Exception as e: 
                    # 工具执行崩溃了也要当做结果传回去,让模型自行纠错 
                    tool_result = f"Error executing tool {tc.function.name}. Cause by:{str(e)}" 
                    
                # 将工具结果追加到上下文中 
                current_context.append({ 
                    "role": "tool", 
                    "content": str(tool_result), 
                    "tool_call_id": tc.id 
                })					
                    
        # ========================================
        # 4. 循环结束,统一落盘清理
        # ========================================
        # 保存最终回答到会话历史
        if final_assistant_response:
            await self.session_manager.add_assistant_message(
                message=final_assistant_response,
                session_id=session_id
            )
        else:
            # 达到 max_iterations 仍未结束
            timeout_msg = f"已达到最大迭代次数 ({self.max_iterations}),任务强制结束。"
            yield timeout_msg
            await self.session_manager.add_assistant_message(
                message=timeout_msg, session_id=session_id
            )
    
        # 清理临时资源
        await self._cleanup(session_id=session_id)

上下文污染:中间工具/子代理调用过程

如果把成百上千 Token 的工具调用 JSON、网页抓取结果、子代理的试错过程全部塞进用户的聊天记录里,不仅会迅速挤爆模型的 Token 上限,还会导致大模型“失忆”——它会忘了用户三轮之前交代的核心需求,因为它满脑子都是刚才查出来的几万字代码和数据。

绝对不应该把中间步骤和原始执行结果当做“聊天记录”永久塞进主会话的上下文里

方案一:阅后即焚(Transient Context Pattern)

这是最干净的做法,其实在我们上一轮修改的代码中已经部分体现了。

  • 运作方式:在一次 process_message 的执行中,我们克隆一份当下的历史记录作为“草稿本”(current_context)。在这个草稿本上,大模型尽情地调用工具、追加 Observation。
  • 如何收尾:一旦大模型给出了最终答案(Final Answer),我们把这句最终答案提取出来,存入数据库(Session)。而那个写满了工具调用记录的“草稿本”(current_context),随着函数的执行结束,直接丢弃,不存入数据库。
  • 结果:下一轮用户提问时,数据库里拉出来的依然是纯净的 User: 查天气 -> Assistant: 北京今天晴天,完全看不到中间调用天气 API 的脏数据。

方案二:轨迹旁路存储(Trace / Trajectory Logging)

如果为了后期 Debug、模型微调(SFT)或者在 UI 上展示给用户看(类似 ChatGPT 那个折叠起来的 “Searching the web…”),直接丢弃中间步骤太可惜了。

  • 运作方式:系统不再只有一张 messages 表,而是增加一张 trajectories(运行轨迹)表。
  • 读写分离:中间的工具调用、Observation 统统写进 trajectories 表。在构建下一轮对话的 Prompt 时,context_builder 只查 messages 表,坚决不把 trajectories 里的长篇大论拼接到上下文里。

方案三:摘要坍缩(Summarization & Collapse)

有时候中间步骤的结论对后续聊天是有用的(比如子代理不仅写了代码,还总结了代码逻辑),不能完全丢掉。

  • 运作方式:当一个长工具或子代理执行完毕后,主 Agent 不直接接收原始结果,而是经过一个压缩层。
  • 例子:子代理写了 500 行代码,并运行输出了结果。存入上下文的不是这 500 行代码,而是一条精简的消息:[系统摘要:子代理已成功编写数据分析脚本,结论为 2023 年利润上升 15%]。用一句话代替了几千个 Token,极大地保护了主线聊天的纯洁性。
← 返回 Notes

Contact

Contact Me

Leave a message here. The form sends directly from the browser to a form delivery service and then to my email.

Messages are delivered to lzx744008464@gmail.com.