找回密码
 立即注册
首页 业界区 业界 基于 LangGraph 的多 Agent 协作系统实战:从架构设计到 ...

基于 LangGraph 的多 Agent 协作系统实战:从架构设计到并行调度

章娅萝 7 小时前
项目地址:https://github.com/sslovett/planMultiAgent
技术栈:FastAPI / LangGraph / LangChain / React / TypeScript / Vite
关键词:LangGraph、多 Agent 协作、Send API 并行 fan-out、ReAct Agent、SSE 流式输出、Supervisor 路由
一、为什么需要多 Agent 协作?

当我们用大模型构建一个旅行助手时,用户的问题往往是复杂的:
"帮我查一下明天北京到上海的高铁,顺便看看上海天气"
这一句话里同时包含了班次查询天气查询两个独立任务。如果用单一 Agent 串行处理,要么让一个 Agent 承担所有能力(Prompt 膨胀、工具冲突),要么串行调用多个工具(延迟叠加)。
更合理的做法是:让专业的 Agent 做专业的事,让它们并行执行,最后汇总结果。
这正是 Supervisor + Worker Agent 并行协作模式 的核心思路。本项目基于 LangGraph 实现了这一架构,下面完整分享从设计到落地的全过程。
二、系统功能一览

2.1 核心功能

功能模块Agent描述示例问句班次查询ticket_search查询火车、高铁、航班班次信息"查一下明天北京到上海的高铁"天气查询weather_search查询城市实时天气"上海今天天气怎么样"政策咨询kb_search退票改签规则、票价政策等知识库 RAG"高铁改签要收手续费吗"闲聊chitchat自然语言闲聊、引导使用"你好,你能帮我做什么"多任务并行多 Agent 协作自动识别多意图,Send API 并行执行"查北京到上海的班次,顺便看看上海天气"2.2 思考链可视化

系统的一大特色是终端风格的思考过程实时展示。前端以类似日志终端的 UI 实时渲染每一步思考,包括 Agent 名称标签、操作类型标识、流式光标动画等。用户能清楚看到:
  1. 01:032  分析  规划    分析用户意图...
  2. 02:156  完成  规划    识别到班次查询+天气查询需求
  3. 03:201  规划  规划    并行执行 2 个任务:智慧出行, 天气
  4. 04:350  执行  出行    智慧出行处理中...
  5. 05:355  执行  天气    天气处理中...
  6. 06:892  完成  出行    智慧出行完成
  7. 07:910  完成  天气    天气完成
  8. 08:920  执行  汇总    汇总 2 个任务结果
复制代码
这种设计参考了 Claude 的 Thinking 展示方式——用户不仅看到最终结果,还能看到系统"是怎么想的"。
1.png

三、架构设计

3.1 整体数据流

2.png
  1. 用户输入
  2.   │
  3.   ▼
  4. ┌──────────────────────────────────────────────────┐
  5. │                FastAPI SSE 接口                    │
  6. │           POST /api/v1/chat/stream                │
  7. │           stream_mode = "custom"                  │
  8. └──────────────────────┬───────────────────────────┘
  9.                        │
  10.                        ▼
  11. ┌──────────────────────────────────────────────────┐
  12. │              LangGraph StateGraph                  │
  13. │                                                    │
  14. │   ┌────────────────────────────────────────────┐ │
  15. │   │            Supervisor Node                   │ │
  16. │   │                                              │ │
  17. │   │   第一层:关键词快速匹配(O(n),无 LLM)      │ │
  18. │   │   第二层:LLM 精确判断(仅必要时触发)        │ │
  19. │   │                                              │ │
  20. │   │   输出:next_agents = ["agent1", "agent2"]   │ │
  21. │   └──────┬──────────────────────────┬────────────┘ │
  22. │          │                          │              │
  23. │    len == 1                    len > 1             │
  24. │   单任务直连               Send API 并行           │
  25. │          │                 ┌────┬────┐             │
  26. │          ▼                 ▼    ▼    ▼             │
  27. │    ┌──────────┐     ┌─────┐ ┌─────┐ ┌─────┐      │
  28. │    │  Agent   │     │Agent│ │Agent│ │Agent│       │
  29. │    │  (ReAct  │     │  A  │ │  B  │ │  C  │       │
  30. │    │  子图)   │     └──┬──┘ └──┬──┘ └──┬──┘       │
  31. │    └────┬─────┘        │       │       │           │
  32. │         │              └───────┼───────┘           │
  33. │         │              Reducer fan-in 聚合          │
  34. │         │                      │                   │
  35. │         ▼                      ▼                   │
  36. │   ┌────────────────────────────────────────────┐  │
  37. │   │           Synthesizer Node                   │  │
  38. │   │                                              │  │
  39. │   │   1 个结果 → 透传(流式已由 Worker 完成)     │  │
  40. │   │   N 个结果 → LLM 流式汇总                    │  │
  41. │   └──────────────────────────────────────────────┘  │
  42. └──────────────────────────────────────────────────┘
  43.                        │
  44.                        ▼
  45.               SSE 事件流 → 前端渲染
复制代码
3.2 三层节点职责

节点职责输入输出Supervisor意图识别、任务分解、路由决策用户消息next_agents 列表Worker Agent执行具体任务(ReAct 子图)用户消息 + 工具agent_outputs 结果Synthesizer结果聚合与流式输出所有 agent_outputsfinal_response3.3 项目目录结构
  1. planMultiAgent/
  2. ├── backend/
  3. │   └── app/
  4. │       ├── agents/
  5. │       │   ├── nodes/
  6. │       │   │   ├── supervisor.py          # Supervisor 节点:双层路由
  7. │       │   │   └── synthesizer.py         # Synthesizer 节点:多结果汇总
  8. │       │   └── tools/
  9. │       │       ├── registry.py            # ToolRegistry 工具注册中心
  10. │       │       ├── __init__.py            # 工具自动注册入口
  11. │       │       ├── ticket_tool.py         # 班次查询工具
  12. │       │       ├── weather_tool.py        # 天气查询工具
  13. │       │       ├── kb_tool.py             # 知识库检索工具
  14. │       │       └── chitchat_tool.py       # 闲聊工具
  15. │       ├── graph/
  16. │       │   ├── state.py                   # AgentState(Annotated reducer)
  17. │       │   ├── agents.py                  # create_react_agent 工厂
  18. │       │   └── langgraph_workflow.py      # 工作流编译、路由函数
  19. │       ├── utils/
  20. │       │   ├── message_helpers.py         # get_writer()、SSE 构造、3.10 兼容
  21. │       │   ├── llm_helper.py             # call_llm / call_llm_stream 统一接口
  22. │       │   └── llm_client.py             # OpenAI 兼容客户端
  23. │       ├── services/                      # 外部服务调用层
  24. │       ├── vectors/                       # 向量数据库适配层(Qdrant/ES)
  25. │       ├── api/
  26. │       │   └── routes.py                 # SSE 流式接口
  27. │       ├── config.py                      # pydantic-settings 配置
  28. │       └── main.py                        # FastAPI 入口 + lifespan
  29. ├── frontend/
  30. │   └── src/
  31. │       ├── components/
  32. │       │   ├── ChatInterface.tsx          # 主界面
  33. │       │   ├── ThinkingChain.tsx          # 终端风格思考链组件
  34. │       │   ├── MessageList.tsx            # 消息列表
  35. │       │   ├── MessageItem.tsx            # 消息气泡
  36. │       │   └── InputBox.tsx               # 输入框
  37. │       ├── api/chat.ts                    # SSE 流解析 + 回调分发
  38. │       └── store/chatStore.ts             # Zustand 状态管理
  39. └── README.md
复制代码
四、核心技术深度解析

4.1 AgentState:自定义 Reducer 解决并行 fan-in 难题

多 Agent 并行执行时最棘手的问题是:多个节点同时写同一个 state 字段,如何避免覆盖?
LangGraph 的解决方案是 Annotated + Reducer 函数。本项目为 agent_outputs 字段设计了一个自定义 Reducer:
  1. # backend/app/graph/state.py
  2. def _reset_on_empty(existing: list, new: list) -> list:
  3.     """
  4.     自定义列表 Reducer:空列表重置,非空追加
  5.     - Supervisor 传入 [] → 清除上一轮旧结果(每轮重置)
  6.     - Agent 传入 [{...}] → 追加到列表(并行 fan-in 安全)
  7.     """
  8.     if not new:
  9.         return []
  10.     return existing + new
  11. class AgentState(TypedDict):
  12.     # LangGraph 原生 reducer,自动管理多轮对话历史
  13.     messages: Annotated[Sequence[BaseMessage], add_messages]
  14.     # 自定义 reducer,支持并行安全聚合 + 每轮自动重置
  15.     agent_outputs: Annotated[list, _reset_on_empty]
  16.     # Supervisor 输出的路由信息
  17.     next_agents: List[str]
  18.     collaboration_mode: str    # "single" | "parallel"
  19.     # 各 Agent 业务上下文(Supervisor 动态填充)
  20.     ticket_context: dict
  21.     weather_context: dict
  22.     consultation_context: dict
  23.     # 最终输出
  24.     final_response: str
复制代码
Supervisor 每轮开始时返回 agent_outputs: [],Reducer 检测到空列表就清除旧数据;Worker Agent 各自返回 [{...}],Reducer 执行 existing + new 追加合并。LangGraph 保证 Reducer 操作的原子性,不存在并发写冲突。
这比手动加锁或用队列收集结果要优雅得多——利用框架的 Reducer 机制,把并发问题消化在状态定义层
4.2 Supervisor 双层路由:关键词快速通道 + LLM 精确补充

路由策略直接影响系统的响应速度意图识别准确率
纯 LLM 路由准确但慢(每次都要调用 LLM),纯关键词路由快但死板(语义模糊时容易误判)。项目采用了"两层漏斗"设计:
  1. # backend/app/agents/nodes/supervisor.py
  2. def _analyze_task(query: str, messages: list, state: AgentState) -> dict:
  3.     """
  4.     双层路由策略:
  5.     第一层:关键词快速匹配(O(n),零 LLM 开销)
  6.     第二层:LLM 精确判断(仅在多工具冲突或无匹配时触发)
  7.     """
  8.     registry = get_tool_registry()
  9.     all_tools = registry.get_all()
  10.     # ====== 第一层:关键词匹配 ======
  11.     matched_tools = registry.match_by_keywords(query)
  12.     if matched_tools:
  13.         # 唯一匹配 → 直接选定
  14.         if len(matched_tools) == 1:
  15.             tool_name = matched_tools[0][0]
  16.             return {"mode": "single", "subtasks": [{"agent": tool_name, ...}]}
  17.         # 最高优先级明显领先(priority 数值越小越高)
  18.         if matched_tools[0][1] < matched_tools[1][1]:
  19.             return {"mode": "single", "subtasks": [...]}
  20.         # 多工具同优先级 → 需要 LLM 判断是否并行
  21.         return _analyze_with_llm(query, messages, all_tools,
  22.                                  matched_tools=same_priority_tools)
  23.     # ====== 第二层:LLM 分析 ======
  24.     return _analyze_with_llm(query, messages, all_tools)
复制代码
性能对比
场景路由方式额外延迟示例"查北京到上海高铁"关键词直连0ms命中"高铁"关键词 → ticket_search"上海天气"关键词直连0ms命中"天气"关键词 → weather_search"查个班次顺便看天气"LLM 判断并行~500ms同优先级冲突 → LLM 决定 parallel"这个可以退吗"LLM 兜底分析~500ms无关键词命中 → LLM 分析意图80% 以上的日常请求都能通过第一层关键词匹配直接路由,省去了不必要的 LLM 调用
4.3 Send API:真正的并行 fan-out

这是本项目的核心亮点。LangGraph 的 Send API 可以在条件路由中返回 Send 对象列表,让多个节点真正并发执行(而非串行调度)。
  1. # backend/app/graph/langgraph_workflow.py
  2. from langgraph.types import Send
  3. from langgraph.graph import StateGraph, END
  4. def route_after_supervisor(state: AgentState):
  5.     """Supervisor → Agent 的路由函数"""
  6.     next_agents = state.get("next_agents", [])
  7.     if not next_agents:
  8.         return END                    # 无任务 → 结束
  9.     if len(next_agents) == 1:
  10.         return next_agents[0]         # 单任务 → 直接路由
  11.     # 多任务 → Send API 并行 fan-out
  12.     return [Send(agent, state) for agent in next_agents]
复制代码
工作流编译——把所有节点和边组装起来:
  1. def create_workflow() -> StateGraph:
  2.     agents = create_worker_agents()  # 从 ToolRegistry 动态创建所有 ReAct Agent
  3.     workflow = StateGraph(AgentState)
  4.     # 三类节点
  5.     workflow.add_node("supervisor", supervisor_node)
  6.     for name, compiled_agent in agents.items():
  7.         workflow.add_node(name, _make_agent_node(name, compiled_agent))
  8.     workflow.add_node("synthesizer", synthesizer_node)
  9.     # 边
  10.     workflow.set_entry_point("supervisor")
  11.     route_map = {name: name for name in agents.keys()}
  12.     route_map[END] = END
  13.     workflow.add_conditional_edges("supervisor", route_after_supervisor, route_map)
  14.     for name in agents.keys():
  15.         workflow.add_edge(name, "synthesizer")  # 所有 Agent → Synthesizer
  16.     workflow.add_edge("synthesizer", END)
  17.     return workflow
  18. def compile_workflow():
  19.     """编译 + MemorySaver checkpointer(会话持久化)"""
  20.     workflow = create_workflow()
  21.     checkpointer = MemorySaver()
  22.     return workflow.compile(checkpointer=checkpointer)
复制代码
编译后的工作流图结构如下:
  1. START → supervisor → [ticket_search, weather_search, kb_search, chitchat] → synthesizer → END
  2.             │                        ↑ Send 并行 fan-out
  3.             │                        ↓ Reducer fan-in 聚合
  4.             └── END(无任务时直接结束)
复制代码
4.4 Worker Agent:ReAct 子图 + 流式双模态

每个 Worker Agent 都是一个 create_react_agent 编译出的独立子图,内置 Reason → Act → Observe 循环。Agent 包装节点根据是否为单 Agent 场景,选择不同的输出策略:
  1. # backend/app/graph/langgraph_workflow.py
  2. def _make_agent_node(agent_name: str, compiled_agent):
  3.     """创建 Agent 包装节点"""
  4.     async def node_fn(state: AgentState, config: RunnableConfig) -> dict:
  5.         writer = get_writer(config)
  6.         is_single_agent = len(state.get("next_agents", [])) == 1
  7.         if is_single_agent:
  8.             # 单 Agent:astream_events 捕获子图内部 LLM token,逐个推送到前端
  9.             response_text = await _stream_agent(compiled_agent, state, writer, ...)
  10.         else:
  11.             # 多 Agent:ainvoke 收集完整结果,交给 Synthesizer 统一汇总
  12.             result = await compiled_agent.ainvoke({"messages": state["messages"]})
  13.             response_text = _extract_response(result)
  14.         return {
  15.             "messages": [AIMessage(content=response_text, name=agent_name)],
  16.             "agent_outputs": [{
  17.                 "agent_name": agent_name,
  18.                 "success": True,
  19.                 "response": response_text,
  20.             }],
  21.         }
  22.     return node_fn
复制代码
为什么要区分单/多 Agent 输出策略?

  • 单 Agent:用户只等一个任务,应该尽快看到第一个 token → 用 astream_events 逐 token 流式推送。
  • 多 Agent:多个任务并行执行,每个结果都只是片段 → 用 ainvoke 收集完整结果,由 Synthesizer 统一汇总后再流式输出,避免多路结果交错。
流式输出的关键实现——从 ReAct 子图中捕获 LLM 的 token 流:
  1. async def _stream_agent(compiled_agent, state, writer, agent_name, display_name):
  2.     """通过 astream_events 捕获 ReAct 子图内部的 LLM token"""
  3.     response_text = ""
  4.     async for event in compiled_agent.astream_events(
  5.         {"messages": state["messages"]}, version="v2"
  6.     ):
  7.         if event["event"] == "on_chat_model_stream":
  8.             chunk = event["data"]["chunk"]
  9.             content = chunk.content or ""
  10.             # 关键:过滤掉工具调用 token,只推送最终回复内容
  11.             if content and not getattr(chunk, "tool_call_chunks", []):
  12.                 response_text += content
  13.                 writer({"type": "content_stream", "block": {
  14.                     "delta": content,
  15.                     "accumulated": response_text,
  16.                 }})
  17.     return response_text
复制代码
这里有一个容易踩的坑:on_chat_model_stream 事件不仅包含最终回复的 token,也包含 tool call 的参数 token。如果不过滤 tool_call_chunks,前端会收到一堆乱码 JSON。
4.5 Synthesizer:智能汇总策略

Synthesizer 根据结果数量采取不同策略:
  1. # backend/app/agents/nodes/synthesizer.py
  2. async def synthesizer_node(state: AgentState, config: RunnableConfig) -> dict:
  3.     writer = get_writer(config)
  4.     agent_outputs = state.get("agent_outputs", [])
  5.     if not agent_outputs:
  6.         return {}
  7.     # 单结果 → 直接透传(Agent wrapper 已完成流式推送)
  8.     if len(agent_outputs) == 1:
  9.         return {"final_response": agent_outputs[0]["response"]}
  10.     # 多结果 → LLM 流式汇总
  11.     prompt = f"""汇总以下 {len(agent_outputs)} 个 Agent 的执行结果..."""
  12.     response = ""
  13.     async for token in call_llm_stream(prompt=prompt, ...):
  14.         response += token
  15.         writer({"type": "content_stream", "block": {
  16.             "delta": token,
  17.             "accumulated": response,
  18.         }})
  19.     return {
  20.         "messages": [AIMessage(content=response)],
  21.         "final_response": response,
  22.     }
复制代码
当 Synthesizer 遇到 LLM 调用失败时,会自动降级为简单拼接模式——把各 Agent 结果按顺序拼接后一次性推送,确保用户始终能得到响应。
4.6 ToolRegistry:插件化的工具注册中心

所有工具的元数据(名称、关键词、优先级、上下文映射)统一由 ToolRegistry 管理。工作流代码不硬编码任何工具名称,完全通过注册中心动态发现。
  1. # backend/app/agents/tools/registry.py
  2. class ToolConfig(BaseModel):
  3.     name: str                           # 技术名称
  4.     display_name: str                   # 前端显示名称
  5.     description: str                    # 工具描述
  6.     keywords: List[str]                 # 关键词列表(第一层路由用)
  7.     priority: int = 3                   # 优先级(1=最高)
  8.     tool_class: Type[BaseTool]          # LangChain 工具类
  9.     required_params: List[str] = []     # 必需参数列表
  10.     context_field: Optional[str]        # 对应 AgentState 中的上下文字段
  11.     context_description: Optional[str]  # 上下文描述模板
  12. class ToolRegistry:
  13.     def register(self, name, display_name, keywords, tool_class, priority, ...): ...
  14.     def match_by_keywords(self, query) -> List[tuple[str, int]]: ...
  15.     def get_tool_instance(self, name) -> BaseTool: ...
复制代码
注册示例
  1. # backend/app/agents/tools/__init__.py
  2. registry.register(
  3.     name="ticket_search",
  4.     display_name="智慧出行",
  5.     description="班次查询:查询车次、班次、出行计划",
  6.     keywords=["火车", "班次", "车票", "高铁", "飞机", "航班", "买票", "订票"],
  7.     tool_class=TicketSearchTool,
  8.     priority=2,
  9.     requires_params=True,
  10.     required_params=["departure", "destination"],
  11.     context_field="ticket_context",
  12.     context_description="用户查询了从 {departure} 到 {destination} 的班次"
  13. )
复制代码
context_field 和 context_description 的设计使得 Supervisor 能自动将解析出的参数填入 AgentState 对应的上下文字段,供 LLM 在后续对话中引用(例如"目的地天气"自动关联到之前查过的目的地城市)。
4.7 SSE 流式协议设计

后端:stream_mode="custom" 统一推送

LangGraph 提供了多种 stream_mode(values、updates、messages、custom)。本项目选择 custom 模式:所有事件通过 get_stream_writer() 手动推送,精确控制每一个 SSE 事件的时机和内容。
  1. # backend/app/api/routes.py
  2. @router.post("/chat/stream")
  3. async def chat_stream(request_body: ChatRequest, request: Request):
  4.     workflow = get_workflow()
  5.     session_id = request_body.session_id or str(uuid.uuid4())
  6.     config = {"configurable": {"thread_id": session_id}}
  7.     async def stream_output():
  8.         yield sse_event("session_id", {"session_id": session_id})
  9.         yield sse_event("thinking_start")
  10.         async for payload in workflow.astream(
  11.             input_state, config=config, stream_mode="custom"
  12.         ):
  13.             event_type = payload.get("type", "")
  14.             if event_type == "thinking_stream":
  15.                 yield sse_event("thinking_stream", payload)
  16.             elif event_type == "content_stream":
  17.                 yield sse_event("content_stream", {
  18.                     "delta": payload["block"]["delta"],
  19.                     "accumulated": payload["block"]["accumulated"],
  20.                 })
  21.         yield sse_event("thinking_end")
  22.         yield sse_event("response_end", {"content": accumulated_content})
  23.     return StreamingResponse(stream_output(), media_type="text/event-stream")
复制代码
SSE 事件协议

事件类型触发时机关键字段session_id请求开始session_idthinking_start思考链启动—thinking_stream每个思考步骤content, agent, agent_display, is_streamingcontent_stream回复 tokendelta(增量), accumulated(累积)thinking_end思考链结束—response_end完整响应结束content, needs_input前端:SSE 流解析与回调分发

前端通过 fetch + ReadableStream 解析 SSE 事件流,按事件类型分发到不同回调:
  1. // frontend/src/api/chat.ts
  2. export const chatApi = {
  3.   sendMessageStream: async (request: ChatRequest, callbacks: StreamCallbacks) => {
  4.     const response = await fetch(`${API_BASE_URL}/chat/stream`, {
  5.       method: 'POST',
  6.       headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' },
  7.       body: JSON.stringify(request),
  8.     });
  9.     const reader = response.body?.getReader();
  10.     // 解析 SSE 事件 → 按 type 分发到 onThinking / onContent / onResponseEnd
  11.     await parseSSEReader(reader, callbacks);
  12.   },
  13. };
复制代码
前端 ThinkingChain 组件以终端日志风格渲染思考过程,每个思考块包含:

  • 时间戳(01:032 格式)
  • 操作类型标签(分析 / 搜索 / 执行 / 完成)
  • Agent 标签(彩色标识不同 Agent)
  • 流式光标(正在思考时闪烁)
4.8 Python 3.10 contextvars 兼容方案

这是开发过程中耗时最长的一个技术难题。
问题:LangGraph 的 get_stream_writer() 依赖 Python 的 contextvars 机制。然而 Python < 3.11 的 asyncio 在 create_task() 和 to_thread() 时不会自动传播 context,导致节点函数中调用 get_stream_writer() 时抛出 RuntimeError。
表现:Supervisor 的思考链推送正常,但 Worker Agent 的思考链全部丢失。
根因:LangGraph 内部通过 var_child_runnable_config 这个 ContextVar 传递运行时配置。Python 3.10 切换协程时不复制 context,子协程中这个变量为空。
解决方案——封装 get_writer() 工具函数,在获取 writer 前手动注入 config:
  1. # backend/app/utils/message_helpers.py
  2. def ensure_config_context(config: RunnableConfig) -> None:
  3.     """手动将 config 注入 contextvar,兼容 Python 3.10"""
  4.     from langchain_core.runnables.config import var_child_runnable_config
  5.     if config and not var_child_runnable_config.get(None):
  6.         var_child_runnable_config.set(config)
  7. def get_writer(config: RunnableConfig = None) -> Callable:
  8.     """获取 stream writer(兼容 Python 3.10+)"""
  9.     if config:
  10.         ensure_config_context(config)
  11.     try:
  12.         from langgraph.config import get_stream_writer
  13.         return get_stream_writer()
  14.     except RuntimeError:
  15.         return lambda x: None  # 降级为空操作
复制代码
使用规范:项目中所有需要推送 SSE 事件的节点函数,签名统一为 (state: AgentState, config: RunnableConfig),函数入口第一行调用 get_writer(config)。
4.9 LLM 统一调用层

项目封装了统一的 LLM 调用接口,支持同步调用和异步流式调用,统一处理日志、重试、JSON 解析等横切关注点:
  1. # backend/app/utils/llm_helper.py
  2. def call_llm(prompt, model=None, temperature=0.3,
  3.              parse_json=False, default_response=None) -> Any:
  4.     """同步 LLM 调用(Supervisor 路由分析用)"""
  5.     client = get_llm_client()
  6.     response = client.chat.completions.create(model=model, messages=[...])
  7.     content = response.choices[0].message.content.strip()
  8.     if parse_json:
  9.         return parse_llm_json(content, default_response)
  10.     return content
  11. async def call_llm_stream(prompt, model=None, temperature=0.3) -> AsyncGenerator:
  12.     """异步流式 LLM 调用(Synthesizer 汇总用)"""
  13.     client = get_async_llm_client()
  14.     stream = await client.chat.completions.create(model=model, messages=[...], stream=True)
  15.     async for chunk in stream:
  16.         if chunk.choices[0].delta.content:
  17.             yield chunk.choices[0].delta.content
复制代码
两个接口使用 OpenAI 兼容格式,可以对接任何兼容 OpenAI API 的模型服务(通义千问、DeepSeek、本地 Ollama 等)。
4.10 会话管理:零代码持久化

传统做法需要手写 SessionManager,维护一个 Dict[session_id, List[Message]] 的映射。本项目直接利用 LangGraph 的 MemorySaver checkpointer + thread_id 配置:
  1. # 编译时注入 checkpointer
  2. checkpointer = MemorySaver()
  3. compiled = workflow.compile(checkpointer=checkpointer)
  4. # 运行时通过 thread_id 区分会话
  5. config = {"configurable": {"thread_id": session_id}}
  6. async for payload in workflow.astream(input_state, config=config, stream_mode="custom"):
  7.     ...
复制代码
MemorySaver 自动持久化 AgentState 中所有带 Reducer 的字段(messages 通过 add_messages 自动追加对话历史),下次同一个 thread_id 的请求会自动加载历史消息——零代码实现多轮对话
五、如何扩展新 Agent

添加新 Agent 只需三步,不改动任何工作流代码
第一步:创建 LangChain Tool
  1. # backend/app/agents/tools/hotel_tool.py
  2. from langchain.tools import BaseTool
  3. from pydantic import BaseModel, Field
  4. class HotelSearchInput(BaseModel):
  5.     city: str = Field(description="城市名称")
  6.     checkin: str = Field(description="入住日期,格式 YYYY-MM-DD")
  7. class HotelSearchTool(BaseTool):
  8.     name: str = "hotel_search"
  9.     description: str = "酒店查询工具,查询城市酒店信息"
  10.     args_schema: type[BaseModel] = HotelSearchInput
  11.     def _run(self, city: str, checkin: str) -> str:
  12.         # 调用酒店 API
  13.         return f"{city} 的酒店信息..."
  14.     async def _arun(self, city: str, checkin: str) -> str:
  15.         return self._run(city, checkin)
复制代码
第二步:注册到 ToolRegistry
  1. # backend/app/agents/tools/__init__.py
  2. from .hotel_tool import HotelSearchTool
  3. registry.register(
  4.     name="hotel_search",
  5.     display_name="酒店",
  6.     description="酒店查询:搜索酒店、住宿信息",
  7.     keywords=["酒店", "住宿", "宾馆", "入住", "房间"],
  8.     tool_class=HotelSearchTool,
  9.     priority=2,
  10.     requires_params=True,
  11.     required_params=["city", "checkin"],
  12. )
复制代码
第三步:添加系统提示词
  1. # backend/app/graph/agents.py
  2. AGENT_PROMPTS["hotel_search"] = (
  3.     "你是酒店查询助手,帮用户搜索和推荐酒店。"
  4.     "请根据用户需求调用工具查询,用友好的中文回复。"
  5. )
复制代码
重启服务,create_worker_agents() 自动从注册中心创建新 Agent 并加入工作流图。Supervisor 会根据"酒店"等关键词自动将相关请求路由到新 Agent。
六、踩坑实录

坑 1:并行 Agent 的 agent_outputs 被覆盖

现象:两个 Agent 并行执行时,agent_outputs 只有后完成那个 Agent 的结果。
原因:state 字段默认行为是覆盖写入,不是追加。
解决:使用 Annotated[list, _reset_on_empty] 自定义 Reducer,将覆盖语义改为追加语义。LangGraph 的 Reducer 是并行安全的。
坑 2:astream_events 输出乱码 JSON

现象:前端 content_stream 收到类似 {"departure": "北京", "destin 的碎片文本。
原因:on_chat_model_stream 事件包含工具调用参数的 token(tool_call_chunks),和最终回复 token 混在一起。
解决:过滤 tool_call_chunks:
  1. if content and not getattr(chunk, "tool_call_chunks", []):
  2.     writer({"type": "content_stream", ...})
复制代码
坑 3:Python 3.10 下 get_stream_writer() 失效

现象:Supervisor 的思考链正常推送,Worker Agent 节点的思考链全部丢失。
原因:Python 3.10 的 asyncio 不传播 contextvars。
解决:封装 get_writer(config) 函数,手动注入 var_child_runnable_config(详见 4.8 节)。
坑 4:MemorySaver 导致 agent_outputs 跨轮污染

现象:第二轮对话的响应中混入了第一轮的 Agent 结果。
原因:MemorySaver 持久化整个 state,agent_outputs 在第二轮开始时仍保留第一轮的值。
解决:Supervisor 每轮返回 agent_outputs: [],触发 _reset_on_empty Reducer 清空旧数据。
坑 5:stream_mode="messages" 消息重复

现象:使用 stream_mode="messages" 时,同一条 AI 消息被推送了多次。
原因:messages 模式会输出每个节点的 state 变化,多个 Agent 节点都写入了 messages 字段。
解决:改用 stream_mode="custom",所有 SSE 事件通过 get_stream_writer() 手动推送,精确控制输出内容。
七、技术选型

技术版本要求角色选型理由LangGraph≥1.0.0Agent 工作流编排原生 Send API 并行、Annotated Reducer、MemorySaver、create_react_agentLangChain≥0.3.0工具层抽象BaseTool 体系成熟,与 LangGraph 无缝集成FastAPI≥0.109.0Web 框架原生异步、SSE StreamingResponse、Pydantic 校验React + ViteReact 18+前端快速开发、TypeScript 支持Pydantic v2≥2.5.0配置管理pydantic-settings 统一管理 .env 配置通义千问qwen-maxLLMOpenAI 兼容接口、中文能力强、性价比高
注:LLM 层使用 OpenAI 兼容接口,可替换为 DeepSeek、GPT-4o、本地 Ollama 等任何兼容服务。
八、快速上手

环境要求


  • Python 3.10+
  • Node.js 18+
克隆与配置
  1. git clone https://github.com/sslovett/planMultiAgent
  2. cd planMultiAgent
复制代码
在 backend/ 目录下创建 .env 文件:
  1. # LLM 配置(支持 OpenAI 兼容接口)
  2. LLM_API_KEY=sk-xxxxxxxxxxxxxxxx
  3. LLM_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1
  4. LLM_MODEL=qwen-max
  5. LLM_TEMPERATURE=0.3
  6. # 服务端口
  7. API_PORT=8002
  8. # 外部服务(可选)
  9. TICKET_API_URL=http://localhost:9001
  10. # Qdrant知识库
  11. QDRANT_URL=http://localhost:6333
  12. QDRANT_COLLECTION=travel_kb
复制代码
启动
  1. # 后端
  2. cd backend
  3. pip install -r requirements.txt
  4. python -m uvicorn app.main:app --reload --port 8002
  5. # 前端(新终端)
  6. cd frontend
  7. npm install
  8. npm run dev
复制代码
访问 http://localhost:5173,开始对话。
API 文档:http://localhost:8002/docs
九、总结

回顾整个项目的设计,核心是四个原则:

  • 职责单一:Supervisor 只路由,Worker 只执行,Synthesizer 只汇总。每个节点的代码量控制在 100 行以内。
  • 并行优先:通过 LangGraph Send API 实现真正的并行 fan-out,对比串行执行,两个 Agent 并行时总延迟从 T1 + T2 降低到 max(T1, T2)。
  • 插件化扩展:ToolRegistry 将工具元数据与工作流解耦。添加新 Agent 只需创建工具类 + 注册 + 写 Prompt,不改动任何工作流代码。
  • 流式体验:stream_mode="custom" + get_stream_writer() 实现思考链和回复内容的实时推送,用户从发送消息到看到第一个 token 的延迟控制在 1-2 秒。
如果你也在探索多 Agent 协作的架构设计,希望这篇文章能提供一些有价值的参考。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册