公开笔记

LangServe 源码解读与异步并发实战

详解LangServe部署原理与add_routes底层机制、FastAPI自动接口生成、SSE流式输出,梳理Python协程/线程/进程及GIL核心区别,讲解Agent单线程协程并发模型、共享文件资源加锁方案、任务拆分、并发限流与容错降级工程实战。

发布于 更新于

LangServe 和 LangGraph 均为 LangChain 生态工具,但定位与功能完全不同:LangServe 是部署层工具(将 LangChain 应用转为 API),LangGraph 是编排层框架(构建复杂有状态 Agent 工作流)。两者常协同使用,但解决的是 LLM 应用开发的不同阶段问题。

add_routes

本质:读取你的 Runnable(可以是 LLM、Chain、Agent) → 自动生成数据验证模型 → 在 FastAPI app 上自动注册一组 HTTP 路由 → 把 HTTP 请求转发给 Runnable 执行 → 把执行结果转回 HTTP 响应。

前置条件:

  1. FastAPI:Python 高性能 Web 框架,负责处理 HTTP 请求、生成接口文档、数据验证
  2. Runnable:LangChain 所有组件(模型、链、代理)的统一接口,固定有 4 个核心方法:
    • invoke():同步单次调用
    • stream():流式返回
    • batch():批量调用
    • stream_log():流式日志
  • 最小可运行的 server.py
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI

app = FastAPI()
llm = ChatOpenAI(model="gpt-4o")

add_routes(
    app,           # FastAPI 实例
    llm,           # 任意 Runnable
    path="/chat",  # 路由前缀
)

# uvicorn server:app --reload

一行 add_routes,自动生成四个端点:

端点作用
POST /chat/invoke单次调用,返回完整结果
POST /chat/batch批量调用
POST /chat/streamSSE 流式输出
GET /chat/playground内置调试 UI

同步 LLM 调用如何变成异步接口?

LangChain 的 Runnable 接口同时实现了同步和异步两套方法:

# 同步
llm.invoke("hello")

# 异步(底层用 asyncio.to_thread 或原生 async)
await llm.ainvoke("hello")

LangServe 生成的每个端点都是 FastAPI 的 async def,内部调用 Runnable 的 ainvoke / astream。 这样整个调用链全程非阻塞,FastAPI 的 event loop 不会被 LLM 的网络 IO 卡住。

# add_routes 内部简化逻辑(伪代码)
@app.post("/chat/invoke")
async def invoke_endpoint(request: InvokeRequest) -> InvokeResponse:
    output = await runnable.ainvoke(
        request.input,
        config=request.config,
    )
    return InvokeResponse(output=output)

如果 Runnable 只有同步的 .invoke(),LangChain 会自动用 asyncio.get_event_loop().run_in_executor(None, sync_fn) 包一层,跑进线程池,不阻塞 event loop。


流式输出(SSE)的处理

/stream 端点使用 Server-Sent Events,底层是 astream

@app.post("/chat/stream")
async def stream_endpoint(request: StreamRequest):
    async def event_generator():
        async for chunk in runnable.astream(request.input):
            yield {"data": serialize(chunk)}

    return EventSourceResponse(event_generator())

客户端收到的是一帧帧 token,而不是等全部生成完才返回。


asyncio 封装自定义同步函数

这是「阶段二:并发与稳定性」真正要学的东西——如何自己实现类似机制

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

def sync_llm_call(prompt: str) -> str:
    # 假设这是一个阻塞的同步调用
    import time; time.sleep(2)
    return f"Response to: {prompt}"

async def async_llm_call(prompt: str) -> str:
    loop = asyncio.get_event_loop()
    # 把同步函数扔进线程池,不阻塞 event loop
    result = await loop.run_in_executor(executor, sync_llm_call, prompt)
    return result

# FastAPI 里使用
@app.post("/invoke")
async def invoke(prompt: str):
    return await async_llm_call(prompt)

核心概念总结

同步 LLM 调用
    ↓  asyncio.run_in_executor (线程池)
async def endpoint   ←─── FastAPI event loop
    ↓  SSE / JSON
HTTP 客户端

add_routes 帮你把这整个流程打包好了,核心价值就是:不用自己写每个 invoke/batch/stream endpoint 的 async 胶水代码

异步、协程、线程

核心概念

LangServe 源码解读_1777297149375

  • 并发(Concurrency):多个任务交替执行,看起来像同时在跑,本质上同一时间只有一个任务在干活(比如单线程协程)。
  • 并行(Parallelism):多个任务同时执行,同一时间真的有多个任务在不同 CPU 核心上同时干活(比如多进程 / 多线程跑 CPU 密集任务)。
  • GILCPython 的全局解释器锁(Global Interpreter Lock),GIL 是一把强制锁,保证同一时间、同一个 Python 进程里,只能有 1 个线程执行 Python 字节码。
    • 为什么会有 GIL? 历史原因:
      • Python 早期没设计多线程安全
      • 为了内存管理、垃圾回收不崩溃
      • 加了一把全局大锁,简单粗暴保证安全
    • GIL 的影响
      • CPU 密集任务:多线程 = 没用!计算任务会一直占着 GIL,多线程反而更慢。→ 必须用 多进程 multiprocessing 才能利用多核。
      • IO 密集任务:多线程 = 有用!网络请求、数据库、文件读写、sleep 时,GIL 会自动释放!线程可以在等待时切换,实现并发。
      • 协程 asyncio:完全不受 GIL 影响。因为协程本来就是 单线程 的,GIL 根本不参与切换。

容易混淆的边界

  • 异步 ≠ 多线程。 异步是一种编程模型——” 不阻塞地发起操作 “;多线程是实现并发的一种机制。异步可以在单线程里实现(Python asyncio 就是)。
  • 协程 ≠ 线程。 协程由程序自己调度(用户态),OS 完全不感知;线程由 OS 调度(内核态)。协程切换几乎没有开销,线程切换需要上下文保存。
  • asyncio ≠ OS 自动管理。 Python asyncio 是用户态调度器,运行在单 OS 线程内。OS 只负责底层 I/O 通知(epoll/kqueue),” 谁来处理通知 ” 完全由 event loop 决定。
操作系统层                  线程1      线程2      线程3
                          ↕          ↕          ↕
Python 进程层               asyncio    asyncio    asyncio
                          loop A     loop B     loop C
                         /  |  \
                        coro coro coro   ← 这些都在同一个 OS 线程里

coroutine(协程)是 Python 中用户态的、轻量级的、由程序主动调度的 “任务单元”,你可以把它理解成 “单线程里的微线程”。

Python 异步编程中,事件循环(eventloop)和协程(coro)是用户态调度的轻量任务单元,与所属 OS 线程强绑定,操作系统无法感知也无法管理其资源绑定关系

对比

维度同步 (Sync)异步 (Async) / 协程 (Coro)线程 (Thread)
调度者程序自身event loop(用户态)操作系统(内核态)
切换时机不切换遇到 await 主动让出OS 抢占,任意时刻
切换成本极低(无系统调用)较高(保存寄存器 / 栈)
并发表现串行执行单线程并发(GIL + 单 loop)多线程并发(多核 CPU)
受 GIL 制约,仅 IO 密集有并发优势
适合场景简单脚本、CPU 密集大量 I/O、网络请求CPU 密集、需真并行
需要加锁通常不需要是(竞态条件)
是否受 GIL 限制-不受影响受限制
  • 协程(asyncio)的 asyncio.gather并发,单线程内切换任务,同一时间只有一段代码在跑。
  • 多线程(threading):IO 密集场景是并发(GIL 限制,同一时间只能一个线程跑 Python 代码);如果是 C 扩展 / IO 等待,会有部分并行效果,但不是真・Python 代码并行。
  • 多进程(multiprocessing):并行,每个进程有独立 GIL,能同时在不同 CPU 核心上执行 Python 代码。

同步(Sync)实现

同步阻塞:依次请求 3 个接口,每个耗时 1 秒

import time

def fetch(url: str) -> str:
    time.sleep(1)          # 模拟网络 I/O
    return f"result:{url}"

def main():
    urls = ["a", "b", "c"]
    results = []
    for url in urls:
        results.append(fetch(url))   # 阻塞,等完再走
    return results

start = time.time()
main()
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 3.0s
  • 执行特征:串行,每个 fetch 都卡住主线程。总耗时 = 各任务耗时之和。
  • OS 调度:无任何异步,CPU 在 sleep 期间完全空等。

异步(Async)/ 协程(Coro)实现

asyncio 协程:并发 (其实是单线程) 请求同样 3 个接口

import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(1)   # 让出控制权给 event loop
    return f"result:{url}"

async def main():
    urls = ["a", "b", "c"]
    results = await asyncio.gather(
        *[fetch(url) for url in urls]
    )
    return results

import time
start = time.time()
asyncio.run(main())
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 1.0s   ← 3 个协程并发等待

event loop 调度过程

# 时间轴(同一个 OS 线程内)
#  t=0   fetch(a) 启动 → await 让出
#        fetch(b) 启动 → await 让出
#        fetch(c) 启动 → await 让出
#  t=1   三个 I/O 同时完成,loop 依次唤醒
#        全部完成,总耗时 ≈ 1s

loop = asyncio.get_event_loop()
print(f"loop id: {id(loop)}")
# 协程绑定的就是这个 loop;换了 loop 就失效
  • 单线程内的协作式并发,await 是协程主动挂起的信号,event loop 趁机运行其他协程。OS 不参与调度,只负责底层 I/O 通知(epoll/kqueue)。单 OS 线程,无锁,极低切换成本。
  • 为什么是 1 秒而不是 3 秒?
    • time.sleep(1)阻塞当前线程,这 1 秒里 CPU 什么都干不了,只能傻等。
    • await asyncio.sleep(1)主动让出执行权,告诉事件循环:“我要等 1 秒,这期间你可以去跑其他任务,1 秒后再通知我继续执行”。

线程(Thread)实现

threading(I/O 密集):用线程并发执行同样任务

import threading, time

results = {}

def fetch(url: str):
    time.sleep(1) # GIL 会自动释放
    results[url] = f"result:{url}"  # 注意竞态风险

threads = [threading.Thread(target=fetch, args=(u,))
           for u in ["a", "b", "c"]]

start = time.time()
for t in threads: t.start() # 启动线程
for t in threads: t.join() # 等待所有线程执行完毕
print(f"耗时: {time.time()-start:.1f}s")
# 输出: 耗时: 1.0s   ← OS 并发调度 3 个线程

multiprocessing(CPU 密集):绕过 GIL,真正多核并行

from multiprocessing import Pool

def cpu_task(n: int) -> int:
    return sum(i*i for i in range(n))   # 纯计算

with Pool(4) as pool:
    results = pool.map(cpu_task, [10**7]*4)
# 4 个进程真正并行跑,利用多核

**GIL 说明:**Python 的全局解释器锁(GIL)让同一时刻只有一个线程执行 Python 字节码。I/O 期间线程会释放 GIL(所以线程对 I/O 有效),但 CPU 密集任务必须用 multiprocessing 才能真正并行。

Agent 并发情况

绝大多数 Agent 项目(比如 LangChain/LangGraph 构建的),业务逻辑层面都是「单线程协程并发」,只是:

  • 底层的 LLM 调用、工具请求、数据库读写 这些 IO 操作,会被并发执行;
  • Agent 的状态流转、决策逻辑 本身,还是在同一个事件循环的单线程里跑。

为什么 Agent 项目普遍是单线程协程模型?

特性单线程协程的优势多线程 / 多进程的劣势
状态一致性所有节点共享同一个状态对象,单线程下无竞态条件,不需要加锁多线程访问共享状态必须加锁,极易出现死锁、数据不一致
流程可控性线性流转,遇到 await 主动让出,调试和追踪执行路径更简单操作系统抢占式调度,执行路径不可控,难以复现问题
工具 / LLM 调用大量 IO 等待(网络请求),协程非阻塞并发的优势完全发挥多线程切换成本高,还受 GIL 限制,CPU 密集场景无优势

Agent 项目的并发,分为两个层级:

  • ① Agent 内部的单线程并发
    • 每个 Agent 实例,通常跑在一个线程 + 一个事件循环里;
    • 内部的多个工具调用、LLM 请求,通过 asyncio.gather 等方式并发执行;
    • 这些并发请求,都在同一个线程的事件循环里调度。
  • ② 服务端的多请求并发(你说的 “请求不是”)
    • 当你的 Agent 被部署为 API 服务(比如 FastAPI/LangServe)时:
      • FastAPI 会为每个请求创建独立的协程任务;
      • 这些请求级的协程,都在同一个进程的事件循环里并发;
      • 极端高并发场景下,FastAPI 会通过多进程模式,启动多个进程 / 事件循环,实现真正的多进程并发。

所以,一个典型的 Agent 服务,并发模型是这样的:

进程1(事件循环A) → 处理请求1、请求2、请求3(单线程协程并发)
进程2(事件循环B) → 处理请求4、请求5、请求6(单线程协程并发)
...

❌ 我用 LangServe 本地部署了一个 Agent 服务,用户同时启动两个 Session 去修改本地文件(两个请求),需要加锁,因为 FastAPI 会为每个请求创建独立的协程任务,所以有并发风险。 而如果是只有一个协程,这个协程里两个 agent 去写文件就不需要锁

关键点:是否需要加锁,不取决于“是不是 FastAPI / 协程”,而取决于“是否存在并发访问同一资源(文件)”

1️⃣ FastAPI + 多请求的情况

用 FastAPI 时:

  • 每个请求 → 一个独立协程
  • 多个请求 → 并发执行(同一事件循环中交错运行)
  • 如果两个 session 同时写同一个文件 → 竞态条件 👉 结论:必须加锁

否则可能出现:

  • 写入内容交错(文件损坏)
  • 覆盖彼此修改
  • 部分写入(尤其是多步写操作)

2️⃣ “只有一个协程就不用锁”——这个说法不完全对

“如果是只有一个协程,这个协程里两个 agent 去写文件就不需要锁”,这句话只在一个前提下成立:两个 agent 的执行是严格串行的(没有 await 切换)

async def handle():
    await asyncio.gather(
        agent1.run(),
        agent2.run()
    )

这已经是并发执行了(即使在一个请求里),此时:

  • 两个 agent 同时写文件
  • ✅ 必须加锁

正确的判断标准:不要看“协程数量”,看这个:👉 是否可能在同一时间有多个执行路径写同一个文件

✅ 最终结论(写文件)

  • FastAPI 多请求 → 一定要锁

  • 单协程 ≠ 不需要锁(只要有 await / 并发就可能出问题)

  • 判断标准:是否可能并发写同一资源

  • 不要因为 asyncio 是单线程就认为天然安全。单线程避免了 GIL 级别的线程争抢,但由于 await 显式交出控制权的设计,协程级别的状态争用和竞态条件依然存在。操作共享文件,必须加锁。

  • 单线程 ≠ 绝对安全: asyncio 确实在同一线程和事件循环中运行,没有系统级的线程争用,但竞态条件(Race Condition)依然在应用层存在

  • await 即交出控制权: 协程的并发本质是“协作式调度”。一旦执行到 await(如异步写文件),当前 Agent 就会立刻挂起,其他 Agent 会趁机插入执行。

  • 资源共享必加锁: 只要多个并发体(子协程)对同一个共享资源(如文件、内存状态)进行非原子性的操作,必须使用 asyncio.Lock() 将操作边界封装成原子操作,否则一定会产生数据交错(脏写入)。如果试图用同步阻塞(不加 await)来规避,则会导致整个事件循环卡死,属于严重的工程事故。

任务轮转

并发任务轮转:

  • 协程是“用户态轮转”
  • 线程是“内核态抢占”

协程的轮转:自己让出(主动)

协程是靠 event loop:

await something()
  • 当前任务主动让出控制权
  • event loop 选择下一个任务执行 所以协程是:

协作式调度(cooperative scheduling)

特点:

  • 不会被强行打断
  • 切换点完全由 await 决定
  • 可控、可预测

线程的轮转:操作系统强制切换(被动)

线程完全不一样,它不靠你写代码控制。线程的调度是由 操作系统调度器(scheduler) 完成的。

  • 核心机制:时间片(time slice)
  • 操作系统会这样做:
    1. 给每个线程一个时间片(比如几毫秒)
    2. 线程运行
    3. 时间到了 → 强制打断
    4. 切换到另一个线程
  • 这叫:抢占式调度(preemptive scheduling)

为什么线程更容易出竞态?

results[url] = f"result:{url}"  # 注意竞态风险

问题在于:这一行不是原子操作。线程执行可能是这样:

线程A:读 results
⏰ 被打断
线程B:修改 results
线程A:继续写(覆盖)

根本控制不了切换点在哪。

GIL 对线程轮转的影响(Python 特殊点)

在 CPython 里有个东西叫: GIL(全局解释器锁)

它的作用是:

  • 同一时刻只允许一个线程执行 Python 字节码
  • 但!仍然会切换线程(不是不切) 切换时机大致有:
  • 时间片(默认约 5ms)
  • I/O 操作(如 time.sleep()
  • 某些 C 扩展释放 GIL 所以: Python 线程是“假并行 + 真并发”

总结

机制谁控制切换是否可预测是否会被打断
协程你(await)✅ 高❌ 不会
线程操作系统❌ 低✅ 随时

为什么线程一定要加锁,而协程有时可以不用?答案就是:

  • 线程:随时被打断 → 必须锁
  • 协程:只有 await 才切换 → 可控

工程建议:agent + 文件操作 场景

  • 用协程:可以通过“设计避免并发”
    • 串行写文件:单写队列(并发生产,串行消费)
  • 用线程 / FastAPI:不要赌,直接加锁

协程:通过设计避免并发(单写队列)

核心思想是:所有写文件请求都进入一个队列,由唯一一个 writer 协程串行消费。 这样即使有很多 agent 并发运行,也只有一个地方真正落盘。

import asyncio
from pathlib import Path

FILE_PATH = Path("agent_output.txt")
write_queue = asyncio.Queue()


async def file_writer():
    """唯一写文件协程"""
    while True:
        content = await write_queue.get()

        # 串行写入,不会并发
        with open(FILE_PATH, "a", encoding="utf-8") as f:
            f.write(content + "\n")

        write_queue.task_done()


async def agent(name: str):
    # 模拟 agent 推理 / 调用工具
    await asyncio.sleep(1)

    result = f"{name}: generated content"

    # 不直接写文件,只投递任务
    await write_queue.put(result)


async def main():
    # 启动唯一 writer
    writer_task = asyncio.create_task(file_writer())

    # 多个 agent 并发跑
    await asyncio.gather(
        agent("agent-1"),
        agent("agent-2"),
        agent("agent-3"),
    )

    # 等待全部写完
    await write_queue.join()

    writer_task.cancel()


asyncio.run(main())
  • 为什么这里可以不加锁?
    • 因为真正写文件的只有:async def file_writer(),这一个协程。
    • 即使多个 agent 同时:await asyncio.gather(...)。它们只是并发产出结果,不直接写文件。
    • 所以这是:并发生产,串行消费

FastAPI / 多请求:必须加锁

因为多个请求会同时进来:

request A → 写文件
request B → 写文件

可能同时修改同一文件。

from fastapi import FastAPI
import asyncio

app = FastAPI()

file_lock = asyncio.Lock()


@app.post("/write")
async def write_file(content: str):
    async with file_lock:
        with open("demo.txt", "a", encoding="utf-8") as f:
            f.write(content + "\n")

    return {"status": "ok"}
  • 为什么这里必须锁?
    • 因为:async def write_file(...) 每个请求都会产生独立协程,两个协程都可能执行到:with open(...)
    • 所以必须:async with file_lock 让同一时间只能一个请求写。

多线程时的锁(threading)

import threading

lock = threading.Lock()

def write_file(content):
    with lock:
        with open("demo.txt", "a") as f:
            f.write(content + "\n")

因为线程可能在任意时刻被 OS 打断:

线程A 写到一半

切换线程B

文件损坏

并发任务拆分

LangServe 源码解读_1777304794953

GPT Researcher 的核心价值之一,就是将传统的“串行单线程 Research”重构为了高并发、具备容错能力的异步数据流。对于 Agent 工程而言,单纯的 API 调用很简单,但如何稳定、高效地调度几十个并发的检索子任务,才是工程实力的体现。

针对图片中提到的三个核心点,以下是其底层的工程化实现方案与最佳实践剖析:

任务拆分

在进行深度研究时,单一的 Query 无法覆盖全貌。Agent 需要将大目标拆解为多个子目标,映射到具体的执行任务。

  • 工程实现:通过一个专门的“规划阶段(Planning Phase)”,利用 LLM 将主问题分解为一系列独立的搜索词(Search Queries)。
  • 代码映射:生成一个包含多个 Query 的列表,然后通过列表推导式,将每一个 Query 包装成一个独立的协程任务(Coroutine Task)。
# 假设 LLM 返回了 ["Query A", "Query B", "Query C"]
sub_queries = await generate_sub_queries(main_topic)

# 将每一个 query 映射为一个异步抓取/分析的协程任务
# 注意:这里只是创建了协程对象,尚未投入事件循环执行
tasks = [run_sub_research(query) for query in sub_queries]

并发与容错

这是 researcher.py 中最关键的工程处理。在并发请求几十个网页或执行多次搜索时,网络超时、代理失效、被目标网站拦截(403/429)是必然发生的常态。如果一个子任务崩溃导致整个 Agent 崩溃,工程健壮性就不合格。

  • 致命误区:直接使用 await asyncio.gather(*tasks)。这种写法下,只要有一个子任务抛出异常(如 TimeoutError),gather 就会立刻向上抛出该异常,导致其他所有正在进行的子任务被隐式取消或丢弃。
  • 核心真相 (return_exceptions=True):必须将 gatherreturn_exceptions 参数设为 True。这会将异常捕获并作为结果对象返回,从而实现“部分失败(Partial Failure)”的隔离。
  • 并发控制 (Semaphore):为了防止瞬间发起几百个并发请求导致 LLM API 触发限流(Rate Limit)或耗尽系统文件描述符,必须结合 asyncio.Semaphore 进行并发度压制。
# 定义并发信号量,例如最多同时运行 10 个子任务
semaphore = asyncio.Semaphore(10)

async def safe_run_sub_research(query):
    async with semaphore:
        try:
            # 增加单任务维度的超时控制
            return await asyncio.wait_for(run_sub_research(query), timeout=30.0)
        except Exception as e:
            # 在任务内部捕获特定异常,或直接抛出由外层 gather(return_exceptions=True) 兜底
            raise RuntimeError(f"Task {query} failed: {str(e)}")

# 包装所有任务
safe_tasks = [safe_run_sub_research(q) for q in sub_queries]

# 核心:并发执行并隔离异常
results = await asyncio.gather(*safe_tasks, return_exceptions=True)

结果聚合

任务执行完毕后,results 列表中会混杂着成功的数据(如抓取到的文本)和失败的异常对象(如 TimeoutError)。此时需要进行数据清洗。

  • 工程实现:遍历结果,通过类型检查(isinstance)过滤掉异常对象,将有效的上下文(Contexts)拼接起来,最终输入给负责总结(Summarize)的 LLM。
valid_contexts = []
failed_queries_count = 0

for res in results:
    if isinstance(res, Exception):
        # 记录部分失败的日志,不中断主流程
        print(f"[Warning] Sub-task exception caught: {res}")
        failed_queries_count += 1
        continue
    
    # 将成功的返回结果加入上下文池
    valid_contexts.append(res)

# 只要有足够比例的有效上下文,系统就可以继续推进
if not valid_contexts:
    raise ValueError("All sub-research tasks failed. Cannot proceed.")

# 将 valid_contexts 合并,送入最终的 Report Generation 节点
final_report = await generate_report(valid_contexts)

通过这种设计,系统从脆弱的“一环断则全链断”转变为具备弹性降级能力的架构。即使 20 个检索任务中失败了 3 个,系统依然能利用剩下的 17 个有效信息生成高质量报告。

← 返回 Notes

Contact

Contact Me

Leave a message here. The form sends directly from the browser to a form delivery service and then to my email.

Messages are delivered to lzx744008464@gmail.com.