公开笔记

MultiAgent

覆盖 MultiAgent 多智能体系统数据流转与通信机制,详解消息队列、状态图(LangGraph)两大协作模式,包含原理、工程实现、执行流程、优缺点、适用场景、核心对比以及生产环境混合架构落地建议。

发布于 更新于

数据流转

  • Agent 间通过消息队列通信
  • Agent 间通过状态图传递中间产物

通过消息队列

核心思想

Agent 之间完全解耦,采用异步、事件驱动的通信模式。每个 Agent 只监听特定队列(或 topic),处理完后将结构化消息推送到下一个队列。

  • 每个 Agent 独立运行
  • 不直接调用对方,通过消息队列发 / 收结构化消息
  • 中间产物(上下文、结果、记忆、指令)统一格式(JSON/JSONL)

工程实现

  • 消息队列系统:
    • 轻量:Redis Pub/Sub、Redis List
    • 中等:RabbitMQ、NSQ、Kafka(轻量版)
    • 框架内置:LangChain MessageBus、AutoGPT 消息通道
  • 协议层:可结合 A2A (Agent2Agent) Protocol(Google 提出的开放标准)或自定义 MCP(Model Context Protocol),定义消息格式。
  • 中间产物格式:统一使用 Pydantic 模型 或 JSON Schema,例如:
class RetrievalResult(BaseModel):
    query: str
    subgraph: Dict  # 或 GraphRAG 子图结构
    entities: List[str]
    text_units: List[str]
    confidence: float
    source: str
{
  "task_id": "uuid",
  "from_agent": "DataAgent",
  "to_agent": "GraphRAgent",
  "msg_type": "intermediate_output",
  "content": {
    "entities": [...],
    "relations": [...],
    "context": "..."
  },
  "status": "processing",
  "timestamp": 1712345678
}

标准流程(SOP)示例

  1. Planner Agent(规划者)接收用户查询,生成结构化任务消息(TaskMessage),推送到 Research Queue
  2. Retriever Agent(检索者)从 Research Queue 拉取任务,执行 GraphRAG 的 Local SearchGlobal Search,生成 RetrievalResult,推送到 Analyzer Queue
  3. Analyzer Agent(分析者)处理 RetrievalResult,生成分析报告(AnalysisReport),推送到 Validator Queue
  4. Validator Agent(验证者)检查质量,若通过则推送到 Synthesizer Queue,否则返回 Planner Queue 触发重试。
  5. Synthesizer Agent(合成者)生成最终答案并返回给用户。

总结

优点

  • 高可扩展性:可水平扩展 Agent 实例,支持高并发。
  • 容错性强:消息持久化、重试、死信队列天然支持。
  • 解耦彻底:不同 Agent 可使用不同技术栈,甚至不同语言。
  • 适合分布式、生产级部署(微服务架构)。

缺点

  • 调试较复杂(需要追踪消息流)。
  • 端到端延迟可能稍高(异步)。
  • 需要额外管理队列配置和消息一致性。 适用场景
  • 大规模、多租户、生产环境。
  • Agent 数量多、部署在不同机器/容器。
  • 需要事件溯源(Event Sourcing)或监控每个中间产物的流转。 常见框架/库支持
  • LangChain + Redis Queue 自定义 LLM。
  • AutoGen(支持消息队列模式)。
  • LlamaIndex 的部分事件驱动实现。
  • CrewAI + 外部消息 broker。

通过状态图/状态机

核心思想

有向图(Directed Graph) 来显式建模整个工作流。每个 Agent 是一个 Node(节点),中间产物保存在共享的 State(状态对象) 中,通过边(Edges)在节点间流转。典型代表是 LangGraph(LangChain 生态)。

  • 全局维护一个状态机(State Machine)
  • 每个 Agent 只负责一个状态节点
  • 中间产物存在全局状态 / 共享内存
  • 状态流转驱动 Agent 执行

工程实现

  • State:一个共享的 Pydantic/BaseModel 对象,累积所有中间产物。例如:
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]  # 对话历史
    query: str
    retrieval_result: Optional[RetrievalResult]   # GraphRAG 子图等
    analysis: Optional[AnalysisReport]
    validation_score: Optional[float]
    final_answer: Optional[str]
    iteration: int  # 支持循环重试
  • Nodes:每个 Agent 执行函数,接收当前 State,返回更新后的 State(或 Command)。
  • Edges:条件边(conditional edges)决定下一步流向(例如:如果 validation_score < 0.8,则返回重试节点)。

标准流程(SOP)示例

  1. 定义 StateGraph(AgentState)。
  2. 添加节点:
    • planner_node:更新 query 和计划。
    • retriever_node:执行 GraphRAG Local/Global Search,更新 retrieval_result。
    • analyzer_node:基于 retrieval_result 更新 analysis。
    • validator_node:计算分数并决定是否继续。
    • synthesizer_node:生成最终答案。
  3. 设置条件边:例如 tools_condition 或自定义路由函数。
  4. 编译图:graph = workflow.compile(),然后 graph.invoke(initial_state) 或流式执行。

总结

优点

  • 可观察性极强:整个流程可视化(LangGraph Studio 支持图形界面调试)。
  • 状态集中管理:所有中间产物都在一个 State 对象中,易于追踪、持久化(MemorySaver 支持 checkpoint)。
  • 支持循环与分支:天然处理重试、反思(reflection)、多跳推理。
  • 开发体验好:代码即工作流,调试时可单步执行。
  • 与 GraphRAG 结合紧密:Retriever Node 可直接调用 Local Search / Global Search,返回结构化子图。 缺点
  • 单进程/单机时更高效,分布式部署需 LangGraph Server(增加复杂度)。
  • 高度结构化,灵活性略低于纯消息队列(但可通过 Supervisor 节点弥补)。 适用场景
  • 原型开发、复杂推理流程、需要频繁迭代的 Agentic GraphRAG。
  • 需要 Human-in-the-Loop(人工干预节点)。
  • 团队内部开发与调试优先的场景。
  • 与 GraphRAG 深度集成(Local/Global Search 作为工具节点)。 常见框架
  • LangGraph(最主流,支持 MessagesState、自定义 State、持久化、流式)。
  • CrewAI(更高层抽象,但底层也可转为 graph)。
  • AutoGen(早期更消息导向,后续也支持 graph-like 协调)。

总结

维度消息队列(异步解耦)状态图(LangGraph 等)
通信方式异步消息推送/拉取同步/异步状态传递(共享 State)
中间产物传递每条消息独立携带结构化 payload累积在共享 State 对象中
解耦程度最高(微服务友好)中等(节点间通过 graph 连接)
可观察性/调试中等(需日志 + 追踪工具)极高(图形化 + checkpoint)
扩展性优秀(分布式天然支持)良好(需 Server 支持大规模)
容错与重试内置(死信、重试队列)需手动实现循环边或 checkpoint
开发复杂度中高(需管理队列)较低(代码定义图)
与 GraphRAG 集成好(Retriever Agent 调用 GraphRAG)极佳(Retriever 作为 Node 直接返回子图)
推荐场景生产级、高并发、跨服务复杂推理、原型、需要循环的 Agentic 流程
  • 混合使用建议(推荐生产实践):
    • 内部核心流程状态图(LangGraph) 实现精确控制和 GraphRAG Local/Global Search。
    • 跨服务或大规模并行消息队列 连接不同子系统(例如一个 LangGraph 工作流完成后将最终产物推送到 Kafka,触发下游微服务)。
    • 在 GraphRAG 场景中:Planner → Local/Global Search(State Graph 内)→ 分析验证(消息队列解耦)→ 合成。
    • 比如 LangGraph 底层用了类似订阅的消息传递机制 来实现超级步执行和状态更新,但它把这些“订阅细节”封装成了状态图(State Graph) 的编程范式,让开发者不用直接操作队列或 Topic,而是通过定义 State + Nodes + Edges 来构建工作流。这就是为什么 LangGraph 天然支持循环、分支、并行,同时保持状态一致性。
  • 实际落地提示
    • 明确格式 是关键:所有中间产物必须定义严格的 Pydantic Schema,并加入版本、timestamp、confidence 等元数据。
    • 错误处理:无论哪种方式,都需设计统一的 ErrorPayload 和重试机制。
← 返回 Notes

Contact

Contact Me

Leave a message here. The form sends directly from the browser to a form delivery service and then to my email.

Messages are delivered to lzx744008464@gmail.com.