找回密码
 立即注册
首页 业界区 业界 openclaw平替之nanobot源码解析(七):Gateway与多渠道 ...

openclaw平替之nanobot源码解析(七):Gateway与多渠道集成

胁冉右 昨天 21:15
在之前的文章中,我们深入研究了 nanobot 的大脑(AgentLoop)、心脏(MessageBus)和记忆(Memory)。今天,我们要聊聊它的”触角”——Gateway(网关)与 Channels(渠道)
正是这套系统,让 nanobot 不仅仅是一个终端工具,而是一个能 24/7 在线、随时随地为你服务的全能助手。
1. Gateway:Agent 的”生产环境”

当你运行 nanobot gateway 时,你实际上启动了一个长期运行的服务。它的核心逻辑在 nanobot/cli/commands.py 的 gateway 函数中,通过 asyncio.gather 同时拉起了五个核心服务:

  • MessageBus:消息总线,连接所有组件。
  • AgentLoop:大脑,负责对话逻辑。
  • ChannelManager:连接 Telegram、飞书、Discord 等渠道。
  • CronService:精准的”闹钟”,执行预设的定时任务。
  • HeartbeatService:AI 的”脉搏”,驱动主动思考。
2. ChannelManager:消息的”交通警察”

ChannelManager(nanobot/channels/manager.py)是所有渠道的管理者。它有两个核心任务:
任务 A:启动所有渠道
它会遍历你的 config.json,如果发现 telegram.enabled: true,就会去实例化 TelegramChannel 并调用它的 start() 方法。目前支持的渠道包括:
渠道特点TelegramBot API + Webhook/轮询DiscordGateway Bot + IntentsWhatsAppNode.js Bridge + WebSocket飞书WebSocket 长连接SlackSocket ModeDingTalkStream ModeQQbotpy SDK + WebSocketEmailIMAP/SMTP 轮询MochatSocket.IO WebSocketMatrixMatrix Protocol任务 B:分发下行消息(Outbound Dispatcher)
这是一个非常关键的异步任务。它持续监听 MessageBus.outbound 队列,一旦 Agent 输出了回复,就会根据消息中的 channel 字段找到对应的渠道实例并调用 send() 方法。
  1. # nanobot/channels/manager.py 完整逻辑
  2. async def _dispatch_outbound(self) -> None:
  3.     """Dispatch outbound messages to the appropriate channel."""
  4.     logger.info("Outbound dispatcher started")
  5.     while True:
  6.         try:
  7.             msg = await asyncio.wait_for(
  8.                 self.bus.consume_outbound(),
  9.                 timeout=1.0
  10.             )
  11.             # 过滤 progress 消息(可配置开关)
  12.             if msg.metadata.get("_progress"):
  13.                 if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints:
  14.                     continue
  15.                 if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress:
  16.                     continue
  17.             channel = self.channels.get(msg.channel)
  18.             if channel:
  19.                 try:
  20.                     await channel.send(msg)
  21.                 except Exception as e:
  22.                     logger.error("Error sending to{}:{}", msg.channel, e)
  23.             else:
  24.                 logger.warning("Unknown channel:{}", msg.channel)
  25.         except asyncio.TimeoutError:
  26.             continue
  27.         except asyncio.CancelledError:
  28.             break
复制代码
这里有一个可配置的消息过滤机制:Agent 在工具执行过程中会发送 progress 消息(用于显示执行进度),你可以通过 config.json 中的 send_progress 和 send_tool_hints 开关来控制是否要在渠道中显示这些中间状态。
3. BaseChannel:统一的”翻译官”

nanobot 支持十几种渠道,但它们的底层协议各不相同。为了让 Agent 核心逻辑保持简洁,nanobot 定义了一个抽象基类 BaseChannel(nanobot/channels/base.py)。
每个具体的渠道只需要实现三个抽象方法:

  • start():连接到平台并开始监听消息。
  • stop():停止渠道并清理资源。
  • send():收到总线的回复后,将其”翻译”成平台协议发出去。
而 _handle_message() 则是 BaseChannel 提供的一个具体实现方法——它负责权限检查(is_allowed())和消息格式标准化:
  1. # nanobot/channels/base.py
  2. async def _handle_message(
  3.     self,
  4.     sender_id: str,
  5.     chat_id: str,
  6.     content: str,
  7.     media: list[str] | None = None,
  8.     metadata: dict[str, Any] | None = None,
  9.     session_key: str | None = None,
  10. ) -> None:
  11.     # 第一步:权限检查
  12.     if not self.is_allowed(sender_id):
  13.         logger.warning(
  14.             "Access denied for sender{} on channel{}. "
  15.             "Add them to allowFrom list in config to grant access.",
  16.             sender_id, self.name,
  17.         )
  18.         return
  19.     # 第二步:封装成统一的 InboundMessage 丢进总线
  20.     msg = InboundMessage(
  21.         channel=self.name,
  22.         sender_id=str(sender_id),
  23.         chat_id=str(chat_id),
  24.         content=content,
  25.         media=media or [],
  26.         metadata=metadata or {},
  27.         session_key_override=session_key,
  28.     )
  29.     await self.bus.publish_inbound(msg)
复制代码
每个渠道的具体实现只需要解析平台特有的消息格式,然后调用 _handle_message() 即可——权限检查和总线发布这些通用逻辑不需要重复写。
4. 安全第一:allow_from 机制

让 AI 暴露在公网上是危险的。nanobot 在 BaseChannel 中内置了权限检查:
  1. def is_allowed(self, sender_id: str) -> bool:
  2.     """Check if *sender_id* is permitted.  Empty list → deny all; ``"*"`` → allow all."""
  3.     allow_list = getattr(self.config, "allow_from", [])
  4.     if not allow_list:
  5.         logger.warning("{}: allow_from is empty — all access denied", self.name)
  6.         return False
  7.     if "*" in allow_list:
  8.         return True
  9.     return str(sender_id) in allow_list
复制代码
这段逻辑有三个要点:

  • allow_from 为空:默认拒绝所有访问(return False)。
  • "*" 在白名单中:允许所有用户访问。
  • 其他情况:只有在白名单中的 sender_id 才会被放行。
这种默认拒绝的策略,保证了你的 API 额度不会被陌生人耗尽。
5. 跨语言的艺术:WhatsApp Bridge

值得一提的是 nanobot 处理 WhatsApp 的方式。由于 WhatsApp 的协议比较复杂且主要由 Node.js 社区维护,nanobot 并没有强行用 Python 重写,而是采用了一个 Bridge(桥接)模式

  • Python 端(nanobot/channels/whatsapp.py):负责逻辑、消息路由、权限检查。
  • Node.js 端(bridge/):使用 @whiskeysockets/baileys 处理 WhatsApp Web 协议。
  • 通信:两者通过 WebSocket 进行跨语言对话。
  1. # Python 端连接到 Bridge
  2. async with websockets.connect(bridge_url) as ws:
  3.     self._ws = ws
  4.     if self.config.bridge_token:
  5.         await ws.send(json.dumps({"type": "auth", "token": self.config.bridge_token}))
  6.     # 监听来自 Bridge 的消息
  7.     async for message in ws:
  8.         await self._handle_bridge_message(message)
复制代码
这种”专业的事交给专业的语言去做”的思想,非常值得借鉴。
总结

nanobot 的 Gateway 架构展示了如何构建一个高内聚、低耦合的多渠道系统。通过统一的消息模型和抽象的渠道接口,它成功地屏蔽了不同社交平台之间的复杂性。
本文是这个系列的第六篇。在下一篇中,我们将深入探讨 Gateway 的进阶功能——定时任务与心跳机制,看看 nanobot 如何让 AI 从”被动响应”转变为”主动服务”。
懂原理,比会调包更重要。 希望这个系列能帮你拆掉 AI Agent 的”黑盒”,让你在构建自己的智能体时更加游刃有余。

感谢阅读!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册