找回密码
 立即注册
首页 业界区 业界 AI Agent 框架探秘:拆解 OpenHands(7)--- Agent ...

AI Agent 框架探秘:拆解 OpenHands(7)--- Agent

咸和璧 4 天前
AI Agent 框架探秘:拆解 OpenHands(7)---  Agent


目录

  • AI Agent 框架探秘:拆解 OpenHands(7)---  Agent

    • 0x00 摘要
    • 0x01 状态管理

      • 1.1 设计要点
      • 1.2 State类

    • 0x02 Agent系统

      • 2.1 基类
      • 2.2 Agent 类型

    • 0x03 State

      • 3.1 特色
      • 3.2 State 定义

        • 3.2.1 OpenHands 的 State
        • 3.2.2 其他实现

      • 3.3 生命周期
      • 3.4 联系

        • 3.4.1 State 与AgentController的联系
        • 3.4.2 State 与 Observation/Action 的关系
        • 3.4.3 State 的共享

      • 3.5 持久化和恢复
      • 3.6 小结

    • 0x04 大模型适配层(LLM Adapter)

      • 4.1 LLM

        • 4.1.1 作用
        • 4.1.2 代码

      • 4.2 LLMRegistry

        • 4.2.1 作用
        • 4.2.2 工作流
        • 4.2.3 代码


    • 0xFF 参考


0x00 摘要

An LLM agent runs tools in a loop to achieve a goal.
智能体(Agent)是一种能够感知和理解环境,并使用工具来实现目标的应用程序。LLM能够动态指导自己的过程和工具使用,保持对任务完成方式的控制。Agent的设计旨在更灵活地处理某些任务,其决策由模型决定,而非预定义的规则。
借助 CodeAct 的 LLM 智能体,OpenHands 通过交互式的多轮流程,展现出显著的优势:

  • 智能体能够接收新的观察数据,并据此优化先前的行动方案。这类似于人类在任务执行中,依据新信息灵活调整策略的过程。
  • 依托记忆与反馈机制,智能体可随时间提升自身性能。它能将过往经验铭记于心,并在后续任务中加以运用,不断进步,恰似一名持续学习成长的学生。
  • 此外,智能体还能胜任复杂的流程任务,涵盖模型训练、数据可视化以及自动化决策等。这表明 CodeAct 不仅能处理基础任务,更能驾驭高级且复杂的作业,例如训练机器学习模型、绘制图表以及实施自动决策等。
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 状态管理

1.1 设计要点

多任务并发执行时,任务状态易出现冲突;长流程任务的中间状态(如已完成子任务、待处理步骤)易丢失;异常中断后难以精准恢复到中断前的状态。因此,需要有一个数据结构来维护Agent的状态,这就是State。
在每个 Session(我们的对话线程)中,state 属性就像智能体专用于该特定交互的草稿板,是智能体存储和更新对话期间所需动态细节的地方。
状态管理的价值在于可追溯与可恢复:任何时刻都能回答”当前在哪一步、为什么这样做、结果如何、接下来做什么”。在出现错误或需要人工介入时,可以精确定位问题并从断点恢复。状态设计最佳实践如下:

  • 最小主义: 仅存储必要的、动态的数据。
  • 序列化: 使用基本的、可序列化的类型。
  • 描述性键和前缀: 使用清晰的名称和适当的前缀(user:、app:、temp: 或无前缀)。
  • 浅层结构: 尽可能避免深层嵌套。
  • 标准更新流程: 依赖 append_event。
1.2 State类

State类作为全面的数据容器,用于跟踪 OpenHands 系统中智能体的运行状态。它维护智能体运行、演进和从会话中恢复所需的所有关键信息,包括对话历史、运行指标、迭代控制和错误记录。save_to_session方法支持智能体状态的持久化存储,允许在不同的执行实例之间恢复会话和保持连续性。
对于需要长时间运行且状态不断变化的 Agent 任务来说,State的作用至关重要。

  • 首先,它是 Agent 决策的核心依据,尤其是完整的历史事件记录,为 Agent 提供了不可或缺的上下文信息,让决策不再盲目;
  • 其次,外部系统(比如用户界面或控制器)可以通过调整State来管理 Agent 的生命周期,实现暂停、恢复、终止等操作;
  • 更重要的是,State可以被序列化存储,当任务因意外中断时,系统能通过加载存储的State,让任务从断点处精确恢复,完美解决了长周期任务的连续性问题。
0x02 Agent系统

2.1 基类

实际上,我们要实现一个 AI Agent,最简单的就是以 ReAct 为基础,去构建一个不断循环的推理(Reason),行动(Act)和观察(Observe)。
而在 OpenHands 的技术体系中,Agent 系统凭借高度灵活的模块化架构,实现了多样化专业 Agent 的开发与适配。这一设计的根基,是一个名为Agent的抽象基类 —— 它就像所有 Agent 的 “通用模板”,不仅规定了必须实现的核心接口,还封装了各类 Agent 都需要的基础功能,确保了不同 Agent 在系统中的兼容性。
所有 Agent 都必须遵循Agent基类的规范,其中最核心的就是step()方法。这个方法如同 Agent 的 “决策入口”,接收State作为输入,经过内部逻辑处理后输出Action。这种清晰简洁的接口设计,让系统能轻松接入新的 Agent 实现,或是在不同 Agent 之间切换,大大提升了扩展性。
  1. class Agent(ABC):
  2.     DEPRECATED = False
  3.     """
  4.     This abstract base class is an general interface for an agent dedicated to
  5.     executing a specific instruction and allowing human interaction with the
  6.     agent during execution.
  7.     It tracks the execution status and maintains a history of interactions.
  8.     """
  9.     _registry: dict[str, type['Agent']] = {}
  10.     sandbox_plugins: list[PluginRequirement] = []
  11.     config_model: type[AgentConfig] = AgentConfig
  12.     """Class field that specifies the config model to use for the agent. Subclasses may override with a derived config model if needed."""
  13.     def __init__(
  14.         self,
  15.         config: AgentConfig,
  16.         llm_registry: LLMRegistry,
  17.     ):
  18.         self.llm = llm_registry.get_llm_from_agent_config('agent', config)
  19.         self.llm_registry = llm_registry
  20.         self.config = config
  21.         self._complete = False
  22.         self._prompt_manager: 'PromptManager' | None = None
  23.         self.mcp_tools: dict[str, ChatCompletionToolParam] = {}
  24.         self.tools: list = []
  25.         
  26.     @abstractmethod
  27.     def step(self, state: 'State') -> 'Action':
  28.         """Starts the execution of the assigned instruction. This method should
  29.         be implemented by subclasses to define the specific execution logic.
  30.         """
  31.         pass        
复制代码
2.2 Agent 类型

在 OpenHands 的智能体系中,针对不同任务场景设计了多种专业化 Agent,它们如同分工明确的 “岗位专员”,各自承载着独特的功能使命,共同支撑起系统的多样化能力。

  • CodeActAgent。作为系统的核心力量,CodeActAgent 践行了 CodeAct 的核心理念,将所有行动统一到代码层面,具备极强的通用性。它主要负责处理各类代码相关任务,既能执行 bash 命令,也能运行 Python 代码。其工作原理很巧妙:先向大语言模型提供文件读写、命令执行等 “工具” 的详细定义,再借助模型的函数调用或工具调用能力,让模型根据任务需求自主选择合适的工具完成操作,堪称系统中无所不能的 “技术骨干”。
  • BrowsingAgent。专注于网页交互任务的 BrowsingAgent,就像一位专业的网页操作专员。它会把网页的无障碍树作为上下文信息传递给大语言模型,帮助模型理解网页的结构布局。同时,它还提供了点击、填写表单、滚动页面等一系列网页交互动作,模型通过分析无障碍树制定操作策略,由它来精准执行,高效完成网页相关的任务。
  • ReadOnlyAgent。这是一位坚守 “不修改原则” 的特殊 Agent。它的核心特点是只读不写,只能进行查看类操作,不会执行任何可能改变系统状态或修改数据的动作,在需要保障系统安全、避免数据被误改的场景中发挥着重要作用。
  • VisualBrowsingAgent。作为 BrowsingAgent 的 “视觉增强版”,VisualBrowsingAgent 具备处理视觉信息的能力。它不仅能理解网页的结构,还能识别网页中的图像等视觉内容,针对需要分析视觉元素的网页任务,比如识别图片中的信息、基于视觉布局进行操作等,它能展现出独特的优势。
  • DummyAgent。DummyAgent 是一个结构简单的 Agent,主要承担测试任务。它就像系统的 “测试道具”,开发者可以通过它验证系统的基础功能和交互逻辑,为其他 Agent 的开发和调试提供支持,是保障系统稳定性的重要辅助角色。
  • LocAgent。LocAgent 实现了LocAgent: Graph-Guided LLM Agents for Code Localization 的Agent。LocAgent 首先将代码库解析为一个异构图表示,其中包含了多种类型的代码实体及其依赖关系。在此基础上,系统构建了分层稀疏索引,这不仅支持了高效的内容检索,还使得结构化的探索成为可能。借助这些索引,LocAgent 能够结合图结构与工具接口,执行由 Agent 驱动的逐步搜索过程,从而精准地完成代码定位任务。这种多跳推理的方式,使得 LocAgent 能够逐步接近目标代码,实现高效的代码定位。
作为系统的 “智能决策者”,每个 Agent 的核心动力都来自大型语言模型(LLM)。它的工作目标十分明确:基于当前任务的完整上下文信息(也就是State对象),判断并输出下一步该执行的具体操作(即Action)。这种将决策逻辑完全 “打包” 在 Agent 内部的设计,是实现系统模块化的关键 —— 就像不同的专业工具各司其职,Agent 只需专注于自己的决策任务,无需干扰其他组件。而agenthub的存在更让这套体系如虎添翼,它如同一个 “Agent 人才库”,汇集了具备不同专业技能的 Agent,系统可根据具体任务需求灵活选择或委托相应的 Agent 来处理。
每个 Agent 被设计成一个循环,在每次迭代中,通过调用 agent.step() 方法,以状态 (State)作为输入,输出动作 (Actions)来执行操作或命令,在执行动作的后可能接收到的观察 (Observations)结果。
在实现的过程中,每个 Agent 类都必须实现 step 和 search_memory 方法,以便执行指令和从记忆中查询信息。该抽象类还提供了一些辅助方法,如 reset、register、get_cls、list_agents,帮助管理 Agent 的状态及其注册信息。
而一个Agent最简驱动流程如下。
  1. while True:
  2.   prompt = agent.generate_prompt(state)
  3.   response = llm.completion(prompt)
  4.   action = agent.parse_response(response)
  5.   observation = runtime.run(action)
  6.   state = state.update(action, observation)
复制代码
0x03 State

State 对象是 Agent 执行任务时所依赖的关键信息的集合。它包括以下内容:

  • Agent 采取的动作的历史记录,以及这些动作产生的观察结果(例如文件内容、命令输出)。
  • 自最近一步以来发生的一系列动作和观察的轨迹。
  • 一个 plan 对象,包含主要目标。Agent 可以通过 AddTaskAction 和 ModifyTaskAction 来添加和修改子任务。
3.1 特色

OpenHands State的主要特色如下:

  • 全面的状态跟踪:捕获智能体操作的所有方面,从当前状态和对话历史到性能指标和错误记录。
  • 多智能体支持:通过delegate_level和父指标快照包含委托层级跟踪,用于协调多智能体操作。
  • 持久化机制:通过 pickle 序列化和 base64 编码提供可靠的会话保存,并处理遗留状态文件的向后兼容性。
  • 运行控制:整合迭代和预算控制标志,管理资源使用并防止无限循环。
  • 可扩展性:包含extra_data字段用于特定任务信息,使类适用于不同的使用场景。
  • 状态转换:跟踪当前状态和恢复状态,管理智能体生命周期(LOADING、RUNNING、PAUSED 等)。
  • 历史管理:通过起始 / 结束索引维护事件历史,跟踪相关对话片段。
3.2 State 定义

3.2.1 OpenHands 的 State

State表示 OpenHands 系统中代理的运行状态,保存其操作和记忆的数据,实际上聚合了Agent做出决策所需要的所有信息:

  • 多代理/委托状态:

    • 存储任务(代理与用户之间的对话)
    • 子任务(代理与用户或其他代理之间的对话)
    • 全局和局部迭代次数
    • 多代理交互的委托层级数
    • 几乎卡住的状态

  • 代理的运行状态:

    • 当前代理状态(例如,加载中、运行中、已暂停)
    • 流量控制状态,用于速率限制
    • 确认模式
    • 遇到的最新错误

  • 保存和恢复代理的数据:

    • 保存和从会话中恢复
    • 使用 pickle 和 base64 序列化

  • 保存/恢复关于消息历史的数据:

    • 代理历史中事件的开始和结束 ID
    • 摘要和委托摘要

  • 指标:

    • 当前任务的全局指标
    • 当前子任务的局部指标

  • 额外数据:

    • 额外的任务特定数据"

具体代码如下。
  1. @dataclass
  2. class State:
  3.     """表示OpenHands系统中智能体的运行状态,保存其操作和内存数据。"""
  4.     session_id: str = ''  # 当前会话的唯一标识符
  5.     user_id: Optional[str] = None  # 与会话相关联的用户标识符
  6.     iteration_flag: IterationControlFlag = field(  # 控制迭代限制和进度
  7.         default_factory=lambda: IterationControlFlag(
  8.             limit_increase_amount=100, current_value=0, max_value=100
  9.         )
  10.     )
  11.     conversation_stats: Optional[ConversationStats] = None  # 关于对话历史的统计信息
  12.     budget_flag: Optional[BudgetControlFlag] = None  # 控制资源预算限制
  13.     confirmation_mode: bool = False  # 智能体在执行操作前是否需要确认
  14.     history: List[Event] = field(default_factory=list)  # 智能体操作中的事件记录
  15.     inputs: Dict = field(default_factory=dict)  # 存储智能体的输入参数
  16.     outputs: Dict = field(default_factory=dict)  # 存储智能体生成的输出结果
  17.     agent_state: AgentState = AgentState.LOADING  # 智能体当前的运行状态
  18.     resume_state: Optional[AgentState] = None  # 暂停后要返回的状态
  19.     # 根智能体的层级为0,每个委托层级增加1
  20.     delegate_level: int = 0  # 多智能体委托中的层级结构
  21.     # start_id和end_id跟踪历史中事件的范围
  22.     start_id: int = -1  # 历史中相关事件的起始索引
  23.     end_id: int = -1  # 历史中相关事件的结束索引
  24.     parent_metrics_snapshot: Optional[Metrics] = None  # 父智能体指标的快照
  25.     parent_iteration: int = 100  # 来自父智能体的迭代计数
  26.     # 注意:控制器使用此字段跟踪委托前父级的指标快照
  27.     # 评估任务存储跟踪任务进度/状态所需的额外数据
  28.     extra_data: Dict[str, Any] = field(default_factory=dict)  # 特定于任务的附加数据
  29.     last_error: str = ''  # 最近遇到的错误记录
  30.     # 注意:已弃用的参数,暂时保留以确保向后兼容性
  31.     # 将在30天后移除
  32.     iteration: Optional[int] = None  # 已弃用:使用iteration_flag替代
  33.     local_iteration: Optional[int] = None  # 已弃用:本地迭代计数器
  34.     max_iterations: Optional[int] = None  # 已弃用:最大迭代限制
  35.     traffic_control_state: Optional[TrafficControlState] = None  # 已弃用:速率限制状态
  36.     local_metrics: Optional[Metrics] = None  # 已弃用:使用metrics替代
  37.     delegates: Optional[Dict[Tuple[int, int], Tuple[str, str]]] = None  # 已弃用:委托跟踪
  38.     metrics: Metrics = field(default_factory=Metrics)  # 当前任务的性能指标
  39.     def save_to_session(
  40.         self, sid: str, file_store: FileStore, user_id: Optional[str]
  41.     ) -> None:
  42.         """将当前状态保存到持久存储中,以便以后检索。
  43.         
  44.         参数:
  45.             sid: 与此状态相关联的会话ID
  46.             file_store: 用于保存的存储系统
  47.             user_id: 与此会话相关联的用户ID
  48.         """
  49.         # 暂时移除对话统计信息,因为它们自行处理持久性
  50.         conversation_stats = self.conversation_stats
  51.         self.conversation_stats = None  
  52.         # 序列化状态对象
  53.         pickled = pickle.dumps(self)
  54.         logger.debug(f'Saving state to session {sid}:{self.agent_state}')
  55.         encoded = base64.b64encode(pickled).decode('utf-8')
  56.         
  57.         try:
  58.             # 将编码后的状态写入文件存储
  59.             file_store.write(
  60.                 get_conversation_agent_state_filename(sid, user_id), encoded
  61.             )
  62.             # 在SaaS/远程环境中清理旧的状态文件
  63.             if user_id:
  64.                 old_filename = get_conversation_agent_state_filename(sid)
  65.                 try:
  66.                     file_store.delete(old_filename)
  67.                 except Exception:
  68.                     pass  # 删除旧文件时忽略错误
  69.         except Exception as e:
  70.             logger.error(f'Failed to save state to session: {e}')
  71.             raise e
  72.         finally:
  73.             # 恢复对话统计信息引用
  74.             self.conversation_stats = conversation_stats
复制代码
3.2.2 其他实现

在其他的Agent系统中,也可以用一个保存键值对的集合(字典或 Map)来实现state。它用于存放智能体为让当前对话顺利进行需要记住或追踪的信息:

  • 个性化交互: 记住之前提到的用户偏好(例如,'user_preference_theme': 'dark')。
  • 跟踪任务进度: 在多轮过程中跟踪步骤(例如,'booking_step': 'confirm_payment')。
  • 积累信息: 构建列表或摘要(例如,'shopping_cart_items': ['book', 'pen'])。
  • 做出明智决策: 存储影响下一个响应的标志或值(例如,'user_is_authenticated': True)。
状态键上的前缀定义了它们的作用域和持久性行为,特别是对于持久性服务:

  • 无前缀(会话状态):

    • 作用域: 特定于当前会话(id)。
    • 持久性: 仅在 SessionService 是持久性的(Database、VertexAI)时才持久化。
    • 使用案例: 跟踪当前任务中的进度(例如,'current_booking_step')、此次交互的临时标志(例如,'needs_clarification')。
    • 示例: session.state['current_intent'] = 'book_flight'

  • user: 前缀(用户状态):

    • 作用域: 绑定到 user_id,在该用户的所有会话中共享(在同一个 app_name 内)。
    • 持久性: 在 Database 或 VertexAI 中持久化。(由 InMemory 存储但在重启时丢失)。
    • 使用案例: 用户偏好(例如,'user:theme')、个人资料详情(例如,'user:name')。
    • 示例: session.state['user:preferred_language'] = 'fr'

  • app: 前缀(应用状态):

    • 作用域: 绑定到 app_name,在该应用程序的所有用户和会话中共享。
    • 持久性: 在 Database 或 VertexAI 中持久化。(由 InMemory 存储但在重启时丢失)。
    • 使用案例: 全局设置(例如,'app:api_endpoint')、共享模板。
    • 示例: session.state['app:global_discount_code'] = 'SAVE10'

  • temp: 前缀(临时会话状态):

    • 作用域: 特定于当前会话处理轮次。
    • 持久性: 从不持久化。 保证被丢弃,即使使用持久性服务。
    • 使用案例: 仅在立即需要的中间结果、你明确不想存储的数据。
    • 示例: session.state['temp:raw_api_response'] = {...}

智能体代码通过单一的 session.state 集合(dict/Map)与合并后的状态交互。SessionService 会根据前缀从正确的底层存储获取/合并状态。
3.3 生命周期

State类的生命周期如下:
1.png

3.4 联系

state和其他组件或者数据结构的联系如下。
3.4.1 State 与AgentController的联系

在AgentController._step()中,Agent通过State获取信息。
  1.     async def _step(self) -> None:
  2.         """Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget."""
  3.         if self.get_agent_state() != AgentState.RUNNING:
  4.             self.log(
  5.                 'debug',
  6.                 f'Agent not stepping because state is {self.get_agent_state()} (not RUNNING)',
  7.                 extra={'msg_type': 'STEP_BLOCKED_STATE'},
  8.             )
  9.             return
复制代码
3.4.2 State 与 Observation/Action 的关系

Observation 更新 State。当 Environment 返回 Observation 时,Observation 被添加到 State 历史中。
  1.     def add_history(self, event: Event):
  2.         # if the event is not filtered out, add it to the history
  3.         if self.agent_history_filter.include(event):
  4.             self.state.history.append(event)
复制代码
Agent 基于当前 State 生成 Action。
  1. action = self.agent.step(self.state)
复制代码
另外,虽然前端的 FooterContent 组件不直接使用 State,但整个前端界面的状态管理依赖于后端 State 的同步Backend State -> WebSocket -> Frontend State -> UI Updates。前端组件根据接收到的 State 信息更新界面状态和可用操作。
3.4.3 State 的共享

State 分为全局状态和局部状态。全局指标在委托间共享,比如。
  1.     async def start_delegate(self, action: AgentDelegateAction) -> None:
  2.         """启动一个委托智能体来处理子任务。
  3.         OpenHands 是一个多智能体系统。`任务(task)` 指 OpenHands(整个系统)与用户之间的对话,
  4.         可能包含用户的一个或多个输入。它始于用户的初始输入(通常是任务说明),结束于以下三种情况:
  5.         智能体发起的 `AgentFinishAction`、用户发起的停止操作,或出现错误。
  6.         `子任务(subtask)` 指智能体与用户之间,或智能体与其他智能体之间的对话。如果一个 `任务`
  7.         由单个智能体执行,那么它同时也是一个 `子任务`。否则,一个 `任务` 由多个 `子任务` 组成,
  8.         每个子任务由一个智能体执行。
  9.         参数:
  10.             action (AgentDelegateAction):包含要启动的委托智能体信息的动作对象
  11.         """
  12.         # 根据动作中指定的智能体类型获取对应的智能体类
  13.         agent_cls: type[Agent] = Agent.get_cls(action.agent)
  14.         # 获取该智能体的配置(优先使用动作指定的配置,否则使用当前智能体的配置)
  15.         agent_config = self.agent_configs.get(action.agent, self.agent.config)
  16.         # 确保父智能体与子智能体共享指标,以实现全局累积
  17.         delegate_agent = agent_cls(
  18.             config=agent_config, llm_registry=self.agent.llm_registry
  19.         )
  20.         # 在启动委托智能体前,对当前指标进行快照
  21.         state = State(
  22.             session_id=self.id.removesuffix('-delegate'),  # 会话ID(移除委托后缀)
  23.             user_id=self.user_id,  # 关联的用户ID
  24.             inputs=action.inputs or {},  # 子任务的输入参数(默认为空字典)
  25.             iteration_flag=self.state.iteration_flag,  # 继承迭代控制标志
  26.             budget_flag=self.state.budget_flag,  # 继承预算控制标志
  27.             delegate_level=self.state.delegate_level + 1,  # 委托层级在父级基础上加1
  28.             # 全局指标在父智能体与子智能体间共享
  29.             metrics=self.state.metrics,
  30.             # 从事件流的最新位置开始记录新事件
  31.             start_id=self.event_stream.get_latest_event_id() + 1,
  32.             # 记录委托前父智能体的指标快照
  33.             parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(),
  34.             # 记录父智能体当前的迭代次数
  35.             parent_iteration=self.state.iteration_flag.current_value,
  36.         )
复制代码
不同层级的控制标志不同,比如在上面代码中也有体现:
  1. self.state.iteration_flag # 全局迭代控制
  2. self.state.budget_flag # 全局预算控制
复制代码
3.5 持久化和恢复

OpenHands 通过StateTracker管理状态的持久化,支持会话中断后的状态恢复,确保任务连续性。比如,save_state保存状态到存储。
  1. class StateTracker:
  2.     """管理并同步智能体在其生命周期内的状态。
  3.     它负责:
  4.     1. 维持智能体状态在多个会话间的持久性
  5.     2. 通过过滤和跟踪相关事件来管理智能体历史(以前由智能体控制器执行)
  6.     3. 在控制器和LLM组件之间同步指标
  7.     4. 更新预算和迭代限制的控制标志
  8.     """
  9.     def __init__(
  10.         self, sid: str | None, file_store: FileStore | None, user_id: str | None
  11.     ):
  12.         self.sid = sid  # 会话ID,用于标识当前会话
  13.         self.file_store = file_store  # 文件存储对象,用于持久化状态
  14.         self.user_id = user_id  # 用户ID,关联到特定用户
  15.         # 过滤掉与智能体无关的事件
  16.         # 这些事件将不被包含在智能体历史中
  17.         self.agent_history_filter = EventFilter(
  18.             exclude_types=(
  19.                 NullAction,  # 排除空动作事件
  20.                 NullObservation,  # 排除空观察事件
  21.                 ChangeAgentStateAction,  # 排除更改智能体状态的动作事件
  22.                 AgentStateChangedObservation,  # 排除智能体状态已更改的观察事件
  23.             ),
  24.             exclude_hidden=True,  # 排除隐藏事件
  25.         )
复制代码
3.6 小结

State 在 OpenHands 系统中起到以下关键作用:

  • 信息中枢:聚合所有决策所需信息
  • 控制中心:管理迭代、预算等控制流程
  • 记忆载体:维护交互历史和上下文
  • 协调机制:支持多 Agent 委托和状态共享
  • 持久化基础:支持会话恢复和状态保存
  • 接口桥梁:连接后端逻辑和前端展示
State 是 OpenHands 系统的 “大脑”,确保了整个智能体系统的连贯性和智能决策能力。
0x04 大模型适配层(LLM Adapter)

回归 AI Agent 的根本,其实就是 Loop+Tokens,我们拆解来看看:

  • Loop:其实也就是循环,类比人类解决一个问题,就是不断去尝试,直到解决,这就是一个循环,只不过循环长短不同。
  • Tokens:Loop 中不断的去让大模型思考决策,行动,和收集反馈信息继续下次的计划和执行。
因此,大语言模型是 OpenHands 的 "智能核心",将 LLM 作为动态调度器的设计,是当前 AI Agent 领域的核心实现范式。即,将LLM做为一个主动的任务规划与函数调用引擎。这种模式带来了两大优势:

  • 能力的涌现:由于执行计划是LLM动态生成的,agent能够执行开发者从未明确编码过的行为,甚至自主决定编写并执行一个脚本来完成任务。
  • 业务逻辑复杂度降低:开发者只需不断增加原子能力工具,复杂的业务编排逻辑交给 LLM 来决定,整体代码复杂度得以降低。
此架构范式让应用的能力上限不再受限于开发者预设的控制流,而是取决于 AI 在运行时对可用工具的动态组合与调用,为实现能处理复杂、多步任务,并具备一定自主性的通用 AI agent 提供了可参考的实现路径。
OpenHands 框架通过模块化的设计,实现了对主流 LLM 的无缝集成,既支持云端模型的便捷调用,也兼容本地部署的隐私化需求。
在云端集成方面,OpenHands 系统提供了统一的接口层,封装了 OpenAI、Azure、Mistral AI 等平台的 API 差异。开发者只需配置相应的 API 密钥和模型参数,框架就能自动适配不同模型的输入输出格式,实现 "一键切换"。这种设计的优势在于:当某一模型因负载过高响应缓慢时,系统可自动切换到备用模型,确保任务不中断;同时,也允许开发者根据任务特性选择最适合的模型(例如用代码生成能力突出的模型处理编程任务,用多模态模型处理包含图文的需求)。
对于对数据隐私有严格要求的场景(如医疗、金融领域),OpenHands 支持通过 Ollama 部署本地大语言模型。只需一台配备 GPU 的服务器,开发者就能将模型运行在私有环境中,所有数据处理均在本地完成,避免了敏感信息上传至云端的风险。框架会自动检测本地 GPU 的算力,推荐适配的模型版本,并优化推理参数以平衡速度与精度。
4.1 LLM

4.1.1 作用

LLM 类是 OpenHands 框架中语言模型的核心封装类,继承自重试混入类(RetryMixin)和调试混入类(DebugMixin),提供了统一的大语言模型调用接口。其核心职责是整合 LiteLLM 工具的多模型适配能力,处理模型配置解析、请求参数格式化、函数调用模拟、重试机制、日志记录、成本与延迟统计等全流程逻辑。
LLM 类为各类语言模型提供了统一接口,通过 LiteLLM 支持 100 余种模型提供商,并提供两大 API:一是保证广泛兼容性的标准对话补全 API(Chat Completions API),二是适配最新推理模型的 OpenAI 响应 API(Responses API)。

  • 原生支持推理 / 扩展思考能力:SDK 能够捕获并处理前沿模型的高级原生推理字段 —— 例如 Anthropic 模型的扩展思考字段 ThinkingBlock、OpenAI 模型的推理字段 ReasoningItemModel。SDK 为智能体透明化支持 OpenAI 响应 API,使客户端开发者可直接使用仅在该 API 开放的先进推理模型(如 GPT-5-Codex)。
  • 内置非函数调用模型支持:针对不原生支持函数调用的模型,SDK 实现了 NonNativeToolCallingMixin 混合类 —— 将工具 schema 转换为基于文本的提示指令,并通过结构化提示与正则提取技术,从模型输出中解析工具调用指令。这一设计使无函数调用能力的模型也能胜任智能体任务,大幅拓展了可用模型范围。
  • 多 LLM 路由支持:SDK 内置 RouterLLM(LLM 子类),允许智能体为不同的 LLM 请求匹配不同模型。开发者可通过自定义扩展 RouterLLM 并实现 select_llm () 方法,基于输入内容动态选择适配模型,其适配标准如下。

    • 性能:某些模型在特定任务(例如,编程、推理、创意写作)方面表现出色。
    • 成本:不同模型具有不同的价格点。
    • 功能:模型提供多样化的功能、上下文窗口大小和微调选项。
    • 可用性/冗余:拥有替代方案可以确保即使一个提供商出现问题,应用程序仍能正常运行。

  • 完善的工程化能力:内置重试机制(支持失败重试、延迟策略)、请求 / 响应日志记录(可持久化到文件)、性能指标统计(延迟、成本),同时支持安全设置、缓存提示词等实用功能。
  • 函数调用灵活支持:对不原生支持函数调用的模型,提供基于提示词的模拟转换能力;对支持函数调用的模型,自动适配其参数格式,无需开发者关注底层差异。
  • 配置化驱动:通过 LLMConfig 统一管理模型参数(温度系数、最大输出 token 数、API 密钥等),支持动态调整模型配置,适配不同场景需求。
4.1.2 代码

代码如下。
  1. class LLM(RetryMixin, DebugMixin):
  2.     """语言模型(LLM)实例的封装类,提供统一的模型调用接口。
  3.     属性:
  4.         config: LLMConfig 对象,存储模型的配置参数(如模型名称、API密钥、温度系数等)。
  5.     """
  6.     def __init__(
  7.         self,
  8.         config: LLMConfig,
  9.         service_id: str,
  10.         metrics: Metrics | None = None,
  11.         retry_listener: Callable[[int, int], None] | None = None,
  12.     ) -> None:
  13.         """初始化 LLM 实例。若传入 LLMConfig,其参数将作为默认值;
  14.         直接传入的简单参数会覆盖 config 中的对应配置。
  15.         参数:
  16.             config: 模型配置对象,包含模型调用所需的所有参数。
  17.             service_id: 服务标识,用于关联当前 LLM 实例所属的服务。
  18.             metrics: 指标统计对象,用于记录模型调用的延迟、成本等信息(可选)。
  19.             retry_listener: 重试回调函数,每次重试时触发,接收(当前重试次数,总重试次数)作为参数(可选)。
  20.         """
  21.         # 标记是否已尝试获取模型信息
  22.         self._tried_model_info = False
  23.         # 标记是否支持成本统计指标
  24.         self.cost_metric_supported: bool = True
  25.         # 深拷贝配置对象,避免外部修改影响内部状态
  26.         self.config: LLMConfig = copy.deepcopy(config)
  27.         # 服务标识赋值
  28.         self.service_id = service_id
  29.         # 初始化指标统计对象(若未传入则创建默认实例)
  30.         self.metrics: Metrics = (
  31.             metrics if metrics is not None else Metrics(model_name=config.model)
  32.         )
  33.         # 模型信息(如支持的功能、参数限制等,后续通过 init_model_info 初始化)
  34.         self.model_info: ModelInfo | None = None
  35.         # 标记是否启用函数调用功能
  36.         self._function_calling_active: bool = False
  37.         # 重试回调函数赋值
  38.         self.retry_listener = retry_listener
  39.         # 处理日志记录配置:若启用日志记录,需确保日志文件夹存在
  40.         if self.config.log_completions:
  41.             if self.config.log_completions_folder is None:
  42.                 raise RuntimeError(
  43.                     'log_completions_folder is required when log_completions is enabled'
  44.                 )
  45.             # 创建日志文件夹(若已存在则不报错)
  46.             os.makedirs(self.config.log_completions_folder, exist_ok=True)
  47.         # 调用 init_model_info 初始化模型信息,核心是获取 config.max_output_tokens(后续函数调用需用到)
  48.         # 忽略初始化过程中的警告信息
  49.         with warnings.catch_warnings():
  50.             warnings.simplefilter('ignore')
  51.             self.init_model_info()
  52.         # 打印调试日志:模型是否支持视觉能力
  53.         if self.vision_is_active():
  54.             logger.debug('LLM: model has vision enabled')
  55.         # 打印调试日志:是否启用提示词缓存
  56.         if self.is_caching_prompt_active():
  57.             logger.debug('LLM: caching prompt enabled')
  58.         # 打印调试日志:模型是否支持函数调用
  59.         if self.is_function_calling_active():
  60.             logger.debug('LLM: model supports function calling')
  61.         # 处理自定义分词器:若配置了自定义分词器,按指定路径加载
  62.         if self.config.custom_tokenizer is not None:
  63.             self.tokenizer = create_pretrained_tokenizer(self.config.custom_tokenizer)
  64.         else:
  65.             self.tokenizer = None
  66.         # 初始化模型调用的基础参数
  67.         kwargs: dict[str, Any] = {
  68.             'temperature': self.config.temperature,  # 温度系数,控制输出随机性
  69.             'max_completion_tokens': self.config.max_output_tokens,  # 最大输出token数
  70.         }
  71.         # 若配置了 top_k,添加到参数中(OpenAI 不支持该参数,LiteLLM 会特殊处理)
  72.         if self.config.top_k is not None:
  73.             kwargs['top_k'] = self.config.top_k
  74.         # 若配置了 top_p,添加到参数中(OpenAI 不支持该参数,但 LiteLLM 支持)
  75.         if self.config.top_p is not None:
  76.             kwargs['top_p'] = self.config.top_p
  77.         # 处理 OpenHands 专属模型:重写为 LiteLLM 代理格式
  78.         if self.config.model.startswith('openhands/'):
  79.             model_name = self.config.model.removeprefix('openhands/')
  80.             self.config.model = f'litellm_proxy/{model_name}'
  81.             self.config.base_url = 'https://llm-proxy.app.all-hands.dev/'
  82.             logger.debug(
  83.                 f'Rewrote openhands/{model_name} to {self.config.model} with base URL {self.config.base_url}'
  84.             )
  85.         # 获取当前模型支持的功能特性
  86.         features = get_features(self.config.model)
  87.         # 处理支持推理努力度(reasoning_effort)的模型
  88.         if features.supports_reasoning_effort:
  89.             # Gemini 模型特殊处理:仅将 'low'/'none' 映射为优化的思考预算
  90.             if 'gemini-2.5-pro' in self.config.model:
  91.                 logger.debug(
  92.                     f'Gemini model {self.config.model} with reasoning_effort {self.config.reasoning_effort}'
  93.                 )
  94.                 if self.config.reasoning_effort in {None, 'low', 'none'}:
  95.                     kwargs['thinking'] = {'budget_tokens': 128}  # 思考预算设为 128 token
  96.                     kwargs['allowed_openai_params'] = ['thinking']  # 允许传递 thinking 参数
  97.                     kwargs.pop('reasoning_effort', None)  # 移除原 reasoning_effort 参数
  98.                 else:
  99.                     kwargs['reasoning_effort'] = self.config.reasoning_effort
  100.                 logger.debug(
  101.                     f'Gemini model {self.config.model} with reasoning_effort {self.config.reasoning_effort} mapped to thinking {kwargs.get("thinking")}'
  102.                 )
  103.             # Claude Sonnet 4.5 不支持 reasoning_effort,直接移除该参数
  104.             elif 'claude-sonnet-4-5' in self.config.model:
  105.                 kwargs.pop('reasoning_effort', None)
  106.             # 其他支持的模型,直接传递 reasoning_effort 参数
  107.             else:
  108.                 kwargs['reasoning_effort'] = self.config.reasoning_effort
  109.             # 推理类模型不支持 temperature 和 top_p,移除这两个参数
  110.             kwargs.pop('temperature')
  111.             kwargs.pop('top_p')
  112.         # 处理 Azure 模型的参数兼容问题(参考:https://github.com/All-Hands-AI/OpenHands/issues/6777)
  113.         if self.config.model.startswith('azure'):
  114.             kwargs['max_tokens'] = self.config.max_output_tokens  # Azure 用 max_tokens 而非 max_completion_tokens
  115.             kwargs.pop('max_completion_tokens')
  116.         # 为支持安全设置的模型添加安全配置
  117.         if 'mistral' in self.config.model.lower() and self.config.safety_settings:
  118.             kwargs['safety_settings'] = self.config.safety_settings
  119.         elif 'gemini' in self.config.model.lower() and self.config.safety_settings:
  120.             kwargs['safety_settings'] = self.config.safety_settings
  121.         # 支持 AWS Bedrock 模型:添加 AWS 相关配置参数
  122.         kwargs['aws_region_name'] = self.config.aws_region_name
  123.         if self.config.aws_access_key_id:
  124.             # 从密钥管理中获取 AWS 访问密钥
  125.             kwargs['aws_access_key_id'] = (
  126.                 self.config.aws_access_key_id.get_secret_value()
  127.             )
  128.         if self.config.aws_secret_access_key:
  129.             # 从密钥管理中获取 AWS 密钥
  130.             kwargs['aws_secret_access_key'] = (
  131.                 self.config.aws_secret_access_key.get_secret_value()
  132.             )
  133.         # 禁用 Claude Opus 4.1 的 Anthropic 扩展思考功能(避免需要 'thinking' 内容块,参考:#10510)
  134.         if 'claude-opus-4-1' in self.config.model.lower():
  135.             kwargs['thinking'] = {'type': 'disabled'}
  136.         # Anthropic 约束:Opus 4.1 不能同时接受 temperature 和 top_p,若两者都存在则优先保留 temperature
  137.         _model_lower = self.config.model.lower()
  138.         if ('claude-opus-4-1' in _model_lower) and (
  139.             'temperature' in kwargs and 'top_p' in kwargs
  140.         ):
  141.             kwargs.pop('top_p', None)
  142.         # 绑定 LiteLLM 完成函数,预设固定参数(通过 partial 固化模型配置)
  143.         self._completion = partial(
  144.             litellm.completion,  # LiteLLM 的核心完成函数
  145.             model=self.config.model,  # 模型名称
  146.             # API 密钥(若配置则从密钥管理中获取)
  147.             api_key=self.config.api_key.get_secret_value()
  148.             if self.config.api_key
  149.             else None,
  150.             base_url=self.config.base_url,  # 模型服务基础 URL
  151.             api_version=self.config.api_version,  # API 版本(如 Azure 需指定)
  152.             custom_llm_provider=self.config.custom_llm_provider,  # 自定义 LLM 提供商
  153.             timeout=self.config.timeout,  # 超时时间
  154.             drop_params=self.config.drop_params,  # 是否允许 LiteLLM 丢弃不支持的参数
  155.             seed=self.config.seed,  # 随机种子(保证输出可复现)
  156.             **kwargs,  # 上述拼接的动态参数
  157.         )
  158.         # 保存未包装的原始 completion 函数(用于内部调用)
  159.         self._completion_unwrapped = self._completion
  160.         # 为 completion 函数添加重试装饰器(继承自 RetryMixin)
  161.         @self.retry_decorator(
  162.             num_retries=self.config.num_retries,  # 最大重试次数
  163.             retry_exceptions=LLM_RETRY_EXCEPTIONS,  # 触发重试的异常类型
  164.             retry_min_wait=self.config.retry_min_wait,  # 最小重试等待时间
  165.             retry_max_wait=self.config.retry_max_wait,  # 最大重试等待时间
  166.             retry_multiplier=self.config.retry_multiplier,  # 重试等待时间倍增系数
  167.             retry_listener=self.retry_listener,  # 重试回调函数
  168.         )
  169.         def wrapper(*args: Any, **kwargs: Any) -> Any:
  170.             """LiteLLM 完成函数的包装器,负责:
  171.             1. 解析输入消息(处理 Message 对象与字典格式的转换)
  172.             2. 模拟函数调用(对不支持函数调用的模型)
  173.             3. 日志记录(输入提示词、输出响应)
  174.             4. 性能指标统计(延迟、成本)
  175.             5. 响应格式转换与异常处理
  176.             """
  177.             # 延迟导入以避免循环依赖
  178.             from openhands.io import json
  179.             # 初始化消息参数(存储用户传入的消息)
  180.             messages_kwarg: (
  181.                 dict[str, Any] | Message | list[dict[str, Any]] | list[Message]
  182.             ) = []
  183.             # 标记是否需要模拟函数调用(模型不支持原生函数调用时为 True)
  184.             mock_function_calling = not self.is_function_calling_active()
  185.             # 处理位置参数:部分调用者可能直接传入 (model, messages, **kwargs)
  186.             if len(args) > 1:
  187.                 # 忽略第一个参数(模型名称,已通过 partial 固化)
  188.                 # 设计原则:不允许覆盖已配置的模型参数
  189.                 messages_kwarg = args[1] if len(args) > 1 else args[0]
  190.                 kwargs['messages'] = messages_kwarg
  191.                 # 移除前两个位置参数(已转换为关键字参数)
  192.                 args = args[2:]
  193.             # 处理关键字参数:若已传入 messages 则直接赋值
  194.             elif 'messages' in kwargs:
  195.                 messages_kwarg = kwargs['messages']
  196.             # 确保消息为列表格式(统一处理单条/多条消息)
  197.             messages_list = (
  198.                 messages_kwarg if isinstance(messages_kwarg, list) else [messages_kwarg]
  199.             )
  200.             # 格式化消息:将 Message 对象转换为模型可识别的字典格式
  201.             messages: list[dict] = []
  202.             if messages_list and isinstance(messages_list[0], Message):
  203.                 messages = self.format_messages_for_llm(
  204.                     cast(list[Message], messages_list)
  205.                 )
  206.             else:
  207.                 messages = cast(list[dict[str, Any]], messages_list)
  208.             # 更新 kwargs 中的 messages 为格式化后的结果
  209.             kwargs['messages'] = messages
  210.             # 保存原始函数调用相关消息(用于后续日志记录)
  211.             original_fncall_messages = copy.deepcopy(messages)
  212.             mock_fncall_tools = None
  213.             # 若需要模拟函数调用且传入了工具配置,转换消息格式
  214.             if mock_function_calling and 'tools' in kwargs:
  215.                 # 标记是否添加上下文学习示例(部分模型不需要)
  216.                 add_in_context_learning_example = True
  217.                 if (
  218.                     'openhands-lm' in self.config.model
  219.                     or 'devstral' in self.config.model
  220.                 ):
  221.                     add_in_context_learning_example = False
  222.                 # 将函数调用格式的消息转换为普通文本提示(模拟函数调用)
  223.                 messages = convert_fncall_messages_to_non_fncall_messages(
  224.                     messages,
  225.                     kwargs['tools'],
  226.                     add_in_context_learning_example=add_in_context_learning_example,
  227.                 )
  228.                 kwargs['messages'] = messages
  229.                 # 若模型支持停止词且未禁用,添加默认停止词
  230.                 if (
  231.                     get_features(self.config.model).supports_stop_words
  232.                     and not self.config.disable_stop_word
  233.                 ):
  234.                     kwargs['stop'] = STOP_WORDS
  235.                 # 移除 tools 参数(模拟调用时不需要传递)
  236.                 mock_fncall_tools = kwargs.pop('tools')
  237.                 # OpenHands 自研模型特殊处理:禁用工具调用
  238.                 if 'openhands-lm' in self.config.model:
  239.                     kwargs['tool_choice'] = 'none'
  240.                 else:
  241.                     # 其他模型:移除 tool_choice 参数(模拟调用时不支持)
  242.                     kwargs.pop('tool_choice', None)
  243.             # 校验消息非空:无消息则抛出异常
  244.             if not messages:
  245.                 raise ValueError(
  246.                     'The messages list is empty. At least one message is required.'
  247.                 )
  248.             # 记录 LLM 输入提示词日志
  249.             self.log_prompt(messages)
  250.             # 设置 LiteLLM 是否允许修改参数(默认允许,如为空消息添加默认内容)
  251.             # 注意:该设置为全局,无法通过 partial 覆盖
  252.             litellm.modify_params = self.config.modify_params
  253.             # 非 LiteLLM 代理模型:移除 extra_body 参数(仅代理模型支持)
  254.             if 'litellm_proxy' not in self.config.model:
  255.                 kwargs.pop('extra_body', None)
  256.             # 记录调用开始时间(用于计算延迟)
  257.             start_time = time.time()
  258.             # 抑制 LiteLLM 调用过程中 httpx 库的弃用警告
  259.             # 避免出现 "Use 'content=<...>' to upload raw bytes/text content" 警告
  260.             with warnings.catch_warnings():
  261.                 warnings.filterwarnings(
  262.                     'ignore', category=DeprecationWarning, module='httpx.*'
  263.                 )
  264.                 warnings.filterwarnings(
  265.                     'ignore',
  266.                     message=r'.*content=.*upload.*',
  267.                     category=DeprecationWarning,
  268.                 )
  269.                 # 调用原始 completion 函数(非流式,返回 ModelResponse 对象)
  270.                 resp: ModelResponse = self._completion_unwrapped(*args, **kwargs)
  271.             # 计算调用延迟并记录到指标中
  272.             latency = time.time() - start_time
  273.             response_id = resp.get('id', 'unknown')  # 获取响应 ID(无则设为 'unknown')
  274.             self.metrics.add_response_latency(latency, response_id)  # 记录延迟指标
  275.             # 深拷贝原始响应(用于模拟函数调用场景的日志记录)
  276.             non_fncall_response = copy.deepcopy(resp)
  277.             # 若启用了函数调用模拟且存在工具配置,将响应转换回函数调用格式
  278.             if mock_function_calling and mock_fncall_tools is not None:
  279.                 # 校验响应是否包含有效选项(Gemini 模型曾出现无选项的情况)
  280.                 if len(resp.choices) < 1:
  281.                     raise LLMNoResponseError(
  282.                         'Response choices is less than 1 - This is only seen in Gemini models so far. Response: '
  283.                         + str(resp)
  284.                     )
  285.                 # 获取非函数调用格式的响应消息
  286.                 non_fncall_response_message = resp.choices[0].message
  287.                 # 将 "原始消息 + 非函数调用响应" 转换为函数调用格式的响应
  288.                 fn_call_messages_with_response = (
  289.                     convert_non_fncall_messages_to_fncall_messages(
  290.                         messages + [non_fncall_response_message], mock_fncall_tools
  291.                     )
  292.                 )
  293.                 # 提取转换后的函数调用响应消息
  294.                 fn_call_response_message = fn_call_messages_with_response[-1]
  295.                 # 确保响应消息为 LiteLLMMessage 类型(若为字典则转换)
  296.                 if not isinstance(fn_call_response_message, LiteLLMMessage):
  297.                     fn_call_response_message = LiteLLMMessage(
  298.                         **fn_call_response_message
  299.                     )
  300.                 # 更新响应中的消息为函数调用格式
  301.                 resp.choices[0].message = fn_call_response_message
  302.             # 二次校验响应有效性:确保 choices 非空且至少有一个选项
  303.             if not resp.get('choices') or len(resp['choices']) < 1:
  304.                 raise LLMNoResponseError(
  305.                     'Response choices is less than 1 - This is only seen in Gemini models so far. Response: '
  306.                     + str(resp)
  307.                 )
  308.             # 记录 LLM 响应日志
  309.             self.log_response(resp)
  310.             # 响应后处理:计算调用成本(如 token 消耗对应的费用)
  311.             cost = self._post_completion(resp)
  312.             # 若启用响应日志持久化,将请求/响应数据写入文件
  313.             if self.config.log_completions:
  314.                 # 断言日志文件夹已配置(初始化时已校验,此处防止异常)
  315.                 assert self.config.log_completions_folder is not None
  316.                 # 构造日志文件名:模型名_时间戳.json(替换 '/' 为 '__' 避免路径错误)
  317.                 log_file = os.path.join(
  318.                     self.config.log_completions_folder,
  319.                     f'{self.config.model.replace("/", "__")}-{time.time()}.json',
  320.                 )
  321.                 # 构造日志数据字典
  322.                 _d = {
  323.                     'messages': messages,  # 实际发送给模型的消息(可能是模拟函数调用格式)
  324.                     'response': resp,      # 模型响应(可能是转换后的函数调用格式)
  325.                     'args': args,          # 调用时的位置参数
  326.                     'kwargs': {            # 调用时的关键字参数(过滤敏感/冗余字段)
  327.                         k: v
  328.                         for k, v in kwargs.items()
  329.                         if k not in ('messages', 'client')
  330.                     },
  331.                     'timestamp': time.time(),  # 调用时间戳
  332.                     'cost': cost,              # 调用成本(token 费用)
  333.                 }
  334.                 # 若启用了函数调用模拟,额外记录原始函数调用格式的消息和响应
  335.                 if mock_function_calling:
  336.                     # 覆盖 response 为非函数调用格式(与 messages 保持一致)
  337.                     _d['response'] = non_fncall_response
  338.                     # 新增字段记录原始函数调用格式数据
  339.                     _d['fncall_messages'] = original_fncall_messages
  340.                     _d['fncall_response'] = resp
  341.                 # 将日志数据写入文件(JSON 格式)
  342.                 with open(log_file, 'w') as f:
  343.                     f.write(json.dumps(_d))
  344.             # 返回最终处理后的模型响应
  345.             return resp
  346.         # 将包装后的函数赋值给 _completion,后续通过该属性调用模型
  347.         self._completion = wrapper
复制代码
4.2 LLMRegistry

4.2.1 作用

LLMRegistry 是 OpenHands 框架中管理 LLM 实例的核心组件,负责集中创建、复用和监控所有 LLM 资源。配合 create_registry_and_conversation_stats 函数,它构建了从配置解析到实例管理的完整链路,确保 LLM 资源高效利用且配置一致。LLMRegistry的功能如下。

  • 集中化实例管理:通过 service_to_llm 映射表跟踪所有 LLM 实例,避免重复创建,降低资源消耗;严格检查同一服务 ID 的配置一致性,防止冲突。
  • 灵活的实例获取机制

    • 支持通过服务 ID 直接获取或创建 LLM(get_llm)
    • 支持从代理配置自动推导 LLM 配置(get_llm_from_agent_config)
    • 支持临时补充生成需求(request_extraneous_completion)

  • 多模型路由支持:通过 get_router 方法集成 RouterLLM,实现基于代理配置的动态模型选择,灵活应对复杂任务对不同模型的需求。
  • 事件驱动的可扩展性:提供 subscribe 和 notify 机制,允许外部组件(如 ConversationStats)订阅 LLM 注册事件,轻松扩展统计、监控等功能。
  • 配置隔离与安全性:通过深拷贝配置和严格的实例创建逻辑,确保不同服务的 LLM 配置相互隔离,避免外部修改干扰内部状态。
  • 适配用户个性化设置:结合 create_registry_and_conversation_stats 函数,支持通过用户设置覆盖默认配置,兼顾通用性与个性化需求。
4.2.2 工作流

2.png

4.2.3 代码

LLMRegistry 的代码如下:
  1. class LLMRegistry:
  2.     """
  3.     LLM注册表:管理所有LLM实例的生命周期、配置和事件通知的核心组件。
  4.    
  5.     作用:
  6.     - 集中管理多个LLM实例,避免重复创建
  7.     - 确保同一服务ID的LLM配置一致性
  8.     - 支持事件订阅(如统计、监控)
  9.     - 提供路由LLM(RouterLLM)的创建能力
  10.     """
  11.     def __init__(
  12.         self,
  13.         config: OpenHandsConfig,
  14.         agent_cls: Optional[str] = None,
  15.         retry_listener: Optional[Callable[[int, int], None]] = None,
  16.     ):
  17.         self.registry_id = str(uuid4())  # 注册表唯一标识
  18.         self.config = copy.deepcopy(config)  # 深拷贝配置,避免外部修改影响
  19.         self.retry_listener = retry_listener  # 重试事件监听器(可选)
  20.         # 构建代理到LLM配置的映射(从全局配置中提取)
  21.         self.agent_to_llm_config = self.config.get_agent_to_llm_config_map()
  22.         self.service_to_llm: dict[str, LLM] = {}  # 服务ID到LLM实例的映射
  23.         self.subscriber: Optional[Callable[[Any], None]] = None  # 事件订阅者(如统计器)
  24.         # 确定当前激活的代理类型(用户指定优先,否则使用默认)
  25.         selected_agent_cls = self.config.default_agent
  26.         if agent_cls:
  27.             selected_agent_cls = agent_cls
  28.         # 基于代理类型获取对应的LLM配置
  29.         agent_name = selected_agent_cls if selected_agent_cls is not None else 'agent'
  30.         llm_config = self.config.get_llm_config_from_agent(agent_name)
  31.         # 初始化并激活代理的主LLM实例
  32.         self.active_agent_llm: LLM = self.get_llm('agent', llm_config)
  33.     def _create_new_llm(
  34.         self, service_id: str, config: LLMConfig, with_listener: bool = True
  35.     ) -> LLM:
  36.         """
  37.         内部方法:创建新的LLM实例并注册到注册表中。
  38.         
  39.         参数:
  40.             service_id: 服务唯一标识(用于区分不同LLM实例)
  41.             config: LLM配置
  42.             with_listener: 是否绑定重试监听器
  43.         
  44.         返回:
  45.             新创建的LLM实例
  46.         """
  47.         # 根据是否需要监听器,初始化LLM
  48.         if with_listener:
  49.             llm = LLM(
  50.                 service_id=service_id, config=config, retry_listener=self.retry_listener
  51.             )
  52.         else:
  53.             llm = LLM(service_id=service_id, config=config)
  54.         
  55.         # 记录到映射表中
  56.         self.service_to_llm[service_id] = llm
  57.         # 通知订阅者(如统计器)有新LLM注册
  58.         self.notify(RegistryEvent(llm=llm, service_id=service_id))
  59.         return llm
  60.     def request_extraneous_completion(
  61.         self, service_id: str, llm_config: LLMConfig, messages: list[dict[str, str]]
  62.     ) -> str:
  63.         """
  64.         请求额外的LLM生成(用于非主流程的补充生成需求)。
  65.         
  66.         参数:
  67.             service_id: 服务ID
  68.             llm_config: 对应的LLM配置
  69.             messages: 输入消息列表(格式:[{role: ..., content: ...}, ...])
  70.         
  71.         返回:
  72.             LLM生成的文本内容(去除首尾空白)
  73.         """
  74.         # 若服务ID未注册,则创建新LLM(不绑定监听器,适用于临时任务)
  75.         if service_id not in self.service_to_llm:
  76.             self._create_new_llm(
  77.                 config=llm_config, service_id=service_id, with_listener=False
  78.             )
  79.         # 获取LLM实例并执行生成
  80.         llm = self.service_to_llm[service_id]
  81.         response = llm.completion(messages=messages)
  82.         return response.choices[0].message.content.strip()
  83.     def get_llm_from_agent_config(self, service_id: str, agent_config: AgentConfig):
  84.         """
  85.         根据代理配置获取对应的LLM实例(支持复用已有实例)。
  86.         
  87.         参数:
  88.             service_id: 服务ID
  89.             agent_config: 代理配置对象
  90.         
  91.         返回:
  92.             匹配的LLM实例
  93.         """
  94.         # 从代理配置中提取LLM配置
  95.         llm_config = self.config.get_llm_config_from_agent_config(agent_config)
  96.         # 若实例已存在,直接返回(配置不一致时暂不处理,预留更新逻辑)
  97.         if service_id in self.service_to_llm:
  98.             if self.service_to_llm[service_id].config != llm_config:
  99.                 # TODO: 未来支持动态更新LLM配置
  100.                 # 当代理委托的配置不同时,应复用现有LLM
  101.                 pass
  102.             return self.service_to_llm[service_id]
  103.         # 实例不存在则创建新的
  104.         return self._create_new_llm(config=llm_config, service_id=service_id)
  105.     def get_llm(
  106.         self,
  107.         service_id: str,
  108.         config: Optional[LLMConfig] = None,
  109.     ) -> LLM:
  110.         """
  111.         获取或创建指定服务ID的LLM实例(核心方法)。
  112.         
  113.         参数:
  114.             service_id: 服务唯一标识
  115.             config: LLM配置(新实例必需)
  116.         
  117.         返回:
  118.             对应的LLM实例
  119.         
  120.         异常:
  121.             ValueError: 同一服务ID配置不一致,或创建新实例时无配置
  122.         """
  123.         # 检查同一服务ID的配置是否一致(防止冲突)
  124.         if (
  125.             service_id in self.service_to_llm
  126.             and self.service_to_llm[service_id].config != config
  127.         ):
  128.             raise ValueError(
  129.                 f"Service ID {service_id} requested with different config. Use a new service ID."
  130.             )
  131.         # 实例已存在则直接返回
  132.         if service_id in self.service_to_llm:
  133.             return self.service_to_llm[service_id]
  134.         # 新实例必须提供配置
  135.         if not config:
  136.             raise ValueError("Cannot create new LLM without specifying config.")
  137.         # 创建并返回新实例
  138.         return self._create_new_llm(config=config, service_id=service_id)
  139.     def get_active_llm(self) -> LLM:
  140.         """返回当前激活的代理主LLM实例"""
  141.         return self.active_agent_llm
  142.     def get_router(self, agent_config: AgentConfig) -> LLM:
  143.         """
  144.         获取路由LLM实例(用于多模型路由选择)。
  145.         
  146.         参数:
  147.             agent_config: 代理配置(包含路由规则)
  148.         
  149.         返回:
  150.             路由LLM实例(RouterLLM)或主LLM(当路由为noop时)
  151.         """
  152.         # 从代理配置中获取路由名称
  153.         router_name = agent_config.model_routing.router_name
  154.         # 若为"noop_router"(无操作路由),直接返回主LLM
  155.         if router_name == 'noop_router':
  156.             return self.get_llm_from_agent_config('agent', agent_config)
  157.         # 否则创建并返回路由LLM实例
  158.         return RouterLLM.from_config(
  159.             agent_config=agent_config,
  160.             llm_registry=self,
  161.             retry_listener=self.retry_listener,
  162.         )
  163.     def subscribe(self, callback: Callable[[RegistryEvent], None]) -> None:
  164.         """
  165.         订阅注册表事件(如新LLM创建)。
  166.         
  167.         参数:
  168.             callback: 事件触发时的回调函数
  169.         """
  170.         self.subscriber = callback
  171.         # 订阅后,立即通知已存在的主LLM实例(补报历史事件)
  172.         self.notify(
  173.             RegistryEvent(
  174.                 llm=self.active_agent_llm,
  175.                 service_id=self.active_agent_llm.service_id
  176.             )
  177.         )
  178.     def notify(self, event: RegistryEvent) -> None:
  179.         """
  180.         通知订阅者事件发生(如LLM注册)。
  181.         
  182.         参数:
  183.             event: 注册表事件对象
  184.         """
  185.         if self.subscriber:
  186.             try:
  187.                 self.subscriber(event)
  188.             except Exception as e:
  189.                 logger.warning(f"Failed to notify subscriber of event: {e}")
复制代码
0xFF 参考

https://docs.all-hands.dev/openhands/usage/architecture/backend
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第二篇:Agent 相关核心概念】  克里
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第一篇:系列导读】 克里
Coding Agent之Openhands解析(含代码)  Arrow
OpenHands 源码解读  一力辉
Google Agent 白皮书解析
Agent开发实践:从想法到产品——SSE、上下文工程与流式解析关键技术攻坚
Agent开发实践:从想法到产品——系统架构设计实践

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

4 天前

举报

4 小时前

举报

您需要登录后才可以回帖 登录 | 立即注册