LangServe 和 LangGraph 均为 LangChain 生态工具,但定位与功能完全不同:LangServe 是部署层工具(将 LangChain 应用转为 API),LangGraph 是编排层框架(构建复杂有状态 Agent 工作流)。两者常协同使用,但解决的是 LLM 应用开发的不同阶段问题。
add_routes
本质:读取你的 Runnable(可以是 LLM、Chain、Agent) → 自动生成数据验证模型 → 在 FastAPI app 上自动注册一组 HTTP 路由 → 把 HTTP 请求转发给 Runnable 执行 → 把执行结果转回 HTTP 响应。
前置条件:
- FastAPI:Python 高性能 Web 框架,负责处理 HTTP 请求、生成接口文档、数据验证
- Runnable:LangChain 所有组件(模型、链、代理)的统一接口,固定有 4 个核心方法:
invoke():同步单次调用stream():流式返回batch():批量调用stream_log():流式日志
- 最小可运行的 server.py
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o")
add_routes(
app, # FastAPI 实例
llm, # 任意 Runnable
path="/chat", # 路由前缀
)
# uvicorn server:app --reload
一行 add_routes,自动生成四个端点:
| 端点 | 作用 |
|---|---|
POST /chat/invoke | 单次调用,返回完整结果 |
POST /chat/batch | 批量调用 |
POST /chat/stream | SSE 流式输出 |
GET /chat/playground | 内置调试 UI |
同步 LLM 调用如何变成异步接口?
LangChain 的 Runnable 接口同时实现了同步和异步两套方法:
# 同步
llm.invoke("hello")
# 异步(底层用 asyncio.to_thread 或原生 async)
await llm.ainvoke("hello")
LangServe 生成的每个端点都是 FastAPI 的 async def,内部调用 Runnable 的 ainvoke / astream。 这样整个调用链全程非阻塞,FastAPI 的 event loop 不会被 LLM 的网络 IO 卡住。
# add_routes 内部简化逻辑(伪代码)
@app.post("/chat/invoke")
async def invoke_endpoint(request: InvokeRequest) -> InvokeResponse:
output = await runnable.ainvoke(
request.input,
config=request.config,
)
return InvokeResponse(output=output)
如果 Runnable 只有同步的 .invoke(),LangChain 会自动用 asyncio.get_event_loop().run_in_executor(None, sync_fn) 包一层,跑进线程池,不阻塞 event loop。
流式输出(SSE)的处理
/stream 端点使用 Server-Sent Events,底层是 astream:
@app.post("/chat/stream")
async def stream_endpoint(request: StreamRequest):
async def event_generator():
async for chunk in runnable.astream(request.input):
yield {"data": serialize(chunk)}
return EventSourceResponse(event_generator())
客户端收到的是一帧帧 token,而不是等全部生成完才返回。
用 asyncio 封装自定义同步函数
这是「阶段二:并发与稳定性」真正要学的东西——如何自己实现类似机制:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
def sync_llm_call(prompt: str) -> str:
# 假设这是一个阻塞的同步调用
import time; time.sleep(2)
return f"Response to: {prompt}"
async def async_llm_call(prompt: str) -> str:
loop = asyncio.get_event_loop()
# 把同步函数扔进线程池,不阻塞 event loop
result = await loop.run_in_executor(executor, sync_llm_call, prompt)
return result
# FastAPI 里使用
@app.post("/invoke")
async def invoke(prompt: str):
return await async_llm_call(prompt)
核心概念总结
同步 LLM 调用
↓ asyncio.run_in_executor (线程池)
async def endpoint ←─── FastAPI event loop
↓ SSE / JSON
HTTP 客户端
add_routes 帮你把这整个流程打包好了,核心价值就是:不用自己写每个 invoke/batch/stream endpoint 的 async 胶水代码。
异步、协程、线程
核心概念

- 并发(Concurrency):多个任务交替执行,看起来像同时在跑,本质上同一时间只有一个任务在干活(比如单线程协程)。
- 并行(Parallelism):多个任务同时执行,同一时间真的有多个任务在不同 CPU 核心上同时干活(比如多进程 / 多线程跑 CPU 密集任务)。
GIL: CPython 的全局解释器锁(Global Interpreter Lock),GIL 是一把强制锁,保证同一时间、同一个 Python 进程里,只能有 1 个线程执行 Python 字节码。- 为什么会有 GIL?
历史原因:
- Python 早期没设计多线程安全
- 为了内存管理、垃圾回收不崩溃
- 加了一把全局大锁,简单粗暴保证安全
- GIL 的影响
- CPU 密集任务:多线程 = 没用!计算任务会一直占着 GIL,多线程反而更慢。→ 必须用 多进程 multiprocessing 才能利用多核。
- IO 密集任务:多线程 = 有用!网络请求、数据库、文件读写、sleep 时,GIL 会自动释放!线程可以在等待时切换,实现并发。
- 协程 asyncio:完全不受 GIL 影响。因为协程本来就是 单线程 的,GIL 根本不参与切换。
- 为什么会有 GIL?
历史原因:
容易混淆的边界
- 异步 ≠ 多线程。 异步是一种编程模型——” 不阻塞地发起操作 “;多线程是实现并发的一种机制。异步可以在单线程里实现(Python asyncio 就是)。
- 协程 ≠ 线程。 协程由程序自己调度(用户态),OS 完全不感知;线程由 OS 调度(内核态)。协程切换几乎没有开销,线程切换需要上下文保存。
- asyncio ≠ OS 自动管理。 Python asyncio 是用户态调度器,运行在单 OS 线程内。OS 只负责底层 I/O 通知(epoll/kqueue),” 谁来处理通知 ” 完全由 event loop 决定。
操作系统层 线程1 线程2 线程3
↕ ↕ ↕
Python 进程层 asyncio asyncio asyncio
loop A loop B loop C
/ | \
coro coro coro ← 这些都在同一个 OS 线程里
coroutine(协程)是 Python 中用户态的、轻量级的、由程序主动调度的 “任务单元”,你可以把它理解成 “单线程里的微线程”。
Python 异步编程中,事件循环(eventloop)和协程(coro)是用户态调度的轻量任务单元,与所属 OS 线程强绑定,操作系统无法感知也无法管理其资源绑定关系;
对比
| 维度 | 同步 (Sync) | 异步 (Async) / 协程 (Coro) | 线程 (Thread) |
|---|---|---|---|
| 调度者 | 程序自身 | event loop(用户态) | 操作系统(内核态) |
| 切换时机 | 不切换 | 遇到 await 主动让出 | OS 抢占,任意时刻 |
| 切换成本 | — | 极低(无系统调用) | 较高(保存寄存器 / 栈) |
| 并发表现 | 串行执行 | 单线程并发(GIL + 单 loop) | 多线程并发(多核 CPU) 受 GIL 制约,仅 IO 密集有并发优势 |
| 适合场景 | 简单脚本、CPU 密集 | 大量 I/O、网络请求 | CPU 密集、需真并行 |
| 需要加锁 | 否 | 通常不需要 | 是(竞态条件) |
| 是否受 GIL 限制 | - | 不受影响 | 受限制 |
- 协程(asyncio)的
asyncio.gather:并发,单线程内切换任务,同一时间只有一段代码在跑。 - 多线程(threading):IO 密集场景是并发(GIL 限制,同一时间只能一个线程跑 Python 代码);如果是 C 扩展 / IO 等待,会有部分并行效果,但不是真・Python 代码并行。
- 多进程(multiprocessing):并行,每个进程有独立 GIL,能同时在不同 CPU 核心上执行 Python 代码。
同步(Sync)实现
同步阻塞:依次请求 3 个接口,每个耗时 1 秒
import time
def fetch(url: str) -> str:
time.sleep(1) # 模拟网络 I/O
return f"result:{url}"
def main():
urls = ["a", "b", "c"]
results = []
for url in urls:
results.append(fetch(url)) # 阻塞,等完再走
return results
start = time.time()
main()
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 3.0s
- 执行特征:串行,每个 fetch 都卡住主线程。总耗时 = 各任务耗时之和。
- OS 调度:无任何异步,CPU 在
sleep期间完全空等。
异步(Async)/ 协程(Coro)实现
asyncio 协程:并发 (其实是单线程) 请求同样 3 个接口
import asyncio
async def fetch(url: str) -> str:
await asyncio.sleep(1) # 让出控制权给 event loop
return f"result:{url}"
async def main():
urls = ["a", "b", "c"]
results = await asyncio.gather(
*[fetch(url) for url in urls]
)
return results
import time
start = time.time()
asyncio.run(main())
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 1.0s ← 3 个协程并发等待
event loop 调度过程
# 时间轴(同一个 OS 线程内)
# t=0 fetch(a) 启动 → await 让出
# fetch(b) 启动 → await 让出
# fetch(c) 启动 → await 让出
# t=1 三个 I/O 同时完成,loop 依次唤醒
# 全部完成,总耗时 ≈ 1s
loop = asyncio.get_event_loop()
print(f"loop id: {id(loop)}")
# 协程绑定的就是这个 loop;换了 loop 就失效
- 单线程内的协作式并发,
await是协程主动挂起的信号,event loop 趁机运行其他协程。OS 不参与调度,只负责底层 I/O 通知(epoll/kqueue)。单 OS 线程,无锁,极低切换成本。 - 为什么是 1 秒而不是 3 秒?
time.sleep(1):阻塞当前线程,这 1 秒里 CPU 什么都干不了,只能傻等。await asyncio.sleep(1):主动让出执行权,告诉事件循环:“我要等 1 秒,这期间你可以去跑其他任务,1 秒后再通知我继续执行”。
线程(Thread)实现
threading(I/O 密集):用线程并发执行同样任务
import threading, time
results = {}
def fetch(url: str):
time.sleep(1) # GIL 会自动释放
results[url] = f"result:{url}" # 注意竞态风险
threads = [threading.Thread(target=fetch, args=(u,))
for u in ["a", "b", "c"]]
start = time.time()
for t in threads: t.start() # 启动线程
for t in threads: t.join() # 等待所有线程执行完毕
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 1.0s ← OS 并发调度 3 个线程
multiprocessing(CPU 密集):绕过 GIL,真正多核并行
from multiprocessing import Pool
def cpu_task(n: int) -> int:
return sum(i*i for i in range(n)) # 纯计算
with Pool(4) as pool:
results = pool.map(cpu_task, [10**7]*4)
# 4 个进程真正并行跑,利用多核
**GIL 说明:**Python 的全局解释器锁(GIL)让同一时刻只有一个线程执行 Python 字节码。I/O 期间线程会释放 GIL(所以线程对 I/O 有效),但 CPU 密集任务必须用 multiprocessing 才能真正并行。
Agent 并发情况
绝大多数 Agent 项目(比如 LangChain/LangGraph 构建的),业务逻辑层面都是「单线程协程并发」,只是:
- 底层的 LLM 调用、工具请求、数据库读写 这些 IO 操作,会被并发执行;
- 但 Agent 的状态流转、决策逻辑 本身,还是在同一个事件循环的单线程里跑。
为什么 Agent 项目普遍是单线程协程模型?
| 特性 | 单线程协程的优势 | 多线程 / 多进程的劣势 |
|---|---|---|
| 状态一致性 | 所有节点共享同一个状态对象,单线程下无竞态条件,不需要加锁 | 多线程访问共享状态必须加锁,极易出现死锁、数据不一致 |
| 流程可控性 | 线性流转,遇到 await 主动让出,调试和追踪执行路径更简单 | 操作系统抢占式调度,执行路径不可控,难以复现问题 |
| 工具 / LLM 调用 | 大量 IO 等待(网络请求),协程非阻塞并发的优势完全发挥 | 多线程切换成本高,还受 GIL 限制,CPU 密集场景无优势 |
Agent 项目的并发,分为两个层级:
- ① Agent 内部的单线程并发
- 每个 Agent 实例,通常跑在一个线程 + 一个事件循环里;
- 内部的多个工具调用、LLM 请求,通过
asyncio.gather等方式并发执行; - 这些并发请求,都在同一个线程的事件循环里调度。
- ② 服务端的多请求并发(你说的 “请求不是”)
- 当你的 Agent 被部署为 API 服务(比如 FastAPI/LangServe)时:
- FastAPI 会为每个请求创建独立的协程任务;
- 这些请求级的协程,都在同一个进程的事件循环里并发;
- 极端高并发场景下,FastAPI 会通过多进程模式,启动多个进程 / 事件循环,实现真正的多进程并发。
- 当你的 Agent 被部署为 API 服务(比如 FastAPI/LangServe)时:
所以,一个典型的 Agent 服务,并发模型是这样的:
进程1(事件循环A) → 处理请求1、请求2、请求3(单线程协程并发)
进程2(事件循环B) → 处理请求4、请求5、请求6(单线程协程并发)
...
❌ 我用 LangServe 本地部署了一个 Agent 服务,用户同时启动两个 Session 去修改本地文件(两个请求),需要加锁,因为 FastAPI 会为每个请求创建独立的协程任务,所以有并发风险。 而如果是只有一个协程,这个协程里两个 agent 去写文件就不需要锁
关键点:是否需要加锁,不取决于“是不是 FastAPI / 协程”,而取决于“是否存在并发访问同一资源(文件)”。
1️⃣ FastAPI + 多请求的情况
用 FastAPI 时:
- 每个请求 → 一个独立协程
- 多个请求 → 并发执行(同一事件循环中交错运行)
- 如果两个 session 同时写同一个文件 → 竞态条件 👉 结论:必须加锁
否则可能出现:
- 写入内容交错(文件损坏)
- 覆盖彼此修改
- 部分写入(尤其是多步写操作)
2️⃣ “只有一个协程就不用锁”——这个说法不完全对
“如果是只有一个协程,这个协程里两个 agent 去写文件就不需要锁”,这句话只在一个前提下成立:两个 agent 的执行是严格串行的(没有 await 切换)
async def handle():
await asyncio.gather(
agent1.run(),
agent2.run()
)
这已经是并发执行了(即使在一个请求里),此时:
- 两个 agent 同时写文件
- ✅ 必须加锁
正确的判断标准:不要看“协程数量”,看这个:👉 是否可能在同一时间有多个执行路径写同一个文件
✅ 最终结论(写文件)
-
FastAPI 多请求 → 一定要锁
-
单协程 ≠ 不需要锁(只要有 await / 并发就可能出问题)
-
判断标准:是否可能并发写同一资源
-
不要因为
asyncio是单线程就认为天然安全。单线程避免了 GIL 级别的线程争抢,但由于await显式交出控制权的设计,协程级别的状态争用和竞态条件依然存在。操作共享文件,必须加锁。 -
单线程 ≠ 绝对安全:
asyncio确实在同一线程和事件循环中运行,没有系统级的线程争用,但竞态条件(Race Condition)依然在应用层存在。 -
await即交出控制权: 协程的并发本质是“协作式调度”。一旦执行到await(如异步写文件),当前 Agent 就会立刻挂起,其他 Agent 会趁机插入执行。 -
资源共享必加锁: 只要多个并发体(子协程)对同一个共享资源(如文件、内存状态)进行非原子性的操作,必须使用
asyncio.Lock()将操作边界封装成原子操作,否则一定会产生数据交错(脏写入)。如果试图用同步阻塞(不加await)来规避,则会导致整个事件循环卡死,属于严重的工程事故。
任务轮转
并发任务轮转:
- 协程是“用户态轮转”
- 线程是“内核态抢占”
协程的轮转:自己让出(主动)
协程是靠 event loop:
await something()
- 当前任务主动让出控制权
- event loop 选择下一个任务执行 所以协程是:
✅ 协作式调度(cooperative scheduling)
特点:
- 不会被强行打断
- 切换点完全由
await决定 - 可控、可预测
线程的轮转:操作系统强制切换(被动)
线程完全不一样,它不靠你写代码控制。线程的调度是由 操作系统调度器(scheduler) 完成的。
- 核心机制:时间片(time slice)
- 操作系统会这样做:
- 给每个线程一个时间片(比如几毫秒)
- 线程运行
- 时间到了 → 强制打断
- 切换到另一个线程
- 这叫:抢占式调度(preemptive scheduling)
为什么线程更容易出竞态?
results[url] = f"result:{url}" # 注意竞态风险
问题在于:这一行不是原子操作。线程执行可能是这样:
线程A:读 results
⏰ 被打断
线程B:修改 results
线程A:继续写(覆盖)
根本控制不了切换点在哪。
GIL 对线程轮转的影响(Python 特殊点)
在 CPython 里有个东西叫: GIL(全局解释器锁)
它的作用是:
- 同一时刻只允许一个线程执行 Python 字节码
- 但!仍然会切换线程(不是不切) 切换时机大致有:
- 时间片(默认约 5ms)
- I/O 操作(如
time.sleep()) - 某些 C 扩展释放 GIL 所以: Python 线程是“假并行 + 真并发”
总结
| 机制 | 谁控制切换 | 是否可预测 | 是否会被打断 |
|---|---|---|---|
| 协程 | 你(await) | ✅ 高 | ❌ 不会 |
| 线程 | 操作系统 | ❌ 低 | ✅ 随时 |
为什么线程一定要加锁,而协程有时可以不用?答案就是:
- 线程:随时被打断 → 必须锁
- 协程:只有 await 才切换 → 可控
工程建议:agent + 文件操作 场景
- 用协程:可以通过“设计避免并发”
- 串行写文件:单写队列(并发生产,串行消费)
- 用线程 / FastAPI:不要赌,直接加锁
协程:通过设计避免并发(单写队列)
核心思想是:所有写文件请求都进入一个队列,由唯一一个 writer 协程串行消费。 这样即使有很多 agent 并发运行,也只有一个地方真正落盘。
import asyncio
from pathlib import Path
FILE_PATH = Path("agent_output.txt")
write_queue = asyncio.Queue()
async def file_writer():
"""唯一写文件协程"""
while True:
content = await write_queue.get()
# 串行写入,不会并发
with open(FILE_PATH, "a", encoding="utf-8") as f:
f.write(content + "\n")
write_queue.task_done()
async def agent(name: str):
# 模拟 agent 推理 / 调用工具
await asyncio.sleep(1)
result = f"{name}: generated content"
# 不直接写文件,只投递任务
await write_queue.put(result)
async def main():
# 启动唯一 writer
writer_task = asyncio.create_task(file_writer())
# 多个 agent 并发跑
await asyncio.gather(
agent("agent-1"),
agent("agent-2"),
agent("agent-3"),
)
# 等待全部写完
await write_queue.join()
writer_task.cancel()
asyncio.run(main())
- 为什么这里可以不加锁?
- 因为真正写文件的只有:
async def file_writer(),这一个协程。 - 即使多个 agent 同时:
await asyncio.gather(...)。它们只是并发产出结果,不直接写文件。 - 所以这是:并发生产,串行消费。
- 因为真正写文件的只有:
FastAPI / 多请求:必须加锁
因为多个请求会同时进来:
request A → 写文件
request B → 写文件
可能同时修改同一文件。
from fastapi import FastAPI
import asyncio
app = FastAPI()
file_lock = asyncio.Lock()
@app.post("/write")
async def write_file(content: str):
async with file_lock:
with open("demo.txt", "a", encoding="utf-8") as f:
f.write(content + "\n")
return {"status": "ok"}
- 为什么这里必须锁?
- 因为:
async def write_file(...)每个请求都会产生独立协程,两个协程都可能执行到:with open(...) - 所以必须:
async with file_lock让同一时间只能一个请求写。
- 因为:
多线程时的锁(threading)
import threading
lock = threading.Lock()
def write_file(content):
with lock:
with open("demo.txt", "a") as f:
f.write(content + "\n")
因为线程可能在任意时刻被 OS 打断:
线程A 写到一半
↓
切换线程B
↓
文件损坏
并发任务拆分

GPT Researcher 的核心价值之一,就是将传统的“串行单线程 Research”重构为了高并发、具备容错能力的异步数据流。对于 Agent 工程而言,单纯的 API 调用很简单,但如何稳定、高效地调度几十个并发的检索子任务,才是工程实力的体现。
针对图片中提到的三个核心点,以下是其底层的工程化实现方案与最佳实践剖析:
任务拆分
在进行深度研究时,单一的 Query 无法覆盖全貌。Agent 需要将大目标拆解为多个子目标,映射到具体的执行任务。
- 工程实现:通过一个专门的“规划阶段(Planning Phase)”,利用 LLM 将主问题分解为一系列独立的搜索词(Search Queries)。
- 代码映射:生成一个包含多个 Query 的列表,然后通过列表推导式,将每一个 Query 包装成一个独立的协程任务(Coroutine Task)。
# 假设 LLM 返回了 ["Query A", "Query B", "Query C"]
sub_queries = await generate_sub_queries(main_topic)
# 将每一个 query 映射为一个异步抓取/分析的协程任务
# 注意:这里只是创建了协程对象,尚未投入事件循环执行
tasks = [run_sub_research(query) for query in sub_queries]
并发与容错
这是 researcher.py 中最关键的工程处理。在并发请求几十个网页或执行多次搜索时,网络超时、代理失效、被目标网站拦截(403/429)是必然发生的常态。如果一个子任务崩溃导致整个 Agent 崩溃,工程健壮性就不合格。
- 致命误区:直接使用
await asyncio.gather(*tasks)。这种写法下,只要有一个子任务抛出异常(如TimeoutError),gather就会立刻向上抛出该异常,导致其他所有正在进行的子任务被隐式取消或丢弃。 - 核心真相 (
return_exceptions=True):必须将gather的return_exceptions参数设为True。这会将异常捕获并作为结果对象返回,从而实现“部分失败(Partial Failure)”的隔离。 - 并发控制 (
Semaphore):为了防止瞬间发起几百个并发请求导致 LLM API 触发限流(Rate Limit)或耗尽系统文件描述符,必须结合asyncio.Semaphore进行并发度压制。
# 定义并发信号量,例如最多同时运行 10 个子任务
semaphore = asyncio.Semaphore(10)
async def safe_run_sub_research(query):
async with semaphore:
try:
# 增加单任务维度的超时控制
return await asyncio.wait_for(run_sub_research(query), timeout=30.0)
except Exception as e:
# 在任务内部捕获特定异常,或直接抛出由外层 gather(return_exceptions=True) 兜底
raise RuntimeError(f"Task {query} failed: {str(e)}")
# 包装所有任务
safe_tasks = [safe_run_sub_research(q) for q in sub_queries]
# 核心:并发执行并隔离异常
results = await asyncio.gather(*safe_tasks, return_exceptions=True)
结果聚合
任务执行完毕后,results 列表中会混杂着成功的数据(如抓取到的文本)和失败的异常对象(如 TimeoutError)。此时需要进行数据清洗。
- 工程实现:遍历结果,通过类型检查(
isinstance)过滤掉异常对象,将有效的上下文(Contexts)拼接起来,最终输入给负责总结(Summarize)的 LLM。
valid_contexts = []
failed_queries_count = 0
for res in results:
if isinstance(res, Exception):
# 记录部分失败的日志,不中断主流程
print(f"[Warning] Sub-task exception caught: {res}")
failed_queries_count += 1
continue
# 将成功的返回结果加入上下文池
valid_contexts.append(res)
# 只要有足够比例的有效上下文,系统就可以继续推进
if not valid_contexts:
raise ValueError("All sub-research tasks failed. Cannot proceed.")
# 将 valid_contexts 合并,送入最终的 Report Generation 节点
final_report = await generate_report(valid_contexts)
通过这种设计,系统从脆弱的“一环断则全链断”转变为具备弹性降级能力的架构。即使 20 个检索任务中失败了 3 个,系统依然能利用剩下的 17 个有效信息生成高质量报告。