公开笔记

LangChain

覆盖 LangChain 新旧架构对比、LCEL 核心语法、LangGraph 状态机、Runnable 统一协议、数据流转机制、流式处理、Agent 路由逻辑、工具开发、记忆系统、回调机制及Java 生态扩展,详解工程化最佳实践与核心组件实战用法

发布于 更新于

Chain

Chain 已经过时(甚至被官方废弃)的东西:传统的 Chain 类。如果你在看一些半年前或一年前的教程,你会看到大量以 Chain 结尾的类,比如:

  • LLMChain
  • ConversationalRetrievalChain
  • SequentialChain

现状: 这些传统的类已经过时了原因: 它们是高度封装的“黑盒”。当你遇到报错,或者想自定义里面某一个微小步骤时,极难调试。LangChain 官方目前已经在代码库中逐步将这些传统的 Chain 标记为 Deprecated(废弃)。

取代传统 Chain 的新标准:LCEL (LangChain 表达式语言)

官方并没有放弃“把组件串联起来”这个需求,而是推出了 LCEL 来取代传统的 Chain 类。 现在,如果你想写一条线性的处理流程,不再是去实例化一个 LLMChain,而是用类似 Linux 管道符 | 的极其优雅的语法:

# 现代的“链”是这样写的(LCEL)
chain = prompt_template | llm | output_parser
result = chain.invoke({"question": "什么是黑洞?"})

现状: LCEL 完全没有过时,它是目前 LangChain 甚至 LangGraph 底层最核心的基础件。只要你的任务是线性、单向、确定性的(比如:提取文本 -> 生成摘要 -> 翻译成英文),用 LCEL 依然是最高效、最轻量的首选。

因为单向的 LCEL 链条存在一个致命弱点:它不能循环,它没有状态。 如果大模型在生成摘要时发现“提取的文本缺失了信息”,在 LCEL 中程序就只能报错退出。

LangGraph 解决的是“非确定性”和“循环纠错”的问题。 它引入了图(Graph)的概念,允许你的程序:

  1. 无限循环与回退: 做错了?退回到上一步重新思考。
  2. 全局状态记忆(State): 在一个全局的大字典里记录执行到了哪一步,保留所有上下文。
  3. 多智能体协作: 让“程序员 Agent”写代码,让“测试 Agent”检查,如果不通过再打回给程序员重写。

Runnable 协议与 LCEL

在 LangChain 较新的版本中,其底层架构经历了一次重构,引入了 LCEL (LangChain Expression Language)。这是理解它底层实现的关键。

在写 LangChain 代码时,经常会看到这种像 Linux 管道一样的写法:

chain = prompt | model | output_parser
result = chain.invoke({"topic": "人工智能"})

这背后依赖于 Python 的魔术方法重载和统一的接口协议(Runnable 协议):

  • Runnable 接口: LangChain 强制要求所有的核心组件(Prompt、Model、Parser 等)都必须继承一个名为 Runnable 的基类。这个基类规定了几个必须实现的方法:invoke (单次调用)、stream (流式输出)、batch (批量处理)。
  • 重载 | 运算符: Runnable 基类在 Python 底层重写了 __or__ 方法(即 | 运算符)。当代码执行 A | B 时,LangChain 会在底层把它包装成一个 RunnableSequence 对象。
  • 数据流转: 当你调用 chain.invoke() 时,实际上是执行了:B.invoke(A.invoke(输入))。上一个组件的输出,会被强行作为下一个组件的输入。

RunableLambda

函数方法被转换为 RunnableLambda,这为函数添加了批处理和异步支持,以及原生跟踪和调试功能。

RunnableLambda 是 LangChain 和 LangGraph 底层架构中最朴素,但也最重要的组件之一。专门负责把普通的 Python 函数,包装成符合 LangChain 标准流水线规范的 Runnable 对象。

普通函数包装成 Runnable 后,拥有了以下超能力:

  • 标准调用 (.invoke): runnable_math.invoke(5) -> 返回 10
  • 自动并发批量处理 (.batch): 如果你传入一个列表 runnable_math.batch([1, 2, 3]),LangChain 底层会自动起线程池,并发执行你的普通函数,返回 [2, 4, 6]
  • 自动异步转换 (.ainvoke): 即使你写的是同步的 Python 函数,包装后也可以在异步框架(如 FastAPI)中直接用 await runnable_math.ainvoke(5) 调用,LangChain 底层会把它放入异步事件循环的执行器中。
  • 流式回传 (.stream): 虽然普通函数通常是一次性返回,但包装后它依然遵循流式协议(把整个结果当作一个流的数据块发出),保证流水线下游的组件不会崩溃。

魔术方法重载 Magic Method Overloading

在 Python 中,当你对两个对象使用 a | b 时,Python 解释器在底层实际上是在调用一个名叫 __or__ 的隐藏方法,也就是执行:a.__or__(b)

LangChain 就是通过在 Runnable 基类里重写了这个 __or__ 方法,把两个组件“粘”在了一起的。

# ==========================================
# 1. 基类:对应 langchain_core.runnables.Runnable
# ==========================================
class Runnable:
    """所有组件的基类。定义了所有组件都必须具备的标准行为。"""
    
    def invoke(self, input_data):
        """每个子类必须自己实现具体的执行逻辑"""
        raise NotImplementedError("子类必须实现 invoke 方法")

    def __or__(self, other):
        """
        核心魔法:无论是单个节点,还是一个巨大的链条,
        只要你用 '|' 往后面接东西,统统打包成一个新的 Sequence。
        """
        # 注意:这里我们做了一个优化,自动把普通函数转成 Runnable
        if not isinstance(other, Runnable):
            if callable(other):
                other = RunnableLambda(other)
            else:
                raise TypeError(f"无法将 {type(other)} 接入流水线")
        
        # 将当前的(self)和新来的(other)组合成一个新的串联对象
        return RunnableSequence(self, other)

# ==========================================
# 2. 包装器:对应 langchain_core.runnables.RunnableLambda)
# ==========================================
class RunnableLambda(Runnable):
    """最基础的叶子节点,负责包装普通的 Python 函数"""
    def __init__(self, func):
        self.func = func

    def invoke(self, input_data):
        # 执行自己包装的函数
        return self.func(input_data)

# ==========================================
# 3. 序列执行器:对应 langchain_core.runnables.RunnableSequence
# ==========================================
class RunnableSequence(Runnable):
    """
    负责把多个 Runnable 串联起来。
    注意:它自己也是一个 Runnable!这就是“复合模式”的精髓。
    """
    def __init__(self, first, second):
        self.steps = []
        
        # ⭐️ 核心“拍平”逻辑:防止生成多层套娃结构
        # 如果 first 已经是一个 Sequence,就把它的步骤拆解出来
        if isinstance(first, RunnableSequence):
            self.steps.extend(first.steps)
        else:
            self.steps.append(first)
            
        # 同理处理 second
        if isinstance(second, RunnableSequence):
            self.steps.extend(second.steps)
        else:
            self.steps.append(second)

    def invoke(self, input_data):
        """
        Sequence 的执行逻辑:像水管一样,把前一个的输出喂给下一个。
        """
        result = input_data
        for step in self.steps:
            result = step.invoke(result)
        return result
# 定义基础函数
def step1(x): return x + 1
def step2(x): return x * 2
def step3(x): return x - 3

# 实例化叶子节点
node1 = RunnableLambda(step1)
node2 = RunnableLambda(step2)
node3 = RunnableLambda(step3)

# 场景 1:每个方法都能独立执行(具备 Lambda 的能力)
print(f"node1 独立执行: {node1.invoke(5)}")  # 输出: 6

# 场景 2:每个方法都能无限拼接(自动生成 Sequence)
# 执行顺序:先执行 node1 | node2 -> 生成临时的 Sequence_A
# 然后执行 Sequence_A | node3 -> 生成最终的 Sequence_B
chain = node1 | node2 | node3

# 场景 3:连普通函数都能被自动包装(我在 __or__ 里加了处理逻辑)
def final_step(x): 
    return f"最终结果是: {x}"

# 直接把原生的 Python 函数接在链条后面!这就是 LangChain 极度丝滑的原因。
super_chain = chain | final_step 

print(f"完整链路执行: {super_chain.invoke(5)}") # 输出: 最终结果是: 9

普通只用继承 RunnableLambda 就行:普通函数是 RunnableLambda 的实例,而 RunnableLambda 继承自顶级基类 Runnable。在 Runnable 中,写着所有组件共享的组合逻辑:

# 基类中的逻辑,会去实现 RunnableSequence
def __or__(self, other):
    return RunnableSequence(self, other)

解析数据流向

在 LangChain 的现代架构(特别是引入 LCEL - LangChain Expression Language 之后)中,维护数据从链头走向链尾的核心数据结构是一个字典(Dictionary),而维护数据流转方式的核心机制是 Runnable 协议及其 invoke/stream 方法

我们可以把整个数据流转过程想象成一条装配线,数据字典就是传送带上的那个“箱子”。

1. 核心载体:字典 Dictionary 作为输入输出

在大部分复杂的链式调用中,节点之间传递的数据通常不是单一的字符串,而是一个 Python 字典。

  • 初始化: 链的第一个组件接收一个字典作为初始输入(例如:{"topic": "人工智能", "language": "中文"})。
  • 传递与修改: 每个组件(Node/Runnable)接收这个字典,执行自己的任务,然后生成一个新的字典(或者修改原字典并返回),传递给下一个组件。

2. 核心动力:Runnable 协议(LCEL)

LangChain 底层定义了一个名为 Runnable 的基类(Base Class)。所有的核心组件——无论是 Prompt 模板、LLM 模型、输出解析器,还是自定义的 Python 函数,都继承自这个 Runnable 接口。

这个接口规定了数据流转的标准动作:

  1. invoke(input): 同步执行,接收输入,返回输出。
  2. stream(input): 流式执行,接收输入,逐个块(chunk)返回输出。
  3. batch(inputs): 批量执行。

当你使用管道符 | 把它们串起来时:

chain = prompt | model | output_parser

LangChain 在底层会将它们包装成一个 RunnableSequence。当你调用 chain.invoke(初始数据) 时,底层的逻辑其实就是:

# 伪代码:RunnableSequence 的底层流转
data = 初始数据
data = prompt.invoke(data)
data = model.invoke(data)
final_result = output_parser.invoke(data)
return final_result

3. 数据拼接/保留

如果数据只是像接力棒一样传递,那上一步的数据就会被完全覆盖。比如,生成了摘要后,原来的长文本就不见了。但很多时候我们需要在链的后端使用最开始传入的变量。

为了控制字典在传送带上的“拆解”和“合并”,LangChain 提供了几个关键的路由组件:

RunnablePassthrough 透明穿透

它的作用是把接收到的数据原封不动地传递给下一步。经常用于结合字典来补充参数。

from langchain_core.runnables import RunnablePassthrough

# 假设 context_retriever 是一个从数据库取数据的组件
chain = (
    {"context": context_retriever, "question": RunnablePassthrough()} 
    | prompt 
    | model
)

# 传入 "什么是黑洞"
# 1. RunnablePassthrough() 直接拿到 "什么是黑洞" 并赋给 "question" 键
# 2. context_retriever 拿到 "什么是黑洞",去数据库查资料,返回长文本赋给 "context" 键
# 3. 此时传给 prompt 的数据字典变成了:
#    {"context": "黑洞是...", "question": "什么是黑洞"}

RunnableParallel / 字典映射 (并行处理)

当你看到大括号 {} 时,它实际上是 RunnableParallel 的语法糖。它的作用是将输入数据复制并分发给内部的多个组件,然后把它们各自的执行结果打包成一个新的字典往下传。

RunnablePassthrough.assign() 追加合并

这是非常常用的高级特性。它接收当前的字典,执行某个新任务,并将结果追加(Assign) 到原字典中,然后把包含新旧数据的完整字典传给下一步。

# 假设上一步传来的字典是: {"text": "Hello world"}
chain = RunnablePassthrough.assign(
    translated_text=translate_to_chinese_chain  
)
# 这一步执行后,传给下一步的字典会变成:
# {"text": "Hello world", "translated_text": "你好世界"}

总结

在 LangChain(LCEL)中:

  1. 载体是 Python 字典(Dict),它像一个容器,装载着各种中间变量。
  2. 通道Runnable 接口invoke 方法,它规定了“接收前一个输出 -> 处理 -> 返回后一个输入”的刚性规范。
  3. 交通警察RunnablePassthroughRunnableParallel 等组件,它们负责决定字典里的键值对是直接透传、覆盖、提取还是追加合并。

流式处理

stream/astream 和 astream_events

流式传输中间步骤 .astream()

使用 AgentExecutor.astream() 方法,你可以实时获取 Agent 每一步的大动作。这种方法输出的数据包(Chunks)是交替出现的:

  • 动作 (Action): 告诉你 Agent 决定调用什么工具,以及传入了什么参数(例如:决定调用 where_cat_is_hiding 工具)。
  • 观察 (Observation): 告诉你这个工具执行完毕后,返回了什么结果(例如:猫藏在“架子上”)。
  • 最终答案 (Final Answer): 当 Agent 收集够了信息,它会输出最终想要回答给用户的那句话。

在这个过程中,你可以通过解析字典里的 actionssteps 或者 messages 字段,把 Agent 在后台“查资料”的过程展示给用户看,让用户知道 AI 没有卡死,而是在努力工作。

流式传输底层事件和逐字输出

基础的 .astream() 是一步一步抛出结果的(比如工具执行完才会抛出一个结果包)。如果你想做到极致的实时(比如 Agent 最后回答时,真的是一个字一个字地流到屏幕上),你需要使用更高阶的 .astream_events(version="v1") API。

这是一个非常细粒度的事件流 API,它会捕捉整个 Agent 运行周期中的每一个微小节点,例如:

  • on_chain_start: Agent 刚刚启动,收到用户输入。
  • on_tool_start: 工具开始执行。
  • on_tool_end: 工具执行结束。
  • on_chat_model_stream: (最核心的)大模型正在逐个吐出 Token(字)。

通过监听 on_chat_model_stream 等事件,你不仅可以知道 Agent 正在做什么,还可以把最终回答的每一个 Token 实时渲染到前端界面上,实现和 ChatGPT 一模一样的丝滑打字机效果。

工具调用时

在工具调用(Function Calling)时,流式处理可以让你在模型输出参数的过程中就开始“边接收边解析”,甚至在参数还没完全输出完时,就能提前知道模型想要调用哪个工具。

在普通的文本对话中,流式输出的就是一个个字符(比如“你”、“好”、“啊”)。 但在工具调用(Function Calling)中,流式输出的结构会复杂一些。根据 LangChain 文档的说明:

  • 分块传输(Chunks): 模型会将工具调用的信息切分成小块(ToolCallChunk)逐步发送给你。
  • 块的结构: 每个 ToolCallChunk 通常包含:
    • name: 工具的名称(通常只在第一个块中出现,后面的是 None)。
    • args: 工具参数的一部分(一段 JSON 字符串的片段,比如 {"a",然后是 : 3,,接着是 "b": 12})。
    • id: 调用的唯一标识符。
    • index: 索引号(如果模型决定同时调用多个工具,这个索引能帮你区分当前接收的参数属于哪个工具)。

LangChain 如何帮你处理这些“碎片”?

如果你自己手写代码去拼接这些 {"a": 3} 会非常痛苦。LangChain 提供的核心便利在于自动合并块(Chunk Merging)

当你使用异步流式调用(如 async for chunk in llm.astream(query):)时,LangChain 允许你直接将这些消息块“加”在一起(gathered = gathered + chunk)。在这个累加的过程中,LangChain 会在后台做两件事:

  1. 拼接原始字符串: 把参数片段拼接成完整的 JSON 字符串(比如 {"a": 3, "b": 12})。这个存在 gathered.tool_call_chunks 中。
  2. 最佳努力解析(Best-effort Parsing): 尝试将拼到一半的 JSON 实时转换成 Python 的字典对象(Dictionary)。比如当它接收到 {"a": 3 时,它就会尝试解析出一个包含键 a 的字典。这个存在 gathered.tool_calls 中。
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
 
# 注意,这里的文档字符串非常重要,因为它们将与类名一起传递给模型。
class Add(BaseModel):
    """将两个整数相加。"""
    a: int = Field(..., description="第一个整数")
    b: int = Field(..., description="第二个整数")
 
 
class Multiply(BaseModel):
    """将两个整数相乘。"""
    a: int = Field(..., description="第一个整数")
    b: int = Field(..., description="第二个整数")
 
tools = [Add, Multiply]
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
llm_with_tools = llm.bind_tools(tools)

query = "3 * 12是多少?此外,11 + 49是多少?"
async for chunk in llm_with_tools.astream(query):    
	print(chunk.tool_call_chunks)


### 输出
[]
[{'name': 'Multiply', 'args': '', 'id': 'call_d39MsxKM5cmeGJOoYKdGBgzc', 'index': 0}]
[{'name': None, 'args': '{"a"', 'id': None, 'index': 0}]
[{'name': None, 'args': ': 3, ', 'id': None, 'index': 0}]
[{'name': None, 'args': '"b": 1', 'id': None, 'index': 0}]
[{'name': None, 'args': '2}', 'id': None, 'index': 0}]
[{'name': 'Add', 'args': '', 'id': 'call_QJpdxD9AehKbdXzMHxgDMMhs', 'index': 1}]
[{'name': None, 'args': '{"a"', 'id': None, 'index': 1}]
[{'name': None, 'args': ': 11,', 'id': None, 'index': 1}]
[{'name': None, 'args': ' "b": ', 'id': None, 'index': 1}]
[{'name': None, 'args': '49}', 'id': None, 'index': 1}]
[]

Agent 路由

 AgentAction / AgentFinish

  • 旧版 LangChain:使用 AgentExecutor,返回 AgentAction / AgentFinish。
  • 新版 LangChain(推荐):底层使用 LangGraph 构建 agent。 不再直接用 isinstance(response, AgentFinish),而是通过 状态图(StateGraph) + 条件边(conditional edge)来控制流程。 例如:LLM 输出如果包含 tool_calls,就路由到 “tools” 节点;如果没有,就路由到 END(结束)。

虽然实现方式变了,但核心思想完全一样:

  • 继续循环 = 还需要行动(Action)

  • 结束循环 = 已经完成(Finish)

  • example:假设用户问“今天香港天气怎么样?”

    1. Agent 返回 AgentAction(tool=“weather_tool”, tool_input={“city”: “Hong Kong”})
    2. 执行工具后得到结果 → 传回 Agent
    3. Agent 返回 AgentFinish(return_values={“output”: ” 香港今天晴天,温度 22°C”})
    4. 循环停止,返回答案。

如何判断大模型想干什么?

目前主流的判断方式分为两种时代:传统的文本解析(Regex/JSON)现代的工具调用(Function Calling)。我们来逐一拆解。

1. 纯文本解析

在这种模式下,开发者会在 Prompt(提示词)中与大模型定下严格的输出格式。

经典 ReAct Agent
  • 使用 ReActOutputParser 或 ReActSingleInputOutputParser

  • 判断标志主要看 LLM 输出中是否包含 “Final Answer:”(或类似关键词)

  • 解析逻辑伪代码

def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
    # 清理文本...
    if "Final Answer:" in text:          # ← 核心判断标志
        final_answer = text.split("Final Answer:")[-1].strip()
        return AgentFinish(
            return_values={"output": final_answer},
            log=text
        )
    else:
        # 尝试提取 Thought / Action / Action Input
        action = extract_action_from_text(text)   # 正则或字符串查找
        return AgentAction(tool=action.tool, tool_input=action.input, log=text)
  • 模型 Prompt 约束
prompt = 
"你是一个智能助手。你可以使用以下工具:[Search, Calculator]。
请你必须严格按照以下格式输出:
如果你需要使用工具,请输出: 
Action: 工具的名称 
Action Input: 传给工具的参数 
如果你已经得出了最终答案,请输出: 
Final Answer: 你的最终回答"

answer = 

Thought: 我需要计算 125 乘以 34。
Action: Calculator
Action Input: 125 * 34
#### 或者: 
Thought: 我现在知道答案了。
Final Answer: 计算结果是 4250。
MRKL / Zero-Shot Agent
  • 使用 MRKLOutputParser

  • 判断标志:输出中是否包含 “Action”: “Final Answer”(JSON 格式)或 “Final Answer” 关键词。

  • 常见提示词会要求 LLM 以 JSON 形式输出:

    {
      "action": "Final Answer",
      "action_input": "最终答案内容"
    }
    

或者

{
  "action": "search_tool",
  "action_input": "查询内容"
}
自定义 Output Parser

继承 AgentOutputParser 重写 parse() 方法,例如:

class CustomOutputParser(AgentOutputParser):
    def parse(self, llm_output: str):
        if "Final Answer:" in llm_output or "最终答案:" in llm_output:
            return AgentFinish(...)   # 结束
        else:
            return AgentAction(...)   # 调用工具

2. 原生工具调用 (Function Calling / Tool Calling)

因为靠正则表达式去匹配文本太脆弱了(大模型有时候会乱加空格、换行,或者忘记写关键词),后来 OpenAI 等厂商在模型底层直接加入了 Function Calling 的能力。现在,Gemini、GPT-4、Claude 等先进模型,在 API 层面就支持区分“普通对话”和“工具调用”。

  • 使用 OpenAIFunctionsAgentOutputParser 或类似
  • 判断标志:LLM 输出中是否有 tool_calls 字段(结构化)。
    • 如果有 tool_calls → 解析为 AgentAction
    • 如果没有 tool_calls,只有普通内容 → 解析为 AgentFinish

这比文本解析更可靠,因为它是结构化的。

结构化输出

with_structured_outputbing_tools 都可以得到结构化的内容,但是作用和目的都是不同的

with_structured_output
  • 作用:强制性要求输出指定的格式;
  • 目的:关注结果,目的是为了拿到数据结果,只关心数据。流程往往在这里就终结了。你把这个数据存进数据库,或者在网页上展示出来;
  • 用途:生成结果数据、信息抽取、分类器;
bing_tools

作用:提供“工具给模型”选择,模型不一定会选择工具;

目的:赋予模型调用函数能力,可以灵活获得工具选择或者结果。

用途:调用函数。

代码分析
  • with_structured_output 源码伪逻辑(仅供学习)
def with_structured_output(schema):
    # 1. 检查底层模型的能力
    if model.supports_tool_calling():
        # 路线 A: 转成工具,Tool Calling 执行
        json_schema = convert_to_json_schema(schema)
        model_with_tools = model.bind_tools([json_schema], tool_choice="any")
        return model_with_tools | PydanticToolsParser(schema)
        
    elif model.supports_json_mode():
        # 路线 B: 开启 JSON 模式,注入提示词。可靠性通常低于 Tool Calling,尤其在复杂 schema 时容易出现格式错误。
        model_with_json = model.bind(response_format={"type": "json_object"})
        prompt_with_instructions = inject_schema_into_prompt(schema)
        return prompt_with_instructions | model_with_json | JsonOutputParser()
        
    else:
        # 路线 C: 降级为纯文本提示词和正则解析
        # 但是一般情况下不会执行这个逻辑
        # 大多数情况下,如果模型不支持 Tool Calling,with_structured_output 会直接抛出 NotImplementedError
        prompt_with_instructions = inject_strict_instructions(schema)
        return prompt_with_instructions | model | PydanticOutputParser(schema)
  • with_structured_output 真实逻辑(抛弃不稳定分支)
def with_structured_output(
    self, 
    schema: dict[str, Any] | type[BaseModel] | type, 
    *, 
    include_raw: bool = False, 
    method: str | None = None, 
    **kwargs: Any
) -> Runnable[LanguageModelInput, Any]:
    """
    将大模型的输出转换为结构化数据。该方法是 LangChain 提供的最重要的高级功能之一,用于让 LLM 可靠地输出
    Pydantic 对象、JSON 等结构化结果,而不是随意文本。
    
    参数说明:
        schema (dict | BaseModel | type): 
            定义期望的输出结构,支持三种形式:
            - Pydantic BaseModel 类(最推荐)
            - TypedDict 类
            - JSON Schema 字典
            
        include_raw (bool, default=False):
            - False: 只返回解析后的结构化结果(Pydantic 对象或 dict)
            - True: 返回一个字典,包含 'raw'(原始 AIMessage)、'parsed'(解析结果)、
                    'parsing_error'(解析失败时的错误信息),便于调试
            
        method (str | None, default=None):
            强制指定使用哪种结构化输出方式。可选值:
            - "tool_calling" : 使用工具调用(Tool Calling)模式(推荐)
            - "json_mode"    : 使用 JSON Mode 模式
            - None           : 自动根据模型能力选择最佳方式
            
        **kwargs: 
            额外参数会透传给 bind_tools() 或 bind() 方法。
            常见参数如:strict=True(OpenAI 支持严格模式,强制模型严格遵守 schema)
    
    返回值:
        一个 Runnable 对象,可直接 .invoke()、.stream() 使用。
    """

    # 1. 将用户传入的 schema 统一转换为 JSON Schema 格式
    #    支持 Pydantic BaseModel、TypedDict 等多种输入形式
    json_schema = _convert_schema_to_json(schema)

    # 2. 优先使用 Tool Calling 模式(绝大多数现代模型推荐路线)
    #    原理:把结构化输出伪装成“调用一个专门的工具”,让模型通过 tool_calls 返回数据
    #    成功率最高,推荐优先使用
    if self._supports_tool_calling() and (method is None or method == "tool_calling"):
        
        bound_llm = self.bind_tools(
            tools=[json_schema],                    # 将 schema 转为工具格式
            tool_choice="required",                 # 强制模型必须调用此工具(推荐写法)
            **kwargs                                # 透传 strict=True 等参数
        )
        
        # 根据 schema 类型选择合适的解析器(PydanticToolsParser 或 JsonOutputToolsParser 等)
        parser = _get_tools_parser(schema)
        
        runnable = bound_llm | parser

    # 3. 降级使用 JSON Mode(部分模型支持)
    #    原理:让模型直接以 JSON 格式输出,通过 response_format 控制
    elif self._supports_json_mode() or method == "json_mode":
        
        bound_llm = self.bind(
            response_format={"type": "json_object"},   # 开启 JSON Mode
            **kwargs
        )
        
        # 将 schema 的字段描述注入到 Prompt 中,引导模型按要求输出 JSON
        prompt_with_instructions = _create_structured_output_prompt(schema)
        
        runnable = prompt_with_instructions | bound_llm | JsonOutputParser()

    # 4. 如果模型既不支持 Tool Calling 也不支持 JSON Mode,则抛出异常
    else:
        raise NotImplementedError(
            f"with_structured_output is not implemented for this model: {self.__class__.__name__}. "
            "Please use a model that supports tool calling or json_mode."
        )

    # 5. 如果用户设置 include_raw=True,则包装返回结果
    #    返回格式为:{"raw": AIMessage, "parsed": structured_data, "parsing_error": error or None}
    if include_raw:
        runnable = runnable | _wrap_with_raw_output()

    return runnable
  • with_structured_output:不管你传入的 schema 是什么类型 (Pydantic/TypedDict/JSON Schema 字典),LangChain 在发包给 OpenAI 之前,都会把它们翻译成标准的 OpenAI Function Calling JSON Schema
  • tools_calll:无论你传的是 Python 函数还是 Pydantic 类,LangChain 在发包给 OpenAI 之前,都会把它们翻译成标准的 OpenAI Function Calling JSON Schema。如果你愿意,你完全可以直接手写这段原生 JSON 传给 bind_tools
# 这是最底层的原生 JSON Schema 写法
raw_json_schema = {
    "type": "function",
    "function": {
        "name": "extract_weather",
        "description": "提取天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "城市名称"},
                "temperature": {"type": "integer", "description": "温度"}
            },
            "required": ["city", "temperature"]
        }
    }
}

# 直接绑定原生 JSON
llm_with_tools = llm.bind_tools([raw_json_schema])

msg = llm_with_tools.invoke("上海今天大雨,只有18度")

print(msg.tool_calls)
# 输出: [{'name': 'extract_weather', 'args': {'city': '上海', 'temperature': 18}, 'id': 'call_def456'}]
总结
  1. 既然 bind_tools 也能提取格式,那要 with_structured_output 干嘛?区别仅仅在于“最后一步的拆快递(解析)”**:

    • 当你使用 bind_tools([WeatherInfo]): 大模型吐出的是一个原始的 AIMessage 对象。你需要的数据藏得很深,你要自己写代码去扒开字典:city = msg.tool_calls[0]['args']['city']
    • 当你使用 with_structured_output(WeatherInfo): LangChain 在底层不仅调用了 bind_tools,还非常贴心地在链条末尾塞进了一个输出解析器(PydanticToolsParser)。 当大模型吐出字典后,解析器会截获它,并自动帮你实例化一个正儿八经的 Python 对象还给你。你拿到手直接就是 result = ...,然后优雅地调用 result.city 即可。
  2. 调用顺序:如果先 bind_tools,再调用 with_structured_output,有时会覆盖之前的 tools。如果需要同时使用工具和结构化输出:1.先绑定普通工具,再绑定结构化输出;2.直接在 with_structured_output 中传入需要的 tools;

# 1. 定义工具
@tool
def check_order_status(order_id: str) -> str:
    """查询订单状态。当用户询问订单、物流、发货情况时必须调用此工具。"""
    mock_db = {
        "ORD-123": "已发货,预计明天送达。",
        "ORD-456": "正在配货中。"
    }
    return mock_db.get(order_id, f"未找到订单 {order_id}")

# 2. 定义结构化输出 Schema
class UserIntent(BaseModel):
    category: str = Field(..., description="用户意图分类:'查订单', '闲聊', '投诉', '其他'")
    emotion: str = Field(..., description="用户情绪:'积极', '中性', '愤怒'")

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.bind_tools([check_order_status]).with_structured_output(UserIntent, include_raw=True)
  1. 不同模型支持程度不同:OpenAI、Claude、Gemini 支持很好;部分本地模型或旧模型可能抛 NotImplementedError。
  2. include_raw=True 在调试解析失败时非常有用。

LangGraph 实现

在 LangGraph 中,已经不再使用判断 AgentAction / AgentFinish 这种基于 Python 对象类型的 while 循环判断。

LangGraph 采用 状态机 + 图(Graph) 的方式来处理流程,通过 条件边(conditional edges) 来决定下一步是继续调用工具,还是结束并输出最终答案。这种设计更加灵活、可视化、支持持久化、中断和多代理协作。

核心判断逻辑放在一个条件路由函数(conditional edge function)中,最常见的是检查 LLM 输出的 tool_calls 属性:

  • 典型实现(现代 Tool Calling ReAct Agent)
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import tools_condition   # 官方预置函数(最常用)
from langchain_core.messages import AIMessage
from typing import TypedDict, Annotated, Literal

# 1. 定义状态(State)
class AgentState(TypedDict):
    messages: Annotated[list, "add_messages"]   # 消息历史(包含 LLM 输出和工具结果)

# 2. Agent 节点:调用 LLM(绑定了 tools)
def agent_node(state: AgentState):
    response = llm_with_tools.invoke(state["messages"])   # llm_with_tools 是绑定了工具的 LLM
    return {"messages": [response]}

# 3. 工具执行节点(预置的 ToolNode)
tool_node = ToolNode(tools)

# 4. 条件判断函数(关键!相当于旧版的 AgentFinish 判断)
def should_continue(state: AgentState) -> Literal["tools", END]:
    messages = state["messages"]
    last_message = messages[-1]
    
    # 核心判断:
    # 如果最后一条消息(LLM 输出)包含 tool_calls(且不为空),说明需要调用工具 → 继续循环
    # 如果没有 tool_calls,说明 LLM 直接给出了最终答案 → 结束
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"          # 去执行工具节点
    return END                  # 结束图的执行,返回最终结果
  • 构建图(Graph)的完整结构
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("agent", agent_node)      # LLM 思考 + 决定
workflow.add_node("tools", tool_node)       # 执行工具

# 添加边
workflow.set_entry_point("agent")

# 条件边(最重要的一步)
workflow.add_conditional_edges(
    "agent",                    # 从 agent 节点出来
    should_continue,            # 使用上面的判断函数
    {"tools": "tools", END: END}   # 映射:如果返回 "tools" 就去 tools 节点,否则结束
)

# 从 tools 执行完后,总是回到 agent 继续思考
workflow.add_edge("tools", "agent")

app = workflow.compile()
  1. 输入 → agent 节点(LLM 调用)
  2. should_continue 检查最后一条 AIMessage 是否有 tool_calls:
    • 有 → 路由到 “tools” 节点执行工具 → 工具结果作为新消息加入 state → 回到 agent
    • 没有 → 路由到 END,整个 graph 结束,app.invoke() 返回最终的 messages(最后一条就是最终答案)
  3. 这个循环由图自动管理,不需要手动 while 循环。

LangGraph 提供了开箱即用的条件函数,避免自己写 should_continue:

from langgraph.prebuilt import tools_condition

workflow.add_conditional_edges(
    "agent",
    tools_condition,        # 内部逻辑几乎和上面看到的 should_continue 一模一样
    {"tools": "tools", END: END}
)

tools_condition 的核心就是:检查最后一条消息是否有 tool_calls

与 AgentExecutor / AgentFinish 的对比

方面旧版 AgentExecutor (AgentFinish)新版 LangGraph (推荐)
判断方式isinstance(response, AgentFinish) 或 OutputParser 解析 “Final Answer:“检查 last_message.tool_calls 是否存在/为空
循环控制while 循环 + 手动 invokeGraph 的 conditional edges + 自动路由
状态管理intermediate_steps 等有限字段自定义 TypedDict State,支持任意结构 + 消息列表
灵活性较低(固定 ReAct 模式)极高(可加 human-in-loop、多代理、自定义路由等)
是否依赖文本解析依赖(ReAct 模式下看 “Final Answer:” 等字符串)主要依赖结构化 tool_calls(更可靠)
推荐场景简单快速原型生产级、复杂、多步骤、可调试 agent
  • 如果使用纯文本 ReAct Prompt(非 tool calling),判断逻辑可能会改成检查最后消息的内容是否包含 “Final Answer” 等,但现代最佳实践是使用 tool calling(OpenAI、Anthropic、Grok 等模型都支持),判断更稳定。
  • create_react_agent() 函数是 LangGraph 提供的快捷方式,它内部已经搭建好了上面这种 graph(agent + tools + conditional edges)

AgentExecutor

AgentExecutor 是 LangChain 框架中智能体(Agent)的核心运行时与调度管理器。它是连接 LLM 大脑(Agent)、外部工具(Tools) 和 用户输入 的中央控制枢纽,负责驱动整个 “思考 → 调用工具 → 获得结果 → 再思考” 的自动闭环,直到任务完成。

官方定义:管理使用工具的智能体的执行链。实现 Runnable 接口,是 LangChain 中运行 Agent 的标准、最高级封装。

LLM 本身只能生成文本,无法直接运行代码或调用 API。AgentExecutor 是一个封装好的运行环境(Runtime),它把“大模型预测出的动作”真正落地执行,并把执行结果反馈给大模型。

核心工作流程(Agent Loop)

AgentExecutor 本质上是一个带有错误处理和日志记录的 while 循环。当接收到用户的任务时,它会严格按照以下步骤循环运行,直到任务完成:

  1. 输入接收: 接收用户的初始问题或任务。
  2. 思考与决策(调用 Agent): 将问题交给 Agent(包含 LLM 和 Prompt)。LLM 会根据当前上下文进行推理,并决定下一步应该做什么。
  3. 解析意图: AgentExecutor 解析 LLM 的输出。此时通常有两种结果:
    • Action(采取行动): LLM 决定需要调用某个工具(例如:“我需要用计算器算一下 125 * 34”)。
    • Finish(完成任务): LLM 认为已经收集到足够的信息,可以直接回答用户(例如:“最终答案是 4250”)。
  4. 工具执行(如果结果是 Action): AgentExecutor 拦截这个 Action,找到对应的工具(Tool),并将 LLM 生成的参数传入工具中实际运行(比如真正去调用一个 Python 函数或第三方 API)。
  5. 反馈结果(Observation): AgentExecutor 获取工具运行的结果(或者是报错信息),将这个结果作为新的“观察(Observation)”添加到上下文中。
  6. 循环往复: 带着最新的观察结果,回到第 2 步,让 LLM 再次思考。这个 “思考 (Thought) -> 行动 (Action) -> 观察 (Observation)” 的循环(即 ReAct 模式)会一直持续,直到触发 Finish 动作,或者达到最大循环次数(避免死循环)。

解决什么工程问题?

在实际工程中,你不能指望 LLM 每次都完美输出。AgentExecutor 解决了很多现实的工程问题:

  • 处理格式错误: 如果 LLM 输出的工具调用格式不符合要求(比如漏了括号、JSON 格式崩了),AgentExecutor 会捕获这个解析错误,并将错误信息(如“格式不对,请重新输出”)反馈给 LLM,让它自我纠正。
  • 处理工具异常: 如果调用的外部 API 宕机或超时,AgentExecutor 会捕获异常,告诉 LLM “该工具当前不可用”,LLM 可能会选择更换其他工具或改变策略。
  • 防止无限死循环: 它内置了 max_iterations(最大迭代次数)或 max_execution_time(最大执行时间)的限制。如果智能体陷入了“反复调用同一个工具却得不到结果”的死胡同,AgentExecutor 会强行终止循环并返回当前最好的结果。
  • 日志与可观测性: 它记录了智能体每一步的思考过程(Thought)和工具调用细节,这对于开发者调试(Debug)至关重要。

代码

  • 用伪代码理解 AgentExecutor
def AgentExecutor(user_input, agent, tools, max_iterations=15):
    intermediate_steps = [] # 记录之前的动作和结果
    
    for _ in range(max_iterations):
        # 1. 询问大脑(LLM)下一步该怎么做
        decision = agent.plan(user_input, intermediate_steps)
        
        # 2. 如果大脑说可以结束了,就返回最终答案
        if isinstance(decision, AgentFinish):
            return decision.return_values
            
        # 3. 如果大脑说需要行动,就提取工具名和参数
        if isinstance(decision, AgentAction):
            tool_name = decision.tool
            tool_input = decision.tool_input
            
            # 4. 找到对应的工具并执行它
            try:
                tool = find_tool(tools, tool_name)
                observation = tool.run(tool_input)
            except Exception as e:
                # 捕获异常,并将其作为观察结果返回给大模型,让它知道出错了
                observation = f"Tool execution failed: {e}"
                
            # 5. 将行动和结果记录下来,进入下一次循环
            intermediate_steps.append((decision, observation))
            
    # 超过最大循环次数,强制退出
    return "Agent stopped due to iteration limit."

是否支持使用多个 Agent?

AgentExecutor 的设计就是为单个 agent 服务的。看它的核心初始化方式(无论是旧版 from_agent_and_tools 还是新版):

agent_executor = AgentExecutor(
    agent=xxx,          # ← 这里只能传入一个 agent 对象
    tools=tools,
    ...
)
  • agent 参数只接受 一个BaseSingleActionAgentBaseMultiActionAgent(或通过 create_react_agent 等工厂函数创建的单个 agent)。
  • 它内部的执行循环(之前的 while not isinstance(response, AgentFinish))是围绕这一个 agent 反复调用 LLM + 执行工具 + 反馈的逻辑。
  • 它没有内置机制来管理多个独立 agent 之间的协作、路由、状态共享或并行执行。

简单来说:一个 AgentExecutor = 一个 agent + 它的工具 + 执行循环

如果要实现多个 agent(Multi-Agent), LangChain 官方推荐以下两种主流方式:

  1. 用 AgentExecutor 实现“伪多代理”(简单但有限制)

    • 把其他 agent 包装成 Tool,然后给主 agent 使用。
    • 主 agent(用 AgentExecutor 运行)看到的是几个“子工具”,实际上每个子工具内部运行的是另一个完整的 AgentExecutor(子 agent)。
    • 优点:简单,能快速实现 supervisor(监督者) + sub-agents(子代理)模式。
    • 缺点:上下文管理较差、子 agent 没有独立状态、调试复杂、层次深时容易 token 爆炸。
  2. 使用 LangGraph 构建真正的多代理系统(推荐)

    • LangGraph 是 LangChain 官方现在主推的框架,专门用来处理复杂、多 agent、带循环的状态流。 常见多代理模式包括:
      • Supervisor 模式(中央协调者):一个 supervisor agent 把任务路由给多个 specialist agents(每个 specialist 可以是独立的 LangGraph agent 或 AgentExecutor)。中央协调,易控制,适合大多数业务场景。
      • Subagents 模式:主 agent 把子 agents 当作工具调用。
      • Handoff / Router 模式:代理之间直接传递控制权(使用 create_handoff_tool),更去中心化。
      • Hierarchical / Swarm / Parallel:支持并行执行、持久化、human-in-the-loop 等高级特性。

LangGraph 的优势:

  • 用图(Graph)明确定义节点(每个 agent 是一个节点)和边(路由逻辑)。
  • 更好的状态管理、streaming、中断、持久化、调试(LangSmith 可视化)。
  • 支持真正的多 agent 协作,而不只是“把 agent 包成工具”。

具体 LangGraph 是如何实现多代理的,请看:[[LangGraph#Multi-Agent]]

处理解析错误

agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
  1. 默认错误处理(设为 True)

当你在初始化 AgentExecutor 时,设置 handle_parsing_errors=True

  • 工作原理: 当解析器报错时,程序不再直接崩溃退出。相反,它会捕获这个错误,并将一段默认的警告信息(例如:“格式无效,缺少 ‘Action:’”)作为上一步的“观察结果(Observation)”重新喂给大模型。
  • 效果: 大模型看到报错信息后,通常会意识到自己刚才的输出格式写错了,然后在下一轮思考中立刻纠正过来,重新输出正确的格式。
  1. 自定义错误提示语(传入字符串)

有时默认的报错信息不够明确,大模型看了可能还是不知道怎么改。这时你可以直接传一段字符串给 handle_parsing_errors

  • 用法: handle_parsing_errors="检查你的输出并确保其符合规范,请必须使用 Action 和 Action Input 语法!"
  • 效果: 当模型输出格式错误时,这段你亲手写的“严厉提示”会被发送给模型,强制引导它按正确的格式重新输出。
  1. 动态错误处理逻辑(传入自定义函数)

如果你需要更复杂的逻辑,可以传入一个 Python 函数。

  • 用法: 写一个函数接收 error 对象,并返回一个字符串。例如,文档里的例子是截取错误信息的前 50 个字符发给模型:

    def _handle_error(error) -> str:
        return str(error)[:50] # 截取前50个字符避免报错信息太长
    
    agent_executor = AgentExecutor(..., handle_parsing_errors=_handle_error)
    
  • 效果: 这种方法最灵活,你可以根据具体的异常类型(Exception Type)动态生成不同的安抚或纠正话术反馈给大模型。

迭代器 AgentIter

在通常情况下,当我们调用 Agent 执行任务时,它会一口气把所有的“思考 - 调用工具 - 观察”循环跑完,直接给出最终结果。但这篇文章指出,有时候我们希望一步一步地控制 Agent 的执行过程,这在以下场景非常有用:

  1. 添加人工审核(Human-in-the-loop): 在 Agent 执行下一步动作前,让人类确认是否允许继续。
  2. 中间步骤验证: 在代码层面插入自定义的逻辑,检查 Agent 上一步使用工具得到的结果是否正确。

示例:让 Agent 计算“第 998、999 和 1000 个素数的乘积”**。

为了完成这个任务,文档演示了以下步骤:

  1. 定义工具: 给了 Agent 两个工具。一个是 GetPrime(用来查询第 n 个素数是多少),另一个是 Calculator(用来做数学乘法计算)。
  2. 构建 Agent 迭代器: 构建好 OpenAI Functions Agent 后,不再使用常规的 agent_executor.invoke(),而是使用了 agent_executor.iter() 进行 for 循环遍历。
  3. 注入自定义逻辑与人工打断: 在 for 循环内部,代码做了一层拦截:
    • 当检测到 Agent 刚刚调用了 GetPrime 工具拿到了一个数字时,代码会自己运行一个 is_prime() 函数,去验证拿到的这个数到底是不是素数(这就是中间步骤验证)。
    • 验证完之后,代码弹出了一个 input("是否继续(Y/n):\n"),询问控制台前的人类用户是否同意 Agent 继续往下走(这就是人工审核介入)。
####  1. 工具定义
primes = {998: 7901, 999: 7907, 1000: 7919}
 
class CalculatorInput(BaseModel):
    question: str = Field()
 
class PrimeInput(BaseModel):
    n: int = Field()
 
def is_prime(n: int) -> bool:
    if n <= 1 or (n % 2 == 0 and n > 2):
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True
 
def get_prime(n: int, primes: dict = primes) -> str:
    return str(primes.get(int(n)))
 
async def aget_prime(n: int, primes: dict = primes) -> str:
    return str(primes.get(int(n)))
 
tools = [
    Tool(
        name="GetPrime",
        func=get_prime,
        description="用于返回第`n`个素数的工具",
        args_schema=PrimeInput,
        coroutine=aget_prime,
    ),
    Tool.from_function(
        func=llm_math_chain.run,
        name="Calculator",
        description="在需要计算数学表达式时非常有用",
        args_schema=CalculatorInput,
        coroutine=llm_math_chain.arun,
    ),
]

### 2. 创建代理
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

question = "第998、999和1000个素数的乘积是多少?"
 
for step in agent_executor.iter({"input": question}):
    if output := step.get("intermediate_step"):
        action, value = output[0]
        if action.tool == "GetPrime":
            print(f"正在检查 {value} 是否为素数...")
            assert is_prime(int(value))
        # 询问用户是否继续
        _continue = input("是否继续(Y/n):\n") or "Y"
        if _continue.lower() != "y":
            break

工具

创建工具的方法

1. @tool 装饰器

这是目前 LangChain 官方最推荐的写法,非常 Pythonic。你只需要写一个普通的 Python 函数,然后在上面加一个 @tool 装饰器即可。

  • 原理解析: LangChain 会自动读取函数的名字作为工具名,读取函数的 Docstring(多行注释) 作为工具的描述,并自动读取类型提示(Type Hints) 作为参数的 Schema。
  • 适用场景: 90% 的日常开发场景。
from langchain_core.tools import tool
from typing import Annotated

@tool
def get_weather(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
    """Get the current weather in a given location.
    
    Args:
        location: The city and state, e.g. Hong Kong, Tung Chung
    """
    # 这里写实际逻辑(调用 API、数据库等)
    if "hong kong" in location.lower():
        return "Hong Kong today is 22°C, sunny."
    return f"Weather in {location} is unknown."

# 查看生成的工具信息 
print(get_weather.name) # 输出: get_weather
print(get_weather.description) # 输出: Get the current weather in a given location. Args:location: The city and state, e.g. Hong Kong, Tung Chung

# 使用时直接把工具放入列表
tools = [get_weather]

优点

  • 代码最简洁
  • 支持异步(@tool + async def)
  • 自动推断 schema

2. 使用 Tool.from_function() 或直接实例化 Tool

这种方法不需要修改原函数的代码,非常适合用来包装第三方库的函数,或者包装某个类实例的方法

  • 原理解析: 你需要手动把目标函数(func)、工具名(name)和描述(description)拼装在一起。
  • 适用场景: 无法给原函数加装饰器时;需要将一个已经存在的旧函数快速接入 Agent 时。
  • 注意: 标准的 Tool 类默认只支持单一字符串输入(Single String Input)。

最简单创建:Tool.from_function

from langchain_core.tools import Tool
tool = Tool.from_function(
    func=my_function,
    name="tool_name",
    description="tool description"
)

实例化 Tool

将 Runnable 转换为 Tool:把 LangChain 的 Runnable(如 Chain 的 )直接转为工具:

class CalculatorInput(BaseModel): 
	question: str = Field()

llm_math_chain = LLMMathChain.from_llm(llm=llm, verbose=True)

tools = [
    Tool.from_function(
        func=llm_math_chain.run,
        name="Calculator",
        description="在需要计算数学表达式时非常有用",
        args_schema=CalculatorInput,
        coroutine=llm_math_chain.arun,
    ),
]

LLMMathChain 是 LangChain 官方封装好的源码,底层逻辑大致执行流程是这样的:

  1. 构造特定 Prompt: 它内部有一个写死的 Prompt,大概长这样:“你是一个计算器,请把用户的数学问题转换成可以在 Python numexpr 库中运行的代码。不要输出别的话,只输出代码。”
  2. 请求大模型: 它带着这个 Prompt 和用户的算式(7901 * 7907 * 7919),向你传入的 llm(即 GPT-4)发起一次秘密的网络请求。
  3. 大模型写代码: GPT-4 收到请求后,生成一段纯净的代码字符串(比如就叫 7901 * 7907 * 7919)。
  4. 本地沙盒执行:LLMMathChain 拿到大模型生成的这串代码文本后,会调用 Python 本地的 numexpr.evaluate() 或原生的 eval() 函数,直接在你的计算机内存里运行这段动态代码。
# LangChain 底层偷偷执行了类似这样的代码 
result = numexpr.evaluate("7901 * 7907 * 7919")
  1. 返回结果: 最后,.run() 方法把这个数字作为字符串返回给 Agent 的执行流。

3. StructuredTool.from_function()

普通的 Tool 只能接收一个字符串参数。如果你的工具需要接收多个参数(例如同时需要纬度和经度),或者需要接收复杂的结构化数据(如 JSON 字典),你就需要使用 StructuredTool

  • 适用场景: 工具需要多个参数,且你不想(或不能)使用 @tool 装饰器时。
  • 注意: 现在的 @tool 装饰器在底层也会自动将多参数函数转化为 StructuredTool,所以 StructuredTool 多用于手动构建复杂结构的场景。
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

class WeatherInput(BaseModel):
    location: str = Field(..., description="The city and state, e.g. Hong Kong")
    date: str | None = Field(None, description="Optional date in YYYY-MM-DD")

def get_weather_func(input: WeatherInput) -> str:
    return f"Weather in {input.location} is 22°C."

weather_tool = StructuredTool.from_function(
    func=get_weather_func,
    name="get_weather",
    description="Get current or future weather",
    args_schema=WeatherInput,   # 使用 Pydantic 模型
    return_direct=False
)

3. 继承 BaseTool 类

如果你需要对工具自定义校验、错误处理、异步、缓存等高级功能时,使用这种方式。

  • 适用场景: 需要在工具内部维护复杂的连接池(如数据库连接);需要自定义工具发生错误时的回调处理;需要同时严格控制同步 (_run) 和异步 (_arun) 的具体执行逻辑。
from langchain_core.tools import BaseTool
from pydantic import Field
from typing import Type

class WeatherTool(BaseTool):
    name: str = "get_weather"
    description: str = "Useful for when you need to get weather information."
    args_schema: Type[WeatherInput] = WeatherInput   # 可选,使用 Pydantic

    def _run(self, location: str, date: str | None = None) -> str:
        """同步执行逻辑"""
        # 实际调用 API 等
        return f"Weather in {location}: 22°C"

    async def _arun(self, location: str, date: str | None = None) -> str:
        """异步执行逻辑(推荐用于 I/O 操作)"""
        ...

ToolUse

主要讨论 Agent 如何发现、调用和管理工具。

1. 人工介入 HumanInTheLoop

  • 含义Human in the Loop(人参与循环)。
  • 作用:当 Agent 在执行过程中遇到不确定、敏感或需要确认的情况时,暂停执行,把控制权交给人类(用户或管理员)。
  • 常见场景
    • 需要人工审批某些操作(如发送邮件、修改数据库)。
    • 工具返回结果有歧义时,让人判断下一步。
    • 增加安全性和可控性,避免 Agent 完全自主跑偏。
  • 在 LangGraph 中通过 Interrupt(中断) 和 Checkpoint 机制实现,非常强大。

3. 多工具 MultipleTools

  • 含义:Agent 同时使用多个不同的工具
  • 作用:一个 Agent 通常不会只绑定一个工具,而是绑定多个专业工具(例如:搜索工具 + 计算器 + 天气工具 + 数据库工具)。
  • LLM(大模型)会根据任务自动选择合适的工具调用。
  • 这几乎是现代 Agent 的标配。

对于多个工具,我们无法确定模型将调用哪个工具。因此,我们不能硬编码将特定工具放入我们的链条中。相反,我们将添加一个 call_tool_list,即一个 RunnableLambda,它接受 JsonOutputToolsParser 的输出并根据它实际构建链条的末尾,这意味着在运行时附加到链条末尾被调用的工具。我们可以这样做,因为 LCEL 有一个很酷的特性,即在任何可运行序列(LCEL 的核心构建块)中,如果一个组件返回更多的可运行项,则这些项作为链条的一部分运行。

from operator import itemgetter
from typing import Union
 
from langchain.output_parsers import JsonOutputToolsParser
from langchain_core.runnables import (
    Runnable,
    RunnableLambda,
    RunnableMap,
    RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
 
model = ChatOpenAI(model="gpt-3.5-turbo")
tools = [multiply, exponentiate, add]
model_with_tools = model.bind_tools(tools)
tool_map = {tool.name: tool for tool in tools}
 
 
def call_tool(tool_invocation: dict) -> Union[str, Runnable]:
    """根据模型选择的工具动态构建链条末尾的函数。"""
    tool = tool_map[tool_invocation["type"]]
    return RunnablePassthrough.assign(output=itemgetter("args") | tool)
 
 
# .map()允许我们将函数应用于输入列表。
call_tool_list = RunnableLambda(call_tool).map()
chain = model_with_tools | JsonOutputToolsParser() | call_tool_list

4. 并行 Parallel

  • 含义并行工具调用(Parallel Tool Calls)。
  • 作用:当 Agent 需要调用多个工具时,可以同时(并行) 执行,而不是一个接一个顺序执行。
  • 优势
    • 大幅提升速度(例如同时查天气 + 查股票 + 搜索新闻)。
    • 现代支持 Tool Calling 的模型(GPT-4o、Claude、Grok 等)原生支持并行调用。
  • 在 LangGraph 的 ToolNode 中会自动处理并行执行。
# 模型支持并行调用
model = ChatOpenAI(model="gpt-3.5-turbo-1106")
tools = [multiply, exponentiate, add]
model_with_tools = model.bind_tools(tools)
tool_map = {tool.name: tool for tool in tools}
 
def call_tool(tool_invocation: dict) -> Union[str, Runnable]:
    """Function for dynamically constructing the end of the chain based on the model-selected tool."""
    tool = tool_map[tool_invocation["type"]]
    return RunnablePassthrough.assign(output=itemgetter("args") | tool)
 
 
# .map() allows us to apply a function to a list of inputs.
call_tool_list = RunnableLambda(call_tool).map()
chain = model_with_tools | JsonOutputToolsParser() | call_tool_list

chain.invoke( "What's 23 times 7, and what's five times 18 and add a million plus a billion and cube thirty-seven")

当面对包含多个请求的复杂问题时,这类先进模型不再只输出一个工具名称,而是会一口气输出一个包含多个工具调用请求的 JSON 列表(Array)

既然大模型一次性丢过来好几个工具任务,LangChain 是怎么同时处理它们的呢?核心就在这句代码里:

# call_tool 是我们写好的单个工具执行函数
# .map() 是 LCEL 的高阶用法
call_tool_list = RunnableLambda(call_tool).map()

在 LangChain 表达式语言(LCEL)中,.map() 的作用是将一个针对单条数据操作的函数,广播(应用)到一个列表的所有元素上。

实际运行流程如下:

  1. 用户输入发疯式提问: “23 乘以 7 是多少,5 乘以 18 是多少,一百万加十亿是多少,37 的立方是多少?”。只发送一次。
  2. 大模型输出列表: 模型瞬间理解了意图,输出一个包含 4 个工具任务的列表: [任务A(乘法), 任务B(加法), 任务C(加法), 任务D(求幂)]
  3. .map() 并行处理: 当这个列表传递给带有 .map() 的链条时,LangChain 会在底层启动并发执行。它不会等任务 A 算完再算任务 B,而是同时去执行这四个工具函数。
  4. 统一返回: 所有工具执行完毕后,结果会被重新打包成一个列表返回给你。

5. 工具错误处理 ToolErrorHandling

  • 含义Tool Error Handling —— 对工具执行过程中出现的错误进行处理。
  • 作用:工具调用失败是很常见的(API 限流、网络错误、参数错误、返回异常等)。
  • 常见处理方式
    • 捕获错误后返回友好提示给 LLM,让 LLM 重试或换其他工具。
    • 记录错误日志。
    • 设置默认 fallback(备用方案)。
    • 在 LangGraph 的 ToolNode 中可以自定义错误处理逻辑(handleToolErrors 等)。

错误处理 ToolErrorHandling

在构建 AI Agent(智能体)时,我们经常会让大模型去调用外部工具(比如请求天气 API、查数据库、执行代码)。但是,真实的开发环境中充满了不确定性:API 可能会超时、大模型可能传错了参数、数据库可能连不上。

最简单的就是在工具内 try/ catch 错误,但是如果无法覆盖错误,下面的更优的解法:

ToolException + handle_tool_error

LangChain 提供了一套内置机制来处理这种异常:

  • 主动抛出特定异常: 在你编写自定义工具函数时,如果遇到问题(比如输入参数不合法),你可以主动抛出 LangChain 专属的 ToolException
  • 开启错误捕获: 在定义工具时,设置 handle_tool_error=True

效果: 当工具内部抛出 ToolException 时,程序不会崩溃。LangChain 会把这捕捉下来,并把错误信息变成一段文本(比如:” 发生错误:API 速率超限 ”),当做这个工具的返回值,继续喂给大模型。

handle_tool_error 参数不仅可以设为 True,还有其他更灵活的玩法:

  • 传入固定的字符串: 比如 handle_tool_error="该工具目前不可用,请尝试使用其他工具或者直接回答不知道。"。这样一旦报错,大模型就会收到这句明确的指示,从而改变策略。
  • 传入自定义的 Python 函数: 你可以写一个函数,专门用来解析复杂的错误对象(Error),然后提取出最关键的错误信息返回给大模型,防止把长篇大论的报错代码(Traceback)直接塞给模型导致上下文爆炸。

Fallbacks

可以给一个容易出错的工具链加上 .with_fallbacks([备用链])

  • 当主工具报错时,完全不理会大模型,直接在底层无缝切换到“备用工具”或者“备用大模型”。
  • 例如:如果你调用 ChatGPT-4 + 复杂搜索工具 失败了,系统自动降级使用 ChatGPT-3.5 + 简单搜索工具 再试一次。

使用技巧

强制返回结构化数据 100%

通常情况下,大语言模型(LLM)回答问题时总是喜欢用自然语言聊天。但在实际的工程开发中,我们的后续代码往往需要非常严谨的格式提取数据。

不需要 prompt,直接利用 Function Calling(函数调用) 能力来解决这个问题。

1. 定义期望的输出结构(造一个假的“响应函数”)

代码没有去死磕 Prompt 告诉大模型“请你必须输出 JSON 格式,包含 answer 和 sources 字段”,而是耍了个小聪明:定义了一个名为 Response 的假工具(函数)。 在这个工具的参数说明(Schema)里,明确规定了调用它需要传入两个参数:

  • answer:字符串,用来放最终的回答。
  • sources:列表,用来放信息来源。

2. 将“假工具”绑定给大模型

把真正的查询工具(比如 retriever_tool 文档检索工具)和刚才捏造的那个假的 Response 工具,一起绑定给大模型。 这样一来,大模型在思考完问题后,如果它想给用户最终答案,由于设定,它会被引导去“调用”这个 Response 函数。

3. 自定义输出解析器(Output Parser 拦截)

当大模型输出它的思考结果时,写一个自定义的拦截逻辑(parse 函数):

  • 如果大模型选择调用了真正的 retriever_tool,就放行,让它去查资料(返回 AgentAction)。
  • 最精妙的一步来了:如果大模型选择调用了 Response 这个假工具,代码就强行拦截!它会把大模型试图传给 Response 工具的参数(即包含 answersources 的 JSON 字典)直接截获,然后作为最终结果返回给用户(变成 AgentFinish)。

消息

ChatModels 接受一个消息列表作为输入,并返回一个消息。消息有几种不同类型。所有消息都有 rolecontent 属性。role 描述了是谁说了这条消息。LangChain 为不同的角色提供不同的消息类。content 属性描述了消息的内容。这可以是一些不同的内容:

  • 一个字符串(大多数模型是这样的)
  • 一个字典列表(这用于多模式输入,其中字典包含关于该输入类型和输入位置的信息)

此外,消息还有一个 additional_kwargs 属性。这是可以传递有关消息的其他信息的地方。这主要用于 _ 特定于提供商 _ 的输入参数,而不是通用的参数。这方面的最佳示例是 OpenAI 的 function_call

消息类型

HumanMessage

这代表用户发送的消息。通常只包含内容。

AIMessage

这代表模型发送的消息。这可能包含 additional_kwargs - 例如,使用 OpenAI 函数调用时可能包含 functional_call

SystemMessage

这代表系统消息。只有一些模型支持这个。这告诉模型如何行为。通常只包含内容。

FunctionMessage

这代表函数调用的结果。除了 rolecontent,这个消息还有一个 name 参数,表示生成此结果的被调用函数的名称。

ToolMessage

这代表工具调用的结果。为了匹配 OpenAI 的 functiontool 消息类型,这与 FunctionMessage 不同。除了 rolecontent,这个消息还有一个 tool_call_id 参数,表示被调用以生成此结果的工具的 ID。

Memory

常见类型

Memory 类型描述优点缺点适合场景
ConversationBufferMemory完整保存所有对话历史(Human + AI 消息)最简单、上下文最完整Token 消耗大,容易超上下文窗口短对话、原型开发
ConversationBufferWindowMemory只保留最近 K 轮对话(窗口式)控制 Token 数量,节省成本丢失早期重要信息中等长度对话、客服场景
ConversationSummaryMemory用 LLM 对历史对话进行实时总结,只保存总结内容大幅节省 Token,适合长对话总结可能丢失细节,依赖 LLM 质量长对话、创意写作、咨询
ConversationSummaryBufferMemory结合 Buffer + Summary:最近几轮保持原始,早期内容用总结平衡完整性与效率配置稍复杂中长对话,最常用平衡方案
ConversationTokenBufferMemory根据 Token 数量限制保留最近内容精确控制 Token 开销需要指定 LLM 计算 TokenToken 敏感的应用
Entity Memory提取并跟踪对话中的实体(人名、地点、偏好等),用知识图谱方式存储能记住用户具体事实和偏好实现较复杂个人助理、记住用户喜好
VectorStoreRetrieverMemory把历史对话存入向量数据库,支持语义检索召回相关片段支持超长历史、语义搜索需要向量数据库,延迟稍高RAG + 聊天、知识密集型应用
  • 最基础的使用示例(ConversationBufferMemory)
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

# 创建 Memory
memory = ConversationBufferMemory(
    memory_key="history",      # Prompt 中引用的变量名
    return_messages=True       # 返回 Message 对象(推荐)
)

# 创建对话链
conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True               # 显示思考过程
)

# 多轮对话
print(conversation.predict(input="你好,我叫小明"))
print(conversation.predict(input="我喜欢打篮球"))
print(conversation.predict(input="我最喜欢的运动是什么?"))  # 它应该能记住“小明”和“篮球”

新的 Memory 打法

  • 旧版(Chains + Memory):主要使用上面提到的 ConversationXXXMemory 类,配合 LLMChain、ConversationChain 使用。

  • 新版推荐(Runnable + LangGraph)

    • 使用 RunnableWithMessageHistory 来包装你的 Runnable(Chain 或 Agent)。
    • LangGraph 引入更强大的状态管理(State)Checkpointing(检查点持久化),可以轻松实现线程级(thread-scoped)短期记忆。
    • Long-term Memory:通过 LangGraph 的 Store(如 InMemoryStore、数据库后端)或 LangMem 框架实现跨会话的长期记忆(保存用户偏好、事实等)。
  • 写法示例(RunnableWithMessageHistory):

from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory

# ... 定义你的 chain 或 prompt | llm ...

chain_with_history = RunnableWithMessageHistory(
    your_chain,
    lambda session_id: InMemoryChatMessageHistory(),  # 可换成 Redis、Postgres 等持久化
    input_messages_key="input",
    history_messages_key="history"
)

使用建议

  • 短对话(< 10 轮):直接用 ConversationBufferMemory 或 ConversationBufferWindowMemory(k=5)。
  • 长对话:优先 ConversationSummaryBufferMemory(最均衡)。
  • 需要记住用户偏好:结合 Entity Memory 或 VectorStoreRetrieverMemory。
  • 生产环境:一定要做持久化(Redis、Postgres、DynamoDB 等),避免内存丢失。
  • 多代理 / 复杂 Agent:强烈推荐迁移到 LangGraph + Checkpoint + Long-term Store,Memory 管理会更灵活和强大。
  • Token 优化:永远监控上下文长度,结合 Summary 机制,避免超限。

回调 Callbacks

主要作用

  • 实时流式输出(Streaming):让答案一个字一个字地打印出来,而不是等全部生成完。
  • 日志记录与监控:记录每次 LLM 调用耗时、Token 用量、Prompt 内容等。
  • 调试与追踪:查看 Chain、Agent、Tool 的内部执行过程(哪个步骤出错了?)。
  • 集成第三方工具:如 LangSmith、LangFuse、PromptLayer、Weights & Biases 等观测平台。
  • 自定义行为:计算成本、发送通知、保存日志到数据库、 guardrail(安全检查)等。
  • 错误处理:捕获 LLM 调用失败、解析错误等。

工作原理

LangChain 在执行过程中会触发一系列标准事件,你可以通过实现 BaseCallbackHandler 来监听这些事件。

常见事件类型(按组件分类)

  • LLM / Chat Model
    • on_llm_start:LLM 调用开始
    • on_llm_new_token:生成一个新 token(流式输出核心)
    • on_llm_end:LLM 调用结束
    • on_llm_error:LLM 调用出错
  • Chain
    • on_chain_start、on_chain_end、on_chain_error
  • Tool
    • on_tool_start、on_tool_end、on_tool_error
  • Agent / Retriever 等也有对应的事件(如 on_agent_action、on_agent_finish)

Callback Handler

内置 Callback Handler

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4o",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]   # 实时打印输出
)

llm.invoke("介绍一下香港")

其他内置的:

  • StdOutCallbackHandler:简单打印执行过程(调试用)
  • ConsoleCallbackHandler
  • UsageMetadataCallbackHandler:统计 Token 用量

自定义 Callback Handler

from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List

class MyCustomHandler(BaseCallbackHandler):
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        print(f"LLM 开始调用,Prompt 长度: {len(prompts[0])}")

    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)   # 实时输出

    def on_llm_end(self, response, **kwargs):
        print("\nLLM 调用结束")

    def on_chain_error(self, error: Exception, **kwargs):
        print(f"Chain 执行出错: {error}")

# 使用方式
handler = MyCustomHandler()

chain = prompt | llm
chain.invoke({"question": "今天香港天气"}, config={"callbacks": [handler]})

用法

Callback 的传入, 有两种主要方式:

  1. 在构造函数中传入(全局/对象级别):
llm = ChatOpenAI(callbacks=[handler])
  1. 在运行时传入(推荐,单次请求级别):
result = chain.invoke(
    input,
    config={"callbacks": [handler]}   # 或多个 handler
)

注意:新版 LangChain(Runnable 风格)更推荐使用 config={“callbacks”: […]} 的方式,灵活性更高。

在 LangGraph 中的使用情况

  • LangGraph 仍然完全兼容 LangChain 的 Callback 系统。
  • 在 graph.invoke()、graph.stream() 时同样可以通过 config={“callbacks”: [handler]} 传入。
  • LangGraph 还增加了更细粒度的状态和节点级别的可观测性。

不同场景下的使用情况:

  • 流式输出 → 用 StreamingStdOutCallbackHandler 或自定义 on_llm_new_token
  • 生产监控 → 集成 LangSmith(官方)或 LangFuse(开源),它们都通过 Callback 实现自动追踪
  • Token 统计 → 使用 UsageMetadataCallbackHandler
  • 调试复杂 Agent → 自定义 Handler 打印 on_tool_start、on_agent_action 等
  • 异步 → 使用 AsyncCallbackHandler(对应 ainvoke、astream 等)

LangChain4J

更完整的代理实现需要使用 AiServices 或 AgentExecutor 来自动完成工具绑定和调用循环。对于复杂应用,建议深入研究 AiServices(LangChain4j 提供的一种更声明式、更简洁的编程模型)和 AgentExecutor

聊天记忆

底层 API vs 高层 AiServices:

特性底层 API(本示例)高层 AiServices(之前示例)
记忆控制完全手动(灵活)自动处理(便捷)
代码量多(需手动 add 消息)少(框架封装)
适用场景定制化需求(比如过滤消息)快速开发(通用场景)
消息过滤 / 修改易实现(添加前可修改)需自定义扩展
  • 底层 API 脱离 AiServices 封装,手动调用 ChatMemory.add() 管理消息、model.chat() 调用模型,实现更灵活的记忆控制;

相关教程

Quarkus

基于 LangChain4j 库构建,并提供了一个专为 Quarkus 量身定制的声明式编程模型。

Langchain4J 库是一个灵活的工具包。它提供了一套丰富的构建模块来处理大型语言模型,例如代理、工具、检索器和链,使开发人员能够完全控制如何组合和编排它们。

然而,直接使用 LangChain4j 时,您需要负责将这些组件连接起来,配置模型客户端,处理生命周期问题,并将可观测性、配置和内存管理集成到您的应用程序中。这种方法功能强大,但底层且需要手动操作。

Quarkus LangChain4j 扩展基于 LangChain4j 构建,并提供企业级、预设的集成方案,为您处理大部分样板代码:

  • 具有 CDI 和注解的声明式 AI 服务
  • 原生支持 OpenAI、Hugging Face、Ollama 等
  • 使用 Quarkus 配置属性进行集中式配置
  • 构建时间优化和验证
  • 可观测性:日志、指标、追踪
  • 用于嵌入式商店和模型服务的开发服务
  • 用于视觉内省和测试的开发者用户界面
  • 自动聊天内存管理 这样您就可以专注于应用程序逻辑,而不是基础设施搭建。 也就是说,Quarkus LangChain4j 并不限制你只能使用单一模型。你可以自由组合搭配:
  • 使用高级声明式方法(@RegisterAiService)实现快速集成
  • 当您需要更多控制权时,请使用更底层的 LangChain4j API。

Ref

MCP

Ref

https://modelcontextprotocol.io/docs/getting-started/intro

测试与评估

Ref

https://docs.langchain4j.info/tutorials/testing-and-evaluation

← 返回 Notes

Contact

Contact Me

Leave a message here. The form sends directly from the browser to a form delivery service and then to my email.

Messages are delivered to lzx744008464@gmail.com.