数据流转
- Agent 间通过消息队列通信
- Agent 间通过状态图传递中间产物
通过消息队列
核心思想
Agent 之间完全解耦,采用异步、事件驱动的通信模式。每个 Agent 只监听特定队列(或 topic),处理完后将结构化消息推送到下一个队列。
- 每个 Agent 独立运行
- 不直接调用对方,通过消息队列发 / 收结构化消息
- 中间产物(上下文、结果、记忆、指令)统一格式(JSON/JSONL)
工程实现
- 消息队列系统:
- 轻量:Redis Pub/Sub、Redis List
- 中等:RabbitMQ、NSQ、Kafka(轻量版)
- 框架内置:LangChain MessageBus、AutoGPT 消息通道
- 协议层:可结合 A2A (Agent2Agent) Protocol(Google 提出的开放标准)或自定义 MCP(Model Context Protocol),定义消息格式。
- 中间产物格式:统一使用 Pydantic 模型 或 JSON Schema,例如:
class RetrievalResult(BaseModel):
query: str
subgraph: Dict # 或 GraphRAG 子图结构
entities: List[str]
text_units: List[str]
confidence: float
source: str
{
"task_id": "uuid",
"from_agent": "DataAgent",
"to_agent": "GraphRAgent",
"msg_type": "intermediate_output",
"content": {
"entities": [...],
"relations": [...],
"context": "..."
},
"status": "processing",
"timestamp": 1712345678
}
标准流程(SOP)示例
- Planner Agent(规划者)接收用户查询,生成结构化任务消息(TaskMessage),推送到 Research Queue。
- Retriever Agent(检索者)从 Research Queue 拉取任务,执行 GraphRAG 的 Local Search 或 Global Search,生成 RetrievalResult,推送到 Analyzer Queue。
- Analyzer Agent(分析者)处理 RetrievalResult,生成分析报告(AnalysisReport),推送到 Validator Queue。
- Validator Agent(验证者)检查质量,若通过则推送到 Synthesizer Queue,否则返回 Planner Queue 触发重试。
- Synthesizer Agent(合成者)生成最终答案并返回给用户。
总结
优点:
- 高可扩展性:可水平扩展 Agent 实例,支持高并发。
- 容错性强:消息持久化、重试、死信队列天然支持。
- 解耦彻底:不同 Agent 可使用不同技术栈,甚至不同语言。
- 适合分布式、生产级部署(微服务架构)。
缺点:
- 调试较复杂(需要追踪消息流)。
- 端到端延迟可能稍高(异步)。
- 需要额外管理队列配置和消息一致性。 适用场景:
- 大规模、多租户、生产环境。
- Agent 数量多、部署在不同机器/容器。
- 需要事件溯源(Event Sourcing)或监控每个中间产物的流转。 常见框架/库支持:
- LangChain + Redis Queue 自定义 LLM。
- AutoGen(支持消息队列模式)。
- LlamaIndex 的部分事件驱动实现。
- CrewAI + 外部消息 broker。
通过状态图/状态机
核心思想
用 有向图(Directed Graph) 来显式建模整个工作流。每个 Agent 是一个 Node(节点),中间产物保存在共享的 State(状态对象) 中,通过边(Edges)在节点间流转。典型代表是 LangGraph(LangChain 生态)。
- 全局维护一个状态机(State Machine)
- 每个 Agent 只负责一个状态节点
- 中间产物存在全局状态 / 共享内存中
- 状态流转驱动 Agent 执行
工程实现
- State:一个共享的 Pydantic/BaseModel 对象,累积所有中间产物。例如:
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages] # 对话历史
query: str
retrieval_result: Optional[RetrievalResult] # GraphRAG 子图等
analysis: Optional[AnalysisReport]
validation_score: Optional[float]
final_answer: Optional[str]
iteration: int # 支持循环重试
- Nodes:每个 Agent 执行函数,接收当前 State,返回更新后的 State(或 Command)。
- Edges:条件边(conditional edges)决定下一步流向(例如:如果 validation_score < 0.8,则返回重试节点)。
标准流程(SOP)示例
- 定义 StateGraph(AgentState)。
- 添加节点:
- planner_node:更新 query 和计划。
- retriever_node:执行 GraphRAG Local/Global Search,更新 retrieval_result。
- analyzer_node:基于 retrieval_result 更新 analysis。
- validator_node:计算分数并决定是否继续。
- synthesizer_node:生成最终答案。
- 设置条件边:例如 tools_condition 或自定义路由函数。
- 编译图:graph = workflow.compile(),然后 graph.invoke(initial_state) 或流式执行。
总结
优点:
- 可观察性极强:整个流程可视化(LangGraph Studio 支持图形界面调试)。
- 状态集中管理:所有中间产物都在一个 State 对象中,易于追踪、持久化(MemorySaver 支持 checkpoint)。
- 支持循环与分支:天然处理重试、反思(reflection)、多跳推理。
- 开发体验好:代码即工作流,调试时可单步执行。
- 与 GraphRAG 结合紧密:Retriever Node 可直接调用 Local Search / Global Search,返回结构化子图。 缺点:
- 单进程/单机时更高效,分布式部署需 LangGraph Server(增加复杂度)。
- 高度结构化,灵活性略低于纯消息队列(但可通过 Supervisor 节点弥补)。 适用场景:
- 原型开发、复杂推理流程、需要频繁迭代的 Agentic GraphRAG。
- 需要 Human-in-the-Loop(人工干预节点)。
- 团队内部开发与调试优先的场景。
- 与 GraphRAG 深度集成(Local/Global Search 作为工具节点)。 常见框架:
- LangGraph(最主流,支持 MessagesState、自定义 State、持久化、流式)。
- CrewAI(更高层抽象,但底层也可转为 graph)。
- AutoGen(早期更消息导向,后续也支持 graph-like 协调)。
总结
| 维度 | 消息队列(异步解耦) | 状态图(LangGraph 等) |
|---|---|---|
| 通信方式 | 异步消息推送/拉取 | 同步/异步状态传递(共享 State) |
| 中间产物传递 | 每条消息独立携带结构化 payload | 累积在共享 State 对象中 |
| 解耦程度 | 最高(微服务友好) | 中等(节点间通过 graph 连接) |
| 可观察性/调试 | 中等(需日志 + 追踪工具) | 极高(图形化 + checkpoint) |
| 扩展性 | 优秀(分布式天然支持) | 良好(需 Server 支持大规模) |
| 容错与重试 | 内置(死信、重试队列) | 需手动实现循环边或 checkpoint |
| 开发复杂度 | 中高(需管理队列) | 较低(代码定义图) |
| 与 GraphRAG 集成 | 好(Retriever Agent 调用 GraphRAG) | 极佳(Retriever 作为 Node 直接返回子图) |
| 推荐场景 | 生产级、高并发、跨服务 | 复杂推理、原型、需要循环的 Agentic 流程 |
- 混合使用建议(推荐生产实践):
- 内部核心流程 用 状态图(LangGraph) 实现精确控制和 GraphRAG Local/Global Search。
- 跨服务或大规模并行 用 消息队列 连接不同子系统(例如一个 LangGraph 工作流完成后将最终产物推送到 Kafka,触发下游微服务)。
- 在 GraphRAG 场景中:Planner → Local/Global Search(State Graph 内)→ 分析验证(消息队列解耦)→ 合成。
- 比如 LangGraph 底层用了类似订阅的消息传递机制 来实现超级步执行和状态更新,但它把这些“订阅细节”封装成了状态图(State Graph) 的编程范式,让开发者不用直接操作队列或 Topic,而是通过定义 State + Nodes + Edges 来构建工作流。这就是为什么 LangGraph 天然支持循环、分支、并行,同时保持状态一致性。
- 实际落地提示:
- 明确格式 是关键:所有中间产物必须定义严格的 Pydantic Schema,并加入版本、timestamp、confidence 等元数据。
- 错误处理:无论哪种方式,都需设计统一的 ErrorPayload 和重试机制。