找回密码
 立即注册
首页 业界区 业界 Langchain 1.0后astream_events事件类型及生命周期简析 ...

Langchain 1.0后astream_events事件类型及生命周期简析

眸胝 2026-2-9 18:45:18
本文为博客园用户“孤舟晓月”原创,发布于博客园,备份与B站。若你在其他站点看到,说明它被盗了......
前置知识

langchain使用流式输出通常采用stream(同步)和astream(异步)两种模式,类似与下面的代码段:
  1. print("开始流式输出...")# 流式输出for chunk in graph.stream(initial_state, config=my_config):    print(f"流式块: {chunk}")print("流式输出测试完成!")
复制代码
chunk就是大模型返回的信息。通常只包含少量的必需字段。也就存在很多问题:
比如无法取得token的细分使用量,部分模型(比如deepseek-reasoner)的封装无法在流式输出中实时呈现推理内容等等。
一、astream_event产生的事件类型

当然,langchain在1.0中也给出了解法,即使用astream_events方法。该方法会返回一系列的关键事件,以便咱们精准检测整个智能体的运行情况乃至修改相关数据。
langchain官方的参考地址为:astream_events
1. 官方文档

放个截图,方便后面的内容展开

其中我遇到过的事件类型部分摘录如下:

  • on_chat_model_start
  • on_chat_model_stream
  • on_chat_model_end
  • on_chain_start
  • on_chain_stream
  • on_chain_end
  • on_tool_start
  • on_tool_end
    .......
2. 所有事件的共性字段

上面的文档翻译一下,就是:astream_event 会迭代返回多个StreamEvent对象,所有的StreamEvent都具有下列共有字段:
字段名类型描述与作用eventstring事件类型的唯一标识,如 ”on_chain_start”。namestring产生事件的组件或对象的名称,例如 ”LangGraph”、”model”、”tools”、”ChatDeepSeekCustomized”、”weather_tool”。run_idstring当前事件运行的唯一ID。用于标识一个特定的执行实例。parent_idsarray父级运行的ID列表。清晰展示了执行的层级和调用关系。例如,ChatDeepSeekCustomized 事件的 parent_ids 会包含其所属的 model 链和顶级 LangGraph 的 run_id。tagsarray标签列表,用于分类或标记运行。常见标签如 ”graph: step: 1″、”seq: step: 1″。metadataobject元数据字典,包含执行的上下文信息。不同事件类型的元数据丰富程度不同,但以下LangGraph相关字段非常常见:  
• thread_id: 执行线程ID。  
• langgraph_node: 当前所在的图节点名。  
• langgraph_step: 执行步骤。  
• langgraph_checkpoint_ns: 检查点命名空间。dataobject与该事件相关的数据。该字段的内容 这取决于活动的类型。!!!重要!!!在官方文档中,密密麻麻列举了N多事件,遗憾的是截止2026年2月9日,官方文档并没有给出这些事件的生命周期。 于是就有了本篇文章。
接下来,我会先给出测试的DEMO和框架的输出,当然,输出已经被转换为标准的JSON格式。
坐稳,这就出发!
二、测试DEMO

[code]"""  使用LangGraph create_agent方法创建智能体,ChatDeepSeekCustomized与多个工具的协作  该测试模拟一个复杂的任务场景,需要智能体调用多个工具才能完成  ChatDeepSeekCustomized为我基于langchain框架定制的deepseek封装,支持使用reasoner模型发起的tool-call和推理思维链透传。如果你想运行这个测试,请:1. 切换为langchain-deepseek库提供的ChatDeepseek封装。2. 自行注册deepseek的API"""  import asyncio  import os  import sys    # 添加项目根目录到Python路径  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))    from langchain_core.messages import SystemMessage, HumanMessage, AIMessage  from langchain_core.tools import tool  from langchain.agents import create_agent  from langgraph.checkpoint.memory import MemorySaver  from extends.custom_deepseek_chat import ChatDeepSeekCustomized      @tool  def calculator_tool(operation: str) -> str:      """计算器工具,执行基本数学运算"""      try:          # 解析操作          result = eval(operation)          return f"计算结果: {result}"      except Exception as e:          return f"计算错误: {str(e)}"      @tool  def search_tool(query: str) -> str:      """搜索工具,模拟信息检索"""      return f"搜索结果: 关于'{query}'的信息是模拟的,实际应用中应连接真实搜索引擎"      @tool  def analysis_tool(data: str) -> str:      """分析工具,对数据进行分析"""      return f"分析结果: '{data}'的数据分析已完成"      @tool  def weather_tool(city: str) -> str:      """天气工具,模拟查询天气"""      return f"天气预报: {city}的天气是晴朗的,温度约22°C"      @tool  def database_query_tool(table: str, condition: str) -> str:      """数据库查询工具,模拟数据库查询"""      return f"数据库查询结果: 从表'{table}'中找到满足条件'{condition}'的记录共5条"      def create_complex_task_agent():      """创建一个需要多次工具调用才能完成复杂任务的智能体"""      print("创建使用ChatDeepSeekCustomed的LangGraph智能体...")        # 设置API密钥     # !!!----------填入自己的API KEY-----------!!!    DEEPSEEK_API_KEY = "sk-你的API KEY"        # 为了获得比较清晰的events,直接使用ChatDeepSeekCustomized,启用reasoning    model = ChatDeepSeekCustomized(          model="deepseek-reasoner",          api_key=DEEPSEEK_API_KEY,          temperature=0.1,          include_reasoning_content=True  # 启用推理内容输出      )        # 定义工具列表      tools = [calculator_tool, search_tool, analysis_tool, weather_tool, database_query_tool]        # 创建系统提示 - 字符串格式      system_prompt_str = """      你是一个高级AI助手,能够使用多种工具解决问题。你需要合理规划工具调用顺序,完成复杂任务。      在每次调用工具后,你会收到结果,然后根据结果决定下一步行动。    请使用推理来决定如何最好地解决问题。    你可以进行多步思考,并在必要时调用工具。      """      # 定义checkpoint存储类型      my_cp_memory = MemorySaver()        # 创建Agent - 使用正确的参数名      graph = create_agent(          tools=tools,          model=model,  # 使用我们的自定义模型          system_prompt=system_prompt_str,          checkpointer=my_cp_memory      )        return graph, tools        async def test_streaming():      """测试流式输出,观察推理内容是否正常输出"""      print("\n开始测试流式输出...")      print("开始测试复杂任务...")      try:          from langchain.agents import AgentState          from langchain_core.runnables import RunnableConfig            # 创建智能体          graph, tools = create_complex_task_agent()            # 定义一个复杂任务,需要多次工具调用          complex_task = (              "我需要规划一次去北京的旅行。请帮我:\n"              "1. 查询北京的当前天气\n"              "2. 计算从上海到北京的距离(假设直线距离约为1000公里)\n"              "3. 基于距离计算旅行所需的油费(假设每公里油耗费用为0.8元)\n"              "4. 分析旅游预算的合理性\n"              "5. 最后给我一个完整的旅行建议"          )            print(f"任务: {complex_task}")          print("\n开始执行任务...")              class StateInAgent(AgentState):              pass            initial_state = StateInAgent(messages=[HumanMessage(content=complex_task)])            # 配置          my_config = RunnableConfig()          my_config["configurable"] = {              "thread_id": "test_streaming",          }            print("开始流式输出...")          events = []          # 测试流式输出          async for event in graph.astream_events(initial_state, config=my_config):              if event['event'] == 'on_chat_model_stream':                  if not should_stop_append_on_chat_model_stream(events):                      events.append(event)                      print(event)              else:                  events.append(event)                  print(event)          print("流式输出测试完成!")          return True      except Exception as e:          print(f"流式输出测试失败: {e}")          import traceback          traceback.print_exc()          return False    def should_stop_append_on_chat_model_stream(event_list, max_count=10):     """   因为大模型一次简单的推理就可能产生数百个on_chat_model_stream事件,这里做一个   定制化判断,如果事件队列末尾有连续10个on_chat_model_stream事件了,就停止入队列。   """      stop_flag =  False      # 判断最近的12个事件中,有多少个on_chat_model_stream事件,如果超过10个,则返回true      last_max_events = event_list[-max_count:]      for event in last_max_events:          if event['event'] == 'on_chat_model_stream':              stop_flag = True          else:              stop_flag = False      return stop_flag    async def main():      """主函数"""      print("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-2-13 22:17:28

举报

您需要登录后才可以回帖 登录 | 立即注册