Chain
Chain 已经过时(甚至被官方废弃)的东西:传统的 Chain 类。如果你在看一些半年前或一年前的教程,你会看到大量以 Chain 结尾的类,比如:
LLMChainConversationalRetrievalChainSequentialChain
现状: 这些传统的类已经过时了。 原因: 它们是高度封装的“黑盒”。当你遇到报错,或者想自定义里面某一个微小步骤时,极难调试。LangChain 官方目前已经在代码库中逐步将这些传统的 Chain 标记为 Deprecated(废弃)。
取代传统 Chain 的新标准:LCEL (LangChain 表达式语言)
官方并没有放弃“把组件串联起来”这个需求,而是推出了 LCEL 来取代传统的 Chain 类。 现在,如果你想写一条线性的处理流程,不再是去实例化一个 LLMChain,而是用类似 Linux 管道符 | 的极其优雅的语法:
# 现代的“链”是这样写的(LCEL)
chain = prompt_template | llm | output_parser
result = chain.invoke({"question": "什么是黑洞?"})
现状: LCEL 完全没有过时,它是目前 LangChain 甚至 LangGraph 底层最核心的基础件。只要你的任务是线性、单向、确定性的(比如:提取文本 -> 生成摘要 -> 翻译成英文),用 LCEL 依然是最高效、最轻量的首选。
因为单向的 LCEL 链条存在一个致命弱点:它不能循环,它没有状态。 如果大模型在生成摘要时发现“提取的文本缺失了信息”,在 LCEL 中程序就只能报错退出。
LangGraph 解决的是“非确定性”和“循环纠错”的问题。 它引入了图(Graph)的概念,允许你的程序:
- 无限循环与回退: 做错了?退回到上一步重新思考。
- 全局状态记忆(State): 在一个全局的大字典里记录执行到了哪一步,保留所有上下文。
- 多智能体协作: 让“程序员 Agent”写代码,让“测试 Agent”检查,如果不通过再打回给程序员重写。
Runnable 协议与 LCEL
在 LangChain 较新的版本中,其底层架构经历了一次重构,引入了 LCEL (LangChain Expression Language)。这是理解它底层实现的关键。
在写 LangChain 代码时,经常会看到这种像 Linux 管道一样的写法:
chain = prompt | model | output_parser
result = chain.invoke({"topic": "人工智能"})
这背后依赖于 Python 的魔术方法重载和统一的接口协议(Runnable 协议):
- Runnable 接口: LangChain 强制要求所有的核心组件(Prompt、Model、Parser 等)都必须继承一个名为
Runnable的基类。这个基类规定了几个必须实现的方法:invoke(单次调用)、stream(流式输出)、batch(批量处理)。 - 重载
|运算符:Runnable基类在 Python 底层重写了__or__方法(即|运算符)。当代码执行A | B时,LangChain 会在底层把它包装成一个RunnableSequence对象。 - 数据流转: 当你调用
chain.invoke()时,实际上是执行了:B.invoke(A.invoke(输入))。上一个组件的输出,会被强行作为下一个组件的输入。
RunableLambda
函数方法被转换为 RunnableLambda,这为函数添加了批处理和异步支持,以及原生跟踪和调试功能。
RunnableLambda 是 LangChain 和 LangGraph 底层架构中最朴素,但也最重要的组件之一。专门负责把普通的 Python 函数,包装成符合 LangChain 标准流水线规范的 Runnable 对象。
普通函数包装成 Runnable 后,拥有了以下超能力:
- 标准调用 (
.invoke):runnable_math.invoke(5)-> 返回10。 - 自动并发批量处理 (
.batch): 如果你传入一个列表runnable_math.batch([1, 2, 3]),LangChain 底层会自动起线程池,并发执行你的普通函数,返回[2, 4, 6]。 - 自动异步转换 (
.ainvoke): 即使你写的是同步的 Python 函数,包装后也可以在异步框架(如 FastAPI)中直接用await runnable_math.ainvoke(5)调用,LangChain 底层会把它放入异步事件循环的执行器中。 - 流式回传 (
.stream): 虽然普通函数通常是一次性返回,但包装后它依然遵循流式协议(把整个结果当作一个流的数据块发出),保证流水线下游的组件不会崩溃。
魔术方法重载 Magic Method Overloading
在 Python 中,当你对两个对象使用 a | b 时,Python 解释器在底层实际上是在调用一个名叫 __or__ 的隐藏方法,也就是执行:a.__or__(b)。
LangChain 就是通过在 Runnable 基类里重写了这个 __or__ 方法,把两个组件“粘”在了一起的。
# ==========================================
# 1. 基类:对应 langchain_core.runnables.Runnable
# ==========================================
class Runnable:
"""所有组件的基类。定义了所有组件都必须具备的标准行为。"""
def invoke(self, input_data):
"""每个子类必须自己实现具体的执行逻辑"""
raise NotImplementedError("子类必须实现 invoke 方法")
def __or__(self, other):
"""
核心魔法:无论是单个节点,还是一个巨大的链条,
只要你用 '|' 往后面接东西,统统打包成一个新的 Sequence。
"""
# 注意:这里我们做了一个优化,自动把普通函数转成 Runnable
if not isinstance(other, Runnable):
if callable(other):
other = RunnableLambda(other)
else:
raise TypeError(f"无法将 {type(other)} 接入流水线")
# 将当前的(self)和新来的(other)组合成一个新的串联对象
return RunnableSequence(self, other)
# ==========================================
# 2. 包装器:对应 langchain_core.runnables.RunnableLambda)
# ==========================================
class RunnableLambda(Runnable):
"""最基础的叶子节点,负责包装普通的 Python 函数"""
def __init__(self, func):
self.func = func
def invoke(self, input_data):
# 执行自己包装的函数
return self.func(input_data)
# ==========================================
# 3. 序列执行器:对应 langchain_core.runnables.RunnableSequence
# ==========================================
class RunnableSequence(Runnable):
"""
负责把多个 Runnable 串联起来。
注意:它自己也是一个 Runnable!这就是“复合模式”的精髓。
"""
def __init__(self, first, second):
self.steps = []
# ⭐️ 核心“拍平”逻辑:防止生成多层套娃结构
# 如果 first 已经是一个 Sequence,就把它的步骤拆解出来
if isinstance(first, RunnableSequence):
self.steps.extend(first.steps)
else:
self.steps.append(first)
# 同理处理 second
if isinstance(second, RunnableSequence):
self.steps.extend(second.steps)
else:
self.steps.append(second)
def invoke(self, input_data):
"""
Sequence 的执行逻辑:像水管一样,把前一个的输出喂给下一个。
"""
result = input_data
for step in self.steps:
result = step.invoke(result)
return result
# 定义基础函数
def step1(x): return x + 1
def step2(x): return x * 2
def step3(x): return x - 3
# 实例化叶子节点
node1 = RunnableLambda(step1)
node2 = RunnableLambda(step2)
node3 = RunnableLambda(step3)
# 场景 1:每个方法都能独立执行(具备 Lambda 的能力)
print(f"node1 独立执行: {node1.invoke(5)}") # 输出: 6
# 场景 2:每个方法都能无限拼接(自动生成 Sequence)
# 执行顺序:先执行 node1 | node2 -> 生成临时的 Sequence_A
# 然后执行 Sequence_A | node3 -> 生成最终的 Sequence_B
chain = node1 | node2 | node3
# 场景 3:连普通函数都能被自动包装(我在 __or__ 里加了处理逻辑)
def final_step(x):
return f"最终结果是: {x}"
# 直接把原生的 Python 函数接在链条后面!这就是 LangChain 极度丝滑的原因。
super_chain = chain | final_step
print(f"完整链路执行: {super_chain.invoke(5)}") # 输出: 最终结果是: 9
普通只用继承 RunnableLambda 就行:普通函数是 RunnableLambda 的实例,而 RunnableLambda 继承自顶级基类 Runnable。在 Runnable 中,写着所有组件共享的组合逻辑:
# 基类中的逻辑,会去实现 RunnableSequence
def __or__(self, other):
return RunnableSequence(self, other)
解析数据流向
在 LangChain 的现代架构(特别是引入 LCEL - LangChain Expression Language 之后)中,维护数据从链头走向链尾的核心数据结构是一个字典(Dictionary),而维护数据流转方式的核心机制是 Runnable 协议及其 invoke/stream 方法。
我们可以把整个数据流转过程想象成一条装配线,数据字典就是传送带上的那个“箱子”。
1. 核心载体:字典 Dictionary 作为输入输出
在大部分复杂的链式调用中,节点之间传递的数据通常不是单一的字符串,而是一个 Python 字典。
- 初始化: 链的第一个组件接收一个字典作为初始输入(例如:
{"topic": "人工智能", "language": "中文"})。 - 传递与修改: 每个组件(Node/Runnable)接收这个字典,执行自己的任务,然后生成一个新的字典(或者修改原字典并返回),传递给下一个组件。
2. 核心动力:Runnable 协议(LCEL)
LangChain 底层定义了一个名为 Runnable 的基类(Base Class)。所有的核心组件——无论是 Prompt 模板、LLM 模型、输出解析器,还是自定义的 Python 函数,都继承自这个 Runnable 接口。
这个接口规定了数据流转的标准动作:
invoke(input): 同步执行,接收输入,返回输出。stream(input): 流式执行,接收输入,逐个块(chunk)返回输出。batch(inputs): 批量执行。
当你使用管道符 | 把它们串起来时:
chain = prompt | model | output_parser
LangChain 在底层会将它们包装成一个 RunnableSequence。当你调用 chain.invoke(初始数据) 时,底层的逻辑其实就是:
# 伪代码:RunnableSequence 的底层流转
data = 初始数据
data = prompt.invoke(data)
data = model.invoke(data)
final_result = output_parser.invoke(data)
return final_result
3. 数据拼接/保留
如果数据只是像接力棒一样传递,那上一步的数据就会被完全覆盖。比如,生成了摘要后,原来的长文本就不见了。但很多时候我们需要在链的后端使用最开始传入的变量。
为了控制字典在传送带上的“拆解”和“合并”,LangChain 提供了几个关键的路由组件:
RunnablePassthrough 透明穿透
它的作用是把接收到的数据原封不动地传递给下一步。经常用于结合字典来补充参数。
from langchain_core.runnables import RunnablePassthrough
# 假设 context_retriever 是一个从数据库取数据的组件
chain = (
{"context": context_retriever, "question": RunnablePassthrough()}
| prompt
| model
)
# 传入 "什么是黑洞"
# 1. RunnablePassthrough() 直接拿到 "什么是黑洞" 并赋给 "question" 键
# 2. context_retriever 拿到 "什么是黑洞",去数据库查资料,返回长文本赋给 "context" 键
# 3. 此时传给 prompt 的数据字典变成了:
# {"context": "黑洞是...", "question": "什么是黑洞"}
RunnableParallel / 字典映射 (并行处理)
当你看到大括号 {} 时,它实际上是 RunnableParallel 的语法糖。它的作用是将输入数据复制并分发给内部的多个组件,然后把它们各自的执行结果打包成一个新的字典往下传。
RunnablePassthrough.assign() 追加合并
这是非常常用的高级特性。它接收当前的字典,执行某个新任务,并将结果追加(Assign) 到原字典中,然后把包含新旧数据的完整字典传给下一步。
# 假设上一步传来的字典是: {"text": "Hello world"}
chain = RunnablePassthrough.assign(
translated_text=translate_to_chinese_chain
)
# 这一步执行后,传给下一步的字典会变成:
# {"text": "Hello world", "translated_text": "你好世界"}
总结
在 LangChain(LCEL)中:
- 载体是 Python 字典(Dict),它像一个容器,装载着各种中间变量。
- 通道是
Runnable接口 的invoke方法,它规定了“接收前一个输出 -> 处理 -> 返回后一个输入”的刚性规范。 - 交通警察是
RunnablePassthrough和RunnableParallel等组件,它们负责决定字典里的键值对是直接透传、覆盖、提取还是追加合并。
流式处理
stream/astream 和 astream_events
流式传输中间步骤 .astream()
使用 AgentExecutor.astream() 方法,你可以实时获取 Agent 每一步的大动作。这种方法输出的数据包(Chunks)是交替出现的:
- 动作 (Action): 告诉你 Agent 决定调用什么工具,以及传入了什么参数(例如:决定调用
where_cat_is_hiding工具)。 - 观察 (Observation): 告诉你这个工具执行完毕后,返回了什么结果(例如:猫藏在“架子上”)。
- 最终答案 (Final Answer): 当 Agent 收集够了信息,它会输出最终想要回答给用户的那句话。
在这个过程中,你可以通过解析字典里的 actions、steps 或者 messages 字段,把 Agent 在后台“查资料”的过程展示给用户看,让用户知道 AI 没有卡死,而是在努力工作。
流式传输底层事件和逐字输出
基础的 .astream() 是一步一步抛出结果的(比如工具执行完才会抛出一个结果包)。如果你想做到极致的实时(比如 Agent 最后回答时,真的是一个字一个字地流到屏幕上),你需要使用更高阶的 .astream_events(version="v1") API。
这是一个非常细粒度的事件流 API,它会捕捉整个 Agent 运行周期中的每一个微小节点,例如:
on_chain_start: Agent 刚刚启动,收到用户输入。on_tool_start: 工具开始执行。on_tool_end: 工具执行结束。on_chat_model_stream: (最核心的)大模型正在逐个吐出 Token(字)。
通过监听 on_chat_model_stream 等事件,你不仅可以知道 Agent 正在做什么,还可以把最终回答的每一个 Token 实时渲染到前端界面上,实现和 ChatGPT 一模一样的丝滑打字机效果。
工具调用时
在工具调用(Function Calling)时,流式处理可以让你在模型输出参数的过程中就开始“边接收边解析”,甚至在参数还没完全输出完时,就能提前知道模型想要调用哪个工具。
在普通的文本对话中,流式输出的就是一个个字符(比如“你”、“好”、“啊”)。 但在工具调用(Function Calling)中,流式输出的结构会复杂一些。根据 LangChain 文档的说明:
- 分块传输(Chunks): 模型会将工具调用的信息切分成小块(
ToolCallChunk)逐步发送给你。 - 块的结构: 每个
ToolCallChunk通常包含:name: 工具的名称(通常只在第一个块中出现,后面的是None)。args: 工具参数的一部分(一段 JSON 字符串的片段,比如{"a",然后是: 3,,接着是"b": 12})。id: 调用的唯一标识符。index: 索引号(如果模型决定同时调用多个工具,这个索引能帮你区分当前接收的参数属于哪个工具)。
LangChain 如何帮你处理这些“碎片”?
如果你自己手写代码去拼接这些 {"a" 和 : 3} 会非常痛苦。LangChain 提供的核心便利在于自动合并块(Chunk Merging):
当你使用异步流式调用(如 async for chunk in llm.astream(query):)时,LangChain 允许你直接将这些消息块“加”在一起(gathered = gathered + chunk)。在这个累加的过程中,LangChain 会在后台做两件事:
- 拼接原始字符串: 把参数片段拼接成完整的 JSON 字符串(比如
{"a": 3, "b": 12})。这个存在gathered.tool_call_chunks中。 - 最佳努力解析(Best-effort Parsing): 尝试将拼到一半的 JSON 实时转换成 Python 的字典对象(Dictionary)。比如当它接收到
{"a": 3时,它就会尝试解析出一个包含键a的字典。这个存在gathered.tool_calls中。
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
# 注意,这里的文档字符串非常重要,因为它们将与类名一起传递给模型。
class Add(BaseModel):
"""将两个整数相加。"""
a: int = Field(..., description="第一个整数")
b: int = Field(..., description="第二个整数")
class Multiply(BaseModel):
"""将两个整数相乘。"""
a: int = Field(..., description="第一个整数")
b: int = Field(..., description="第二个整数")
tools = [Add, Multiply]
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
llm_with_tools = llm.bind_tools(tools)
query = "3 * 12是多少?此外,11 + 49是多少?"
async for chunk in llm_with_tools.astream(query):
print(chunk.tool_call_chunks)
### 输出
[]
[{'name': 'Multiply', 'args': '', 'id': 'call_d39MsxKM5cmeGJOoYKdGBgzc', 'index': 0}]
[{'name': None, 'args': '{"a"', 'id': None, 'index': 0}]
[{'name': None, 'args': ': 3, ', 'id': None, 'index': 0}]
[{'name': None, 'args': '"b": 1', 'id': None, 'index': 0}]
[{'name': None, 'args': '2}', 'id': None, 'index': 0}]
[{'name': 'Add', 'args': '', 'id': 'call_QJpdxD9AehKbdXzMHxgDMMhs', 'index': 1}]
[{'name': None, 'args': '{"a"', 'id': None, 'index': 1}]
[{'name': None, 'args': ': 11,', 'id': None, 'index': 1}]
[{'name': None, 'args': ' "b": ', 'id': None, 'index': 1}]
[{'name': None, 'args': '49}', 'id': None, 'index': 1}]
[]
Agent 路由
AgentAction / AgentFinish
- 旧版 LangChain:使用 AgentExecutor,返回 AgentAction / AgentFinish。
- 新版 LangChain(推荐):底层使用 LangGraph 构建 agent。 不再直接用 isinstance(response, AgentFinish),而是通过 状态图(StateGraph) + 条件边(conditional edge)来控制流程。 例如:LLM 输出如果包含 tool_calls,就路由到 “tools” 节点;如果没有,就路由到 END(结束)。
虽然实现方式变了,但核心思想完全一样:
-
继续循环 = 还需要行动(Action)
-
结束循环 = 已经完成(Finish)
-
example:假设用户问“今天香港天气怎么样?”
- Agent 返回 AgentAction(tool=“weather_tool”, tool_input={“city”: “Hong Kong”})
- 执行工具后得到结果 → 传回 Agent
- Agent 返回 AgentFinish(return_values={“output”: ” 香港今天晴天,温度 22°C”})
- 循环停止,返回答案。
如何判断大模型想干什么?
目前主流的判断方式分为两种时代:传统的文本解析(Regex/JSON) 和 现代的工具调用(Function Calling)。我们来逐一拆解。
1. 纯文本解析
在这种模式下,开发者会在 Prompt(提示词)中与大模型定下严格的输出格式。
经典 ReAct Agent
-
使用 ReActOutputParser 或 ReActSingleInputOutputParser
-
判断标志主要看 LLM 输出中是否包含 “Final Answer:”(或类似关键词)
-
解析逻辑伪代码
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
# 清理文本...
if "Final Answer:" in text: # ← 核心判断标志
final_answer = text.split("Final Answer:")[-1].strip()
return AgentFinish(
return_values={"output": final_answer},
log=text
)
else:
# 尝试提取 Thought / Action / Action Input
action = extract_action_from_text(text) # 正则或字符串查找
return AgentAction(tool=action.tool, tool_input=action.input, log=text)
- 模型 Prompt 约束
prompt =
"你是一个智能助手。你可以使用以下工具:[Search, Calculator]。
请你必须严格按照以下格式输出:
如果你需要使用工具,请输出:
Action: 工具的名称
Action Input: 传给工具的参数
如果你已经得出了最终答案,请输出:
Final Answer: 你的最终回答"
answer =
Thought: 我需要计算 125 乘以 34。
Action: Calculator
Action Input: 125 * 34
#### 或者:
Thought: 我现在知道答案了。
Final Answer: 计算结果是 4250。
MRKL / Zero-Shot Agent
-
使用 MRKLOutputParser
-
判断标志:输出中是否包含 “Action”: “Final Answer”(JSON 格式)或 “Final Answer” 关键词。
-
常见提示词会要求 LLM 以 JSON 形式输出:
{ "action": "Final Answer", "action_input": "最终答案内容" }
或者
{
"action": "search_tool",
"action_input": "查询内容"
}
自定义 Output Parser
继承 AgentOutputParser 重写 parse() 方法,例如:
class CustomOutputParser(AgentOutputParser):
def parse(self, llm_output: str):
if "Final Answer:" in llm_output or "最终答案:" in llm_output:
return AgentFinish(...) # 结束
else:
return AgentAction(...) # 调用工具
2. 原生工具调用 (Function Calling / Tool Calling)
因为靠正则表达式去匹配文本太脆弱了(大模型有时候会乱加空格、换行,或者忘记写关键词),后来 OpenAI 等厂商在模型底层直接加入了 Function Calling 的能力。现在,Gemini、GPT-4、Claude 等先进模型,在 API 层面就支持区分“普通对话”和“工具调用”。
- 使用
OpenAIFunctionsAgentOutputParser或类似 - 判断标志:LLM 输出中是否有
tool_calls字段(结构化)。- 如果有
tool_calls→ 解析为AgentAction - 如果没有
tool_calls,只有普通内容 → 解析为AgentFinish
- 如果有
这比文本解析更可靠,因为它是结构化的。
结构化输出
with_structured_output 和 bing_tools 都可以得到结构化的内容,但是作用和目的都是不同的
with_structured_output
- 作用:强制性要求输出指定的格式;
- 目的:关注结果,目的是为了拿到数据结果,只关心数据。流程往往在这里就终结了。你把这个数据存进数据库,或者在网页上展示出来;
- 用途:生成结果数据、信息抽取、分类器;
bing_tools
作用:提供“工具给模型”选择,模型不一定会选择工具;
目的:赋予模型调用函数能力,可以灵活获得工具选择或者结果。
用途:调用函数。
代码分析
with_structured_output源码伪逻辑(仅供学习)
def with_structured_output(schema):
# 1. 检查底层模型的能力
if model.supports_tool_calling():
# 路线 A: 转成工具,Tool Calling 执行
json_schema = convert_to_json_schema(schema)
model_with_tools = model.bind_tools([json_schema], tool_choice="any")
return model_with_tools | PydanticToolsParser(schema)
elif model.supports_json_mode():
# 路线 B: 开启 JSON 模式,注入提示词。可靠性通常低于 Tool Calling,尤其在复杂 schema 时容易出现格式错误。
model_with_json = model.bind(response_format={"type": "json_object"})
prompt_with_instructions = inject_schema_into_prompt(schema)
return prompt_with_instructions | model_with_json | JsonOutputParser()
else:
# 路线 C: 降级为纯文本提示词和正则解析
# 但是一般情况下不会执行这个逻辑
# 大多数情况下,如果模型不支持 Tool Calling,with_structured_output 会直接抛出 NotImplementedError
prompt_with_instructions = inject_strict_instructions(schema)
return prompt_with_instructions | model | PydanticOutputParser(schema)
with_structured_output真实逻辑(抛弃不稳定分支)
def with_structured_output(
self,
schema: dict[str, Any] | type[BaseModel] | type,
*,
include_raw: bool = False,
method: str | None = None,
**kwargs: Any
) -> Runnable[LanguageModelInput, Any]:
"""
将大模型的输出转换为结构化数据。该方法是 LangChain 提供的最重要的高级功能之一,用于让 LLM 可靠地输出
Pydantic 对象、JSON 等结构化结果,而不是随意文本。
参数说明:
schema (dict | BaseModel | type):
定义期望的输出结构,支持三种形式:
- Pydantic BaseModel 类(最推荐)
- TypedDict 类
- JSON Schema 字典
include_raw (bool, default=False):
- False: 只返回解析后的结构化结果(Pydantic 对象或 dict)
- True: 返回一个字典,包含 'raw'(原始 AIMessage)、'parsed'(解析结果)、
'parsing_error'(解析失败时的错误信息),便于调试
method (str | None, default=None):
强制指定使用哪种结构化输出方式。可选值:
- "tool_calling" : 使用工具调用(Tool Calling)模式(推荐)
- "json_mode" : 使用 JSON Mode 模式
- None : 自动根据模型能力选择最佳方式
**kwargs:
额外参数会透传给 bind_tools() 或 bind() 方法。
常见参数如:strict=True(OpenAI 支持严格模式,强制模型严格遵守 schema)
返回值:
一个 Runnable 对象,可直接 .invoke()、.stream() 使用。
"""
# 1. 将用户传入的 schema 统一转换为 JSON Schema 格式
# 支持 Pydantic BaseModel、TypedDict 等多种输入形式
json_schema = _convert_schema_to_json(schema)
# 2. 优先使用 Tool Calling 模式(绝大多数现代模型推荐路线)
# 原理:把结构化输出伪装成“调用一个专门的工具”,让模型通过 tool_calls 返回数据
# 成功率最高,推荐优先使用
if self._supports_tool_calling() and (method is None or method == "tool_calling"):
bound_llm = self.bind_tools(
tools=[json_schema], # 将 schema 转为工具格式
tool_choice="required", # 强制模型必须调用此工具(推荐写法)
**kwargs # 透传 strict=True 等参数
)
# 根据 schema 类型选择合适的解析器(PydanticToolsParser 或 JsonOutputToolsParser 等)
parser = _get_tools_parser(schema)
runnable = bound_llm | parser
# 3. 降级使用 JSON Mode(部分模型支持)
# 原理:让模型直接以 JSON 格式输出,通过 response_format 控制
elif self._supports_json_mode() or method == "json_mode":
bound_llm = self.bind(
response_format={"type": "json_object"}, # 开启 JSON Mode
**kwargs
)
# 将 schema 的字段描述注入到 Prompt 中,引导模型按要求输出 JSON
prompt_with_instructions = _create_structured_output_prompt(schema)
runnable = prompt_with_instructions | bound_llm | JsonOutputParser()
# 4. 如果模型既不支持 Tool Calling 也不支持 JSON Mode,则抛出异常
else:
raise NotImplementedError(
f"with_structured_output is not implemented for this model: {self.__class__.__name__}. "
"Please use a model that supports tool calling or json_mode."
)
# 5. 如果用户设置 include_raw=True,则包装返回结果
# 返回格式为:{"raw": AIMessage, "parsed": structured_data, "parsing_error": error or None}
if include_raw:
runnable = runnable | _wrap_with_raw_output()
return runnable
with_structured_output:不管你传入的 schema 是什么类型 (Pydantic/TypedDict/JSON Schema 字典),LangChain 在发包给 OpenAI 之前,都会把它们翻译成标准的 OpenAI Function Calling JSON Schema。tools_calll:无论你传的是 Python 函数还是 Pydantic 类,LangChain 在发包给 OpenAI 之前,都会把它们翻译成标准的 OpenAI Function Calling JSON Schema。如果你愿意,你完全可以直接手写这段原生 JSON 传给bind_tools。
# 这是最底层的原生 JSON Schema 写法
raw_json_schema = {
"type": "function",
"function": {
"name": "extract_weather",
"description": "提取天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"temperature": {"type": "integer", "description": "温度"}
},
"required": ["city", "temperature"]
}
}
}
# 直接绑定原生 JSON
llm_with_tools = llm.bind_tools([raw_json_schema])
msg = llm_with_tools.invoke("上海今天大雨,只有18度")
print(msg.tool_calls)
# 输出: [{'name': 'extract_weather', 'args': {'city': '上海', 'temperature': 18}, 'id': 'call_def456'}]
总结
-
既然
bind_tools也能提取格式,那要with_structured_output干嘛?区别仅仅在于“最后一步的拆快递(解析)”**:- 当你使用
bind_tools([WeatherInfo]): 大模型吐出的是一个原始的AIMessage对象。你需要的数据藏得很深,你要自己写代码去扒开字典:city = msg.tool_calls[0]['args']['city']。 - 当你使用
with_structured_output(WeatherInfo): LangChain 在底层不仅调用了bind_tools,还非常贴心地在链条末尾塞进了一个输出解析器(PydanticToolsParser)。 当大模型吐出字典后,解析器会截获它,并自动帮你实例化一个正儿八经的 Python 对象还给你。你拿到手直接就是result = ...,然后优雅地调用result.city即可。
- 当你使用
-
调用顺序:如果先
bind_tools,再调用with_structured_output,有时会覆盖之前的 tools。如果需要同时使用工具和结构化输出:1.先绑定普通工具,再绑定结构化输出;2.直接在 with_structured_output 中传入需要的 tools;
# 1. 定义工具
@tool
def check_order_status(order_id: str) -> str:
"""查询订单状态。当用户询问订单、物流、发货情况时必须调用此工具。"""
mock_db = {
"ORD-123": "已发货,预计明天送达。",
"ORD-456": "正在配货中。"
}
return mock_db.get(order_id, f"未找到订单 {order_id}")
# 2. 定义结构化输出 Schema
class UserIntent(BaseModel):
category: str = Field(..., description="用户意图分类:'查订单', '闲聊', '投诉', '其他'")
emotion: str = Field(..., description="用户情绪:'积极', '中性', '愤怒'")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.bind_tools([check_order_status]).with_structured_output(UserIntent, include_raw=True)
- 不同模型支持程度不同:OpenAI、Claude、Gemini 支持很好;部分本地模型或旧模型可能抛 NotImplementedError。
- include_raw=True 在调试解析失败时非常有用。
LangGraph 实现
在 LangGraph 中,已经不再使用判断 AgentAction / AgentFinish 这种基于 Python 对象类型的 while 循环判断。
LangGraph 采用 状态机 + 图(Graph) 的方式来处理流程,通过 条件边(conditional edges) 来决定下一步是继续调用工具,还是结束并输出最终答案。这种设计更加灵活、可视化、支持持久化、中断和多代理协作。
核心判断逻辑放在一个条件路由函数(conditional edge function)中,最常见的是检查 LLM 输出的 tool_calls 属性:
- 典型实现(现代 Tool Calling ReAct Agent)
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import tools_condition # 官方预置函数(最常用)
from langchain_core.messages import AIMessage
from typing import TypedDict, Annotated, Literal
# 1. 定义状态(State)
class AgentState(TypedDict):
messages: Annotated[list, "add_messages"] # 消息历史(包含 LLM 输出和工具结果)
# 2. Agent 节点:调用 LLM(绑定了 tools)
def agent_node(state: AgentState):
response = llm_with_tools.invoke(state["messages"]) # llm_with_tools 是绑定了工具的 LLM
return {"messages": [response]}
# 3. 工具执行节点(预置的 ToolNode)
tool_node = ToolNode(tools)
# 4. 条件判断函数(关键!相当于旧版的 AgentFinish 判断)
def should_continue(state: AgentState) -> Literal["tools", END]:
messages = state["messages"]
last_message = messages[-1]
# 核心判断:
# 如果最后一条消息(LLM 输出)包含 tool_calls(且不为空),说明需要调用工具 → 继续循环
# 如果没有 tool_calls,说明 LLM 直接给出了最终答案 → 结束
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools" # 去执行工具节点
return END # 结束图的执行,返回最终结果
- 构建图(Graph)的完整结构
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node) # LLM 思考 + 决定
workflow.add_node("tools", tool_node) # 执行工具
# 添加边
workflow.set_entry_point("agent")
# 条件边(最重要的一步)
workflow.add_conditional_edges(
"agent", # 从 agent 节点出来
should_continue, # 使用上面的判断函数
{"tools": "tools", END: END} # 映射:如果返回 "tools" 就去 tools 节点,否则结束
)
# 从 tools 执行完后,总是回到 agent 继续思考
workflow.add_edge("tools", "agent")
app = workflow.compile()
- 输入 → agent 节点(LLM 调用)
- should_continue 检查最后一条 AIMessage 是否有 tool_calls:
- 有 → 路由到 “tools” 节点执行工具 → 工具结果作为新消息加入 state → 回到 agent
- 没有 → 路由到 END,整个 graph 结束,app.invoke() 返回最终的 messages(最后一条就是最终答案)
- 这个循环由图自动管理,不需要手动 while 循环。
LangGraph 提供了开箱即用的条件函数,避免自己写 should_continue:
from langgraph.prebuilt import tools_condition
workflow.add_conditional_edges(
"agent",
tools_condition, # 内部逻辑几乎和上面看到的 should_continue 一模一样
{"tools": "tools", END: END}
)
tools_condition 的核心就是:检查最后一条消息是否有 tool_calls。
与 AgentExecutor / AgentFinish 的对比
| 方面 | 旧版 AgentExecutor (AgentFinish) | 新版 LangGraph (推荐) |
|---|---|---|
| 判断方式 | isinstance(response, AgentFinish) 或 OutputParser 解析 “Final Answer:“ | 检查 last_message.tool_calls 是否存在/为空 |
| 循环控制 | while 循环 + 手动 invoke | Graph 的 conditional edges + 自动路由 |
| 状态管理 | intermediate_steps 等有限字段 | 自定义 TypedDict State,支持任意结构 + 消息列表 |
| 灵活性 | 较低(固定 ReAct 模式) | 极高(可加 human-in-loop、多代理、自定义路由等) |
| 是否依赖文本解析 | 依赖(ReAct 模式下看 “Final Answer:” 等字符串) | 主要依赖结构化 tool_calls(更可靠) |
| 推荐场景 | 简单快速原型 | 生产级、复杂、多步骤、可调试 agent |
- 如果使用纯文本 ReAct Prompt(非 tool calling),判断逻辑可能会改成检查最后消息的内容是否包含 “Final Answer” 等,但现代最佳实践是使用 tool calling(OpenAI、Anthropic、Grok 等模型都支持),判断更稳定。
- create_react_agent() 函数是 LangGraph 提供的快捷方式,它内部已经搭建好了上面这种 graph(agent + tools + conditional edges)。
AgentExecutor
AgentExecutor 是 LangChain 框架中智能体(Agent)的核心运行时与调度管理器。它是连接 LLM 大脑(Agent)、外部工具(Tools) 和 用户输入 的中央控制枢纽,负责驱动整个 “思考 → 调用工具 → 获得结果 → 再思考” 的自动闭环,直到任务完成。
官方定义:管理使用工具的智能体的执行链。实现 Runnable 接口,是 LangChain 中运行 Agent 的标准、最高级封装。
LLM 本身只能生成文本,无法直接运行代码或调用 API。AgentExecutor 是一个封装好的运行环境(Runtime),它把“大模型预测出的动作”真正落地执行,并把执行结果反馈给大模型。
核心工作流程(Agent Loop)
AgentExecutor 本质上是一个带有错误处理和日志记录的 while 循环。当接收到用户的任务时,它会严格按照以下步骤循环运行,直到任务完成:
- 输入接收: 接收用户的初始问题或任务。
- 思考与决策(调用 Agent): 将问题交给 Agent(包含 LLM 和 Prompt)。LLM 会根据当前上下文进行推理,并决定下一步应该做什么。
- 解析意图: AgentExecutor 解析 LLM 的输出。此时通常有两种结果:
- Action(采取行动): LLM 决定需要调用某个工具(例如:“我需要用计算器算一下 125 * 34”)。
- Finish(完成任务): LLM 认为已经收集到足够的信息,可以直接回答用户(例如:“最终答案是 4250”)。
- 工具执行(如果结果是 Action): AgentExecutor 拦截这个 Action,找到对应的工具(Tool),并将 LLM 生成的参数传入工具中实际运行(比如真正去调用一个 Python 函数或第三方 API)。
- 反馈结果(Observation): AgentExecutor 获取工具运行的结果(或者是报错信息),将这个结果作为新的“观察(Observation)”添加到上下文中。
- 循环往复: 带着最新的观察结果,回到第 2 步,让 LLM 再次思考。这个 “思考 (Thought) -> 行动 (Action) -> 观察 (Observation)” 的循环(即 ReAct 模式)会一直持续,直到触发 Finish 动作,或者达到最大循环次数(避免死循环)。
解决什么工程问题?
在实际工程中,你不能指望 LLM 每次都完美输出。AgentExecutor 解决了很多现实的工程问题:
- 处理格式错误: 如果 LLM 输出的工具调用格式不符合要求(比如漏了括号、JSON 格式崩了),AgentExecutor 会捕获这个解析错误,并将错误信息(如“格式不对,请重新输出”)反馈给 LLM,让它自我纠正。
- 处理工具异常: 如果调用的外部 API 宕机或超时,AgentExecutor 会捕获异常,告诉 LLM “该工具当前不可用”,LLM 可能会选择更换其他工具或改变策略。
- 防止无限死循环: 它内置了
max_iterations(最大迭代次数)或max_execution_time(最大执行时间)的限制。如果智能体陷入了“反复调用同一个工具却得不到结果”的死胡同,AgentExecutor 会强行终止循环并返回当前最好的结果。 - 日志与可观测性: 它记录了智能体每一步的思考过程(Thought)和工具调用细节,这对于开发者调试(Debug)至关重要。
代码
- 用伪代码理解 AgentExecutor
def AgentExecutor(user_input, agent, tools, max_iterations=15):
intermediate_steps = [] # 记录之前的动作和结果
for _ in range(max_iterations):
# 1. 询问大脑(LLM)下一步该怎么做
decision = agent.plan(user_input, intermediate_steps)
# 2. 如果大脑说可以结束了,就返回最终答案
if isinstance(decision, AgentFinish):
return decision.return_values
# 3. 如果大脑说需要行动,就提取工具名和参数
if isinstance(decision, AgentAction):
tool_name = decision.tool
tool_input = decision.tool_input
# 4. 找到对应的工具并执行它
try:
tool = find_tool(tools, tool_name)
observation = tool.run(tool_input)
except Exception as e:
# 捕获异常,并将其作为观察结果返回给大模型,让它知道出错了
observation = f"Tool execution failed: {e}"
# 5. 将行动和结果记录下来,进入下一次循环
intermediate_steps.append((decision, observation))
# 超过最大循环次数,强制退出
return "Agent stopped due to iteration limit."
是否支持使用多个 Agent?
AgentExecutor 的设计就是为单个 agent 服务的。看它的核心初始化方式(无论是旧版 from_agent_and_tools 还是新版):
agent_executor = AgentExecutor(
agent=xxx, # ← 这里只能传入一个 agent 对象
tools=tools,
...
)
- agent 参数只接受
一个BaseSingleActionAgent或BaseMultiActionAgent(或通过create_react_agent等工厂函数创建的单个 agent)。 - 它内部的执行循环(之前的
while not isinstance(response, AgentFinish))是围绕这一个 agent 反复调用 LLM + 执行工具 + 反馈的逻辑。 - 它没有内置机制来管理多个独立 agent 之间的协作、路由、状态共享或并行执行。
简单来说:一个 AgentExecutor = 一个 agent + 它的工具 + 执行循环。
如果要实现多个 agent(Multi-Agent), LangChain 官方推荐以下两种主流方式:
-
用 AgentExecutor 实现“伪多代理”(简单但有限制)
- 把其他 agent 包装成 Tool,然后给主 agent 使用。
- 主 agent(用 AgentExecutor 运行)看到的是几个“子工具”,实际上每个子工具内部运行的是另一个完整的 AgentExecutor(子 agent)。
- 优点:简单,能快速实现 supervisor(监督者) + sub-agents(子代理)模式。
- 缺点:上下文管理较差、子 agent 没有独立状态、调试复杂、层次深时容易 token 爆炸。
-
使用 LangGraph 构建真正的多代理系统(推荐)
- LangGraph 是 LangChain 官方现在主推的框架,专门用来处理复杂、多 agent、带循环的状态流。 常见多代理模式包括:
- Supervisor 模式(中央协调者):一个 supervisor agent 把任务路由给多个 specialist agents(每个 specialist 可以是独立的 LangGraph agent 或 AgentExecutor)。中央协调,易控制,适合大多数业务场景。
- Subagents 模式:主 agent 把子 agents 当作工具调用。
- Handoff / Router 模式:代理之间直接传递控制权(使用 create_handoff_tool),更去中心化。
- Hierarchical / Swarm / Parallel:支持并行执行、持久化、human-in-the-loop 等高级特性。
- LangGraph 是 LangChain 官方现在主推的框架,专门用来处理复杂、多 agent、带循环的状态流。 常见多代理模式包括:
LangGraph 的优势:
- 用图(Graph)明确定义节点(每个 agent 是一个节点)和边(路由逻辑)。
- 更好的状态管理、streaming、中断、持久化、调试(LangSmith 可视化)。
- 支持真正的多 agent 协作,而不只是“把 agent 包成工具”。
具体 LangGraph 是如何实现多代理的,请看:[[LangGraph#Multi-Agent]]
处理解析错误
agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
- 默认错误处理(设为 True)
当你在初始化 AgentExecutor 时,设置 handle_parsing_errors=True。
- 工作原理: 当解析器报错时,程序不再直接崩溃退出。相反,它会捕获这个错误,并将一段默认的警告信息(例如:“格式无效,缺少 ‘Action:’”)作为上一步的“观察结果(Observation)”重新喂给大模型。
- 效果: 大模型看到报错信息后,通常会意识到自己刚才的输出格式写错了,然后在下一轮思考中立刻纠正过来,重新输出正确的格式。
- 自定义错误提示语(传入字符串)
有时默认的报错信息不够明确,大模型看了可能还是不知道怎么改。这时你可以直接传一段字符串给 handle_parsing_errors。
- 用法:
handle_parsing_errors="检查你的输出并确保其符合规范,请必须使用 Action 和 Action Input 语法!" - 效果: 当模型输出格式错误时,这段你亲手写的“严厉提示”会被发送给模型,强制引导它按正确的格式重新输出。
- 动态错误处理逻辑(传入自定义函数)
如果你需要更复杂的逻辑,可以传入一个 Python 函数。
-
用法: 写一个函数接收
error对象,并返回一个字符串。例如,文档里的例子是截取错误信息的前 50 个字符发给模型:def _handle_error(error) -> str: return str(error)[:50] # 截取前50个字符避免报错信息太长 agent_executor = AgentExecutor(..., handle_parsing_errors=_handle_error) -
效果: 这种方法最灵活,你可以根据具体的异常类型(Exception Type)动态生成不同的安抚或纠正话术反馈给大模型。
迭代器 AgentIter
在通常情况下,当我们调用 Agent 执行任务时,它会一口气把所有的“思考 - 调用工具 - 观察”循环跑完,直接给出最终结果。但这篇文章指出,有时候我们希望一步一步地控制 Agent 的执行过程,这在以下场景非常有用:
- 添加人工审核(Human-in-the-loop): 在 Agent 执行下一步动作前,让人类确认是否允许继续。
- 中间步骤验证: 在代码层面插入自定义的逻辑,检查 Agent 上一步使用工具得到的结果是否正确。
示例:让 Agent 计算“第 998、999 和 1000 个素数的乘积”**。
为了完成这个任务,文档演示了以下步骤:
- 定义工具: 给了 Agent 两个工具。一个是
GetPrime(用来查询第 n 个素数是多少),另一个是Calculator(用来做数学乘法计算)。 - 构建 Agent 迭代器: 构建好 OpenAI Functions Agent 后,不再使用常规的
agent_executor.invoke(),而是使用了agent_executor.iter()进行for循环遍历。 - 注入自定义逻辑与人工打断: 在
for循环内部,代码做了一层拦截:- 当检测到 Agent 刚刚调用了
GetPrime工具拿到了一个数字时,代码会自己运行一个is_prime()函数,去验证拿到的这个数到底是不是素数(这就是中间步骤验证)。 - 验证完之后,代码弹出了一个
input("是否继续(Y/n):\n"),询问控制台前的人类用户是否同意 Agent 继续往下走(这就是人工审核介入)。
- 当检测到 Agent 刚刚调用了
#### 1. 工具定义
primes = {998: 7901, 999: 7907, 1000: 7919}
class CalculatorInput(BaseModel):
question: str = Field()
class PrimeInput(BaseModel):
n: int = Field()
def is_prime(n: int) -> bool:
if n <= 1 or (n % 2 == 0 and n > 2):
return False
for i in range(3, int(n**0.5) + 1, 2):
if n % i == 0:
return False
return True
def get_prime(n: int, primes: dict = primes) -> str:
return str(primes.get(int(n)))
async def aget_prime(n: int, primes: dict = primes) -> str:
return str(primes.get(int(n)))
tools = [
Tool(
name="GetPrime",
func=get_prime,
description="用于返回第`n`个素数的工具",
args_schema=PrimeInput,
coroutine=aget_prime,
),
Tool.from_function(
func=llm_math_chain.run,
name="Calculator",
description="在需要计算数学表达式时非常有用",
args_schema=CalculatorInput,
coroutine=llm_math_chain.arun,
),
]
### 2. 创建代理
agent = create_openai_functions_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
question = "第998、999和1000个素数的乘积是多少?"
for step in agent_executor.iter({"input": question}):
if output := step.get("intermediate_step"):
action, value = output[0]
if action.tool == "GetPrime":
print(f"正在检查 {value} 是否为素数...")
assert is_prime(int(value))
# 询问用户是否继续
_continue = input("是否继续(Y/n):\n") or "Y"
if _continue.lower() != "y":
break
工具
创建工具的方法
1. @tool 装饰器
这是目前 LangChain 官方最推荐的写法,非常 Pythonic。你只需要写一个普通的 Python 函数,然后在上面加一个 @tool 装饰器即可。
- 原理解析: LangChain 会自动读取函数的名字作为工具名,读取函数的 Docstring(多行注释) 作为工具的描述,并自动读取类型提示(Type Hints) 作为参数的 Schema。
- 适用场景: 90% 的日常开发场景。
from langchain_core.tools import tool
from typing import Annotated
@tool
def get_weather(location: Annotated[str, "The city and state, e.g. San Francisco, CA"]) -> str:
"""Get the current weather in a given location.
Args:
location: The city and state, e.g. Hong Kong, Tung Chung
"""
# 这里写实际逻辑(调用 API、数据库等)
if "hong kong" in location.lower():
return "Hong Kong today is 22°C, sunny."
return f"Weather in {location} is unknown."
# 查看生成的工具信息
print(get_weather.name) # 输出: get_weather
print(get_weather.description) # 输出: Get the current weather in a given location. Args:location: The city and state, e.g. Hong Kong, Tung Chung
# 使用时直接把工具放入列表
tools = [get_weather]
优点:
- 代码最简洁
- 支持异步(@tool + async def)
- 自动推断 schema
2. 使用 Tool.from_function() 或直接实例化 Tool
这种方法不需要修改原函数的代码,非常适合用来包装第三方库的函数,或者包装某个类实例的方法。
- 原理解析: 你需要手动把目标函数(
func)、工具名(name)和描述(description)拼装在一起。 - 适用场景: 无法给原函数加装饰器时;需要将一个已经存在的旧函数快速接入 Agent 时。
- 注意: 标准的
Tool类默认只支持单一字符串输入(Single String Input)。
最简单创建:Tool.from_function
from langchain_core.tools import Tool
tool = Tool.from_function(
func=my_function,
name="tool_name",
description="tool description"
)
实例化 Tool
将 Runnable 转换为 Tool:把 LangChain 的 Runnable(如 Chain 的 )直接转为工具:
class CalculatorInput(BaseModel):
question: str = Field()
llm_math_chain = LLMMathChain.from_llm(llm=llm, verbose=True)
tools = [
Tool.from_function(
func=llm_math_chain.run,
name="Calculator",
description="在需要计算数学表达式时非常有用",
args_schema=CalculatorInput,
coroutine=llm_math_chain.arun,
),
]
LLMMathChain 是 LangChain 官方封装好的源码,底层逻辑大致执行流程是这样的:
- 构造特定 Prompt: 它内部有一个写死的 Prompt,大概长这样:“你是一个计算器,请把用户的数学问题转换成可以在 Python
numexpr库中运行的代码。不要输出别的话,只输出代码。” - 请求大模型: 它带着这个 Prompt 和用户的算式(7901 * 7907 * 7919),向你传入的
llm(即 GPT-4)发起一次秘密的网络请求。 - 大模型写代码: GPT-4 收到请求后,生成一段纯净的代码字符串(比如就叫
7901 * 7907 * 7919)。 - 本地沙盒执行:
LLMMathChain拿到大模型生成的这串代码文本后,会调用 Python 本地的numexpr.evaluate()或原生的eval()函数,直接在你的计算机内存里运行这段动态代码。
# LangChain 底层偷偷执行了类似这样的代码
result = numexpr.evaluate("7901 * 7907 * 7919")
- 返回结果: 最后,
.run()方法把这个数字作为字符串返回给 Agent 的执行流。
3. StructuredTool.from_function()
普通的 Tool 只能接收一个字符串参数。如果你的工具需要接收多个参数(例如同时需要纬度和经度),或者需要接收复杂的结构化数据(如 JSON 字典),你就需要使用 StructuredTool
- 适用场景: 工具需要多个参数,且你不想(或不能)使用
@tool装饰器时。 - 注意: 现在的
@tool装饰器在底层也会自动将多参数函数转化为StructuredTool,所以StructuredTool多用于手动构建复杂结构的场景。
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class WeatherInput(BaseModel):
location: str = Field(..., description="The city and state, e.g. Hong Kong")
date: str | None = Field(None, description="Optional date in YYYY-MM-DD")
def get_weather_func(input: WeatherInput) -> str:
return f"Weather in {input.location} is 22°C."
weather_tool = StructuredTool.from_function(
func=get_weather_func,
name="get_weather",
description="Get current or future weather",
args_schema=WeatherInput, # 使用 Pydantic 模型
return_direct=False
)
3. 继承 BaseTool 类
如果你需要对工具自定义校验、错误处理、异步、缓存等高级功能时,使用这种方式。
- 适用场景: 需要在工具内部维护复杂的连接池(如数据库连接);需要自定义工具发生错误时的回调处理;需要同时严格控制同步 (
_run) 和异步 (_arun) 的具体执行逻辑。
from langchain_core.tools import BaseTool
from pydantic import Field
from typing import Type
class WeatherTool(BaseTool):
name: str = "get_weather"
description: str = "Useful for when you need to get weather information."
args_schema: Type[WeatherInput] = WeatherInput # 可选,使用 Pydantic
def _run(self, location: str, date: str | None = None) -> str:
"""同步执行逻辑"""
# 实际调用 API 等
return f"Weather in {location}: 22°C"
async def _arun(self, location: str, date: str | None = None) -> str:
"""异步执行逻辑(推荐用于 I/O 操作)"""
...
ToolUse
主要讨论 Agent 如何发现、调用和管理工具。
1. 人工介入 HumanInTheLoop
- 含义:Human in the Loop(人参与循环)。
- 作用:当 Agent 在执行过程中遇到不确定、敏感或需要确认的情况时,暂停执行,把控制权交给人类(用户或管理员)。
- 常见场景:
- 需要人工审批某些操作(如发送邮件、修改数据库)。
- 工具返回结果有歧义时,让人判断下一步。
- 增加安全性和可控性,避免 Agent 完全自主跑偏。
- 在 LangGraph 中通过 Interrupt(中断) 和 Checkpoint 机制实现,非常强大。
3. 多工具 MultipleTools
- 含义:Agent 同时使用多个不同的工具。
- 作用:一个 Agent 通常不会只绑定一个工具,而是绑定多个专业工具(例如:搜索工具 + 计算器 + 天气工具 + 数据库工具)。
- LLM(大模型)会根据任务自动选择合适的工具调用。
- 这几乎是现代 Agent 的标配。
对于多个工具,我们无法确定模型将调用哪个工具。因此,我们不能硬编码将特定工具放入我们的链条中。相反,我们将添加一个 call_tool_list,即一个 RunnableLambda,它接受 JsonOutputToolsParser 的输出并根据它实际构建链条的末尾,这意味着在运行时附加到链条末尾被调用的工具。我们可以这样做,因为 LCEL 有一个很酷的特性,即在任何可运行序列(LCEL 的核心构建块)中,如果一个组件返回更多的可运行项,则这些项作为链条的一部分运行。
from operator import itemgetter
from typing import Union
from langchain.output_parsers import JsonOutputToolsParser
from langchain_core.runnables import (
Runnable,
RunnableLambda,
RunnableMap,
RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo")
tools = [multiply, exponentiate, add]
model_with_tools = model.bind_tools(tools)
tool_map = {tool.name: tool for tool in tools}
def call_tool(tool_invocation: dict) -> Union[str, Runnable]:
"""根据模型选择的工具动态构建链条末尾的函数。"""
tool = tool_map[tool_invocation["type"]]
return RunnablePassthrough.assign(output=itemgetter("args") | tool)
# .map()允许我们将函数应用于输入列表。
call_tool_list = RunnableLambda(call_tool).map()
chain = model_with_tools | JsonOutputToolsParser() | call_tool_list
4. 并行 Parallel
- 含义:并行工具调用(Parallel Tool Calls)。
- 作用:当 Agent 需要调用多个工具时,可以同时(并行) 执行,而不是一个接一个顺序执行。
- 优势:
- 大幅提升速度(例如同时查天气 + 查股票 + 搜索新闻)。
- 现代支持 Tool Calling 的模型(GPT-4o、Claude、Grok 等)原生支持并行调用。
- 在 LangGraph 的 ToolNode 中会自动处理并行执行。
# 模型支持并行调用
model = ChatOpenAI(model="gpt-3.5-turbo-1106")
tools = [multiply, exponentiate, add]
model_with_tools = model.bind_tools(tools)
tool_map = {tool.name: tool for tool in tools}
def call_tool(tool_invocation: dict) -> Union[str, Runnable]:
"""Function for dynamically constructing the end of the chain based on the model-selected tool."""
tool = tool_map[tool_invocation["type"]]
return RunnablePassthrough.assign(output=itemgetter("args") | tool)
# .map() allows us to apply a function to a list of inputs.
call_tool_list = RunnableLambda(call_tool).map()
chain = model_with_tools | JsonOutputToolsParser() | call_tool_list
chain.invoke( "What's 23 times 7, and what's five times 18 and add a million plus a billion and cube thirty-seven")
当面对包含多个请求的复杂问题时,这类先进模型不再只输出一个工具名称,而是会一口气输出一个包含多个工具调用请求的 JSON 列表(Array)。
既然大模型一次性丢过来好几个工具任务,LangChain 是怎么同时处理它们的呢?核心就在这句代码里:
# call_tool 是我们写好的单个工具执行函数
# .map() 是 LCEL 的高阶用法
call_tool_list = RunnableLambda(call_tool).map()
在 LangChain 表达式语言(LCEL)中,.map() 的作用是将一个针对单条数据操作的函数,广播(应用)到一个列表的所有元素上。
实际运行流程如下:
- 用户输入发疯式提问: “23 乘以 7 是多少,5 乘以 18 是多少,一百万加十亿是多少,37 的立方是多少?”。只发送一次。
- 大模型输出列表: 模型瞬间理解了意图,输出一个包含 4 个工具任务的列表:
[任务A(乘法), 任务B(加法), 任务C(加法), 任务D(求幂)] .map()并行处理: 当这个列表传递给带有.map()的链条时,LangChain 会在底层启动并发执行。它不会等任务 A 算完再算任务 B,而是同时去执行这四个工具函数。- 统一返回: 所有工具执行完毕后,结果会被重新打包成一个列表返回给你。
5. 工具错误处理 ToolErrorHandling
- 含义:Tool Error Handling —— 对工具执行过程中出现的错误进行处理。
- 作用:工具调用失败是很常见的(API 限流、网络错误、参数错误、返回异常等)。
- 常见处理方式:
- 捕获错误后返回友好提示给 LLM,让 LLM 重试或换其他工具。
- 记录错误日志。
- 设置默认 fallback(备用方案)。
- 在 LangGraph 的 ToolNode 中可以自定义错误处理逻辑(handleToolErrors 等)。
错误处理 ToolErrorHandling
在构建 AI Agent(智能体)时,我们经常会让大模型去调用外部工具(比如请求天气 API、查数据库、执行代码)。但是,真实的开发环境中充满了不确定性:API 可能会超时、大模型可能传错了参数、数据库可能连不上。
最简单的就是在工具内 try/ catch 错误,但是如果无法覆盖错误,下面的更优的解法:
ToolException + handle_tool_error
LangChain 提供了一套内置机制来处理这种异常:
- 主动抛出特定异常: 在你编写自定义工具函数时,如果遇到问题(比如输入参数不合法),你可以主动抛出 LangChain 专属的
ToolException。 - 开启错误捕获: 在定义工具时,设置
handle_tool_error=True。
效果: 当工具内部抛出 ToolException 时,程序不会崩溃。LangChain 会把这捕捉下来,并把错误信息变成一段文本(比如:” 发生错误:API 速率超限 ”),当做这个工具的返回值,继续喂给大模型。
handle_tool_error 参数不仅可以设为 True,还有其他更灵活的玩法:
- 传入固定的字符串: 比如
handle_tool_error="该工具目前不可用,请尝试使用其他工具或者直接回答不知道。"。这样一旦报错,大模型就会收到这句明确的指示,从而改变策略。 - 传入自定义的 Python 函数: 你可以写一个函数,专门用来解析复杂的错误对象(Error),然后提取出最关键的错误信息返回给大模型,防止把长篇大论的报错代码(Traceback)直接塞给模型导致上下文爆炸。
Fallbacks
可以给一个容易出错的工具链加上 .with_fallbacks([备用链]):
- 当主工具报错时,完全不理会大模型,直接在底层无缝切换到“备用工具”或者“备用大模型”。
- 例如:如果你调用
ChatGPT-4+复杂搜索工具失败了,系统自动降级使用ChatGPT-3.5+简单搜索工具再试一次。
使用技巧
强制返回结构化数据 100%
通常情况下,大语言模型(LLM)回答问题时总是喜欢用自然语言聊天。但在实际的工程开发中,我们的后续代码往往需要非常严谨的格式提取数据。
不需要 prompt,直接利用 Function Calling(函数调用) 能力来解决这个问题。
1. 定义期望的输出结构(造一个假的“响应函数”)
代码没有去死磕 Prompt 告诉大模型“请你必须输出 JSON 格式,包含 answer 和 sources 字段”,而是耍了个小聪明:定义了一个名为 Response 的假工具(函数)。 在这个工具的参数说明(Schema)里,明确规定了调用它需要传入两个参数:
answer:字符串,用来放最终的回答。sources:列表,用来放信息来源。
2. 将“假工具”绑定给大模型
把真正的查询工具(比如 retriever_tool 文档检索工具)和刚才捏造的那个假的 Response 工具,一起绑定给大模型。 这样一来,大模型在思考完问题后,如果它想给用户最终答案,由于设定,它会被引导去“调用”这个 Response 函数。
3. 自定义输出解析器(Output Parser 拦截)
当大模型输出它的思考结果时,写一个自定义的拦截逻辑(parse 函数):
- 如果大模型选择调用了真正的
retriever_tool,就放行,让它去查资料(返回AgentAction)。 - 最精妙的一步来了:如果大模型选择调用了
Response这个假工具,代码就强行拦截!它会把大模型试图传给Response工具的参数(即包含answer和sources的 JSON 字典)直接截获,然后作为最终结果返回给用户(变成AgentFinish)。
消息
ChatModels 接受一个消息列表作为输入,并返回一个消息。消息有几种不同类型。所有消息都有 role 和 content 属性。role 描述了是谁说了这条消息。LangChain 为不同的角色提供不同的消息类。content 属性描述了消息的内容。这可以是一些不同的内容:
- 一个字符串(大多数模型是这样的)
- 一个字典列表(这用于多模式输入,其中字典包含关于该输入类型和输入位置的信息)
此外,消息还有一个 additional_kwargs 属性。这是可以传递有关消息的其他信息的地方。这主要用于 _ 特定于提供商 _ 的输入参数,而不是通用的参数。这方面的最佳示例是 OpenAI 的 function_call。
消息类型
HumanMessage
这代表用户发送的消息。通常只包含内容。
AIMessage
这代表模型发送的消息。这可能包含 additional_kwargs - 例如,使用 OpenAI 函数调用时可能包含 functional_call。
SystemMessage
这代表系统消息。只有一些模型支持这个。这告诉模型如何行为。通常只包含内容。
FunctionMessage
这代表函数调用的结果。除了 role 和 content,这个消息还有一个 name 参数,表示生成此结果的被调用函数的名称。
ToolMessage
这代表工具调用的结果。为了匹配 OpenAI 的 function 和 tool 消息类型,这与 FunctionMessage 不同。除了 role 和 content,这个消息还有一个 tool_call_id 参数,表示被调用以生成此结果的工具的 ID。
Memory
常见类型
| Memory 类型 | 描述 | 优点 | 缺点 | 适合场景 |
|---|---|---|---|---|
| ConversationBufferMemory | 完整保存所有对话历史(Human + AI 消息) | 最简单、上下文最完整 | Token 消耗大,容易超上下文窗口 | 短对话、原型开发 |
| ConversationBufferWindowMemory | 只保留最近 K 轮对话(窗口式) | 控制 Token 数量,节省成本 | 丢失早期重要信息 | 中等长度对话、客服场景 |
| ConversationSummaryMemory | 用 LLM 对历史对话进行实时总结,只保存总结内容 | 大幅节省 Token,适合长对话 | 总结可能丢失细节,依赖 LLM 质量 | 长对话、创意写作、咨询 |
| ConversationSummaryBufferMemory | 结合 Buffer + Summary:最近几轮保持原始,早期内容用总结 | 平衡完整性与效率 | 配置稍复杂 | 中长对话,最常用平衡方案 |
| ConversationTokenBufferMemory | 根据 Token 数量限制保留最近内容 | 精确控制 Token 开销 | 需要指定 LLM 计算 Token | Token 敏感的应用 |
| Entity Memory | 提取并跟踪对话中的实体(人名、地点、偏好等),用知识图谱方式存储 | 能记住用户具体事实和偏好 | 实现较复杂 | 个人助理、记住用户喜好 |
| VectorStoreRetrieverMemory | 把历史对话存入向量数据库,支持语义检索召回相关片段 | 支持超长历史、语义搜索 | 需要向量数据库,延迟稍高 | RAG + 聊天、知识密集型应用 |
- 最基础的使用示例(ConversationBufferMemory)
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# 创建 Memory
memory = ConversationBufferMemory(
memory_key="history", # Prompt 中引用的变量名
return_messages=True # 返回 Message 对象(推荐)
)
# 创建对话链
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True # 显示思考过程
)
# 多轮对话
print(conversation.predict(input="你好,我叫小明"))
print(conversation.predict(input="我喜欢打篮球"))
print(conversation.predict(input="我最喜欢的运动是什么?")) # 它应该能记住“小明”和“篮球”
新的 Memory 打法
-
旧版(Chains + Memory):主要使用上面提到的 ConversationXXXMemory 类,配合 LLMChain、ConversationChain 使用。
-
新版推荐(Runnable + LangGraph):
- 使用 RunnableWithMessageHistory 来包装你的 Runnable(Chain 或 Agent)。
- LangGraph 引入更强大的状态管理(State) 和 Checkpointing(检查点持久化),可以轻松实现线程级(thread-scoped)短期记忆。
- Long-term Memory:通过 LangGraph 的 Store(如 InMemoryStore、数据库后端)或 LangMem 框架实现跨会话的长期记忆(保存用户偏好、事实等)。
-
写法示例(RunnableWithMessageHistory):
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
# ... 定义你的 chain 或 prompt | llm ...
chain_with_history = RunnableWithMessageHistory(
your_chain,
lambda session_id: InMemoryChatMessageHistory(), # 可换成 Redis、Postgres 等持久化
input_messages_key="input",
history_messages_key="history"
)
使用建议
- 短对话(< 10 轮):直接用 ConversationBufferMemory 或 ConversationBufferWindowMemory(k=5)。
- 长对话:优先 ConversationSummaryBufferMemory(最均衡)。
- 需要记住用户偏好:结合 Entity Memory 或 VectorStoreRetrieverMemory。
- 生产环境:一定要做持久化(Redis、Postgres、DynamoDB 等),避免内存丢失。
- 多代理 / 复杂 Agent:强烈推荐迁移到 LangGraph + Checkpoint + Long-term Store,Memory 管理会更灵活和强大。
- Token 优化:永远监控上下文长度,结合 Summary 机制,避免超限。
回调 Callbacks
主要作用
- 实时流式输出(Streaming):让答案一个字一个字地打印出来,而不是等全部生成完。
- 日志记录与监控:记录每次 LLM 调用耗时、Token 用量、Prompt 内容等。
- 调试与追踪:查看 Chain、Agent、Tool 的内部执行过程(哪个步骤出错了?)。
- 集成第三方工具:如 LangSmith、LangFuse、PromptLayer、Weights & Biases 等观测平台。
- 自定义行为:计算成本、发送通知、保存日志到数据库、 guardrail(安全检查)等。
- 错误处理:捕获 LLM 调用失败、解析错误等。
工作原理
LangChain 在执行过程中会触发一系列标准事件,你可以通过实现 BaseCallbackHandler 来监听这些事件。
常见事件类型(按组件分类):
- LLM / Chat Model:
- on_llm_start:LLM 调用开始
- on_llm_new_token:生成一个新 token(流式输出核心)
- on_llm_end:LLM 调用结束
- on_llm_error:LLM 调用出错
- Chain:
- on_chain_start、on_chain_end、on_chain_error
- Tool:
- on_tool_start、on_tool_end、on_tool_error
- Agent / Retriever 等也有对应的事件(如 on_agent_action、on_agent_finish)
Callback Handler
内置 Callback Handler
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4o",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()] # 实时打印输出
)
llm.invoke("介绍一下香港")
其他内置的:
- StdOutCallbackHandler:简单打印执行过程(调试用)
- ConsoleCallbackHandler
- UsageMetadataCallbackHandler:统计 Token 用量
自定义 Callback Handler
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List
class MyCustomHandler(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
print(f"LLM 开始调用,Prompt 长度: {len(prompts[0])}")
def on_llm_new_token(self, token: str, **kwargs):
print(token, end="", flush=True) # 实时输出
def on_llm_end(self, response, **kwargs):
print("\nLLM 调用结束")
def on_chain_error(self, error: Exception, **kwargs):
print(f"Chain 执行出错: {error}")
# 使用方式
handler = MyCustomHandler()
chain = prompt | llm
chain.invoke({"question": "今天香港天气"}, config={"callbacks": [handler]})
用法
Callback 的传入, 有两种主要方式:
- 在构造函数中传入(全局/对象级别):
llm = ChatOpenAI(callbacks=[handler])
- 在运行时传入(推荐,单次请求级别):
result = chain.invoke(
input,
config={"callbacks": [handler]} # 或多个 handler
)
注意:新版 LangChain(Runnable 风格)更推荐使用 config={“callbacks”: […]} 的方式,灵活性更高。
在 LangGraph 中的使用情况
- LangGraph 仍然完全兼容 LangChain 的 Callback 系统。
- 在 graph.invoke()、graph.stream() 时同样可以通过 config={“callbacks”: [handler]} 传入。
- LangGraph 还增加了更细粒度的状态和节点级别的可观测性。
不同场景下的使用情况:
- 流式输出 → 用 StreamingStdOutCallbackHandler 或自定义 on_llm_new_token
- 生产监控 → 集成 LangSmith(官方)或 LangFuse(开源),它们都通过 Callback 实现自动追踪
- Token 统计 → 使用 UsageMetadataCallbackHandler
- 调试复杂 Agent → 自定义 Handler 打印 on_tool_start、on_agent_action 等
- 异步 → 使用 AsyncCallbackHandler(对应 ainvoke、astream 等)
LangChain4J
更完整的代理实现需要使用 AiServices 或 AgentExecutor 来自动完成工具绑定和调用循环。对于复杂应用,建议深入研究 AiServices(LangChain4j 提供的一种更声明式、更简洁的编程模型)和 AgentExecutor。
聊天记忆
底层 API vs 高层 AiServices:
| 特性 | 底层 API(本示例) | 高层 AiServices(之前示例) |
|---|---|---|
| 记忆控制 | 完全手动(灵活) | 自动处理(便捷) |
| 代码量 | 多(需手动 add 消息) | 少(框架封装) |
| 适用场景 | 定制化需求(比如过滤消息) | 快速开发(通用场景) |
| 消息过滤 / 修改 | 易实现(添加前可修改) | 需自定义扩展 |
- 底层 API 脱离
AiServices封装,手动调用ChatMemory.add()管理消息、model.chat()调用模型,实现更灵活的记忆控制;
相关教程
Quarkus
基于 LangChain4j 库构建,并提供了一个专为 Quarkus 量身定制的声明式编程模型。
Langchain4J 库是一个灵活的工具包。它提供了一套丰富的构建模块来处理大型语言模型,例如代理、工具、检索器和链,使开发人员能够完全控制如何组合和编排它们。
然而,直接使用 LangChain4j 时,您需要负责将这些组件连接起来,配置模型客户端,处理生命周期问题,并将可观测性、配置和内存管理集成到您的应用程序中。这种方法功能强大,但底层且需要手动操作。
Quarkus LangChain4j 扩展基于 LangChain4j 构建,并提供企业级、预设的集成方案,为您处理大部分样板代码:
- 具有 CDI 和注解的声明式 AI 服务
- 原生支持 OpenAI、Hugging Face、Ollama 等
- 使用 Quarkus 配置属性进行集中式配置
- 构建时间优化和验证
- 可观测性:日志、指标、追踪
- 用于嵌入式商店和模型服务的开发服务
- 用于视觉内省和测试的开发者用户界面
- 自动聊天内存管理 这样您就可以专注于应用程序逻辑,而不是基础设施搭建。 也就是说,Quarkus LangChain4j 并不限制你只能使用单一模型。你可以自由组合搭配:
- 使用高级声明式方法(
@RegisterAiService)实现快速集成 - 当您需要更多控制权时,请使用更底层的 LangChain4j API。
Ref
MCP
Ref
https://modelcontextprotocol.io/docs/getting-started/intro
测试与评估
Ref
https://docs.langchain4j.info/tutorials/testing-and-evaluation