找回密码
 立即注册
首页 业界区 业界 langchain 快速入门(六):实现多agent协作

langchain 快速入门(六):实现多agent协作

搁胱 6 天前
简介

多Agent协作能够将一个复杂的任务拆解成一个个子任务给专门的agent,能够解决复杂问题,实现复杂的ai工作流。
多Agent协作

不同的Agent,有不同的能力,我们可能会有各种实际需求,例如:实时识别车牌位置(Yolo)->识别车牌内容(qwen-vl)-> LLM管理记录车牌信息。通过多Agent协作的工作流,能够实现拍照答题,自动剪辑,ppt生成等一系列复杂问题。
下面用一个简单的案例,来说明。
简单的多Agent协作

示例

需求:查一下阿里、腾讯、百度的PE,并计算平均值。
  1. import os
  2. import operator
  3. from pydantic import BaseModel, Field
  4. from langchain_community.chat_models.tongyi import ChatTongyi
  5. from langchain_core.tools import tool
  6. from langchain_core.messages import HumanMessage, BaseMessage, ToolMessage, SystemMessage
  7. from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
  8. from typing import Annotated, List, Literal, TypedDict
  9. from langgraph.graph import StateGraph, END
  10. os.environ["DASHSCOPE_API_KEY"] = "apikey"
  11. llm = ChatTongyi(model="qwen-plus")
  12. @tool
  13. def web_search(query: str):
  14.     """用于查找最新的股票数据、公司财报信息。"""
  15.     results = []
  16.     if "阿里" in query: results.append("阿里巴巴(BABA) PE: 15.5")
  17.     if "腾讯" in query: results.append("腾讯控股(0700) PE: 18.2")
  18.     if "百度" in query: results.append("百度(BIDU) PE: 11.8")
  19.    
  20.     if not results:
  21.         return "未找到数据"
  22.     return " ; ".join(results)
  23. @tool
  24. def python_calculator(code: str):
  25.     """用于计算。输入必须是 python 代码。"""
  26.     try:
  27.         result = eval(code)
  28.         return f"计算结果: {result}"
  29.     except Exception as e:
  30.         return f"计算错误: {e}"
  31. def create_agent(state: dict, llm, tools, system_prompt):
  32.     llm_tools = llm.bind_tools(tools)
  33.    
  34.     prompt = [SystemMessage(content=system_prompt)] + state["messages"]
  35.     response = llm_tools.invoke(prompt)
  36.     results = [response]
  37.     for tool_call in response.tool_calls:
  38.         func_name = tool_call["name"]
  39.         args = tool_call["args"]
  40.         call_id = tool_call["id"]
  41.         
  42.         func = next((t for t in tools if t.name == func_name), None)
  43.         if func:
  44.             tool_output = func.invoke(args)
  45.             tool_msg = ToolMessage(
  46.                 content=str(tool_output),
  47.                 name=func_name,
  48.                 tool_call_id=call_id
  49.             )
  50.             results.append(tool_msg)
  51.     return {"messages": results}
  52. class State(TypedDict):
  53.     messages: Annotated[List[BaseMessage], operator.add]
  54.     next: str
  55. def researcher_node(state):
  56.     return create_agent(
  57.         state=state,
  58.         llm=llm,
  59.         tools=[web_search],
  60.         system_prompt="你是一个研究员。只负责查数据,找到数据后直接输出原话,不需要计算。"
  61.     )
  62. def coder_node(state):
  63.     return create_agent(
  64.         state=state,
  65.         llm=llm,
  66.         tools=[python_calculator],
  67.         system_prompt="你是一个程序员。根据上面研究员查到的数据,写代码计算平均值。"
  68.     )
  69. def finish_node(state):
  70.     return create_agent(
  71.         state=state,
  72.         llm=llm,
  73.         tools=[],
  74.         system_prompt="任务完成,简短的总结最终结果。"
  75.     )
  76. def supervisor_node(state):
  77.     system_prompt = (
  78.         "你是项目经理。根据对话历史决定下一步交给谁。"
  79.         "查数据找 Researcher,计算找 Coder,识别图片找 Photographer。"
  80.         "如果任务完成,必须选择 FINISH。"
  81.     )
  82.     prompt = ChatPromptTemplate.from_messages([
  83.         ("system", system_prompt),
  84.         MessagesPlaceholder(variable_name="messages"),
  85.         ("system", "根据以上情况,请做出选择。"),
  86.     ])
  87.     class RouteResponse(BaseModel):
  88.         next: Literal["Researcher", "Coder", "FINISH"] = Field(
  89.             ...,
  90.             description="下一步交给谁?如果任务完成请选 FINISH"
  91.         )
  92.     chain = prompt | llm.with_structured_output(RouteResponse)
  93.     response = chain.invoke(state)
  94.     return {"next": response.next}
  95. def init_agent():
  96.     workflow = StateGraph(State)
  97.     workflow.add_node("Researcher", researcher_node)
  98.     workflow.add_node("Coder", coder_node)
  99.     workflow.add_node("Supervisor", supervisor_node)
  100.     workflow.add_node("Finish", finish_node)
  101.     workflow.add_edge("Researcher", "Supervisor")
  102.     workflow.add_edge("Coder", "Supervisor")
  103.     workflow.add_edge("Finish", END)
  104.     workflow.add_conditional_edges(
  105.         "Supervisor",
  106.         lambda state: state["next"],{
  107.         "Researcher": "Researcher",
  108.         "Coder": "Coder",
  109.         "FINISH": "Finish",
  110.     })
  111.     workflow.set_entry_point("Supervisor")
  112.     return workflow.compile()
  113. if __name__ == "__main__":
  114.     agent = init_agent()
  115.     for result in agent.stream({
  116.         "messages": [HumanMessage(content="查一下阿里、腾讯、百度的PE,并计算平均值。")]
  117.         }):
  118.         for key, value in result.items():
  119.             if key == "Supervisor":
  120.                 print("[" + key + "] 去向: " + value["next"])
  121.             else:
  122.                 print("[" + key + "] 回复: " + value['messages'][-1].content)
复制代码
代码解释

代码一共用到了4个agent:

  • agent Researcher,其有一个工具,负责搜索某些内容
  • agent coder,其有一个工具,负责进行精确计算
  • agent finish,其没有工具,负责总结内容
  • agent Supervisor,其没有工具,负责管理上面3个agent,决定任务的去向
    上面案例使用的是langgraph组件,这里就不详细讲解了,请看之前文章。
代码流程: 初始化工具库->初始化agent->构建图->运行
初始化工具库
  1. @tool
  2. def web_search(query: str):
  3.     """用于查找最新的股票数据、公司财报信息。"""
  4.     results = []
  5.     if "阿里" in query: results.append("阿里巴巴(BABA) PE: 15.5")
  6.     if "腾讯" in query: results.append("腾讯控股(0700) PE: 18.2")
  7.     if "百度" in query: results.append("百度(BIDU) PE: 11.8")
  8.    
  9.     if not results:
  10.         return "未找到数据"
  11.     return " ; ".join(results)
  12. @tool
  13. def python_calculator(code: str):
  14.     """用于计算。输入必须是 python 代码。"""
  15.     try:
  16.         result = eval(code)
  17.         return f"计算结果: {result}"
  18.     except Exception as e:
  19.         return f"计算错误: {e}"
复制代码
这里的web_search使用的是虚假的模拟信息,上面的工具描述不够完整,但是能用,如果用实际案例,请描述完整,工具的描述参考之前文章。
初始化agent

其他3个agent
  1. def create_agent(state: dict, llm, tools, system_prompt):
  2.     llm_tools = llm.bind_tools(tools)
  3.    
  4.     prompt = [SystemMessage(content=system_prompt)] + state["messages"]
  5.     response = llm_tools.invoke(prompt)
  6.     results = [response]
  7.     for tool_call in response.tool_calls:
  8.         func_name = tool_call["name"]
  9.         args = tool_call["args"]
  10.         call_id = tool_call["id"]
  11.         
  12.         func = next((t for t in tools if t.name == func_name), None)
  13.         if func:
  14.             tool_output = func.invoke(args)
  15.             tool_msg = ToolMessage(
  16.                 content=str(tool_output),
  17.                 name=func_name,
  18.                 tool_call_id=call_id
  19.             )
  20.             results.append(tool_msg)
  21. def researcher_node(state):
  22.     return create_agent(
  23.         state=state,
  24.         llm=llm,
  25.         tools=[web_search],
  26.         system_prompt="你是一个研究员。只负责查数据,找到数据后直接输出原话,不需要计算。"
  27.     )
  28. def coder_node(state):
  29.     return create_agent(
  30.         state=state,
  31.         llm=llm,
  32.         tools=[python_calculator],
  33.         system_prompt="你是一个程序员。根据上面研究员查到的数据,写代码计算平均值。"
  34.     )
  35. def finish_node(state):
  36.     return create_agent(
  37.         state=state,
  38.         llm=llm,
  39.         tools=[],
  40.         system_prompt="任务完成,简短的总结最终结果。"
  41.     )
复制代码
流程相对简单,create_agent细节前面文章已经讲解,这里就不废话了。
管理agent
  1. def supervisor_node(state):
  2.     system_prompt = (
  3.         "你是项目经理。根据对话历史决定下一步交给谁。"
  4.         "查数据找 Researcher,计算找 Coder,识别图片找 Photographer。"
  5.         "如果任务完成,必须选择 FINISH。"
  6.     )
  7.     prompt = ChatPromptTemplate.from_messages([
  8.         ("system", system_prompt),
  9.         MessagesPlaceholder(variable_name="messages"),
  10.         ("system", "根据以上情况,请做出选择。"),
  11.     ])
  12.     class RouteResponse(BaseModel):
  13.         next: Literal["Researcher", "Coder", "FINISH"] = Field(
  14.             ...,
  15.             description="下一步交给谁?如果任务完成请选 FINISH"
  16.         )
  17.     chain = prompt | llm.with_structured_output(RouteResponse)
  18.     response = chain.invoke(state)
  19.     return {"next": response.next}
复制代码
这一步需要简单说明:
  1. class RouteResponse(BaseModel):
  2.     next: Literal["Researcher", "Coder", "FINISH"] = Field(
  3.         ...,
  4.         description="下一步交给谁?如果任务完成请选 FINISH"
  5.     )
  6. chain = prompt | llm.with_structured_output(RouteResponse)
复制代码

  • LLM中的with_structured_output方法是langchain提供的一个组件,功能是,限定LLM的输出格式,返回相应格式的字典。
  • 定义输出格式限定的类:

  • 该类是BaseModel的子类
  • Literal是选择,要求ai从"Researcher", "Coder", "FINISH"三选一
  • Field描述变量,尽量详尽,描述+例子,因为是给大模型看的
  1. class classname(BaseModel):
  2.         fieldname: fieldtype = Field(..., description="描述")
复制代码
构建图(重要)
  1. def init_agent():
  2.     workflow = StateGraph(State)
  3.     workflow.add_node("Researcher", researcher_node)
  4.     workflow.add_node("Coder", coder_node)
  5.     workflow.add_node("Supervisor", supervisor_node)
  6.     workflow.add_node("Finish", finish_node)
  7.     workflow.add_edge("Researcher", "Supervisor")
  8.     workflow.add_edge("Coder", "Supervisor")
  9.     workflow.add_edge("Finish", END)
  10.     workflow.add_conditional_edges(
  11.         "Supervisor",
  12.         lambda state: state["next"],{
  13.         "Researcher": "Researcher",
  14.         "Coder": "Coder",
  15.         "FINISH": "Finish",
  16.     })
  17.     workflow.set_entry_point("Supervisor")
  18.     return workflow.compile()
复制代码
这一步相当于连接工作流,构建的流程图如下:
[code]       +---------------------------+       |           开始             |       +-------------+-------------+                     |                     v       +---------------------------+       |       Supervisor          |

相关推荐

您需要登录后才可以回帖 登录 | 立即注册