找回密码
 立即注册
首页 业界区 业界 【OpenClaw】通过 Nanobot 源码学习架构 ---(4)SubAge ...

【OpenClaw】通过 Nanobot 源码学习架构 ---(4)SubAgent

府扔影 9 小时前
【OpenClaw】通过 Nanobot 源码学习架构 ---(4)SubAgent


目录

  • 【OpenClaw】通过 Nanobot 源码学习架构 ---(4)SubAgent

    • 0x00 概要
    • 0x01 基础背景

      • 1.1 原理:为什么需要 SubAgent?
      • 1.2 架构拓扑的演进
      • 1.3 领域隔离
      • 1.4 工作场景:Skills vs SubAgent,如何选择?
      • 1.5 工作流:主从协作的典型模式

    • 0x02 Nanobot SubAgent 功能

      • 2.1 SubAgent 与主 Agent 的区别

        • 2.1.1 设计目的
        • 2.1.2 Subagent 优势

      • 2.2 SubagentManager 类

        • 2.2.1 初始化参数
        • 2.2.2 内部状态管理

      • 2.3 创建子代理流程

        • 2.3.1 spawn() 方法详解
        • 2.3.2 子代理创建步骤

      • 2.4 子代理执行逻辑

        • 2.4.1. 构建子代理专用工具集
        • 2.4.2. 构建子代理专用提示
        • 2.4.3. 运行 Agent 循环(限制迭代次数)
        • 2.4.4. 处理工具调用
        • 2.4.5. 处理完成条件
        • 2.4.6. 处理未完成情况
        • 2.4.7. 通知结果
        • 2.4.8. 错误处理

      • 2.5 结果通知机制

        • 2.5.1 通知内容构建
        • 2.5.2 注入消息总线

      • 2.6 会话级取消机制
      • 2.7 状态查询方法

    • 0x03 Subagent 与 Main Agent 的关系分析

      • 3.1 相同的组件

        • 3.1.1 LLM Provider 完全共享
        • 3.1.2 MessageBus 共享
        • 3.1.3 Workspace 路径相同
        • 3.1.4 配置参数

      • 3.2 不同的组件

        • 3.2.1 工具集(ToolRegistry)完全不同且受限
        • 3.2.2 System Prompt 完全不同
        • 3.2.3 消息历史完全隔离
        • 3.2.4 最大迭代次数不同
        • 3.2.5 结果处理方式不同
        • 3.2.6 会话管理方式不同

      • 3.3 创建流程对比

        • 3.3.1 Main Agent 创建(在 gateway()):
        • 3.3.2 Subagent 创建(在 SubagentManager.spawn()):


    • 0xFF 参考


0x00 概要

OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。
nanobot 的 Subagent 实现是一个简洁但强大的后台任务执行机制。通过复用主 Agent 的 LLM provider 但限制工具集和迭代次数,实现了任务隔离和资源控制。消息总线机制确保子代理结果能够顺利通知主 Agent,最终传达给用户。这种设计使得主 Agent 可以保持专注的对话交互,同时将复杂任务委派给后台子代理执行。
  1. Parent agent                     Subagent
  2. +------------------+             +------------------+
  3. | messages=[...]   |             | messages=[]      | <-- fresh
  4. |                  |  dispatch   |                  |
  5. | tool: task       | ----------> | while tool_use:  |
  6. |   prompt="..."   |             |   call tools     |
  7. |                  |  summary    |   append results |
  8. |   result = "..." | <---------- | return last text |
  9. +------------------+             +------------------+
  10. Parent context stays clean. Subagent context is discarded.
复制代码
2.3.2 子代理创建步骤


  • 生成唯一标识符
  1. 主Agent接收任务
  2.     ↓
  3. [tool_use] Spawn(分析仓库结构), Spawn(整理竞品资料)
  4.     ↓
  5. 两个Sub-agent并行执行(各自有精简System Prompt)
  6.     ↓
  7. 返回仓库结构、竞品资料
  8.     ↓
  9. 主Agent Context中只有库结构、竞品资料的摘要,没有详尽的信息
  10.     ↓
  11. 主Agent完成比较分析
复制代码

  • 记录原始来源
  1. class SubagentManager:
  2.     """Manages background subagent execution."""
  3.     def __init__(
  4.         self,
  5.         provider: LLMProvider,          # LLM 提供商(复用主 Agent 的)
  6.         workspace: Path,                 # 工作空间路径
  7.         bus: MessageBus,                   # 消息总线(用于通知主 Agent)
  8.         model: str | None = None,        # 模型名称
  9.         temperature: float = 0.7,        # 温度参数
  10.         max_tokens: int = 4096,          # 最大 token 数
  11.         brave_api_key: str | None = None,   # 网络搜索 API 密钥
  12.         exec_config: ExecToolConfig | None = None,  # Shell 执行配置
  13.         restrict_to_workspace: bool = False,  # 是否限制到工作空间
  14.     ):
复制代码
用于后续将结果通知回正确的用户/渠道。

  • 创建并启动后台任务
  1. self._running_tasks: dict[str, asyncio.Task | None] = {}
  2. self._session_tasks: dict[str, set[str]] = {}
复制代码
创建异步任务来运行子代理,并将其注册到 _running_tasks 字典中。

  • 关联到会话
  1. async def spawn(
  2.     self,
  3.     task: str,                        # 子代理要执行的任务描述
  4.     label: str | None = None,          # 显示标签(用于用户识别)
  5.     origin_channel: str = "cli",       # 原始渠道(用于结果通知)
  6.     origin_chat_id: str = "direct",    # 原始聊天 ID
  7.     session_key: str | None = None,      # 会话键(用于会话级取消)
  8. ) -> str:
复制代码
如果提供了 session_key,将 task_id 加入该会话的子代理集合。这使得 /stop 命令可以取消整个会话的所有子代理。

  • 设置清理回调
  1. task_id = str(uuid.uuid4())[:8]  # 生成 8 字符的 UUID4,如 "a1b2c3d4"
  2. display_label = label or task[:30] + ("..." if len(task) > 30 else "")
复制代码
当子代理任务完成(无论成功或失败)时,回调函数执行:

  • 从 _running_tasks 移除 task_id
  • 从会话的 task_id 集合中移除
  • 如果该会话没有剩余的子代理,删除会话集条目

  • 返回用户反馈
  1. origin = {"channel": origin_channel, "chat_id": origin_chat_id}
复制代码
2.4 子代理执行逻辑

_run_subagent() 是子代理的核心执行方法,负责完整的 Agent 循环,其具体逻辑如下:
2.4.1. 构建子代理专用工具集
  1. bg_task = asyncio.create_task(
  2.     self._run_subagent(task_id, task, display_label, origin)
  3. )
  4. self._running_tasks[task_id] = bg_task
复制代码
重要设计:子代理的工具集与主 Agent 不同:

  • 包含:文件读写、目录列表、Shell 执行、网络搜索和获取
  • 排除:MessageTool(不能直接发送消息给用户)
  • 排除:SpawnTool(不能派生更多子代理)
  • 排除:CronTool(不能创建定时任务)
这种设计确保子代理专注于执行任务,不会干扰主对话流程或创建递归任务。
2.4.2. 构建子代理专用提示
  1. if session_key:
  2.     self._session_tasks.setdefault(session_key, set()).add(task_id)
复制代码
系统提示明确子代理的角色和限制:
  1. def _cleanup(_: asyncio.Task) -> None:
  2.     self._running_tasks.pop(task_id, None)
  3.     if session_key and (ids := self._session_tasks.get(session_key)):
  4.         ids.discard(task_id)
  5.         if not ids:
  6.             del self._session_tasks[session_key]
  7. bg_task.add_done_callback(_cleanup)
复制代码
这个提示确保子代理:

  • 专注于分配的任务
  • 不会发起新对话
  • 不会尝试与用户直接交互
  • 知道自己的能力边界
2.4.3. 运行 Agent 循环(限制迭代次数)

子代理使用与主 Agent 相同的 LLM provider,但迭代次数限制为 15 次,避免子代理运行过久。
  1. return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
复制代码
2.4.4. 处理工具调用

工具调用处理逻辑与主 Agent 类似:

  • 将工具调用添加到消息历史
  • 逐个执行工具
  • 将工具结果添加到消息历史
  • 继续循环,等待 LLM 下一轮响应
  1. tools = ToolRegistry()
  2. allowed_dir = self.workspace if self.restrict_to_workspace else None
  3. tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
  4. tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
  5. tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
  6. tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
  7. tools.register(ExecTool(
  8.     working_dir=str(self.workspace),
  9.     timeout=self.exec_config.timeout,
  10.     restrict_to_workspace=self.restrict_to_workspace,
  11.     path_append=self.exec_config.path_append,
  12. ))
  13. tools.register(WebSearchTool(api_key=self.brave_api_key))
  14. tools.register(WebFetchTool())
复制代码
2.4.5. 处理完成条件

当 LLM 返回文本响应而没有工具调用时,视为任务完成,退出循环。
  1. system_prompt = self._build_subagent_prompt(task)
  2. messages = [
  3.     {"role": "system", "content": system_prompt},
  4.     {"role": "user", "content": task},
  5. ]
复制代码
2.4.6. 处理未完成情况

如果达到最大迭代次数仍未产生最终响应,使用默认消息。
  1. # Subagent
  2. ## Current Time
  3. {now} ({tz})
  4. You are a subagent spawned by main agent to complete a specific task.
  5. ## Rules
  6. 1. Stay focused - complete only the assigned task, nothing else
  7. 2. Your final response will be reported back to main agent
  8. 3. Do not initiate conversations or take on side tasks
  9. 4. Be concise but informative in your findings
  10. ## What You Can Do
  11. - Read and write files in workspace
  12. - Execute shell commands
  13. - Search web and fetch web pages
  14. - Complete task thoroughly
  15. ## What You Cannot Do
  16. - Send messages directly to users (no message tool available)
  17. - Spawn other subagents
  18. - Access main agent's conversation history
  19. ## Workspace
  20. Your workspace is at: {workspace}
  21. Skills are available at: {workspace}/skills/ (read SKILL.md files as needed)
  22. When you have completed the task, provide a clear summary of your findings or actions.
复制代码
2.4.7. 通知结果

成功完成时调用 _announce_result() 方法通知主 Agent。
  1. max_iterations = 15  # 子代理的最大迭代次数(主 Agent 是 40)
  2. iteration = 0
  3. final_result: str | None = None
  4. while iteration < max_iterations:
  5.     iteration += 1
  6.    
  7.     response = await self.provider.chat(
  8.         messages=messages,
  9.         tools=tools.get_definitions(),
  10.         model=self.model,
  11.         temperature=self.temperature,
  12.         max_tokens=self.max_tokens,
  13.     )
复制代码
2.4.8. 错误处理

如果执行过程中发生异常,捕获并通知主 Agent 错误信息。
  1. if response.has_tool_calls:
  2.     # 构建工具调用消息
  3.     tool_call_dicts = [
  4.         {
  5.             "id": tc.id,
  6.             "type": "function",
  7.             "function": {
  8.                 "name": tc.name,
  9.                 "arguments": json.dumps(tc.arguments, ensure_ascii=False),
  10.             },
  11.         }
  12.         for tc in response.tool_calls
  13.     ]
  14.     messages.append({
  15.         "role": "assistant",
  16.         "content": response.content or "",
  17.         "tool_calls": tool_call_dicts,
  18.     })
  19.    
  20.     # 执行工具
  21.     for tool_call in response.tool_calls:
  22.         result = await tools.execute(tool_call.name, tool_call.arguments)
  23.         messages.append({
  24.             "role": "tool",
  25.             "tool_call_id": tool_call.id,
  26.             "name": tool_call.name,
  27.             "content": result,
  28.         })
复制代码
2.5 结果通知机制

子代理完成任务后,需要将结果通知给主 Agent,主 Agent 再转发给用户。_announce_result() 方法完成了此功能。
  1. else:
  2.     final_result = response.content
  3.     break
复制代码
2.5.1 通知内容构建
  1. if final_result is None:
  2.     final_result = "Task completed but no final response was generated."
复制代码
通知内容包含:

  • 子代理标签和状态
  • 原始任务描述
  • 执行结果
  • 指示主 Agent 如何处理(简洁地总结给用户)
2.5.2 注入消息总线

InboundMessage 通过 bus.publish_inbound() 将通知发布到入站队列。这会被 AgentLoop 接收并处理,最终将总结转发给用户。
  1. logger.info("Subagent [{}] completed successfully", task_id)
  2. await self._announce_result(task_id, label, task, final_result, origin, "ok")
复制代码
2.6 会话级取消机制

cancel_by_session() 方法实现了会话级取消机制。这个方法被主 Agent 的 /stop 命令处理调用,实现会话级的任务清理。
  1. except Exception as e:
  2.     error_msg = f"Error: {str(e)}"
  3.     logger.error("Subagent [{}] failed: {}", task_id, e)
  4.     await self._announce_result(task_id, label, task, error_msg, origin, "error")
复制代码
取消流程如下:

  • 从 _session_tasks 获取该会话关联的所有 task_id
  • 筛选出未完成的任务
  • 对每个任务调用 cancel() 方法
  • 等待所有任务取消完成
  • 返回取消的任务数量
2.7 状态查询方法

get_running_count() 返回当前运行的子代理数量,可用于监控和状态报告。
  1. async def _announce_result(
  2.     self,
  3.     task_id: str,
  4.     label: str,
  5.     task: str,
  6.     result: str,
  7.     origin: dict[str, str],
  8.     status: str,
  9. ) -> None:
复制代码
0x03 Subagent 与 Main Agent 的关系分析

Subagent 不是 Main Agent 的完全克隆,而是一个共享部分组件但功能受限的独立执行单元。Subagent 的设计模式是"共享基础组件 + 独立执行环境":

  • 共享资源:LLM Provider、MessageBus、Workspace、配置参数
  • 隔离执行:独立的工具集、消息历史、系统提示
  • 受限能力:不能发送消息、不能派生子代理、无对话历史
  • 结果聚合:通过 MessageBus 通知 Main Agent,由 Main Agent 统一输出
这种设计避免了子代理干扰主对话流程,同时确保资源高效利用。
架构关系图如下:
1.png

主Agent和SubAgent的对比如下:
特性主 AgentSubagent消息来源用户通过聊天平台主 Agent 的 spawn 调用目标对话交互执行特定任务迭代次数4015消息发送可用 MessageTool不可用子代理派生可用 SpawnTool不可用定时任务可用 CronTool不可用会话历史完整访问无访问权限结果输出直接发送给用户通知主 Agent运行方式同步(阻塞消息处理)异步(后台执行)3.1 相同的组件

3.1.1 LLM Provider 完全共享

共享原因:避免重复创建 API 连接,节省资源和维护成本。LLM 调用是无状态的,多个 Agent 可以安全地共享同一个 provider。
  1. status_text = "completed successfully" if status == "ok" else "failed"
  2. announce_content = f"""[Subagent '{label}' {status_text}]
  3. Task: {task}
  4. Result:
  5. {result}
  6. Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs."""
复制代码
3.1.2 MessageBus 共享

共享原因:Subagent 需要通过 MessageBus 将结果通知给 Main Agent,不是用来处理用户消息。
  1. msg = InboundMessage(
  2.     channel="system",                                    # 使用 system 渠道标识
  3.     sender_id="subagent",                                # 标识来自子代理
  4.     chat_id=f"{origin['channel']}:{origin['chat_id']}",  # 原始渠道和聊天 ID
  5.     content=announce_content,                            # 通知内容
  6. )
  7. await self.bus.publish_inbound(msg)
复制代码
3.1.3 Workspace 路径相同

共享原因:Subagent 访问相同的文件系统,能够读写主 Agent 工作空间中的文件。
  1. async def cancel_by_session(self, session_key: str) -> int:
  2.     """Cancel all subagents for a given session. Returns count cancelled."""
  3.     tasks = [
  4.         self._running_tasks[tid]
  5.         for tid in self._session_tasks.get(session_key, [])
  6.         if tid in self._running_tasks and not self._running_tasks[tid].done()
  7.     ]
  8.     for t in tasks:
  9.         t.cancel()
  10.     if tasks:
  11.         await asyncio.gather(*tasks, return_exceptions=True)
  12.     return len(tasks)
复制代码
3.1.4 配置参数

主Agent和Subagent的配置参数相同,虽然具体数值会有不同。
参数Main AgentSubagent说明modelconfig.agents.defaults.modelprovider.get_default_model()模型名称temperatureconfig.agents.defaults.temperature0.7温度参数max_tokensconfig.agents.defaults.max_tokens4096最大 token 数workspaceconfig.workspace_pathworkspace工作空间exec_configconfig.tools.exec传入的 exec_configShell 执行配置restrict_to_workspaceconfig.tools.restrict_to_workspace传入的值工作空间限制3.2 不同的组件

3.2.1 工具集(ToolRegistry)完全不同且受限

设计意图:Subagent 是"工具型"执行单元,专注于完成任务,不进行交互式对话或启动更多子任务。
Subagent 使用的工具:
  1. def get_running_count(self) -> int:
  2.     """Return number of currently running subagents."""
  3.     return len(self._running_tasks)
复制代码
Subagent 排除的工具:

  • MessageTool:不能直接发送消息给用户
  • SpawnTool:不能派生更多子代理(避免递归爆炸)
  • CronTool:不能创建定时任务
Main Agent 包含的工具:
  1. # SubagentManager 初始化时
  2. self.provider = provider  # 和 Main Agent 使用同一个实例
  3. # Subagent 执行时
  4. response = await self.provider.chat(
  5.     messages=messages,
  6.     tools=tools.get_definitions(),
  7.     model=self.model,
  8.     temperature=self.temperature,
  9.     max_tokens=self.max_tokens,
  10. )
复制代码
3.2.2 System Prompt 完全不同

区别:Subagent 的提示是"聚焦式"的,强调专注任务、不发起对话;Main Agent 的提示是"对话式"的,包含完整指南和记忆。
Subagent 的系统提示:
  1. self.bus = bus  # 和 Main Agent 使用同一个实例
  2. # Subagent 完成任务后
  3. await self.bus.publish_inbound(msg)  # 通知 Main Agent
复制代码
Main Agent 的系统提示:
[code]# nanobot
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册