简介
多Agent协作能够将一个复杂的任务拆解成一个个子任务给专门的agent,能够解决复杂问题,实现复杂的ai工作流。
多Agent协作
不同的Agent,有不同的能力,我们可能会有各种实际需求,例如:实时识别车牌位置(Yolo)->识别车牌内容(qwen-vl)-> LLM管理记录车牌信息。通过多Agent协作的工作流,能够实现拍照答题,自动剪辑,ppt生成等一系列复杂问题。
下面用一个简单的案例,来说明。
简单的多Agent协作
示例
需求:查一下阿里、腾讯、百度的PE,并计算平均值。- import os
- import operator
- from pydantic import BaseModel, Field
- from langchain_community.chat_models.tongyi import ChatTongyi
- from langchain_core.tools import tool
- from langchain_core.messages import HumanMessage, BaseMessage, ToolMessage, SystemMessage
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from typing import Annotated, List, Literal, TypedDict
- from langgraph.graph import StateGraph, END
- os.environ["DASHSCOPE_API_KEY"] = "apikey"
- llm = ChatTongyi(model="qwen-plus")
- @tool
- def web_search(query: str):
- """用于查找最新的股票数据、公司财报信息。"""
- results = []
- if "阿里" in query: results.append("阿里巴巴(BABA) PE: 15.5")
- if "腾讯" in query: results.append("腾讯控股(0700) PE: 18.2")
- if "百度" in query: results.append("百度(BIDU) PE: 11.8")
-
- if not results:
- return "未找到数据"
- return " ; ".join(results)
- @tool
- def python_calculator(code: str):
- """用于计算。输入必须是 python 代码。"""
- try:
- result = eval(code)
- return f"计算结果: {result}"
- except Exception as e:
- return f"计算错误: {e}"
- def create_agent(state: dict, llm, tools, system_prompt):
- llm_tools = llm.bind_tools(tools)
-
- prompt = [SystemMessage(content=system_prompt)] + state["messages"]
- response = llm_tools.invoke(prompt)
- results = [response]
- for tool_call in response.tool_calls:
- func_name = tool_call["name"]
- args = tool_call["args"]
- call_id = tool_call["id"]
-
- func = next((t for t in tools if t.name == func_name), None)
- if func:
- tool_output = func.invoke(args)
- tool_msg = ToolMessage(
- content=str(tool_output),
- name=func_name,
- tool_call_id=call_id
- )
- results.append(tool_msg)
- return {"messages": results}
- class State(TypedDict):
- messages: Annotated[List[BaseMessage], operator.add]
- next: str
- def researcher_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[web_search],
- system_prompt="你是一个研究员。只负责查数据,找到数据后直接输出原话,不需要计算。"
- )
- def coder_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[python_calculator],
- system_prompt="你是一个程序员。根据上面研究员查到的数据,写代码计算平均值。"
- )
- def finish_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[],
- system_prompt="任务完成,简短的总结最终结果。"
- )
- def supervisor_node(state):
- system_prompt = (
- "你是项目经理。根据对话历史决定下一步交给谁。"
- "查数据找 Researcher,计算找 Coder,识别图片找 Photographer。"
- "如果任务完成,必须选择 FINISH。"
- )
- prompt = ChatPromptTemplate.from_messages([
- ("system", system_prompt),
- MessagesPlaceholder(variable_name="messages"),
- ("system", "根据以上情况,请做出选择。"),
- ])
- class RouteResponse(BaseModel):
- next: Literal["Researcher", "Coder", "FINISH"] = Field(
- ...,
- description="下一步交给谁?如果任务完成请选 FINISH"
- )
- chain = prompt | llm.with_structured_output(RouteResponse)
- response = chain.invoke(state)
- return {"next": response.next}
- def init_agent():
- workflow = StateGraph(State)
- workflow.add_node("Researcher", researcher_node)
- workflow.add_node("Coder", coder_node)
- workflow.add_node("Supervisor", supervisor_node)
- workflow.add_node("Finish", finish_node)
- workflow.add_edge("Researcher", "Supervisor")
- workflow.add_edge("Coder", "Supervisor")
- workflow.add_edge("Finish", END)
- workflow.add_conditional_edges(
- "Supervisor",
- lambda state: state["next"],{
- "Researcher": "Researcher",
- "Coder": "Coder",
- "FINISH": "Finish",
- })
- workflow.set_entry_point("Supervisor")
- return workflow.compile()
- if __name__ == "__main__":
- agent = init_agent()
- for result in agent.stream({
- "messages": [HumanMessage(content="查一下阿里、腾讯、百度的PE,并计算平均值。")]
- }):
- for key, value in result.items():
- if key == "Supervisor":
- print("[" + key + "] 去向: " + value["next"])
- else:
- print("[" + key + "] 回复: " + value['messages'][-1].content)
复制代码 代码解释
代码一共用到了4个agent:
- agent Researcher,其有一个工具,负责搜索某些内容
- agent coder,其有一个工具,负责进行精确计算
- agent finish,其没有工具,负责总结内容
- agent Supervisor,其没有工具,负责管理上面3个agent,决定任务的去向
上面案例使用的是langgraph组件,这里就不详细讲解了,请看之前文章。
代码流程: 初始化工具库->初始化agent->构建图->运行
初始化工具库
- @tool
- def web_search(query: str):
- """用于查找最新的股票数据、公司财报信息。"""
- results = []
- if "阿里" in query: results.append("阿里巴巴(BABA) PE: 15.5")
- if "腾讯" in query: results.append("腾讯控股(0700) PE: 18.2")
- if "百度" in query: results.append("百度(BIDU) PE: 11.8")
-
- if not results:
- return "未找到数据"
- return " ; ".join(results)
- @tool
- def python_calculator(code: str):
- """用于计算。输入必须是 python 代码。"""
- try:
- result = eval(code)
- return f"计算结果: {result}"
- except Exception as e:
- return f"计算错误: {e}"
复制代码 这里的web_search使用的是虚假的模拟信息,上面的工具描述不够完整,但是能用,如果用实际案例,请描述完整,工具的描述参考之前文章。
初始化agent
其他3个agent
- def create_agent(state: dict, llm, tools, system_prompt):
- llm_tools = llm.bind_tools(tools)
-
- prompt = [SystemMessage(content=system_prompt)] + state["messages"]
- response = llm_tools.invoke(prompt)
- results = [response]
- for tool_call in response.tool_calls:
- func_name = tool_call["name"]
- args = tool_call["args"]
- call_id = tool_call["id"]
-
- func = next((t for t in tools if t.name == func_name), None)
- if func:
- tool_output = func.invoke(args)
- tool_msg = ToolMessage(
- content=str(tool_output),
- name=func_name,
- tool_call_id=call_id
- )
- results.append(tool_msg)
- def researcher_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[web_search],
- system_prompt="你是一个研究员。只负责查数据,找到数据后直接输出原话,不需要计算。"
- )
- def coder_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[python_calculator],
- system_prompt="你是一个程序员。根据上面研究员查到的数据,写代码计算平均值。"
- )
- def finish_node(state):
- return create_agent(
- state=state,
- llm=llm,
- tools=[],
- system_prompt="任务完成,简短的总结最终结果。"
- )
复制代码 流程相对简单,create_agent细节前面文章已经讲解,这里就不废话了。
管理agent
- def supervisor_node(state):
- system_prompt = (
- "你是项目经理。根据对话历史决定下一步交给谁。"
- "查数据找 Researcher,计算找 Coder,识别图片找 Photographer。"
- "如果任务完成,必须选择 FINISH。"
- )
- prompt = ChatPromptTemplate.from_messages([
- ("system", system_prompt),
- MessagesPlaceholder(variable_name="messages"),
- ("system", "根据以上情况,请做出选择。"),
- ])
- class RouteResponse(BaseModel):
- next: Literal["Researcher", "Coder", "FINISH"] = Field(
- ...,
- description="下一步交给谁?如果任务完成请选 FINISH"
- )
- chain = prompt | llm.with_structured_output(RouteResponse)
- response = chain.invoke(state)
- return {"next": response.next}
复制代码 这一步需要简单说明:- class RouteResponse(BaseModel):
- next: Literal["Researcher", "Coder", "FINISH"] = Field(
- ...,
- description="下一步交给谁?如果任务完成请选 FINISH"
- )
- chain = prompt | llm.with_structured_output(RouteResponse)
复制代码
- LLM中的with_structured_output方法是langchain提供的一个组件,功能是,限定LLM的输出格式,返回相应格式的字典。
- 定义输出格式限定的类:
- 该类是BaseModel的子类
- Literal是选择,要求ai从"Researcher", "Coder", "FINISH"三选一
- Field描述变量,尽量详尽,描述+例子,因为是给大模型看的
- class classname(BaseModel):
- fieldname: fieldtype = Field(..., description="描述")
复制代码 构建图(重要)
- def init_agent():
- workflow = StateGraph(State)
- workflow.add_node("Researcher", researcher_node)
- workflow.add_node("Coder", coder_node)
- workflow.add_node("Supervisor", supervisor_node)
- workflow.add_node("Finish", finish_node)
- workflow.add_edge("Researcher", "Supervisor")
- workflow.add_edge("Coder", "Supervisor")
- workflow.add_edge("Finish", END)
- workflow.add_conditional_edges(
- "Supervisor",
- lambda state: state["next"],{
- "Researcher": "Researcher",
- "Coder": "Coder",
- "FINISH": "Finish",
- })
- workflow.set_entry_point("Supervisor")
- return workflow.compile()
复制代码 这一步相当于连接工作流,构建的流程图如下:
[code] +---------------------------+ | 开始 | +-------------+-------------+ | v +---------------------------+ | Supervisor | |