本文为博客园用户“孤舟晓月”原创,发布于博客园,备份与B站。若你在其他站点看到,说明它被盗了......
前置知识
langchain使用流式输出通常采用stream(同步)和astream(异步)两种模式,类似与下面的代码段:- print("开始流式输出...")# 流式输出for chunk in graph.stream(initial_state, config=my_config): print(f"流式块: {chunk}")print("流式输出测试完成!")
复制代码 chunk就是大模型返回的信息。通常只包含少量的必需字段。也就存在很多问题:
比如无法取得token的细分使用量,部分模型(比如deepseek-reasoner)的封装无法在流式输出中实时呈现推理内容等等。
一、astream_event产生的事件类型
当然,langchain在1.0中也给出了解法,即使用astream_events方法。该方法会返回一系列的关键事件,以便咱们精准检测整个智能体的运行情况乃至修改相关数据。
langchain官方的参考地址为:astream_events
1. 官方文档
放个截图,方便后面的内容展开
其中我遇到过的事件类型部分摘录如下:
- on_chat_model_start
- on_chat_model_stream
- on_chat_model_end
- on_chain_start
- on_chain_stream
- on_chain_end
- on_tool_start
- on_tool_end
.......
2. 所有事件的共性字段
上面的文档翻译一下,就是:astream_event 会迭代返回多个StreamEvent对象,所有的StreamEvent都具有下列共有字段:
字段名类型描述与作用eventstring事件类型的唯一标识,如 ”on_chain_start”。namestring产生事件的组件或对象的名称,例如 ”LangGraph”、”model”、”tools”、”ChatDeepSeekCustomized”、”weather_tool”。run_idstring当前事件运行的唯一ID。用于标识一个特定的执行实例。parent_idsarray父级运行的ID列表。清晰展示了执行的层级和调用关系。例如,ChatDeepSeekCustomized 事件的 parent_ids 会包含其所属的 model 链和顶级 LangGraph 的 run_id。tagsarray标签列表,用于分类或标记运行。常见标签如 ”graph: step: 1″、”seq: step: 1″。metadataobject元数据字典,包含执行的上下文信息。不同事件类型的元数据丰富程度不同,但以下LangGraph相关字段非常常见:
• thread_id: 执行线程ID。
• langgraph_node: 当前所在的图节点名。
• langgraph_step: 执行步骤。
• langgraph_checkpoint_ns: 检查点命名空间。dataobject与该事件相关的数据。该字段的内容 这取决于活动的类型。!!!重要!!!在官方文档中,密密麻麻列举了N多事件,遗憾的是截止2026年2月9日,官方文档并没有给出这些事件的生命周期。 于是就有了本篇文章。
接下来,我会先给出测试的DEMO和框架的输出,当然,输出已经被转换为标准的JSON格式。
坐稳,这就出发!
二、测试DEMO
[code]""" 使用LangGraph create_agent方法创建智能体,ChatDeepSeekCustomized与多个工具的协作 该测试模拟一个复杂的任务场景,需要智能体调用多个工具才能完成 ChatDeepSeekCustomized为我基于langchain框架定制的deepseek封装,支持使用reasoner模型发起的tool-call和推理思维链透传。如果你想运行这个测试,请:1. 切换为langchain-deepseek库提供的ChatDeepseek封装。2. 自行注册deepseek的API""" import asyncio import os import sys # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.tools import tool from langchain.agents import create_agent from langgraph.checkpoint.memory import MemorySaver from extends.custom_deepseek_chat import ChatDeepSeekCustomized @tool def calculator_tool(operation: str) -> str: """计算器工具,执行基本数学运算""" try: # 解析操作 result = eval(operation) return f"计算结果: {result}" except Exception as e: return f"计算错误: {str(e)}" @tool def search_tool(query: str) -> str: """搜索工具,模拟信息检索""" return f"搜索结果: 关于'{query}'的信息是模拟的,实际应用中应连接真实搜索引擎" @tool def analysis_tool(data: str) -> str: """分析工具,对数据进行分析""" return f"分析结果: '{data}'的数据分析已完成" @tool def weather_tool(city: str) -> str: """天气工具,模拟查询天气""" return f"天气预报: {city}的天气是晴朗的,温度约22°C" @tool def database_query_tool(table: str, condition: str) -> str: """数据库查询工具,模拟数据库查询""" return f"数据库查询结果: 从表'{table}'中找到满足条件'{condition}'的记录共5条" def create_complex_task_agent(): """创建一个需要多次工具调用才能完成复杂任务的智能体""" print("创建使用ChatDeepSeekCustomed的LangGraph智能体...") # 设置API密钥 # !!!----------填入自己的API KEY-----------!!! DEEPSEEK_API_KEY = "sk-你的API KEY" # 为了获得比较清晰的events,直接使用ChatDeepSeekCustomized,启用reasoning model = ChatDeepSeekCustomized( model="deepseek-reasoner", api_key=DEEPSEEK_API_KEY, temperature=0.1, include_reasoning_content=True # 启用推理内容输出 ) # 定义工具列表 tools = [calculator_tool, search_tool, analysis_tool, weather_tool, database_query_tool] # 创建系统提示 - 字符串格式 system_prompt_str = """ 你是一个高级AI助手,能够使用多种工具解决问题。你需要合理规划工具调用顺序,完成复杂任务。 在每次调用工具后,你会收到结果,然后根据结果决定下一步行动。 请使用推理来决定如何最好地解决问题。 你可以进行多步思考,并在必要时调用工具。 """ # 定义checkpoint存储类型 my_cp_memory = MemorySaver() # 创建Agent - 使用正确的参数名 graph = create_agent( tools=tools, model=model, # 使用我们的自定义模型 system_prompt=system_prompt_str, checkpointer=my_cp_memory ) return graph, tools async def test_streaming(): """测试流式输出,观察推理内容是否正常输出""" print("\n开始测试流式输出...") print("开始测试复杂任务...") try: from langchain.agents import AgentState from langchain_core.runnables import RunnableConfig # 创建智能体 graph, tools = create_complex_task_agent() # 定义一个复杂任务,需要多次工具调用 complex_task = ( "我需要规划一次去北京的旅行。请帮我:\n" "1. 查询北京的当前天气\n" "2. 计算从上海到北京的距离(假设直线距离约为1000公里)\n" "3. 基于距离计算旅行所需的油费(假设每公里油耗费用为0.8元)\n" "4. 分析旅游预算的合理性\n" "5. 最后给我一个完整的旅行建议" ) print(f"任务: {complex_task}") print("\n开始执行任务...") class StateInAgent(AgentState): pass initial_state = StateInAgent(messages=[HumanMessage(content=complex_task)]) # 配置 my_config = RunnableConfig() my_config["configurable"] = { "thread_id": "test_streaming", } print("开始流式输出...") events = [] # 测试流式输出 async for event in graph.astream_events(initial_state, config=my_config): if event['event'] == 'on_chat_model_stream': if not should_stop_append_on_chat_model_stream(events): events.append(event) print(event) else: events.append(event) print(event) print("流式输出测试完成!") return True except Exception as e: print(f"流式输出测试失败: {e}") import traceback traceback.print_exc() return False def should_stop_append_on_chat_model_stream(event_list, max_count=10): """ 因为大模型一次简单的推理就可能产生数百个on_chat_model_stream事件,这里做一个 定制化判断,如果事件队列末尾有连续10个on_chat_model_stream事件了,就停止入队列。 """ stop_flag = False # 判断最近的12个事件中,有多少个on_chat_model_stream事件,如果超过10个,则返回true last_max_events = event_list[-max_count:] for event in last_max_events: if event['event'] == 'on_chat_model_stream': stop_flag = True else: stop_flag = False return stop_flag async def main(): """主函数""" print("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |