找回密码
 立即注册
首页 业界区 业界 AI Agent 框架探秘:拆解 OpenHands(5)--- 交互&会话 ...

AI Agent 框架探秘:拆解 OpenHands(5)--- 交互&会话

班闵雨 2 小时前
AI Agent 框架探秘:拆解 OpenHands(5)--- 交互&会话


目录

  • AI Agent 框架探秘:拆解 OpenHands(5)--- 交互&会话

    • 0x00 概述
    • 0x01 背景

      • 1.1 会话的意义
      • 1.2 会话系统的常见功能
      • 1.3 Session 常见内容
      • 1.4 会话生命周期
      • 1.5  前文回顾

    • 0x02 OpenHands 会话系统

      • 2.1 对话管理接口 ConversationManager

        • 2.1.1 StandaloneConversationManager
        • 2.1.2 Session初始化

      • 2.2 session.py(WebSession)

        • 2.2.1 WebSession 类概述
        • 2.2.2 WebSession 核心属性
        • 2.2.3 主要功能模块
        • 2.2.4 与系统其他组件的关系
        • 2.2.5 初始化 AgentSession

      • 2.3 agent_session

        • 2.3.1 AgentSession
        • 2.4.2 初始化Agent

          • 核心作用
          • 核心特色
          • 初始化流程图


      • 2.4 用户交互(oh_user_action)逻辑

        • 2.4.1 用户发消息
        • 2.4.2 事件添加
        • 2.4.3 事件处理


    • 0xFF 参考


0x00 概述

有意义的多轮对话要求智能体能够理解上下文。就像人类一样,智能体需要记住对话历史:已经说过和做过什么,以保持连贯性并避免重复。
以下是OpenHands Applications的示例图,本篇就来看看会话和交互如何进行。

因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 背景

本节基于 Google ADK 来进行背景介绍。
1.1 会话的意义

就像你不会每次发短信都从头开始一样,智能体也需要了解当前交互的上下文。常见的Agent系统会通过 Session、State 和 Memory 提供了结构化的上下文管理方式。

  • Session:当前对话线程(可以将你与智能体的不同对话实例视为独立的对话线程,它们可能会利用长期知识

    • 表示用户与你的智能体系统之间单次、持续的交互
    • 包含该特定交互期间,智能体采取的消息和动作(称为 Events)的时间顺序序列。
    • 一个 Session 还可以保存仅在本次对话期间相关的临时数据(State)。

  • State:当前对话中的数据

    • 存储在特定 Session 内的数据。
    • 用于管理当前(单次)、活跃对话线程相关的信息(例如,本次对话中的购物车商品,本 Session 中提到的用户偏好)。
    • 关注如何高效地读取、写入和管理 session 专属数据。

  • Memory:可检索的跨 Session 信息

    • 表示可能跨越多个过去 Session或包含外部数据源的信息存储。
    • 它作为一个知识库,智能体可以检索以回忆超出当前对话的信息或上下文。

因此,Agent系统一般有如下两套组件或者服务:

  • SessionService:管理不同的对话线程(Session 对象)负责生命周期管理:创建、检索、更新(追加 Events、修改 State)和删除单个 Session。
  • MemoryService:管理长期知识存储(Memory),负责将信息(通常来自已完成的 Session)导入长期存储。提供基于查询检索已存储知识的方法。
本篇介绍对话服务,后续另外介绍内存服务。
1.2 会话系统的常见功能

用户通常不会直接创建或管理 Session 对象,而是通过 SessionService。该服务作为会话生命周期的中央管理者。其核心职责包括:

  • 开启新对话: 当用户开始交互时,创建新的 Session 对象。
  • 恢复已有对话: 通过 ID 检索特定 Session,让智能体可以从上次中断处继续。
  • 保存进度: 将新的交互(Event 对象)追加到 session 历史。这也是 session state 更新的机制(详见 State 章节)。
  • 列出对话: 查找特定用户和应用的活跃会话线程。
  • 清理: 当对话结束或不再需要时,删除 Session 及其相关数据。
选择合适的 SessionService 是决定智能体对话历史和临时数据如何存储与持久化的关键。
1.3 Session 常见内容

一般来说,当用户开始与智能体交互时,SessionService 会创建一个 Session 对象。该对象作为单个对话线程相关所有内容的容器。其主要属性如下:

  • 标识信息(id, appName, userId):
用于唯一标记对话的核心字段,具体说明如下:

  • id:当前对话线程的唯一标识符,是后续检索该对话的关键依据。一个 SessionService 对象可管理多个 Session(会话)实例,此字段用于明确当前操作对应的具体会话对象。示例值:"test_id_modification"。
  • app_name:标识当前对话所属的智能体应用。示例值:"id_modifier_workflow"。
  • userId:将对话与特定用户关联的关联字段,用于用户维度的对话管理与权限控制。

  • 对话历史(events):
按时间顺序排列的交互序列,包含当前对话线程中发生的所有交互行为(以 Event 对象形式存储),涵盖用户消息、智能体响应、工具调用动作等全量交互记录。

  • 会话状态(state):
用于存储仅与当前活跃对话相关的临时数据,相当于智能体在交互过程中的 “临时草稿本”。下一节将详细介绍 state 的具体使用与管理方式。

  • 活动追踪(lastUpdateTime):
时间戳字段,记录当前对话线程中最后一次交互事件的发生时间,用于会话活跃度判断与过期管理。
1.4 会话生命周期

2.png

以下是 Session 与 SessionService 在一次对话轮次中协作的简化流程:

  • 开始或恢复: 你的应用程序需要使用 SessionService 来要么 create_session(用于新聊天),要么使用现有的 session id。
  • 提供上下文: Runner 从适当的服务方法获取相应的 Session 对象,为智能体提供对相应 Session 的 state 和 events 的访问权限。
  • 智能体处理: 用户用查询提示智能体。智能体分析查询以及可能的 session state 和 events 历史来确定响应。
  • 响应和状态更新: 智能体生成响应(并可能标记要在 state 中更新的数据)。Runner 将其打包为 Event。
  • 保存交互: Runner 调用 sessionService.append_event(session, event),将 session 和新的 event 作为参数。服务将 Event 添加到历史记录中,并根据事件中的信息更新存储中的 session state。session 的 last_update_time 也会得到更新。
  • 准备下一次: 智能体的响应发送给用户。更新后的 Session 现在由 SessionService 存储,准备进行下一轮(这通常会在当前会话中继续对话,从步骤 1 重新开始循环)。
  • 结束对话: 当对话结束时,你的应用程序调用 sessionService.delete_session(...) 来清理存储的会话数据(如果不再需要的话)。
此循环突出显示了 SessionService 如何通过管理每个 Session 对象的历史和状态,确保对话的连续性。
1.5  前文回顾

我们首先回顾前文介绍的,OpenHands项目关于对话的服务器端组件。

  • session.py 文件定义了Session类,它代表与客户端的WebSocket会话。
  • agent_session.py 文件包含AgentSession类,它管理会话内Agent的生命周期。
  • conversation_manager.py 文件定义了ConversationManager类,它负责管理多个客户端会话。
  • listen.py 文件是主服务器文件,它设置FastAPI应用程序并定义各种API端点。此处关键一步为与会话管理器 ConversationManager 建立连接。
以上几步展示了服务器组件如何构建会话,因此我们由此而入。
0x02 OpenHands 会话系统

会话是专门设计用于跟踪和管理单独对话线程的对象。会话就可以理解为AI代理的临时工作空间,就像你为特定项目准备的办公桌。它包含当前对话的所有必要工具、笔记和参考资料,一切都是即时可访问的,但也具有临时性和任务特定性。
具体而言,在OpenHands中:

  • WebSession 是一个 Web 服务器绑定的会话包装器,负责管理单个 Web 客户端连接并协调 AgentSession 生命周期。是 OpenHands 系统中连接前端用户界面和后端Agent执行的核心桥梁,负责协调整个交互流程。
  • AgentSession 是 OpenHands 框架中 Agent运行的 “上下文容器”,核心作用是封装Agent执行所需的所有组件(Agent、控制器、运行时、内存、事件流),统一管理它们的生命周期(初始化、启动、通信、关闭),并提供会话级的配置隔离、数据持久化和状态管理,是Agent能够独立、稳定执行任务的基础。
2.1 对话管理接口 ConversationManager

ConversationManager 类定义了对话管理的接口,适用于单机模式和集群模式。它负责处理对话的全生命周期,
包括创建、附加、分离和清理。这是OpenHands的一个扩展点,基于它构建的应用可通过服务器配置修改行为,无需改动其核心代码。应用程序可通过以下方式提供自定义实现:

  • 创建一个继承自ConversationManager的类
  • 实现所有必需的抽象方法
  • 将server_config.conversation_manager_class设置为该实现类的全限定名
ConversationManager 定义如下。
  1. class ConversationManager(ABC):
  2.     """OpenHands中对话管理的抽象基类。
  3.     应用程序可能需要在以下场景中自定义实现:
  4.     - 具有分布式对话状态的集群部署
  5.     - 自定义持久化或缓存策略
  6.     - 与外部对话管理系统集成
  7.     - 增强的监控或日志能力
  8.     实现类通过openhands.server.shared.py中的get_impl()方法实例化。
  9.     """
  10.     sio: socketio.AsyncServer  # Socket.IO异步服务器实例,用于实时通信
  11.     config: OpenHandsConfig    # OpenHands配置对象,存储系统参数
  12.     file_store: FileStore      # 文件存储实例,用于管理对话相关文件
  13.     conversation_store: ConversationStore  # 对话存储实例,用于持久化对话数据
复制代码
2.1.1 StandaloneConversationManager

StandaloneConversationManager 是ConversationManager的子类。是默认实现,适用于单服务器部署场景。
  1. @dataclass
  2. class StandaloneConversationManager(ConversationManager):
  3.     """Default implementation of ConversationManager for single-server deployments.
  4.     See ConversationManager for extensibility details.
  5.     """
  6.     sio: socketio.AsyncServer
  7.     config: OpenHandsConfig
  8.     file_store: FileStore
  9.     server_config: ServerConfig
  10.     # Defaulting monitoring_listener for temp backward compatibility.
  11.     monitoring_listener: MonitoringListener = MonitoringListener()
  12.     _local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict)
  13.     _local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict)
  14.     _active_conversations: dict[str, tuple[ServerConversation, int]] = field(
  15.         default_factory=dict
  16.     )
  17.     _detached_conversations: dict[str, tuple[ServerConversation, float]] = field(
  18.         default_factory=dict
  19.     )
  20.     _conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
  21.     _cleanup_task: asyncio.Task | None = None
  22.     _conversation_store_class: type[ConversationStore] | None = None
  23.     _loop: asyncio.AbstractEventLoop | None = None
复制代码
2.1.2 Session初始化

Session初始化的流程图如下。
3.png

StandaloneConversationManager 的  join_conversation 函数如下,其调用了 maybe_start_agent_loop 初始化Agent。需要注意,此处 Session 是 WebSession。
  1.     from openhands.server.session.session import WebSession as Session
  2.     async def join_conversation(
  3.         self,
  4.         sid: str,
  5.         connection_id: str,
  6.         settings: Settings,
  7.         user_id: str | None,
  8.     ) -> AgentLoopInfo:
  9.         await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
  10.         self._local_connection_id_to_session_id[connection_id] = sid
  11.         # 此处调用 maybe_start_agent_loop
  12.         agent_loop_info = await self.maybe_start_agent_loop(sid, settings, user_id)
  13.         return agent_loop_info
复制代码
maybe_start_agent_loop 调用了 _start_agent_loop 初始化 Session。
  1. class ConversationManager:
  2.     def __init__(self, config: OpenHandsConfig, sio: Any, file_store: Any):
  3.         self.config = config  # 框架全局配置
  4.         self.sio = sio  # SocketIO实例(用于客户端通信)
  5.         self.file_store = file_store  # 文件存储实例(用于会话数据持久化)
  6.         self._local_agent_loops_by_sid: Dict[str, Session] = {}  # 会话ID到Session实例的映射(缓存活跃会话)
  7.         self._loop = asyncio.get_event_loop()  # 事件循环实例
  8.     async def maybe_start_agent_loop(
  9.         self,
  10.         sid: str,  # 会话ID(唯一标识一个对话)
  11.         settings: Settings,  # 用户/会话设置(含Agent类型、LLM配置等)
  12.         user_id: Optional[str] = None,  # 用户ID(可选,用于用户级并发控制)
  13.         initial_user_msg: Optional[MessageAction] = None,  # 初始用户消息(可选,会话启动时的第一条消息)
  14.         replay_json: Optional[str] = None,  # 回放JSON字符串(可选,用于会话回放场景)
  15.     ) -> AgentLoopInfo:
  16.         """
  17.         尝试启动Agent循环:优先复用已存在的会话,不存在则新建。
  18.         
  19.         核心逻辑:
  20.         - 检查会话ID对应的会话是否已存在(缓存于_local_agent_loops_by_sid)
  21.         - 存在则直接返回会话信息,不存在则调用_start_agent_loop新建会话
  22.         - 返回标准化的Agent循环信息(供外部调用者使用)
  23.         """     
  24.         # 从缓存中获取会话(复用已有会话,避免重复初始化)
  25.         session = self._local_agent_loops_by_sid.get(sid)
  26.         if not session:
  27.             # 会话不存在,新建Agent循环
  28.             session = await self._start_agent_loop(
  29.                 sid, settings, user_id, initial_user_msg, replay_json
  30.             )
  31.         
  32.         # 将Session实例转换为标准化的AgentLoopInfo返回
  33.         return self._agent_loop_info_from_session(session)
复制代码
_start_agent_loop 该代码是 OpenHands 框架中 Agent循环(Agent Loop)的启动与管理核心,负责会话的创建、复用、并发控制和初始化,是连接用户请求与Agent执行的关键枢纽。其核心使命是:在遵守并发限制的前提下,为用户会话提供完整的组件初始化(LLM 注册表、统计、事件订阅),确保Agent能够顺畅启动并运行。
_start_agent_loop  的核心特色如下:

  • 会话复用机制:通过 _local_agent_loops_by_sid 缓存活跃会话,避免重复初始化,提升响应速度和资源利用率(例如用户重新连接同一会话时直接复用)。
  • 智能并发控制:基于用户 ID 限制最大并发会话数,超限后自动关闭最早的会话,同时向客户端发送友好通知,平衡资源占用与用户体验。
  • 组件化初始化:整合 create_registry_and_conversation_stats 函数,一键完成 LLM 注册表、对话统计、配置适配三大核心组件的初始化,架构清晰且解耦。
  • 异步非阻塞设计:Agent初始化(session.initialize_agent)通过 asyncio.create_task 异步执行,不阻塞会话创建流程,提升系统吞吐量。
  • 事件驱动扩展:自动订阅会话事件流,通过回调函数响应会话更新,支持后续扩展监控、统计等功能,具备良好的可扩展性。
  • 容错与兼容性:处理重复订阅异常,避免报错;支持会话回放(replay_json)和初始消息(initial_user_msg),适配正常对话、回放等多种场景。
_start_agent_loop 代码如下。
  1.     async def _start_agent_loop(
  2.         self,
  3.         sid: str,
  4.         settings: Settings,
  5.         user_id: Optional[str] = None,
  6.         initial_user_msg: Optional[MessageAction] = None,
  7.         replay_json: Optional[str] = None,
  8.     ) -> Session:
  9.         """
  10.         内部方法:实际创建并启动Agent循环,包含并发控制、会话初始化、事件订阅等核心流程。
  11.         """
  12.         # 1. 并发会话数量控制:检查用户当前活跃会话数是否超过上限
  13.         # 获取用户当前运行中的所有会话ID
  14.         running_session_ids = await self.get_running_agent_loops(user_id)
  15.         # 若超过最大并发数,关闭最早的会话以释放资源
  16.         if len(running_session_ids) >= self.config.max_concurrent_conversations:
  17.            
  18.             # 获取用户的会话存储实例,读取所有活跃会话的元数据
  19.             conversation_store = await self._get_conversation_store(user_id)
  20.             conversations = await conversation_store.get_all_metadata(running_session_ids)
  21.             # 按最后更新时间排序(最新的在前, oldest的在后)
  22.             conversations.sort(key=_last_updated_at_key, reverse=True)
  23.             # 循环关闭最早的会话,直到并发数符合限制
  24.             while len(conversations) >= self.config.max_concurrent_conversations:
  25.                 oldest_conversation = conversations.pop()  # 取出最早的会话
  26.                 oldest_sid = oldest_conversation.conversation_id
  27.                
  28.                 # 向客户端发送错误通知(告知会话已关闭)
  29.                 status_update = {
  30.                     'status_update': True,
  31.                     'type': 'error',
  32.                     'id': 'AGENT_ERROR$TOO_MANY_CONVERSATIONS',
  33.                     'message': '同时开启的会话数已达上限。若仍需使用该会话,可发送消息重新激活Agent',
  34.                 }
  35.                 # 在事件循环中发送SocketIO事件(定向到该会话的房间)
  36.                 await run_in_loop(
  37.                     self.sio.emit(
  38.                         'oh_event',
  39.                         status_update,
  40.                         to=ROOM_KEY.format(sid=oldest_sid),  # 按会话ID定向发送
  41.                     ),
  42.                     self._loop,
  43.                 )
  44.                
  45.                 # 关闭最早的会话(释放资源)
  46.                 await self.close_session(oldest_sid)
  47.         # 2. 初始化核心组件:LLM注册表、对话统计、最终配置
  48.         llm_registry, conversation_stats, final_config = (
  49.             create_registry_and_conversation_stats(self.config, sid, user_id, settings)
  50.         )
  51.         
  52.         # 3. 创建Session实例(封装会话的所有状态和组件)
  53.         session = Session(
  54.             sid=sid,
  55.             file_store=self.file_store,  # 绑定文件存储
  56.             config=final_config,  # 绑定最终配置
  57.             llm_registry=llm_registry,  # 绑定LLM注册表
  58.             conversation_stats=conversation_stats,  # 绑定对话统计
  59.             sio=self.sio,  # 绑定SocketIO实例
  60.             user_id=user_id,  # 绑定用户ID
  61.         )
  62.         
  63.         # 4. 将新会话缓存到本地(供后续复用)
  64.         self._local_agent_loops_by_sid[sid] = session
  65.         
  66.         # 5. 异步初始化Agent(不阻塞当前流程):加载Agent、处理初始消息、回放会话(若有)
  67.         asyncio.create_task(
  68.             session.initialize_agent(settings, initial_user_msg, replay_json)
  69.         )
  70.         
  71.         # 6. 订阅会话事件流:监听会话更新事件(仅新建会话时订阅,复用会话跳过)
  72.         try:
  73.             session.agent_session.event_stream.subscribe(
  74.                 subscriber=EventStreamSubscriber.SERVER,  # 订阅者类型:服务器
  75.                 callback=self._create_conversation_update_callback(
  76.                     user_id, sid, settings, llm_registry  # 会话更新回调函数
  77.                 ),
  78.                 callback_id=UPDATED_AT_CALLBACK_ID,  # 回调ID(用于后续取消订阅)
  79.             )
  80.         except ValueError:
  81.             # 若已存在相同ID的订阅,忽略该操作(避免重复订阅)
  82.         # 返回创建好的Session实例
  83.         return session
复制代码
2.2 session.py(WebSession)

session.py 定义了 WebSession 类,它是 OpenHands 系统中管理 Web 客户端会话的核心组件。
2.2.1 WebSession 类概述

WebSession 是一个 Web 服务器绑定的会话包装器,负责管理单个 Web 客户端连接并协调 AgentSession 生命周期。WebSession 的关键设计模式为:

  • 异步队列模式:使用 asyncio.Queue 管理事件发布,确保非阻塞操作
  • 事件驱动架构:通过事件订阅/发布机制实现组件解耦
  • 状态管理模式:跟踪会话状态和连接状态
  • 错误处理机制:全面的异常捕获和错误报告
WebSession 是 OpenHands 系统中连接前端用户界面和后端Agent执行的核心桥梁,负责协调整个交互流程。
2.2.2 WebSession 核心属性

WebSession 核心属性为:

  • sid:稳定的会话 ID,跨传输保持一致
  • sio:Socket.IO 服务器,用于向 Web 客户端发送事件
  • agent_session:核心Agent会话,协调运行时和 LLM
  • config:有效的 OpenHands 配置
  • llm_registry:负责 LLM 访问和重试钩子的注册表
  • file_store:会话的文件存储接口
  • user_id:可选的多租户用户标识符
WebSession会订阅 EventStreamSubscriber.SERVER。
  1. class WebSession:
  2.     """Web server-bound session wrapper.
  3.     This was previously named `Session`. We keep `Session` as a compatibility alias
  4.     (see openhands.server.session.__init__) so downstream imports/tests continue to
  5.     work. The class manages a single web client connection and orchestrates the
  6.     AgentSession lifecycle for that conversation.
  7.     """
  8.     sid: str
  9.     sio: socketio.AsyncServer | None
  10.     last_active_ts: int = 0
  11.     is_alive: bool = True
  12.     agent_session: AgentSession
  13.     loop: asyncio.AbstractEventLoop
  14.     config: OpenHandsConfig
  15.     llm_registry: LLMRegistry
  16.     file_store: FileStore
  17.     user_id: str | None
  18.     logger: LoggerAdapter
复制代码
2.2.3 主要功能模块

WebSession 的主要功能模块为:

  • 初始化和配置管理

    • init 方法设置会话的基本配置和组件
    • 订阅 EventStream 的 SERVER 事件
    • 初始化异步事件发布队列
    • Agent初始化(initialize_agent)
    • 配置Agent、LLM 和运行时环境
    • 处理 MCP(Model Context Protocol)配置
    • 设置 condenser(压缩器)配置
    • 启动 AgentSession
    • 错误处理和验证



  • 事件处理(on_event 和 _on_event)

    • 处理来自Agent的事件

      • 过滤 NullAction 和 NullObservation
      • 根据事件源决定如何处理和转发事件
      • 将环境反馈作为Agent事件发送给 UI


  • 消息分发(dispatch)

    • 处理来自用户的事件
    • 验证图像支持
    • 将事件添加到事件流中

  • 异步消息发送(send, _monitor_publish_queue, _send)

    • 使用队列机制异步发送消息
    • 确保 WebSocket 连接稳定后再发送
    • 处理连接状态和错误

  • 状态管理

    • close 方法清理会话资源
    • queue_status_message 和 _send_status_message 处理状态更新消息

2.2.4 与系统其他组件的关系

WebSession 与系统其他组件的关系如下:

  • 与 EventStream 集成

    • 作为 SERVER 订阅者接收事件
    • 处理来自Agent和用户的事件流

  • 与 AgentSession 协作:

    • 管理 AgentSession 生命周期
    • 转发用户事件到Agent
    • 将Agent响应发送给客户端

  • 与 Socket.IO 集成:

    • 使用 Socket.IO 向客户端发送实时事件
    • 管理 WebSocket 连接状态

2.2.5 初始化 AgentSession

WebSession 的初始化会中完成对AgentSession的初始化,AgentSession的初始化中又做了EventStream的初始化,所以整个会话的EventStream也就是在这里创建的,这里在事件流中订阅了EventStreamSubscriber.SERVER,事件回调函数中将需要向前端广播的事件通过socket进行发送。
  1. class WebSession:
  2.     def __init__(
  3.         self,
  4.         sid: str,
  5.         config: OpenHandsConfig,
  6.         llm_registry: LLMRegistry,
  7.         conversation_stats: ConversationStats,
  8.         file_store: FileStore,
  9.         sio: socketio.AsyncServer | None,
  10.         user_id: str | None = None,
  11.     ):
  12.         self.sid = sid
  13.         self.sio = sio
  14.         self.last_active_ts = int(time.time())
  15.         self.file_store = file_store
  16.         self.logger = OpenHandsLoggerAdapter(extra={'session_id': sid})
  17.         self.llm_registry = llm_registry
  18.         self.conversation_stats = conversation_stats
  19.         self.agent_session = AgentSession(
  20.             sid,
  21.             file_store,
  22.             llm_registry=self.llm_registry,
  23.             conversation_stats=conversation_stats,
  24.             status_callback=self.queue_status_message,
  25.             user_id=user_id,
  26.         )
  27.         self.agent_session.event_stream.subscribe(
  28.             EventStreamSubscriber.SERVER, self.on_event, self.sid
  29.         )
  30.         self.config = config
  31.         # Lazy import to avoid ircular dependency
  32.         from openhands.experiments.experiment_manager import ExperimentManagerImpl
  33.         self.config = ExperimentManagerImpl.run_config_variant_test(
  34.             user_id, sid, self.config
  35.         )
  36.         self.loop = asyncio.get_event_loop()
  37.         self.user_id = user_id
  38.         self._publish_queue: asyncio.Queue = asyncio.Queue()
  39.         self._monitor_publish_queue_task: asyncio.Task = self.loop.create_task(
  40.             self._monitor_publish_queue()
  41.         )
  42.         self._wait_websocket_initial_complete: bool = True
复制代码
agent_session.start()中完成了security_analyzer、runtime、memory、controller的初始化。
2.3 agent_session

2.3.1 AgentSession

AgentSession 是 OpenHands 框架中 Agent运行的 “上下文容器”,核心作用是封装Agent执行所需的所有组件(Agent、控制器、运行时、内存、事件流),统一管理它们的生命周期(初始化、启动、通信、关闭),并提供会话级的配置隔离、数据持久化和状态管理,是Agent能够独立、稳定执行任务的基础。
AgentSession 核心特色如下:

  • 全组件生命周期管理:集中初始化并关联Agent、控制器、运行时、内存、事件流等核心组件,确保组件间通信顺畅,生命周期一致(启动 / 关闭同步)。
  • 灵活的环境配置:支持 Git 仓库集成、自定义密钥注入、MCP 工具扩展等,适配代码开发、第三方服务调用等复杂场景,满足多样化任务需求。
  • 会话状态安全管控:严格校验会话状态(避免重复启动、已关闭会话启动失败),通过状态标记(_starting/_closed)确保流程安全性,减少异常。
  • 支持会话回放与状态恢复:提供 _run_replay 接口支持从 JSON 数据恢复历史会话,便于调试、任务续跑和场景复现。
  • 精细化日志与监控:集成带会话上下文的日志器,记录启动耗时、成功状态、状态恢复等元数据,便于问题排查和系统监控。
  • 安全与隔离设计:通过自定义密钥处理器(UserSecrets)安全管理第三方密钥,运行时环境隔离执行代码,避免敏感信息泄露和系统风险。
  • 状态驱动的事件机制:启动时根据是否有初始消息自动设置Agent状态(运行中 / 等待用户输入),并通过事件流同步状态,确保组件间状态一致性。
AgentSession 定义如下:
  1. class AgentSession:
  2.     """
  3.         Agent会话类:封装Agent运行的完整上下文,管理Agent、控制器、运行时、内存等核心组件的生命周期。
  4.         属性说明:
  5.         controller: Agent控制器实例(负责调度Agent执行流程)
  6.         sid: 会话唯一标识
  7.         user_id: 用户ID(可选)
  8.         event_stream: 事件流(组件间通信核心)
  9.         llm_registry: LLM注册表(管理LLM实例)
  10.         file_store: 文件存储(持久化会话数据)
  11.         runtime: 运行时环境(如沙盒,执行代码/命令)
  12.         memory: Agent内存(存储会话历史、上下文等)
  13.         _starting: 会话启动中标记
  14.         _started_at: 会话启动时间戳
  15.         _closed: 会话关闭标记
  16.         loop: 异步事件循环
  17.         logger: 带会话上下文的日志器
  18.     """
  19.     sid: str
  20.     user_id: Optional[str]
  21.     event_stream: EventStream
  22.     llm_registry: LLMRegistry
  23.     file_store: FileStore
  24.     controller: Optional[AgentController] = None
  25.     runtime: Optional[Runtime] = None
  26.     memory: Optional[Memory] = None
  27.     _starting: bool = False
  28.     _started_at: float = 0
  29.     _closed: bool = False
  30.     loop: Optional[asyncio.AbstractEventLoop] = None
  31.     logger: LoggerAdapter
  32.     def __init__(
  33.         self,
  34.         sid: str,
  35.         file_store: FileStore,
  36.         llm_registry: LLMRegistry,
  37.         conversation_stats: ConversationStats,
  38.         status_callback: Optional[Callable] = None,
  39.         user_id: Optional[str] = None,
  40.     ) -> None:
  41.         """
  42.         初始化AgentSession实例。
  43.         参数:
  44.             sid: 会话ID(唯一标识)
  45.             file_store: 文件存储实例(用于事件流、内存数据持久化)
  46.             llm_registry: LLM注册表实例(提供LLM资源)
  47.             conversation_stats: 对话统计实例(记录会话相关统计数据)
  48.             status_callback: 状态回调函数(可选,会话状态变更时触发)
  49.             user_id: 用户ID(可选,用于用户级数据隔离)
  50.         """
  51.         self.sid = sid
  52.         # 初始化事件流(会话内组件通信的核心枢纽)
  53.         self.event_stream = EventStream(sid, file_store, user_id)
  54.         self.file_store = file_store
  55.         self._status_callback = status_callback  # 状态变更回调(如通知客户端)
  56.         self.user_id = user_id
  57.         # 初始化带会话上下文的日志器(便于追踪会话级日志)
  58.         self.logger = OpenHandsLoggerAdapter(
  59.             extra={'session_id': sid, 'user_id': user_id}
  60.         )
  61.         self.llm_registry = llm_registry  # 绑定LLM注册表
  62.         self.conversation_stats = conversation_stats  # 绑定对话统计实例
复制代码
AgentSession 初始化完成后向事件流中添加ChangeAgentStateAction事件。
4.png

具体代码如下:
  1.     async def start(
  2.         self,
  3.         runtime_name: str,  # 运行时名称(如"sandbox",指定运行环境类型)
  4.         config: OpenHandsConfig,  # 框架全局配置
  5.         agent: Agent,  # 已初始化的Agent实例
  6.         max_iterations: int,  # Agent执行的最大迭代次数(防止无限循环)
  7.         git_provider_tokens: Optional[PROVIDER_TOKEN_TYPE] = None,  # Git提供商令牌(如GitHub令牌)
  8.         custom_secrets: Optional[CUSTOM_SECRETS_TYPE] = None,  # 自定义密钥(第三方服务访问用)
  9.         max_budget_per_task: Optional[float] = None,  # 单任务最大预算(可选)
  10.         agent_to_llm_config: Optional[Dict[str, LLMConfig]] = None,  # Agent-LLM配置映射
  11.         agent_configs: Optional[Dict[str, AgentConfig]] = None,  # 所有Agent配置字典
  12.         selected_repository: Optional[str] = None,  # 选中的Git仓库地址(可选)
  13.         selected_branch: Optional[str] = None,  # 选中的仓库分支(可选)
  14.         initial_message: Optional[MessageAction] = None,  # 初始用户消息(可选)
  15.         conversation_instructions: Optional[str] = None,  # 会话指令(自定义Agent行为)
  16.         replay_json: Optional[str] = None,  # 会话回放JSON数据(可选)
  17.     ) -> None:
  18.         """
  19.         启动Agent会话:初始化运行时、内存、控制器,触发Agent开始执行。
  20.         
  21.         核心流程:
  22.         1. 校验会话状态(避免重复启动)
  23.         2. 创建运行时环境(如沙盒)
  24.         3. 配置Git令牌、自定义密钥
  25.         4. 创建Agent内存(存储上下文、会话指令等)
  26.         5. (可选)添加MCP工具到Agent
  27.         6. (可选)执行会话回放
  28.         7. 创建Agent控制器(调度Agent执行)
  29.         8. 发送初始事件(启动状态/等待用户输入)
  30.         """
  31.         # 校验会话状态:已存在控制器或运行时 → 抛出异常(避免重复启动)
  32.         if self.controller or self.runtime:
  33.             raise RuntimeError(
  34.                 'Session already started. You need to close this session and start a new one.'
  35.             )
  36.         # 会话已关闭 → 日志警告并返回
  37.         if self._closed:
  38.             self.logger.warning('Session closed before starting')
  39.             return
  40.         
  41.         self._starting = True  # 标记会话启动中
  42.         started_at = time.time()
  43.         self._started_at = started_at
  44.         finished = False  # 执行完成标记(用于监控)
  45.         runtime_connected = False  # 运行时连接成功标记
  46.         restored_state = False  # 状态恢复标记(会话回放/恢复场景)
  47.         
  48.         # 初始化自定义密钥处理器(管理第三方服务密钥)
  49.         custom_secrets_handler = UserSecrets(
  50.             custom_secrets=custom_secrets if custom_secrets else {}
  51.         )
  52.         try:
  53.             # 1. 创建运行时环境(如Docker沙盒)并连接
  54.             runtime_connected = await self._create_runtime(
  55.                 runtime_name=runtime_name,
  56.                 config=config,
  57.                 agent=agent,
  58.                 git_provider_tokens=git_provider_tokens,
  59.                 custom_secrets=custom_secrets,
  60.                 selected_repository=selected_repository,
  61.                 selected_branch=selected_branch,
  62.             )
  63.             # 提取仓库目录名(若指定了Git仓库)
  64.             repo_directory = None
  65.             if self.runtime and runtime_connected and selected_repository:
  66.                 repo_directory = selected_repository.split('/')[-1]  # 从仓库地址提取目录名(如"openhands")
  67.             # 2. 配置Git提供商令牌(若有)
  68.             if git_provider_tokens:
  69.                 provider_handler = ProviderHandler(provider_tokens=git_provider_tokens)
  70.                 await provider_handler.set_event_stream_secrets(self.event_stream)  # 注入令牌到事件流
  71.             # 3. 配置自定义密钥(若有)
  72.             if custom_secrets:
  73.                 custom_secrets_handler.set_event_stream_secrets(self.event_stream)  # 注入自定义密钥到事件流
  74.             # 4. 创建Agent内存(存储会话上下文、仓库信息、会话指令等)
  75.             self.memory = await self._create_memory(
  76.                 selected_repository=selected_repository,
  77.                 repo_directory=repo_directory,
  78.                 selected_branch=selected_branch,
  79.                 conversation_instructions=conversation_instructions,
  80.                 custom_secrets_descriptions=custom_secrets_handler.get_custom_secrets_descriptions(),  # 密钥描述(供Agent参考)
  81.                 working_dir=config.workspace_mount_path_in_sandbox,  # 沙盒中的工作目录路径
  82.             )
  83.             # 5. (可选)添加MCP工具到Agent(需运行时已连接且Agent启用MCP)
  84.             if self.runtime and runtime_connected and agent.config.enable_mcp:
  85.                 await add_mcp_tools_to_agent(agent, self.runtime, self.memory)
  86.             # 6. (可选)执行会话回放(从replay_json恢复历史会话)
  87.             if replay_json:
  88.                 initial_message = self._run_replay(
  89.                     initial_message,
  90.                     replay_json,
  91.                     agent,
  92.                     config,
  93.                     max_iterations,
  94.                     max_budget_per_task,
  95.                     agent_to_llm_config,
  96.                     agent_configs,
  97.                 )
  98.             # 7. (正常场景)创建Agent控制器(调度Agent执行流程)
  99.             else:
  100.                 self.controller, restored_state = self._create_controller(
  101.                     agent,
  102.                     config.security.confirmation_mode,  # 安全确认模式(如自动确认/手动确认)
  103.                     max_iterations,
  104.                     max_budget_per_task=max_budget_per_task,
  105.                     agent_to_llm_config=agent_to_llm_config,
  106.                     agent_configs=agent_configs,
  107.                 )
  108.             # 8. 发送初始事件(根据是否有初始消息设置Agent状态)
  109.             if not self._closed:
  110.                 if initial_message:
  111.                     # 有初始消息 → 向事件流添加用户消息,设置Agent状态为"运行中"
  112.                     self.event_stream.add_event(initial_message, EventSource.USER)
  113.                     self.event_stream.add_event(
  114.                         ChangeAgentStateAction(AgentState.RUNNING),
  115.                         EventSource.ENVIRONMENT,
  116.                     )
  117.                 else:
  118.                     # 无初始消息 → 设置Agent状态为"等待用户输入"
  119.                     self.event_stream.add_event(
  120.                         ChangeAgentStateAction(AgentState.AWAITING_USER_INPUT),
  121.                         EventSource.ENVIRONMENT,
  122.                     )
  123.             finished = True  # 标记执行完成
  124.         finally:
  125.             self._starting = False  # 取消启动中标记
  126.             # 计算启动结果(是否成功:执行完成且运行时连接成功)
  127.             success = finished and runtime_connected
  128.             duration = time.time() - started_at  # 计算启动耗时
  129.             # 日志元数据(用于监控和分析)
  130.             # 记录启动结果日志
复制代码
2.4.2 初始化Agent

_start_agent_loop 会调用 initialize_agent 初始化 Agent。session初始化完成后调用initialize_agent进一步完成agent的其余模块初始化工作,首先创建llm和agent,然后调用agent_session.start()。
核心作用

initialize_agent 是 OpenHands 框架中 Agent实例的初始化核心方法,负责将用户设置、默认配置、第三方服务配置融合为最终运行配置,创建Agent实例并启动Agent会话,是连接配置与Agent运行的关键桥梁,确保Agent具备完成任务所需的所有能力(工具访问、LLM 支持、安全控制等)。
核心特色


  • 配置融合机制:用户设置优先于默认配置,支持安全、沙盒、Git、第三方服务等多维度配置的灵活覆盖,兼顾通用性与个性化需求。
  • 模块化压缩器设计:默认启用三阶段上下文压缩器流水线,按 “对话窗口→浏览器输出→LLM 总结” 顺序优化上下文,平衡上下文相关性与模型输入长度限制。
  • 完整的服务配置:自动配置 MCP 服务器(Agent与工具的通信枢纽),支持自定义 MCP 配置扩展,适配不同部署环境的工具通信需求。
  • 安全与隐私保护:敏感信息(如沙盒 API 密钥)通过 get_secret_value() 安全提取,未知错误仅返回错误类型,避免泄露敏感配置。
  • 丰富的扩展参数:支持 Git 仓库访问、自定义密钥、会话指令等扩展参数,适配代码开发、第三方服务集成等复杂场景。
  • 精细化错误处理:区分不同类型错误(微Agent验证错误、值错误、未知错误),返回针对性错误信息,便于问题排查。
  • 状态可视化:初始化开始时发送 AgentState.LOADING 状态事件,让客户端实时感知Agent启动进度,提升用户体验。
初始化流程图

5.png

initialize_agent 代码如下:
  1.     async def initialize_agent(
  2.         self,
  3.         settings: Settings,  # 用户/会话设置(含Agent类型、安全配置等)
  4.         initial_message: Optional[MessageAction] = None,  # 初始用户消息(可选)
  5.         replay_json: Optional[str] = None,  # 会话回放JSON(可选)
  6.     ) -> None:
  7.         """
  8.         初始化Agent核心流程:
  9.         1. 更新会话配置(融合用户设置与默认配置)
  10.         2. 配置MCP服务器(用于工具通信)
  11.         3. 初始化Agent配置(含上下文压缩器)
  12.         4. 创建Agent实例并启动Agent会话
  13.         """
  14.         # 1. 发送Agent状态变更事件:标记为"加载中"
  15.         self.agent_session.event_stream.add_event(
  16.             AgentStateChangedObservation('', AgentState.LOADING),
  17.             EventSource.ENVIRONMENT,  # 事件来源:环境
  18.         )
  19.         # 2. 融合用户设置与默认配置(用户设置优先)
  20.         # 确定Agent类型(用户设置优先,否则用默认)
  21.         agent_cls = settings.agent or self.config.default_agent
  22.         
  23.         # 安全配置:确认模式(用户设置优先)
  24.         self.config.security.confirmation_mode = (
  25.             self.config.security.confirmation_mode
  26.             if settings.confirmation_mode is None
  27.             else settings.confirmation_mode
  28.         )
  29.         
  30.         # 安全配置:安全分析器(用户设置优先)
  31.         self.config.security.security_analyzer = (
  32.             self.config.security.security_analyzer
  33.             if settings.security_analyzer is None
  34.             else settings.security_analyzer
  35.         )
  36.         
  37.         # 沙盒配置:基础容器镜像(用户设置优先)
  38.         self.config.sandbox.base_container_image = (
  39.             settings.sandbox_base_container_image
  40.             or self.config.sandbox.base_container_image
  41.         )
  42.         
  43.         # 沙盒配置:运行时容器镜像(用户设置优先,逻辑:基础镜像或运行时镜像有一个设置则用用户值)
  44.         self.config.sandbox.runtime_container_image = (
  45.             settings.sandbox_runtime_container_image
  46.             if settings.sandbox_base_container_image
  47.             or settings.sandbox_runtime_container_image
  48.             else self.config.sandbox.runtime_container_image
  49.         )
  50.         # 3. Git配置:若用户设置提供,覆盖默认值
  51.         git_user_name = getattr(settings, 'git_user_name', None)
  52.         if git_user_name is not None:
  53.             self.config.git_user_name = git_user_name
  54.         git_user_email = getattr(settings, 'git_user_email', None)
  55.         if git_user_email is not None:
  56.             self.config.git_user_email = git_user_email
  57.         
  58.         # 4. 任务配置:最大迭代次数(用户设置优先)
  59.         max_iterations = settings.max_iterations or self.config.max_iterations
  60.         
  61.         # 5. 任务配置:单任务最大预算(用户设置优先,支持None值)
  62.         max_budget_per_task = (
  63.             settings.max_budget_per_task
  64.             if settings.max_budget_per_task is not None
  65.             else self.config.max_budget_per_task
  66.         )
  67.         # 6. 第三方服务配置:搜索API密钥、沙盒API密钥
  68.         self.config.search_api_key = settings.search_api_key
  69.         if settings.sandbox_api_key:
  70.             # 提取沙盒API密钥的实际值(get_secret_value()用于安全存储的密钥)
  71.             self.config.sandbox.api_key = settings.sandbox_api_key.get_secret_value()
  72.         # 7. MCP服务器配置(用于Agent与工具的通信)
  73.         
  74.         # 若用户设置提供自定义MCP配置,合并到全局配置
  75.         mcp_config = getattr(settings, 'mcp_config', None)
  76.         if mcp_config is not None:
  77.             self.config.mcp = self.config.mcp.merge(mcp_config)
  78.         
  79.         # 默认添加OpenHands的MCP服务器(HTTP + STDIO类型)
  80.         openhands_mcp_server, openhands_mcp_stdio_servers = (
  81.             OpenHandsMCPConfigImpl.create_default_mcp_server_config(
  82.                 self.config.mcp_host, self.config, self.user_id
  83.             )
  84.         )
  85.         if openhands_mcp_server:
  86.             self.config.mcp.shttp_servers.append(openhands_mcp_server)
  87.             self.config.mcp.stdio_servers.extend(openhands_mcp_stdio_servers)
  88.         # 8. Agent配置初始化
  89.         # 获取指定Agent类型的配置
  90.         agent_config = self.config.get_agent_config(agent_cls)
  91.         # 传递运行时信息到Agent配置(用于工具的运行时特定行为)
  92.         agent_config.runtime = self.config.runtime
  93.         # 获取Agent对应的LLM配置
  94.         agent_name = agent_cls if agent_cls is not None else 'agent'
  95.         llm_config = self.config.get_llm_config_from_agent(agent_name)
  96.         
  97.         # 若启用默认上下文压缩器,配置压缩器流水线
  98.         if settings.enable_default_condenser:
  99.             """
  100.             默认压缩器流水线包含三个阶段(顺序重要):
  101.             1. 对话窗口压缩器:处理显式的压缩请求
  102.             2. 浏览器输出压缩器:限制浏览器观察结果的大小(注意力窗口=2)
  103.             3. LLM总结压缩器:限制传递给LLM的上下文大小
  104.             顺序设计原因:先处理浏览器输出,可减少总结成本(仅保留最新浏览器输出)
  105.             """
  106.             max_events_for_condenser = settings.condenser_max_size or 120  # 压缩器最大事件数(默认120)
  107.             default_condenser_config = CondenserPipelineConfig(
  108.                 condensers=[
  109.                     ConversationWindowCondenserConfig(),
  110.                     BrowserOutputCondenserConfig(attention_window=2),
  111.                     LLMSummarizingCondenserConfig(
  112.                         llm_config=llm_config,
  113.                         keep_first=4,  # 保留前4个事件(不压缩)
  114.                         max_size=max_events_for_condenser,  # 压缩后最大事件数
  115.                     ),
  116.                 ]
  117.             )
  118.             agent_config.condenser = default_condenser_config
  119.         
  120.         # 9. 创建Agent实例(通过Agent工厂方法获取对应类型的Agent类)
  121.         agent = Agent.get_cls(agent_cls)(agent_config, self.llm_registry)
  122.         # 10. 绑定LLM重试监听器(Agent会话中LLM重试时触发通知)
  123.         self.llm_registry.retry_listener = self._notify_on_llm_retry
  124.         # 11. 提取ConversationInitData类型设置中的扩展参数(若适用)
  125.         git_provider_tokens = None  # Git提供商令牌(用于代码仓库访问)
  126.         selected_repository = None  # 选中的代码仓库
  127.         selected_branch = None  # 选中的仓库分支
  128.         custom_secrets = None  # 自定义密钥(用于第三方服务访问)
  129.         conversation_instructions = None  # 会话指令(自定义Agent行为)
  130.         if isinstance(settings, ConversationInitData):
  131.             git_provider_tokens = settings.git_provider_tokens
  132.             selected_repository = settings.selected_repository
  133.             selected_branch = settings.selected_branch
  134.             custom_secrets = settings.custom_secrets
  135.             conversation_instructions = settings.conversation_instructions
  136.         # 12. 启动Agent会话(核心步骤)
  137.         try:
  138.             await self.agent_session.start(
  139.                 runtime_name=self.config.runtime,  # 运行时名称(如沙盒类型)
  140.                 config=self.config,  # 完整配置
  141.                 agent=agent,  # Agent实例
  142.                 max_iterations=max_iterations,  # 最大迭代次数
  143.                 max_budget_per_task=max_budget_per_task,  # 单任务最大预算
  144.                 agent_to_llm_config=self.config.get_agent_to_llm_config_map(),  # Agent-LLM配置映射
  145.                 agent_configs=self.config.get_agent_configs(),  # 所有Agent配置
  146.                 git_provider_tokens=git_provider_tokens,  # Git令牌
  147.                 custom_secrets=custom_secrets,  # 自定义密钥
  148.                 selected_repository=selected_repository,  # 选中仓库
  149.                 selected_branch=selected_branch,  # 选中分支
  150.                 initial_message=initial_message,  # 初始用户消息
  151.                 conversation_instructions=conversation_instructions,  # 会话指令
  152.                 replay_json=replay_json,  # 会话回放数据
  153.             )
  154.         except MicroagentValidationError as e:
  155.             # 微Agent验证错误:输出详细错误信息(帮助用户排查配置问题)
  156.             return
  157.         except ValueError as e:
  158.             # 值错误:区分微Agent相关错误和普通值错误
  159.             self.logger.exception(f"创建Agent会话失败: {e}")
  160.             error_message = str(e)
  161.             return
  162.         except Exception as e:
  163.             # 其他未知错误:仅输出错误类型(避免泄露敏感信息)
  164.             return
复制代码
2.4 用户交互(oh_user_action)逻辑

2.4.1 用户发消息

当用户发消息,就是通过socket在 oh_user_action 函数通过conversation_manager向事件中心中添加一条事件。代码位于 openhands\server\listen_socket.py。
  1. @sio.event
  2. async def oh_user_action(connection_id: str, data: dict[str, Any]) -> None:
  3.     await conversation_manager.send_to_event_stream(connection_id, data)
复制代码
旧API 会调用到这里。
  1. @sio.event
  2. async def oh_action(connection_id: str, data: dict[str, Any]) -> None:
  3.     # TODO: Remove this handler once all clients are updated to use oh_user_action
  4.     # Keeping for backward compatibility with in-progress sessions
  5.     await conversation_manager.send_to_event_stream(connection_id, data)
复制代码
2.4.2 事件添加

send_to_event_stream 位于 openhands\server\conversation_manager\standalone_conversation_manager.py。最终调用到 session.dispatch 向事件流发送事件。
  1.     async def send_to_event_stream(self, connection_id: str, data: dict):
  2.         # If there is a local session running, send to that
  3.         sid = self._local_connection_id_to_session_id.get(connection_id)
  4.         if not sid:
  5.             raise RuntimeError(f'no_connected_session:{connection_id}')
  6.         await self.send_event_to_conversation(sid, data)
  7.     async def send_event_to_conversation(self, sid: str, data: dict):
  8.         session = self._local_agent_loops_by_sid.get(sid)
  9.         if not session:
  10.             raise RuntimeError(f'no_conversation:{sid}')
  11.         await session.dispatch(data)
复制代码
dispatch 代码如下。
  1. async def dispatch(self, data: dict) -> None:
  2.     # ...
  3.     self.agent_session.event_stream.add_event(event, EventSource.USER)
复制代码
2.4.3 事件处理

当一条事件被加入系统的事件流后,究竟会触发哪些模块的响应?要解答这个问题,我们首先需要梳理系统中已订阅事件流的核心模块 —— 它们就像时刻待命的接收者,各自守在专属的消息通道旁:Session 模块订阅了 SERVER 通道,Runtime 模块对应 RUNTIME 通道,Memory 模块监听着 MEMORY 通道,而 AgentController 模块则聚焦于 AGENT_CONTROLLER 通道。
当用户发送一条消息并进入事件流后,这条消息会以广播的形式扩散到所有订阅通道,各个模块会通过预先注册的回调函数启动相应处理流程,具体流程如下:
AgentController 模块会在 _on_event 函数里面处理。
  1.     async def _on_event(self, event: Event) -> None:
  2.         if hasattr(event, 'hidden') and event.hidden:
  3.             return
  4.         self.state_tracker.add_history(event)
  5.         if isinstance(event, Action):
  6.             await self._handle_action(event)
  7.         elif isinstance(event, Observation):
  8.             await self._handle_observation(event)
  9.         should_step = self.should_step(event)
  10.         if should_step:
  11.             self.log(
  12.                 'debug',
  13.                 f'Stepping agent after event: {type(event).__name__}',
  14.                 extra={'msg_type': 'STEPPING_AGENT'},
  15.             )
  16.             await self._step_with_exception_handling()
  17.         elif isinstance(event, MessageAction) and event.source == EventSource.USER:
  18.             # If we received a user message but aren't stepping, log why
  19.             self.log(
  20.                 'warning',
  21.                 f'Not stepping agent after user message. Current state: {self.get_agent_state()}',
  22.                 extra={'msg_type': 'NOT_STEPPING_AFTER_USER_MESSAGE'},
  23.             )
复制代码
_handle_action 会调用  _handle_message_action。
  1.     async def _handle_action(self, action: Action) -> None:
  2.         """Handles an Action from the agent or delegate."""
  3.         if isinstance(action, ChangeAgentStateAction):
  4.             await self.set_agent_state_to(action.agent_state)  # type: ignore
  5.         elif isinstance(action, MessageAction):
  6.             await self._handle_message_action(action)
  7.         elif isinstance(action, AgentDelegateAction):
  8.             await self.start_delegate(action)
  9.             assert self.delegate is not None
  10.             # Post a MessageAction with the task for the delegate
  11.             if 'task' in action.inputs:
  12.                 self.event_stream.add_event(
  13.                     MessageAction(content='TASK: ' + action.inputs['task']),
  14.                     EventSource.USER,
  15.                 )
  16.                 await self.delegate.set_agent_state_to(AgentState.RUNNING)
  17.             return
  18.         elif isinstance(action, AgentFinishAction):
  19.             self.state.outputs = action.outputs
  20.             await self.set_agent_state_to(AgentState.FINISHED)
  21.         elif isinstance(action, AgentRejectAction):
  22.             self.state.outputs = action.outputs
  23.             await self.set_agent_state_to(AgentState.REJECTED)
复制代码
_handle_message_action 中,AgentController 模块会生成一条 RecallAction 事件并再次推入事件流,该事件的类型会根据当前是否为用户首次输入来判定:若是首次输入则设为 RecallType.WORKSPACE_CONTEXT,反之则设为 RecallType.KNOWLEDGE;
  1.     async def _handle_message_action(self, action: MessageAction) -> None:
  2.         """Handles message actions from the event stream.
  3.         Args:
  4.             action (MessageAction): The message action to handle.
  5.         """
  6.         if action.source == EventSource.USER:
  7.             # Use info level if LOG_ALL_EVENTS is set
  8.             log_level = (
  9.                 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug'
  10.             )
  11.             self.log(
  12.                 log_level,
  13.                 str(action),
  14.                 extra={'msg_type': 'ACTION', 'event_source': EventSource.USER},
  15.             )
  16.             # if this is the first user message for this agent, matters for the microagent info type
  17.             first_user_message = self._first_user_message()
  18.             is_first_user_message = (
  19.                 action.id == first_user_message.id if first_user_message else False
  20.             )
  21.             recall_type = (
  22.                 RecallType.WORKSPACE_CONTEXT
  23.                 if is_first_user_message
  24.                 else RecallType.KNOWLEDGE
  25.             )
  26.             recall_action = RecallAction(query=action.content, recall_type=recall_type)
  27.             self._pending_action = recall_action
  28.             # this is source=USER because the user message is the trigger for the microagent retrieval
  29.             self.event_stream.add_event(recall_action, EventSource.USER)
  30.             if self.get_agent_state() != AgentState.RUNNING:
  31.                 await self.set_agent_state_to(AgentState.RUNNING)
  32.         elif action.source == EventSource.AGENT:
  33.             # If the agent is waiting for a response, set the appropriate state
  34.             if action.wait_for_response:
  35.                 await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
复制代码
AgentController 模块会在 _on_event 函数里面调用 agent.step 方法,启动智能体的核心处理流程。
这里的 agent.step 的具体逻辑取决于配置文件中指定的智能体类型。以常见的 CodeActAgent 为例,该方法会指向 OpenHands/openhands/agenthub/codeact_agent/codeact_agent.py 中的 step 函数。最终将整理后的信息转化为符合大语言模型输入格式的 messages。
  1.     def step(self, state: State) -> 'Action':
  2.         """Performs one step using the CodeAct Agent.
  3.         This includes gathering info on previous steps and prompting the model to make a command to execute.
  4.         Parameters:
  5.         - state (State): used to get updated info
  6.         Returns:
  7.         - CmdRunAction(command) - bash command to run
  8.         - IPythonRunCellAction(code) - IPython code to run
  9.         - AgentDelegateAction(agent, inputs) - delegate action for (sub)task
  10.         - MessageAction(content) - Message action to run (e.g. ask for clarification)
  11.         - AgentFinishAction() - end the interaction
  12.         - CondensationAction(...) - condense conversation history by forgetting specified events and optionally providing a summary
  13.         - FileReadAction(path, ...) - read file content from specified path
  14.         - FileEditAction(path, ...) - edit file using LLM-based (deprecated) or ACI-based editing
  15.         - AgentThinkAction(thought) - log agent's thought/reasoning process
  16.         - CondensationRequestAction() - request condensation of conversation history
  17.         - BrowseInteractiveAction(browser_actions) - interact with browser using specified actions
  18.         - MCPAction(name, arguments) - interact with MCP server tools
  19.         """
  20.         # Continue with pending actions if any
  21.         if self.pending_actions:
  22.             return self.pending_actions.popleft()
  23.         # if we're done, go back
  24.         latest_user_message = state.get_last_user_message()
  25.         if latest_user_message and latest_user_message.content.strip() == '/exit':
  26.             return AgentFinishAction()
  27.         # Condense the events from the state. If we get a view we'll pass those
  28.         # to the conversation manager for processing, but if we get a condensation
  29.         # event we'll just return that instead of an action. The controller will
  30.         # immediately ask the agent to step again with the new view.
  31.         condensed_history: list[Event] = []
  32.         match self.condenser.condensed_history(state):
  33.             case View(events=events):
  34.                 condensed_history = events
  35.             case Condensation(action=condensation_action):
  36.                 return condensation_action
  37.         initial_user_message = self._get_initial_user_message(state.history)
  38.         messages = self._get_messages(condensed_history, initial_user_message)
  39.         params: dict = {
  40.             'messages': messages,
  41.         }
  42.         params['tools'] = check_tools(self.tools, self.llm.config)
  43.         params['extra_body'] = {
  44.             'metadata': state.to_llm_metadata(
  45.                 model_name=self.llm.config.model, agent_name=self.name
  46.             )
  47.         }
  48.         response = self.llm.completion(**params)
  49.         logger.debug(f'Response from LLM: {response}')
  50.         actions = self.response_to_actions(response)
  51.         logger.debug(f'Actions after response_to_actions: {actions}')
  52.         for action in actions:
  53.             self.pending_actions.append(action)
  54.         return self.pending_actions.popleft()
复制代码
当大语言模型处理完成并返回结果后,该结果会被封装成 Action 对象重新送入事件流。假设模型直接返回了 MessageAction 类型的结果,Session 模块便会捕获这条事件,并将其中的内容转发至前端界面,完成向用户的反馈展示。
0xFF 参考

https://docs.all-hands.dev/openhands/usage/architecture/backend
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第二篇:Agent 相关核心概念】  克里
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第一篇:系列导读】 克里
Coding Agent之Openhands解析(含代码)  Arrow
OpenHands 源码解读  一力辉
https://adk.wiki/

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册