找回密码
 立即注册
首页 业界区 业界 AI Agent框架探秘:拆解 OpenHands(9)--- AgentContr ...

AI Agent框架探秘:拆解 OpenHands(9)--- AgentController

阴昭昭 昨天 21:40
AI Agent框架探秘:拆解 OpenHands(9)---  AgentController


目录

  • AI Agent框架探秘:拆解 OpenHands(9)---  AgentController

    • 0x00 概要
    • 0x01 为何需要 AgentController ?

      • 1.1 问题所在

        • 1.1.1 概率偏差
        • 1.1.2 长任务的心跳与续命
        • 1.1.3 模型即智能体?逻辑闭环≠系统闭环

      • 1.2 解决思路
      • 1.3 Anthropic 博客

    • 0x02 AgentController

      • 2.1 定义
      • 2.2 核心职责
      • 2.3 具体功能
      • 2.4 组织架构
      • 2.5 多个实例
      • 2.6 工作流程

    • 0x03 重点功能

      • 3.1 Agent路由

        • 3.1.1 流程
        • 3.1.2 代码

      • 3.2 代理生命周期管理

        • 3.2.1 流程
        • 3.2.2 代码

      • 3.3  代理执行控制

        • 3.3.1 流程
        • 3.3.2 代码

      • 3.4 回调

        • 3.4.1 核心特色
        • 3.4.2 流程
        • 3.4.3 代码

      • 3.5 全链路可观测
      • 3.6 驯服决策的 “不确定性”

        • 3.6.1 流程
        • 3.6.2 代码


    • 0xFF 参考


0x00 概要

一个成熟的 Agent 系统,必须在不干扰 Agent 自主决策的前提下,提供外部管控接口。比如用户可能需要暂停任务进行参数调整,或是在发现错误时终止执行,甚至在任务中途切换代理角色。这种全生命周期的可控性,需要工作流层设计出灵活的状态机和事件触发机制,在自主性与可控性之间找到完美平衡。
AgentController 是 OpenHands 框架中管理智能体事件处理与状态流转的核心控制器,负责接收事件流回调、转发事件至子智能体(delegate)、处理动作(Action)与观察结果(Observation)等核心事件,并决定是否触发智能体下一步操作,是协调智能体运行节奏的关键组件。
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 为何需要 AgentController ?

我们来看看为何需要 AgentController。
1.1 问题所在

1.1.1 概率偏差

Agent在受控环境里往往乖巧听话,可一旦投进真实的生产线,就经常出错。具体看来,这些偏差大致分为如下基几类:

  • 焦点失准——对话跑偏,回复冗长,用户真正需求被淹没;
  • 范围失准——分不清哪些任务属于自己,硬着头皮揽活,结果一团糟;
  • 幻觉——凭空捏造事实,把错误信息包装得信誓旦旦;
  • 合规失准——最高危的一条,触碰业务红线或法律高压线,例如擅自给出金融、医疗建议;
  • 指令失准——开发者写在系统提示里的“铁律”被它视而不见,关键逻辑瞬间失效。
这暴露出当下开发方法的根本短板——我们正试图用一段柔软的自然语言提示,去勒住一个以概率为引擎的黑盒。其本质原因是:眼下主流的“大语言模型代理”本质上是跟概率打交道:同一个提问,两次回答可能截然不同。于是,代理的最终行为像极了抽盲盒——上线前谁也说不准它会抽到哪一面,生产环境因此危机四伏。
有研究者提出,Agent的运行过程本质上就是一条概率链(Probability Chain)
既然本质是概率,设计工作就不再是“教模型说话”,而是“操纵概率”。无论代理是在写代码、做客服还是玩游戏,它的终极目标只有一个:在给定背景的前提下,把“正确动作序列”出现的概率推到最大。
我们管理的并非具备严谨逻辑推理的实体,而是一个靠模式匹配运转的概率装置。因此,想用不断膨胀的自然语言“君子协定”(Prompt)来根治这些偏差,已被证明低效且不可靠。唯有把概率链拆成可度量、可校准、可验证的环节,才能让代理在生产线里真正听话。
1.1.2 长任务的心跳与续命

与传统 “请求 - 响应” 模式不同,AI Agent 常需处理持续数分钟乃至数小时的长周期任务。这类任务的特殊性在于,执行过程中需应对网络波动、资源调度等突发状况,同时维持状态连续性 —— 这要求底层工作流架构具备强大的状态持久化能力,精准记录任务进展的每一处细节,确保即便出现中断,也能从断点无缝恢复,这对数据存储与流程设计提出了严苛要求。
在支撑长周期任务的智能系统中,AgentController 扮演着 “心脏” 与 “指挥中枢” 的核心角色。它既不直接编写任务代码(这是 Agent 的职责),也不构建运行环境(这是 Runtime 的功能),而是专注于保障任务全生命周期的顺畅推进,专门解决长周期任务面临的工程挑战,包括状态连续性维护、生命周期管理与系统可观测性保障。其核心价值在于将 “调度逻辑” 与 “决策逻辑” 解耦:让 Agent 聚焦于任务本身的思考与执行,而 AgentController 则掌控全局流程,使系统更稳定、可控。
1.1.3 模型即智能体?逻辑闭环≠系统闭环

如今行业内流传着一种观点:“未来每个大语言模型(LLM)都会成为一个智能体”。从技术发展的表象来看,这一判断似乎不无道理。无论是 Claude、ChatGPT 还是 Gemini,这些主流大语言模型都已嵌入工具调用、函数执行乃至文件处理等能力,表面上已经具备了 “思考 + 行动” 的基础条件。
但深入剖析便会发现,这些能力仅仅构成了 “逻辑闭环”,远未达到 “系统闭环” 的层面。基于此,我更看好 n8n 这类 “智能体运行时层(Agent runtime layer)” 的发展潜力,核心原因有三:

  • 并非所有团队都具备搭建复杂系统的工程能力;
  • 不少企业出于数据安全考量,不愿将核心数据交付给封闭平台;
  • 智能体在云端执行过程中难免出现各类问题,并非都能实现无差错运行。
这其中最关键的差异在于:模型能 “调用” 工具,却无法真正 “执行” 系统操作。以 Claude 的代码解释器、GPT 的函数调用功能为例,它们本质上只是返回一段调用描述,真正的执行动作仍需依赖AgentController这样的外部系统。这就像模型是智能体的 “大脑”,能够做出精准决策,却缺乏将决策落地的 “身体”;而AgentController 正是那个让抽象指令转化为具体行为的 “执行层”。尤其当应用场景具备高度定制化且高频使用的特点时,最优解绝不是每次都向大语言模型发起请求,而是构建一套可反复运行、可实时监控、可长期维护的完整系统。
再聪明的模型也会在第 N 次调用后撞墙——成本墙、合规墙、幻觉墙。未来比拼的不是谁家的 LLM 更会“思考”,而是谁家能把思考封装成“可反复、可验证、可审计”的长生命周期系统。Agent 的决胜点,不在参数量的军备竞赛,而在“心跳不停、流程不乱、预算不超”的工程耐力。大脑可以迭代,身体必须长情;掌控了这具“身体”的人,才真正握住了智能体时代的门票。
1.2 解决思路

一个有效的解决思路,是从“期望模型自发达标”转向“主动构建架构引导”。具体来说,就是通过一套设计精良的外部框架,主动、系统地引导模型行为,同时对其进行约束与验证。
工程实践中最关键的认知的是:你要做的不是简单“调用一次模型”,而是设计一个能持续多轮交互的闭环系统。从“大模型”到“Agentic AI”的演进,本质上是从单纯的“堆砌算力与智力”,走向更复杂的“系统工程构建”。其核心原则是,把 Agent 当作“完整系统”来设计,而非仅仅看作“模型能力的叠加”。
基于这个原则,有两个关键方向需要把握:

  • 要同等重视模型选择、工具设计、流程编排策略与运行阶段的治理;
  • 要把工具视为具有严格契约的能力单元,避免将其设计成功能模糊的“万能函数”。
这样做的目的,是确保大语言模型(LLM)的强大能力,始终在预设的安全框架内发挥作用。这种思路的核心,正是AI Agent开发从依赖经验的“炼丹式尝试”,向系统化工程学方法的转变,从依赖经验与运气的“艺术化创作”,逐步走向可预测、可管理、可验证的“严谨工程实践”。
1.3 Anthropic 博客

Anthropic 在其博客中指出了长运行 Agent 的失效模式与解决方案,具体如下:
问题初始化Agent行为编码Agent行为Claude 过早宣布整个项目胜利建立功能清单文件:依据输入规格,生成一份结构化 JSON 文件,列出端到端的功能描述。会话伊始读取功能清单文件,仅选择一项功能开始实现。Claude 留下带有 Bug 或未记录进展的环境初始化 Git 仓库并撰写进展笔记文件。会话开始时阅读进展笔记与 Git 提交日志,并在开发服务器上运行基础测试以捕捉未记录的缺陷;会话结束时提交 Git 并更新进展。Claude 过早将功能标记为完成建立功能清单文件。对所有功能进行自我验证,仅在仔细测试后将其标记为“通过”。Claude 需花费时间研究如何运行应用编写可启动开发服务器的 init.sh 脚本。会话开始时阅读 init.sh。0x02 AgentController

现在的 AI Agent 系统基本都遵循一个通用架构,主要包含三大件:

  • 负责"思考"的 LLM 后端
  • 负责"执行"的工具框架
  • 负责协调的控制循环
AgentController 类就对应第三部分,这是用户查询的主入口和协调者。负责管理整个事件循环,接收执行逻辑产出的事件,与其他组件协作处理并提交事件操作,并将处理后的事件转发到上游(如 UI)。它本质上根据产出的事件逐轮驱动对话。
2.1 定义

AgentController的主要代码如下:
  1. class AgentController:
  2.     # 控制器唯一标识ID
  3.     id: str
  4.     # 被控制的Agent实例(核心决策组件)
  5.     agent: Agent
  6.     # Agent执行的最大迭代次数(防止无限循环)
  7.     max_iterations: int
  8.     # 事件流实例(组件间通信的核心枢纽)
  9.     event_stream: EventStream
  10.     # 当前系统状态(包含完整上下文信息)
  11.     state: State
  12.     # 动作确认模式开关(开启时需确认后才执行Agent动作)
  13.     confirmation_mode: bool
  14.     # Agent名称到LLM配置的映射(用于委托代理场景)
  15.     agent_to_llm_config: dict[str, LLMConfig]
  16.     # Agent名称到Agent配置的映射(用于委托代理场景)
  17.     agent_configs: dict[str, AgentConfig]
  18.     # 父控制器实例(存在层级委托时非空)
  19.     parent: 'AgentController | None' = None
  20.     # 委托的子控制器实例(当前控制器委托任务时非空)
  21.     delegate: 'AgentController | None' = None
  22.     # 待处理的动作信息:元组包含动作对象和时间戳(记录动作创建时间)
  23.     _pending_action_info: tuple[Action, float] | None = None
  24.     # 控制器关闭状态标记(True表示已关闭,不再处理任务)
  25.     _closed: bool = False
  26.     # 缓存的第一条用户消息(用于初始化上下文等场景)
  27.     _cached_first_user_message: MessageAction | None = None
  28.     def __init__(
  29.         self,
  30.         agent: Agent,
  31.         event_stream: EventStream,
  32.         conversation_stats: ConversationStats,
  33.         iteration_delta: int,
  34.         budget_per_task_delta: float | None = None,
  35.         agent_to_llm_config: dict[str, LLMConfig] | None = None,
  36.         agent_configs: dict[str, AgentConfig] | None = None,
  37.         sid: str | None = None,
  38.         file_store: FileStore | None = None,
  39.         user_id: str | None = None,
  40.         confirmation_mode: bool = False,
  41.         initial_state: State | None = None,
  42.         is_delegate: bool = False,
  43.         headless_mode: bool = True,
  44.         status_callback: Callable | None = None,
  45.         replay_events: list[Event] | None = None,
  46.         security_analyzer: 'SecurityAnalyzer | None' = None,
  47.     ):
  48.         """初始化AgentController类的新实例。
  49.         参数:
  50.             agent: 被控制的Agent实例。
  51.             event_stream: 用于发布事件的事件流实例。
  52.             conversation_stats: 对话统计信息实例(记录交互指标等)。
  53.             iteration_delta: Agent可执行的最大迭代次数。
  54.             budget_per_task_delta: 每个任务允许的最大预算(单位:美元),超出则停止Agent。
  55.             agent_to_llm_config: Agent名称到LLM配置的映射字典(用于委托给其他Agent时)。
  56.             agent_configs: Agent名称到Agent配置的映射字典(用于委托给其他Agent时)。
  57.             sid: Agent的会话ID。
  58.             file_store: 文件存储实例(用于状态持久化等)。
  59.             user_id: 用户唯一标识。
  60.             confirmation_mode: 是否启用Agent动作的确认模式。
  61.             initial_state: 控制器的初始状态。
  62.             is_delegate: 该控制器是否为委托控制器(子控制器)。
  63.             headless_mode: Agent是否以无头模式运行(无GUI交互)。
  64.             status_callback: 处理状态更新的可选回调函数。
  65.             replay_events: 用于回放的事件日志列表。
  66.             security_analyzer: 安全分析器实例(用于动作安全校验)。
  67.         """
  68.         # 初始化控制器ID:优先使用传入的sid,否则使用事件流的sid
  69.         self.id = sid or event_stream.sid
  70.         # 记录用户ID
  71.         self.user_id = user_id
  72.         # 记录文件存储实例
  73.         self.file_store = file_store
  74.         # 绑定被控制的Agent
  75.         self.agent = agent
  76.         # 记录无头模式状态
  77.         self.headless_mode = headless_mode
  78.         # 标记当前控制器是否为委托控制器
  79.         self.is_delegate = is_delegate
  80.         # 绑定对话统计实例
  81.         self.conversation_stats = conversation_stats
  82.         # 先设置事件流,后续可能需要订阅事件
  83.         self.event_stream = event_stream
  84.         # 非委托控制器需要订阅事件流,以接收并处理系统事件
  85.         if not self.is_delegate:
  86.             self.event_stream.subscribe(
  87.                 EventStreamSubscriber.AGENT_CONTROLLER,  # 订阅者类型(标识为Agent控制器)
  88.                 self.on_event,  # 事件回调处理函数
  89.                 self.id  # 订阅者ID(当前控制器ID)
  90.             )
  91.         # 初始化状态跟踪器:负责状态的管理、持久化与恢复
  92.         self.state_tracker = StateTracker(sid, file_store, user_id)
  93.         # 设置初始状态:支持从历史会话状态、父Agent状态或全新状态初始化
  94.         self.set_initial_state(
  95.             state=initial_state,  # 传入的初始状态(可能为None)
  96.             conversation_stats=conversation_stats,  # 对话统计信息
  97.             max_iterations=iteration_delta,  # 最大迭代次数
  98.             max_budget_per_task=budget_per_task_delta,  # 任务最大预算
  99.             confirmation_mode=confirmation_mode,  # 动作确认模式
  100.         )
  101.         # 将状态跟踪器中的状态赋值给控制器的state属性
  102.         # 注意:此处为了向后兼容暂时共享状态,后续应将状态逻辑统一迁移到状态管理器
  103.         self.state = self.state_tracker.state
  104.         # 初始化Agent到LLM配置的映射:无传入配置则设为空字典
  105.         self.agent_to_llm_config = agent_to_llm_config if agent_to_llm_config else {}
  106.         # 初始化Agent配置映射:无传入配置则设为空字典
  107.         self.agent_configs = agent_configs if agent_configs else {}
  108.         # 记录初始的最大迭代次数(用于后续重置等场景)
  109.         self._initial_max_iterations = iteration_delta
  110.         # 记录初始的任务最大预算(用于后续重置等场景)
  111.         self._initial_max_budget_per_task = budget_per_task_delta
  112.         # 初始化卡顿检测器:用于识别Agent是否陷入执行卡顿
  113.         self._stuck_detector = StuckDetector(self.state)
  114.         # 绑定状态回调函数(用于对外通知状态更新)
  115.         self.status_callback = status_callback
  116.         # 初始化回放管理器:用于处理事件回放场景
  117.         self._replay_manager = ReplayManager(replay_events)
  118.         # 记录动作确认模式状态
  119.         self.confirmation_mode = confirmation_mode
  120.         # 绑定安全分析器实例(用于动作的安全校验)
  121.         self.security_analyzer = security_analyzer
  122.         # 向事件流中添加系统消息(初始化Agent的系统上下文等)
  123.         self._add_system_message()
复制代码
2.2 核心职责

AgentController 的核心职责可归结为三大支柱:

  • 监听事件流。它是系统事件流的核心订阅者,能够捕捉所有关键变化 —— 无论是用户指令、Agent 的决策输出,还是环境的反馈结果。这一能力使其成为系统的 “感知中枢”,确保对全局动态的全面掌控。
  • 管理状态机。它负责维护任务的当前状态,并根据接收到的事件触发精准的状态转换。例如,当 Agent 执行过程中需要用户确认时,AgentController 会主动暂停系统运行,等待用户输入后再恢复流程,保障任务执行的连贯性,比如:

    • 状态管理:维护代理的运行状态(RUNNING, STOPPED, ERROR, FINISHED等)
    • 初始化控制(创建和配置代理实例
    • 运行控制:控制代理的执行步骤和迭代
    • 关闭处理:优雅地关闭代理并清理资源

  • 驱动 Agent 运行。它通过 step() 方法推动系统迭代前进,这一方法并非无限循环,而是采用 “事件触发” 机制 —— 仅在接收到特定观察结果后才被激活,促使 Agent 针对新情况思考下一步行动,既保证了任务推进的效率,又避免了无效消耗。
AgentController 带来的架构化、确定性方法,并非要限制大语言模型的潜力。恰恰相反,这套方法是为这些潜力搭建一个“容器”,让其能安全地在生产环境中释放价值。它的核心作用,是把行为不确定的概率模型,封装成行为相对可控、风险可预判的软件组件。对于那些希望将AI技术真正落地到关键业务中的开发者和决策者来说,未来的核心竞争力,不仅在于能否用上最先进的模型,更在于是否掌握了驾驭这些模型的成熟方法论与稳健框架。给AI Agent套上“确定性”的缰绳,让其行为可控、风险可防,是它从“技术炫技”走向“实际创造价值”的必经之路。
或者说,Agent 框架应该做与模型能力提升正交的事,因为只要模型进步,工程上的工作就会白费。
2.3 具体功能

AgentController的具体功能如下:

  • 代理生命周期管理
    状态管理:维护代理的运行状态(RUNNING, STOPPED, ERROR, FINISHED等)
    初始化控制:创建和配置代理实例
    运行控制:控制代理的执行步骤和迭代
    关闭处理:优雅地关闭代理并清理资源
  • 事件处理与分发
    事件订阅:订阅 EventStream 中的事件
    事件路由:将事件分发给相应的处理逻辑
    动作处理:处理代理产生的各种动作(Action)
    观察处理:处理环境返回的观察结果(Observation)
  • 代理执行控制
    步进执行:通过_step() 方法控制代理逐步执行
    迭代限制:控制代理的最大执行步数
    预算管理:管理任务的预算限制(基于成本)
    卡死检测:检测并处理代理陷入循环的情况
  • 委托机制管理
    子代理创建:支持创建委托代理处理子任务
    层级管理:管理代理间的层级关系
    结果聚合:收集和处理委托代理的执行结果
  • 安全与确认机制
    安全分析:集成安全分析器评估动作风险
    确认模式:在高风险操作前请求用户确认
    权限控制:控制代理可执行的操作类型
  • 状态跟踪与持久化
    状态追踪:通过 StateTracker 跟踪代理状态
    历史记录:维护代理执行历史
    状态保存:持久化代理状态以便恢复
  • 错误处理与恢复
    异常处理:捕获并处理各种运行时异常
    错误状态:将代理置于适当的错误状态
    恢复机制:提供从错误状态恢复的能力
  • 指标监控
    成本跟踪:跟踪API调用成本
    令牌使用:监控提示和完成令牌的使用情况
    性能指标:收集执行性能数据
  • 重放与调试
    事件重放:支持重放历史事件用于调试
    轨迹记录:记录代理执行轨迹
  • 多代理协调
    父子代理协调:管理父子代理间的消息传递
    资源共享:在代理间共享资源和状态
2.4 组织架构

AgentController的架构如下。
1.jpeg

2.5 多个实例

从代码中可以看出:AgentController 不是单例模式。可以创建多个实例。
AgentController 类有一个标准的 init 方法,允许创建多个实例每个实例都有唯一的 id 属性(通常是 session ID):

  • 委托模式:代码中明确支持 parent-delegate 关系,其中一个 AgentController 可以创建另一个 AgentController作为委托
  • 每个会话一个实例:每个用户会话或任务通常会创建一个独立的 AgentController 实例,通过 sid(session ID)参数区分不同的控制器实例
  • 实例属性:每个实例都有自己的状态(state)、事件流(event_stream)、代理(agent)等独立的属性
因此,AgentController 是一个普通的类,不是单例,可以创建多个实例来管理不同的代理会话。项目中 AgentController 的数量是动态的,取决于:

  • 同时进行的会话数量(每个会话一个主控制器)
  • 每个会话中代理委托任务的数量(每个委托任务一个委托控制器)
系统会根据实际使用情况动态创建和销毁AgentController 。每个会话一个主控制器:

  • 每当用户开始一个新的会话或任务时,会创建一个主 AgentController 实例委托代理控制器。
  • 当主代理需要委托子任务给其他代理时,会为每个委托任务创建一个新的 AgentController 实例在 start_delegate 方法中创建委托控制器实例创建方式。
2.6 工作流程

AgentController 主要工作流程是初始化代理,管理状态,并驱动主循环,逐步推动代理前进。具体如下:

  • 初始化:创建代理实例并设置初始状态
  • 事件监听:订阅事件流并处理传入事件
  • 决策执行:根据事件决定是否执行代理步骤
  • 动作生成:让代理生成下一步动作
  • 结果处理:处理动作执行后的观察结果
  • 状态更新:更新代理状态并决定下一步行动
2.jpeg

0x03 重点功能

3.1 Agent路由

在多智能体系统里,“路由” 本质是让任务精准流转到合适的处理单元,工程上这一步通常靠工具调用机制落地。具体来说,当系统触发某个工具调用时,会先解析其中的AgentDelegateAction指令 —— 这个指令在AgentController里有专门的处理逻辑,一旦匹配成功,就会自动启动一个全新的委托智能体(Delegate Agent),之后所有相关的事件流都会定向转发给这个委托智能体,由它完成后续处理。
OpenHands 这套路由方案,更适合简单的单向任务流转场景,要是需要多级路由,就得靠嵌套AgentController来实现。但从实际工程实践来看,不太建议用多智能体路由的思路解决问题。核心原因是这种方式会让系统架构变得臃肿,不仅增加开发、调试的复杂度,还会引入很多不可控因素 —— 比如智能体间的通信延迟、状态同步误差等。
其实很多时候,单智能体搭配多个工具的方案就足够应对需求,完全没必要非得用多智能体。如果遇到单智能体处理起来吃力的场景,也可以先试试MicroAgent这类提示词增强方案,通过优化指令逻辑提升单智能体的处理能力,从而避开多智能体带来的复杂设计。
3.1.1 流程

OpenHands 多智能体系统的动作分发与委托智能体管理是系统任务流转和多智能体协同的核心逻辑。主要功能包括:

  • 动作类型路由:根据输入动作的类型(状态变更、消息、委托启动、任务完成 / 拒绝),分发到对应的处理方法,实现逻辑解耦。
  • 消息处理:区分用户 / 智能体来源的消息,执行日志记录、动态召回策略(首次消息召回上下文,非首次召回知识库)、智能体状态切换。
  • 委托智能体启动:支持子任务拆分,通过创建委托智能体处理细分任务,继承父智能体配置(迭代限制、预算、指标),同时维护独立会话标识和事件记录范围。
  • 状态与指标管理:确保父子智能体状态同步、全局指标累积,支持任务完成 / 拒绝后的状态标记和结果保存。
3.png

3.1.2 代码

主要特色

  • 模块化设计:动作处理按类型拆分,每个类型对应独立逻辑,易于扩展和维护(如新增动作类型只需添加分支判断)。
  • 多智能体协同:通过委托层级标记、共享事件流、指标复用,实现父子智能体高效协同,支持复杂任务的拆分与并行处理。
  • 动态适配能力:消息处理中根据是否为首次用户交互,自动切换召回策略,提升信息检索的精准性。
  • 鲁棒性保障:通过断言校验、默认配置兜底、状态继承机制,减少多智能体协同中的不可控因素,确保系统稳定性。
  • 可追溯性:委托智能体会话 ID、事件起始 ID、父指标快照等设计,便于任务流转和问题排查。
代码如下。
  1.     async def _handle_action(self, action: Action) -> None:
  2.         """处理来自智能体或委托智能体的动作。
  3.         根据动作的不同类型,分发到对应的处理逻辑,实现状态变更、消息处理、委托启动等核心功能。
  4.         """
  5.         # 处理"更改智能体状态"动作:直接更新智能体状态
  6.         if isinstance(action, ChangeAgentStateAction):
  7.             await self.set_agent_state_to(action.agent_state)  # type: ignore
  8.         # 处理"消息"动作:转发到专门的消息处理方法
  9.         elif isinstance(action, MessageAction):
  10.             await self._handle_message_action(action)
  11.         # 处理"启动委托智能体"动作:初始化并启动子智能体处理子任务
  12.         elif isinstance(action, AgentDelegateAction):
  13.             await self.start_delegate(action)
  14.             # 断言委托智能体已成功创建(确保后续逻辑安全执行)
  15.             assert self.delegate is not None
  16.             # 如果动作中包含"task"参数,向事件流添加任务消息,通知委托智能体
  17.             if 'task' in action.inputs:
  18.                 self.event_stream.add_event(
  19.                     MessageAction(content='TASK: ' + action.inputs['task']),
  20.                     EventSource.USER,
  21.                 )
  22.                 # 将委托智能体状态设置为运行中,开始处理子任务
  23.                 await self.delegate.set_agent_state_to(AgentState.RUNNING)
  24.             return  # 委托启动后,当前动作处理结束
  25.         # 处理"智能体完成任务"动作:保存输出结果并标记状态为完成
  26.         elif isinstance(action, AgentFinishAction):
  27.             self.state.outputs = action.outputs
  28.             await self.set_agent_state_to(AgentState.FINISHED)
  29.         # 处理"智能体拒绝任务"动作:保存输出结果并标记状态为拒绝
  30.         elif isinstance(action, AgentRejectAction):
  31.             self.state.outputs = action.outputs
  32.             await self.set_agent_state_to(AgentState.REJECTED)
  33.     async def _handle_message_action(self, action: MessageAction) -> None:
  34.         """处理事件流中的消息动作。
  35.         区分用户来源和智能体来源的消息,分别执行日志记录、信息召回、状态更新等逻辑。
  36.         参数:
  37.             action (MessageAction):待处理的消息动作对象,包含消息内容、来源等信息
  38.         """
  39.         # 处理用户来源的消息
  40.         if action.source == EventSource.USER:
  41.             # 日志级别控制:如果开启了"记录所有事件",则用info级别,否则用debug级别
  42.             log_level = (
  43.                 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug'
  44.             )
  45.             # 输出日志,附加消息类型和事件来源元数据
  46.             self.log(
  47.                 log_level,
  48.                 str(action),
  49.                 extra={'msg_type': 'ACTION', 'event_source': EventSource.USER},
  50.             )
  51.             # 判断当前消息是否为该智能体接收的第一条用户消息(影响后续召回策略)
  52.             first_user_message = self._first_user_message()
  53.             is_first_user_message = (
  54.                 action.id == first_user_message.id if first_user_message else False
  55.             )
  56.             # 首次消息:召回工作空间上下文;非首次:召回知识库内容
  57.             recall_type = (
  58.                 RecallType.WORKSPACE_CONTEXT
  59.                 if is_first_user_message
  60.                 else RecallType.KNOWLEDGE
  61.             )
  62.             # 创建召回动作,用于检索相关信息
  63.             recall_action = RecallAction(query=action.content, recall_type=recall_type)
  64.             # 记录待处理的召回动作
  65.             self._pending_action = recall_action
  66.             # 添加召回动作到事件流(来源标记为用户,因用户消息是召回触发源)
  67.             self.event_stream.add_event(recall_action, EventSource.USER)
  68.             # 如果智能体当前未处于运行状态,将其切换为运行状态
  69.             if self.get_agent_state() != AgentState.RUNNING:
  70.                 await self.set_agent_state_to(AgentState.RUNNING)
  71.         # 处理智能体来源的消息
  72.         elif action.source == EventSource.AGENT:
  73.             # 如果智能体标记需要等待用户响应,将状态切换为"等待用户输入"
  74.             if action.wait_for_response:
  75.                 await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
  76.     async def start_delegate(self, action: AgentDelegateAction) -> None:
  77.         """启动委托智能体以处理子任务。
  78.         OpenHands 是多智能体系统:
  79.         - 「任务(task)」:系统与用户之间的完整对话,始于用户初始输入(通常是任务描述),
  80.           终于智能体发起的完成动作、用户停止操作或错误触发。
  81.         - 「子任务(subtask)」:智能体与用户或其他智能体之间的对话。
  82.           若单个智能体即可完成任务,则任务与子任务合一;否则任务由多个子任务组成,每个子任务由独立智能体处理。
  83.         参数:
  84.             action (AgentDelegateAction):包含待启动委托智能体信息的动作对象
  85.         """
  86.         # 根据动作中指定的智能体名称,获取对应的智能体类
  87.         agent_cls: Type[Agent] = Agent.get_cls(action.agent)
  88.         # 获取智能体配置:优先使用动作指定的配置,未指定则复用当前智能体的配置
  89.         agent_config = self.agent_configs.get(action.agent, self.agent.config)
  90.         # 创建委托智能体实例(确保父子智能体共享LLM注册信息)
  91.         # 注:父子智能体共享指标,实现全局指标累积
  92.         delegate_agent = agent_cls(
  93.             config=agent_config, llm_registry=self.agent.llm_registry
  94.         )
  95.         # 启动委托智能体前,创建初始状态(继承父智能体关键配置)
  96.         state = State(
  97.             session_id=self.id.removesuffix('-delegate'),  # 会话ID:移除父智能体的委托后缀
  98.             user_id=self.user_id,  # 继承用户ID,保持用户关联
  99.             inputs=action.inputs or {},  # 子任务输入参数(默认为空字典)
  100.             iteration_flag=self.state.iteration_flag,  # 继承迭代控制标志(限制迭代次数)
  101.             budget_flag=self.state.budget_flag,  # 继承预算控制标志(限制资源使用)
  102.             delegate_level=self.state.delegate_level + 1,  # 委托层级+1(标识子智能体层级)
  103.             metrics=self.state.metrics,  # 共享全局指标(父子智能体指标统一累积)
  104.             start_id=self.event_stream.get_latest_event_id() + 1,  # 事件起始ID:从最新事件后开始记录
  105.             parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(),  # 父智能体指标快照(用于后续对比)
  106.             parent_iteration=self.state.iteration_flag.current_value,  # 父智能体当前迭代次数
  107.         )
  108.         # 输出调试日志:记录委托智能体启动信息
  109.         self.log(
  110.             'debug',
  111.             f'start delegate, creating agent {delegate_agent.name}',
  112.         )
  113.         # 创建委托智能体的控制器(核心:标记is_delegate=True,避免直接订阅事件流)
  114.         self.delegate = AgentController(
  115.             sid=self.id + '-delegate',  # 会话ID:在父ID后添加委托后缀,唯一标识
  116.             file_store=self.file_store,  # 继承文件存储对象(用于状态持久化)
  117.             user_id=self.user_id,  # 继承用户ID
  118.             agent=delegate_agent,  # 待管理的委托智能体实例
  119.             event_stream=self.event_stream,  # 共享事件流(父子智能体事件互通)
  120.             conversation_stats=self.conversation_stats,  # 继承对话统计信息
  121.             iteration_delta=self._initial_max_iterations,  # 迭代次数增量(子任务的最大迭代限制)
  122.             budget_per_task_delta=self._initial_max_budget_per_task,  # 单任务预算增量(子任务的资源限制)
  123.             agent_to_llm_config=self.agent_to_llm_config,  # 继承LLM配置映射
  124.             agent_configs=self.agent_configs,  # 继承智能体配置字典
  125.             initial_state=state,  # 初始状态(继承父智能体配置后的状态)
  126.             is_delegate=True,  # 标记为委托智能体(关键:避免重复订阅事件流)
  127.             headless_mode=self.headless_mode,  # 继承无头模式(无交互界面)配置
  128.             security_analyzer=self.security_analyzer,  # 继承安全分析器(用于安全校验)
  129.         )
复制代码
3.2 代理生命周期管理

对于Agent Controller来说,Agent 的状态管理是个很重要的工作,比如维护代理的运行状态,这就是 set_agent_state_to完成的工作。在各种函数中都有调用调用 set_agent_state_to。比如:_react_to_exception,_handle_action,_handle_observation,_handle_message_action 这些函数。
3.2.1 流程

该方法是 OpenHands 智能体的状态管理核心接口,负责统一处理智能体状态变更的全流程,维护代理的运行状态(RUNNING, STOPPED, ERROR, FINISHED等)确保状态一致性、可追溯性和系统稳定性。主要功能包括:

  • 状态变更校验:避免重复设置相同状态,减少无效操作。
  • 关联逻辑触发:状态切换为停止 / 错误时执行重置(释放资源),错误恢复为运行时调整控制限制(如迭代次数上限)。
  • 动作确认处理:用户确认 / 拒绝后,更新待处理动作的确认状态并分发到事件流,完成动作闭环。
  • 事件分发:状态变更后生成 AgentStateChangedObservation 事件,携带错误原因(错误状态时),供其他模块订阅响应。
  • 状态持久化:任何状态变更都强制保存,防止崩溃或意外导致状态丢失。
主要特色

  • 原子性设计:状态更新流程一气呵成,先更新状态再处理副作用,确保后续逻辑基于最新状态。
  • 副作用闭环:状态变更关联的重置、限制调整、动作处理、事件分发等逻辑统一封装,避免分散冗余。
  • 可追溯性:状态变更日志、事件携带的错误原因、持久化存储,形成完整的状态追溯链路。
  • 鲁棒性保障:重复状态拦截、崩溃防护(强制保存)、属性安全访问(hasattr 校验),提升系统稳定性。
  • 扩展性强:新增状态时只需在枚举中添加,核心流程无需大幅修改,适配不同业务场景。
4.png

3.2.2 代码
  1.     async def set_agent_state_to(self, new_state: AgentState) -> None:
  2.         """更新智能体状态并处理副作用。
  3.         核心职责:同步状态变更、触发关联逻辑(重置、限制调整、事件分发)、持久化状态,
  4.         确保状态变更的一致性和可追溯性。
  5.         参数:
  6.             new_state (AgentState):智能体要切换到的新状态
  7.         """
  8.         # 输出状态变更日志:包含智能体名称、旧状态、新状态
  9.         self.log(
  10.             'info',
  11.             f'Setting agent({self.agent.name}) state from {self.state.agent_state} to {new_state}',
  12.         )
  13.         # 状态未变更(新状态与当前状态一致):直接返回,避免重复处理
  14.         if new_state == self.state.agent_state:
  15.             return
  16.         # 保存旧状态:用于后续控制限制校验(如错误恢复后的限制调整)
  17.         old_state = self.state.agent_state
  18.         # 先更新状态:确保后续 _reset() 等方法能获取到最新状态
  19.         self.state.agent_state = new_state
  20.         # 状态切换为停止/错误时:执行重置逻辑(清空临时数据、释放资源)
  21.         if new_state in (AgentState.STOPPED, AgentState.ERROR):
  22.             self._reset()
  23.         # 错误状态恢复为运行状态时:尝试调整控制标志限制(如迭代次数上限、预算)
  24.         if old_state == AgentState.ERROR and new_state == AgentState.RUNNING:
  25.             self.state_tracker.maybe_increase_control_flags_limits(self.headless_mode)
  26.         # 待处理动作存在,且新状态为用户确认/拒绝:更新动作确认状态并分发事件
  27.         if self._pending_action is not None and (
  28.             new_state in (AgentState.USER_CONFIRMED, AgentState.USER_REJECTED)
  29.         ):
  30.             # 清空动作的思考过程(若有该属性)
  31.             if hasattr(self._pending_action, 'thought'):
  32.                 self._pending_action.thought = ''  # type: ignore[union-attr]
  33.             # 根据新状态设置动作确认状态
  34.             confirmation_state = (
  35.                 ActionConfirmationStatus.CONFIRMED
  36.                 if new_state == AgentState.USER_CONFIRMED
  37.                 else ActionConfirmationStatus.REJECTED
  38.             )
  39.             self._pending_action.confirmation_state = confirmation_state  
  40.             self._pending_action._id = None  # type: ignore[attr-defined]  # 清空动作ID(避免重复)
  41.             # 将更新后的动作添加到事件流,供其他模块处理
  42.             self.event_stream.add_event(self._pending_action, EventSource.AGENT)
  43.         # 构建状态变更观察事件:错误状态需携带错误原因
  44.         reason = self.state.last_error if new_state == AgentState.ERROR else ""
  45.         self.event_stream.add_event(
  46.             AgentStateChangedObservation('', self.state.agent_state, reason),
  47.             EventSource.ENVIRONMENT,  # 状态变更由环境触发
  48.         )
  49.         # 状态变更时强制保存状态:防止崩溃或意外情况导致状态丢失
  50.         self.save_state()
复制代码
3.3  代理执行控制

OpenHands 通过_step() 方法控制代理逐步执行,代理的最大执行步数,管理任务的预算限制(基于成本),检测并处理代理陷入循环的情况等。
3.3.1 流程

该代码是 OpenHands 智能体的核心单步执行逻辑,封装了智能体单步生命周期的完整流程,是系统任务执行的核心入口。主要功能包括:

  • 前置条件拦截:校验智能体状态(仅 RUNNING 状态可执行)和待处理动作(无 Pending 动作时才继续),避免并行执行冲突。
  • 核心可控性校验:同步预算与全局指标确保资源不超支,检测智能体是否陷入循环,校验迭代次数 / 预算等控制标志,防止系统失控。
  • 动作生成与回放适配:支持回放模式(直接从回放轨迹获取动作)和正常模式(调用智能体生成动作),适配不同使用场景。
  • 异常精细化处理:针对动作格式错误、LLM 响应异常、函数调用异常等,添加错误事件到事件流;针对上下文窗口溢出,支持历史截断或抛出统一异常,兼容不同 LLM 的错误提示格式。
  • 高危动作安全校验:对命令执行、文件操作、交互式浏览等风险动作,通过安全分析器标记风险等级,在确认模式下触发用户确认,保障执行安全。
  • 状态与事件管理:待确认动作自动切换智能体状态为 “等待用户确认”,非空动作同步到事件流供其他模块订阅,同时准备前端展示指标,兼顾系统协同与可视化需求。
5.png

3.3.2 代码
  1.     async def _step(self) -> None:
  2.         """执行父智能体或委托智能体的单步逻辑。
  3.         核心职责:检查智能体运行状态、拦截阻塞条件、执行控制标志校验、生成/回放动作、处理安全确认。
  4.         同时处理上下文溢出、动作异常等场景,确保单步执行的稳定性和安全性。
  5.         """
  6.         # 检查智能体状态:非运行状态则跳过执行
  7.         if self.get_agent_state() != AgentState.RUNNING:
  8.             self.log(
  9.                 'debug',
  10.                 f'Agent not stepping because state is {self.get_agent_state()} (not RUNNING)',
  11.                 extra={'msg_type': 'STEP_BLOCKED_STATE'},
  12.             )
  13.             return
  14.         # 检查是否有待处理动作:存在则跳过(避免并行执行冲突)
  15.         if self._pending_action:
  16.             action_id = getattr(self._pending_action, 'id', 'unknown')
  17.             action_type = type(self._pending_action).__name__
  18.             self.log(
  19.                 'debug',
  20.                 f'Agent not stepping because of pending action: {action_type} (id={action_id})',
  21.                 extra={'msg_type': 'STEP_BLOCKED_PENDING_ACTION'},
  22.             )
  23.             return
  24.         # 输出步骤日志:包含委托层级、本地步骤数、全局迭代数(便于调试追踪)
  25.         self.log(
  26.             'debug',
  27.             f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.get_local_step()} GLOBAL STEP {self.state.iteration_flag.current_value}',
  28.             extra={'msg_type': 'STEP'},
  29.         )
  30.         # 1. 同步预算与指标:确保所有 LLM 服务的消耗不超过预算限制
  31.         self.state_tracker.sync_budget_flag_with_metrics()
  32.         # 2. 检查智能体是否陷入循环:是则抛出异常并处理
  33.         if self._is_stuck():
  34.             await self._react_to_exception(
  35.                 AgentStuckInLoopError('Agent got stuck in a loop')
  36.             )
  37.             return
  38.         # 3. 执行控制标志校验:检查迭代次数、预算等限制是否超限
  39.         try:
  40.             self.state_tracker.run_control_flags()
  41.         except Exception as e:
  42.             logger.warning('Control flag limits hit')
  43.             await self._react_to_exception(e)
  44.             return
  45.         # 初始化动作:默认为空动作
  46.         action: Action = NullAction()
  47.         # 4. 处理回放模式:不执行智能体逻辑,直接从回放轨迹中获取动作
  48.         if self._replay_manager.should_replay():
  49.             action = self._replay_manager.step()
  50.         else:
  51.             # 非回放模式:调用智能体生成动作
  52.             try:
  53.                 action = self.agent.step(self.state)
  54.                 # 检查动作是否为空:空动作则抛出异常
  55.                 if action is None:
  56.                     raise LLMNoActionError('No action was returned')
  57.                 # 标记动作来源为智能体
  58.                 action._source = EventSource.AGENT  
  59.             except (
  60.                 LLMMalformedActionError,
  61.                 LLMNoActionError,
  62.                 LLMResponseError,
  63.                 FunctionCallValidationError,
  64.                 FunctionCallNotExistsError,
  65.             ) as e:
  66.                 # 处理动作生成相关异常:添加错误观察事件到事件流,直接返回
  67.                 self.event_stream.add_event(
  68.                     ErrorObservation(
  69.                         content=str(e),
  70.                     ),
  71.                     EventSource.AGENT,
  72.                 )
  73.                 return
  74.             except (ContextWindowExceededError, BadRequestError, OpenAIError) as e:
  75.                 # 处理上下文窗口溢出相关异常(兼容不同 LLM 的错误提示格式)
  76.                 error_str = str(e).lower()
  77.                 # 通过关键词匹配判断是否为上下文溢出(因部分 LLM 未统一异常类型)
  78.                 if (
  79.                     'contextwindowexceedederror' in error_str
  80.                     or 'prompt is too long' in error_str
  81.                     or 'input length and `max_tokens` exceed context limit' in error_str
  82.                     or 'please reduce the length of either one' in error_str
  83.                     or 'the request exceeds the available context size' in error_str
  84.                     or 'context length exceeded' in error_str
  85.                     # OpenRouter 上下文溢出错误关键词
  86.                     or (
  87.                         'sambanovaexception' in error_str
  88.                         and 'maximum context length' in error_str
  89.                     )
  90.                     # SambaNova 上下文溢出错误(需同时匹配两个关键词)
  91.                     or isinstance(e, ContextWindowExceededError)
  92.                 ):
  93.                     # 启用上下文截断:添加压缩请求动作,触发历史上下文截断
  94.                     if self.agent.config.enable_history_truncation:
  95.                         self.event_stream.add_event(
  96.                             CondensationRequestAction(), EventSource.AGENT
  97.                         )
  98.                         return
  99.                     else:
  100.                         # 未启用截断:抛出系统统一的上下文溢出异常
  101.                         raise LLMContextWindowExceedError()
  102.                 else:
  103.                     # 非上下文溢出错误:向上抛出原始异常
  104.                     raise e
  105.         # 5. 处理可执行动作的安全确认逻辑
  106.         if action.runnable:
  107.             # 筛选需要安全确认的动作类型(命令执行、文件操作、交互式浏览等)
  108.             if self.state.confirmation_mode and (
  109.                 type(action) is CmdRunAction
  110.                 or type(action) is IPythonRunCellAction
  111.                 or type(action) is BrowseInteractiveAction
  112.                 or type(action) is FileEditAction
  113.                 or type(action) is FileReadAction
  114.             ):
  115.                 # 调用安全分析器检测动作风险
  116.                 await self._handle_security_analyzer(action)
  117.                 # 获取动作的安全风险等级(由 LLM 或安全分析器标记)
  118.                 security_risk = getattr(
  119.                     action, 'security_risk', ActionSecurityRisk.UNKNOWN
  120.                 )
  121.                 # 定义需要用户确认的场景:高风险 或 未知风险且无安全分析器
  122.                 is_high_security_risk = security_risk == ActionSecurityRisk.HIGH
  123.                 is_ask_for_every_action = (
  124.                     security_risk == ActionSecurityRisk.UNKNOWN
  125.                     and not self.security_analyzer
  126.                 )
  127.                 # 命令行模式:强制标记为待确认状态(CLI 自行处理确认逻辑)
  128.                 if self.agent.config.cli_mode:
  129.                     action.confirmation_state = (  
  130.                         ActionConfirmationStatus.AWAITING_CONFIRMATION
  131.                     )
  132.                 # 非命令行模式:高风险/未知风险动作需用户确认
  133.                 elif (
  134.                     is_high_security_risk or is_ask_for_every_action
  135.                 ) and self.confirmation_mode:
  136.                     logger.debug(
  137.                         f'[non-CLI mode] Detected HIGH security risk in action: {action}. Ask for confirmation'
  138.                     )
  139.                     action.confirmation_state = (  
  140.                         ActionConfirmationStatus.AWAITING_CONFIRMATION
  141.                     )
  142.             # 将可执行动作标记为待处理(避免重复执行)
  143.             self._pending_action = action
  144.         # 6. 非空动作的后续处理:状态更新、指标准备、事件分发
  145.         if not isinstance(action, NullAction):
  146.             # 待确认动作:将智能体状态切换为"等待用户确认"
  147.             if (
  148.                 hasattr(action, 'confirmation_state')
  149.                 and action.confirmation_state
  150.                 == ActionConfirmationStatus.AWAITING_CONFIRMATION
  151.             ):
  152.                 await self.set_agent_state_to(AgentState.AWAITING_USER_CONFIRMATION)
  153.             # 准备前端展示的指标数据(如动作类型、执行时间等)
  154.             self._prepare_metrics_for_frontend(action)
  155.             # 将动作添加到事件流(供其他模块订阅处理)
  156.             self.event_stream.add_event(action, action._source)  
  157.         # 输出动作日志:根据全局配置决定日志级别
  158.         log_level = 'info' if LOG_ALL_EVENTS else 'debug'
  159.         self.log(log_level, str(action), extra={'msg_type': 'ACTION'})
复制代码
3.4 回调

AgentController注册了事件回调函数on_event,回调函数里设置标识符_pending_action,同时轮询任务_start_step_loop调用Agent的step方法,该方法用于预测下一个action。
3.4.1 核心特色


  • 事件转发机制:当存在活跃子智能体时,自动将事件转发给子智能体处理,确保层级化智能体协作的连贯性。
  • 状态判断逻辑:通过检查子智能体状态(完成、错误、拒绝等),决定是否终止子智能体并恢复父智能体处理流程。
  • 事件分类处理:区分动作(Action)和观察结果(Observation)事件,分别调用对应处理方法,保证事件处理的针对性。
  • 步骤触发控制:通过 should_step 方法判断是否触发智能体下一步操作,结合用户消息等场景做特殊日志记录,增强流程可观测性。
3.4.2 流程

6.png

3.4.3 代码
  1.     def on_event(self, event: Event) -> None:
  2.         """事件流的回调方法,通知控制器有新事件传入。
  3.         参数:
  4.             event (Event): 待处理的传入事件。
  5.         """
  6.         # 若存在子智能体且未完成/未出错,将事件转发给子智能体
  7.         if self.delegate is not None:
  8.             # 获取子智能体当前状态
  9.             delegate_state = self.delegate.get_agent_state()
  10.             # 判断子智能体是否仍活跃(未完成、未出错、未被拒绝),或因超迭代/超预算报错
  11.             if (
  12.                 delegate_state
  13.                 not in (
  14.                     AgentState.FINISHED,  # 已完成
  15.                     AgentState.ERROR,     # 错误
  16.                     AgentState.REJECTED,  # 被拒绝
  17.                 )
  18.                 or 'RuntimeError: Agent reached maximum iteration.'
  19.                 in self.delegate.state.last_error  # 达到最大迭代次数
  20.                 or 'RuntimeError:Agent reached maximum budget for conversation'
  21.                 in self.delegate.state.last_error  # 达到对话最大预算
  22.             ):
  23.                 # 将事件转发给子智能体,跳过父智能体处理
  24.                 asyncio.get_event_loop().run_until_complete(
  25.                     self.delegate._on_event(event)
  26.                 )
  27.                 return
  28.             else:
  29.                 # 子智能体已完成或出错,终止子智能体流程
  30.                 self.end_delegate()
  31.                 return
  32.         # 仅当无活跃子智能体时,继续父智能体的事件处理
  33.         asyncio.get_event_loop().run_until_complete(self._on_event(event))
  34.     async def _on_event(self, event: Event) -> None:
  35.         """父智能体内部的事件处理方法(无活跃子智能体时调用)。"""
  36.         # 若事件为隐藏类型,则忽略处理
  37.         if hasattr(event, 'hidden') and event.hidden:
  38.             return
  39.         # 将事件添加到状态追踪器的历史记录中
  40.         self.state_tracker.add_history(event)
  41.         # 根据事件类型分发处理
  42.         if isinstance(event, Action):
  43.             # 处理动作类型事件
  44.             await self._handle_action(event)
  45.         elif isinstance(event, Observation):
  46.             # 处理观察结果类型事件
  47.             await self._handle_observation(event)
  48.         # 判断是否需要触发智能体下一步操作
  49.         should_step = self.should_step(event)
  50.         if should_step:
  51.             self.log(
  52.                 'debug',
  53.                 f'Stepping agent after event: {type(event).__name__}',
  54.                 extra={'msg_type': 'STEPPING_AGENT'},
  55.             )
  56.             # 带异常处理的下一步操作
  57.             await self._step_with_exception_handling()
  58.         elif isinstance(event, MessageAction) and event.source == EventSource.USER:
  59.             # 若收到用户消息但未触发下一步,记录警告日志
  60.             self.log(
  61.                 'warning',
  62.                 f'Not stepping agent after user message. Current state: {self.get_agent_state()}',
  63.                 extra={'msg_type': 'NOT_STEPPING_AFTER_USER_MESSAGE'},
  64.             )
复制代码
3.5 全链路可观测

State作用 是存储整个agent工作过程的产生的所有事件、执行状态、任务plan等信息
在任务执行的任意时刻,用户或开发者都需要清晰掌握 Agent 的运行状态:是处于思考决策阶段,还是正在执行具体命令?是等待外部资源响应,还是因参数错误陷入停滞?这种可观测性需要工作流架构对每一个环节进行日志记录、状态标记和实时反馈,构建起透明的监控体系,为问题排查和性能优化提供支撑。
OpenHands 在 start_delegate,_step 和 end_delegate 都会对监控做出处理。
  1.     async def start_delegate(self, action: AgentDelegateAction) -> None:
  2.         # 创建委托智能体实例(确保父子智能体共享LLM注册信息)
  3.         # 注:父子智能体共享指标,实现全局指标累积
  4.         delegate_agent = agent_cls(
  5.             config=agent_config, llm_registry=self.agent.llm_registry
  6.         )
  7.         # 启动委托智能体前,创建初始状态(继承父智能体关键配置)
  8.         state = State(
  9.             session_id=self.id.removesuffix('-delegate'),  # 会话ID:移除父智能体的委托后缀
  10.             user_id=self.user_id,  # 继承用户ID,保持用户关联
  11.             inputs=action.inputs or {},  # 子任务输入参数(默认为空字典)
  12.             iteration_flag=self.state.iteration_flag,  # 继承迭代控制标志(限制迭代次数)
  13.             budget_flag=self.state.budget_flag,  # 继承预算控制标志(限制资源使用)
  14.             delegate_level=self.state.delegate_level + 1,  # 委托层级+1(标识子智能体层级)
  15.             metrics=self.state.metrics,  # 共享全局指标(父子智能体指标统一累积)
  16.             start_id=self.event_stream.get_latest_event_id() + 1,  # 事件起始ID:从最新事件后开始记录
  17.             parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(),  # 父智能体指标快照(用于后续对比)
  18.             parent_iteration=self.state.iteration_flag.current_value,  # 父智能体当前迭代次数
  19.         )
  20.         
  21.     def end_delegate(self) -> None:
  22.         # Calculate delegate-specific metrics before closing the delegate
  23.         delegate_metrics = self.state.get_local_metrics()
  24.         logger.info(f'Local metrics for delegate: {delegate_metrics}')     
  25.         
  26.    async def _step(self) -> None:        
  27.         # Create and log metrics for frontend display
  28.         self._prepare_metrics_for_frontend(action)              
  29.         
  30.     def _prepare_metrics_for_frontend(self, action: Action) -> None:
  31.         """Create a minimal metrics object for frontend display and log it.
  32.         To avoid performance issues with long conversations, we only keep:
  33.         - accumulated_cost: The current total cost
  34.         - accumulated_token_usage: Accumulated token statistics across all API calls
  35.         - max_budget_per_task: The maximum budget allowed for the task
  36.         This includes metrics from both the agent's LLM and the condenser's LLM if it exists.
  37.         Args:
  38.             action: The action to attach metrics to
  39.         """
  40.         # Get metrics from agent LLM
  41.         metrics = self.conversation_stats.get_combined_metrics()
  42.         # Create a clean copy with only the fields we want to keep
  43.         clean_metrics = Metrics()
  44.         clean_metrics.accumulated_cost = metrics.accumulated_cost
  45.         clean_metrics._accumulated_token_usage = copy.deepcopy(
  46.             metrics.accumulated_token_usage
  47.         )
  48.         # Add max_budget_per_task to metrics
  49.         if self.state.budget_flag:
  50.             clean_metrics.max_budget_per_task = self.state.budget_flag.max_value
  51.         action.llm_metrics = clean_metrics
  52.         # Log the metrics information for debugging
  53.         # Get the latest usage directly from the agent's metrics
  54.         latest_usage = None
  55.         if self.state.metrics.token_usages:
  56.             latest_usage = self.state.metrics.token_usages[-1]
  57.         accumulated_usage = self.state.metrics.accumulated_token_usage        
复制代码
3.6 驯服决策的 “不确定性”

LLM 的生成特性决定了其输出天然带有随机性。Agent 在执行任务时,可能突然产生无效操作 —— 比如调用不存在的工具,或是在循环中重复相同步骤。这种非确定性如果缺乏有效管控,会直接导致任务失败。因此,系统必须构建一套智能纠错机制,能够实时检测异常行为,并通过重试、回滚或重新规划等方式修正路径,这需要对任务逻辑和模型行为有深刻的理解。
3.6.1 流程

OpenHands 智能体的异常捕获与统一处理机制是保障系统稳定性和容错能力的关键模块。主要功能包括:

  • 异常包裹:通过 _step_with_exception_handling 方法包裹核心业务逻辑(_step),捕获所有执行过程中的异常,避免系统崩溃。
  • 异常分类处理:在 _react_to_exception 中,根据异常类型细分错误状态(如 LLM 认证失败、服务不可用、预算耗尽等),提供精准的错误定位依据。
  • 状态与错误存储:捕获异常后,更新智能体状态(如 ERROR、RATE_LIMITED),并存储错误详情,便于后续排查。
  • 外部通知:通过 status_callback 回调函数,将错误状态同步给外部系统,支持监控和告警。
  • 容错优化:对未知异常进行包装,返回用户友好提示;对速率限制异常区分 “重试耗尽” 和 “可重试” 场景,提升系统灵活性。
主要特色为:

  • 全面的异常覆盖:捕获所有 Exception 类型,避免因未处理异常导致系统宕机,保障运行稳定性。
  • 精细化错误分类:针对 LLM 相关常见异常(认证、服务、预算、内容政策等)细分错误状态,便于问题定位和监控。
  • 友好的用户体验:未知异常自动包装为用户易懂的提示,减少用户困惑;速率限制场景区分处理,支持后续重试。
  • 可扩展性强:通过回调函数解耦错误通知逻辑,外部系统可灵活对接(如监控告警、日志上报等)。
  • 完整的错误追溯:日志包含会话 ID、异常堆栈、错误类型,结合状态中存储的错误信息,便于问题排查。
7.png

3.6.2 代码
  1.     async def _step(self) -> None:
  2.         """智能体核心执行步骤(实际业务逻辑,如处理动作、调用 LLM 等)"""
  3.         pass
  4.     async def _react_to_exception(
  5.         self,
  6.         e: Exception,
  7.     ) -> None:
  8.         """处理异常:将智能体状态设置为错误,并发送状态消息。
  9.         根据异常类型细分错误状态,便于外部监控和问题定位。
  10.         参数:
  11.             e (Exception):捕获到的异常实例
  12.         """
  13.         # 在设置智能体状态前,先存储错误原因(异常类型 + 异常信息)
  14.         self.state.last_error = f'{type(e).__name__}: {str(e)}'
  15.         # 如果存在状态回调函数,触发回调通知外部系统错误状态
  16.         if self.status_callback is not None:
  17.             # 默认错误状态为通用错误
  18.             runtime_status = RuntimeStatus.ERROR
  19.             # 根据异常类型细分错误状态
  20.             if isinstance(e, AuthenticationError):
  21.                 # LLM 认证失败(如密钥无效、权限不足)
  22.                 runtime_status = RuntimeStatus.ERROR_LLM_AUTHENTICATION
  23.                 self.state.last_error = runtime_status.value  # 更新错误信息为标准化描述
  24.             elif isinstance(
  25.                 e,
  26.                 (
  27.                     ServiceUnavailableError,
  28.                     APIConnectionError,
  29.                     APIError,
  30.                 ),
  31.             ):
  32.                 # LLM 服务不可用(如服务宕机、网络连接失败)
  33.                 runtime_status = RuntimeStatus.ERROR_LLM_SERVICE_UNAVAILABLE
  34.                 self.state.last_error = runtime_status.value
  35.             elif isinstance(e, InternalServerError):
  36.                 # LLM 内部服务器错误
  37.                 runtime_status = RuntimeStatus.ERROR_LLM_INTERNAL_SERVER_ERROR
  38.                 self.state.last_error = runtime_status.value
  39.             elif isinstance(e, BadRequestError) and 'ExceededBudget' in str(e):
  40.                 # LLM 预算耗尽(通过错误信息中的关键词识别)
  41.                 runtime_status = RuntimeStatus.ERROR_LLM_OUT_OF_CREDITS
  42.                 self.state.last_error = runtime_status.value
  43.             elif isinstance(e, ContentPolicyViolationError) or (
  44.                 isinstance(e, BadRequestError)
  45.                 and 'ContentPolicyViolationError' in str(e)
  46.             ):
  47.                 # 内容违反 LLM 政策(直接匹配异常类型或错误信息关键词)
  48.                 runtime_status = RuntimeStatus.ERROR_LLM_CONTENT_POLICY_VIOLATION
  49.                 self.state.last_error = runtime_status.value
  50.             elif isinstance(e, RateLimitError):
  51.                 # 速率限制异常:判断是否已耗尽所有重试次数
  52.                 if (
  53.                     hasattr(e, 'retry_attempt')
  54.                     and hasattr(e, 'max_retries')
  55.                     and e.retry_attempt >= e.max_retries
  56.                 ):
  57.                     # 所有重试均失败,设置为最终错误状态并更新错误信息
  58.                     self.state.last_error = (
  59.                         RuntimeStatus.AGENT_RATE_LIMITED_STOPPED_MESSAGE.value
  60.                     )
  61.                     await self.set_agent_state_to(AgentState.ERROR)
  62.                 else:
  63.                     # 仍有重试次数,设置为速率限制状态(后续可自动重试)
  64.                     await self.set_agent_state_to(AgentState.RATE_LIMITED)
  65.                 return  # 速率限制异常处理完毕,直接返回
  66.             # 触发回调函数,将错误状态和错误信息通知外部
  67.             self.status_callback('error', runtime_status, self.state.last_error)
  68.         # 无论是否有回调,最终将智能体状态设置为错误
  69.         await self.set_agent_state_to(AgentState.ERROR)
  70.     async def _step_with_exception_handling(self) -> None:
  71.         """带异常处理的智能体核心步骤执行方法。
  72.         包裹 _step 方法(核心业务逻辑),捕获所有异常并统一处理,确保系统稳定性。
  73.         """
  74.         try:
  75.             # 执行智能体核心业务逻辑(如处理动作、调用 LLM、任务执行等)
  76.             await self._step()
  77.         except Exception as e:
  78.             # 构建对外上报的异常实例:优先使用已知异常类型,未知类型包装为通用 RuntimeError
  79.             if (
  80.                 isinstance(e, Timeout)
  81.                 or isinstance(e, APIError)
  82.                 or isinstance(e, BadRequestError)
  83.                 or isinstance(e, NotFoundError)
  84.                 or isinstance(e, InternalServerError)
  85.                 or isinstance(e, AuthenticationError)
  86.                 or isinstance(e, RateLimitError)
  87.                 or isinstance(e, ContentPolicyViolationError)
  88.                 or isinstance(e, LLMContextWindowExceedError)
  89.             ):
  90.                 # 已知异常类型:直接上报原始异常
  91.                 reported = e
  92.             else:
  93.                 # 未知异常类型:输出警告日志,并包装为用户友好的 RuntimeError
  94.             # 调用异常响应方法,统一处理异常(设置状态、通知外部等)
  95.             await self._react_to_exception(reported)
复制代码
0xFF 参考

https://docs.all-hands.dev/openhands/usage/architecture/backend
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第二篇:Agent 相关核心概念】  克里
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第一篇:系列导读】 克里
Coding Agent之Openhands解析(含代码)  Arrow
OpenHands 源码解读  一力辉
拆解完十几家Agent平台,我发现——最能落地的不是LangChain,不是Manus,也不是Coze,而是n8n  贝叶斯不司
AI程序员之OpenDevin源码剖析  goofy
你的Agent可能设计错了:UIUC & 斯坦福等联合发文,重构Agent适配2X2
深度解析开源Agent框架Parlant:为失控的AI套上“确定性”缰绳
Effective harnesses for long-running agents

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册