找回密码
 立即注册
首页 业界区 安全 我用 SubAgent 做了一个 AI 自动修复闭环:流式修代码、 ...

我用 SubAgent 做了一个 AI 自动修复闭环:流式修代码、自动构建、失败重试

盒礁泅 昨天 20:35
最近用 AI 零代码项目生成前端页面有时候会出现问题,但是直接把原始报错信息返回给前端,再让用户手动点击按钮才能开始修复,用户体验不是很好,那么如何优化呢?我想到了 SubAgent 来进行修复!
如果把这类修复工作一直交给主 Agent 来做,会带来两个明显问题:

  • 修复过程会不断污染主 Agent 的上下文。
  • 错误日志、修复尝试和中间结果会迅速挤占上下文窗口,影响主 Agent 后续继续处理其他任务。
  • 并且越接近上下文窗口的上线,AI 的注意力就会“涣散”生成的效果也就越差。
1.png

源码

欢迎各位大佬 Star ⭐
源码地址:https://github.com/lieeew/leikooo-code-mother
整体流程

完整流程如下:

  • 用户点击“修复错误”。
  • 后端启动 SubAgent 的 SSE 修复流。
  • 查询当前激活版本号,定位当前版本目录。
  • 读取当前版本 metadata.json 中的 errorLog。
  • SubAgent 基于 errorLog 流式修复代码,前端实时展示修复内容。
  • AI 输出结束后,后端执行 build 和构建产物校验。
  • 后端更新当前版本的 metadata.json 和数据库状态。
  • 前端根据 SSE 展示本轮构建结果。
  • 如果构建失败,则进入下一轮修复,最多重试 3 次。
  • 如果最终修复成功,前端刷新版本列表和预览。
  • 修复成功后,前端把 SubAgent 的 [Fix Summary] 回传给主 Agent,让主 Agent 感知这次修复结果。
为什么不用主 Agent 直接修

主 Agent 负责的是整条业务链路,它需要理解页面、任务上下文、用户目标和历史操作。如果让它直接参与一轮又一轮的报错修复,短时间内确实能解决问题,但长期来看成本很高:

  • 大量错误日志和修复过程会污染主上下文。
  • 主 Agent 的注意力会被局部报错吸走,后续做别的任务时上下文质量下降。
  • 当上下文窗口逐渐逼近上限时,模型输出稳定性也会明显变差。
因此更合适的做法是把“局部、重复、面向报错”的工作隔离给 SubAgent,让主 Agent 只消费最终结果,而不是参与整个修复过程。
为什么在 Reactor 里必须切线程

2.png

这里有一个很容易踩的坑:我们使用的是 Reactor 的非阻塞线程模型,并且整个修复过程要通过 SSE 持续向前端推送状态。
在这种模式下,处理请求和推进事件循环的线程数量是有限的。如果把 build、文件读写、等待外部进程结束这类阻塞操作直接放到 Reactor 线程里执行,线程就会被长期占住,其他请求和 SSE 推送也会受到影响,系统吞吐量和响应性都会明显下降,用户体验直接爆炸。
Tomcat 传统上更接近“一请求一线程”模型。某个请求阻塞了,主要影响的是这个请求本身,只要线程池里还有空闲线程,其他请求通常还能继续跑。但 Reactor 是“少量线程处理大量连接”的事件驱动模型,核心线程的职责是分发和推进事件,一旦把阻塞逻辑塞进去,影响的就不只是当前请求,而是整个事件循环。
所以这里所有阻塞操作都要切到 boundedElastic 线程池执行:
  1. Mono.fromCallable(this::doBlockingWork)
  2.     .subscribeOn(Schedulers.boundedElastic());
复制代码
这个原则在本文里贯穿始终。像读取 metadata.json、执行 build、更新状态这类操作,都会放到 boundedElastic 上处理。
关键实现

3.png

1. 对外暴露一个 SSE 修复接口

前端点击“修复错误”后,后端会启动一个 SSE 流,把修复中的阶段信息、AI 输出和构建结果持续推给前端。
  1. @RestController
  2. @RequestMapping("/app")
  3. public class AppController {
  4.     @GetMapping(value = "/sub-agent/fix", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  5.     public Flux<ServerSentEvent<String>> fixAppWithSubAgent(
  6.             @RequestParam(name = "appId") Long appId) {
  7.         ThrowUtils.throwIf(appId == null, ErrorCode.PARAMS_ERROR);
  8.         UserVO user = userService.getUserLogin();
  9.         return subAgentService.executeFixLoop(appId.toString(), user);
  10.     }
  11. }
复制代码
对应的服务接口非常简单:
  1. public interface SubAgentService {
  2.     Flux<ServerSentEvent<String>> executeFixLoop(String appId, UserVO user);
  3. }
复制代码
2. 给 SubAgent 单独的 ChatClient 和会话隔离

为了避免修复流程污染主 Agent 的聊天记录,我单独给 SubAgent 配了一个 ChatClient,并使用独立的内存 ChatMemory。这里有三个关键点:

  • SubAgent 的聊天记录不是核心资产,使用内存存储即可。
  • 每一次修复任务都要生成独立的 conversationId,不能复用默认值,也不能直接复用 appId。
  • 整个修复循环结束后,再统一清理这次会话的聊天记录。
如果不显式指定 conversationId,默认值很容易导致不同修复任务之间出现上下文污染;如果直接用 appId,连续两次修复同一个应用时,历史聊天记录又会串起来,导致上下文重复累积。
因此更合理的做法是为每次修复生成一个唯一会话 ID,例如:
  1. private String generateSubAgentConversationId(String appId) {
  2.     return "subfix:" + appId + ":" +
  3.             UuidV7Generator.bytesToUuid(UuidV7Generator.generate());
  4. }
复制代码
然后把这个 conversationId 显式传给 ChatClient,这里的参数就指定了 CONVERSATION_ID
  1. advisorSpec.param(CONVERSATION_ID, conversationId)
复制代码
完整的代码如下:
  1. .advisors(advisorSpec -> {
  2.     advisorSpec.param(GEN_APP_INFO, ChatContext.of(appId, userId));
  3.     advisorSpec.param(CONVERSATION_ID, conversationId);
  4. })
  5. .toolContext(Map.of(GEN_APP_INFO, ToolsContext.of(appId, userId)))
复制代码
ChatClient 的配置本身并不复杂,重点是把它和主 Agent 的上下文彻底隔离开:
  1. @Configuration
  2. public class ChatClientConfig {
  3.     @Bean("subAgentChatMemoryRepository")
  4.     public ChatMemoryRepository subAgentChatMemoryRepository() {
  5.         return new InMemoryChatMemoryRepository();
  6.     }
  7.     @Bean("fixChatClient")
  8.     public ChatClient fixChatClient(
  9.             ChatModel primaryChatModel,
  10.             ExecuteToolAdvisor executeToolAdvisor,
  11.             SystemMessageFirstAdvisor systemMessageFirstAdvisor,
  12.             FileTools fileTools,
  13.             TodolistTools todolistTools,
  14.             ToolAdvisor toolAdvisor,
  15.             @Qualifier("subAgentChatMemoryRepository")
  16.             ChatMemoryRepository subAgentChatMemoryRepository) {
  17.         TodolistCacheCleanupAdvisor todolistCacheCleanupAdvisor =
  18.                 new TodolistCacheCleanupAdvisor();
  19.         return ChatClient.builder(primaryChatModel)
  20.                 .defaultAdvisors(
  21.                         executeToolAdvisor,
  22.                         systemMessageFirstAdvisor,
  23.                         toolAdvisor,
  24.                         todolistCacheCleanupAdvisor,
  25.                         MessageChatMemoryAdvisor.builder(
  26.                                 MessageWindowChatMemory.builder()
  27.                                         .chatMemoryRepository(subAgentChatMemoryRepository)
  28.                                         .maxMessages(100)
  29.                                         .build()
  30.                         ).build()
  31.                 )
  32.                 .defaultTools(fileTools, todolistTools)
  33.                 .build();
  34.     }
  35. }
复制代码
3. 用 Flux.concat() 串起“修复 -> 构建 -> 决策下一轮”

这里的执行顺序是严格的:

  • 先让 AI 修代码。
  • 再执行 build 和校验。
  • 根据结果决定是结束,还是进入下一轮修复。
所以这里必须使用 Flux.concat(),而不是 Flux.merge()。
Flux.concat() 会按顺序订阅上游,前一个流只有在发送 onComplete 之后,后一个流才会开始执行;这正好符合“修复完成后再构建”的要求。
相反,Flux.merge() 会并发订阅多个流,数据会按到达顺序交错发给下游。它适合并行汇聚多个事件源,但不适合这种必须按阶段推进的修复链路。
核心逻辑如下:
  1. private Flux<ServerSentEvent<String>> doFixLoopReactive(FixLoopContext context, int attempt) {
  2.     if (attempt > MAX_ATTEMPTS) {
  3.         return Flux.just(doneEvent(
  4.                 new DoneEventPayload(false, MAX_ATTEMPTS, "", context.aiContentAsString())));
  5.     }
  6.     return Flux.concat(
  7.             streamAiFixReactive(context, attempt),
  8.             continueAfterAi(context, attempt)
  9.     ).onErrorResume(e -> handleAiFailure(context, attempt, e));
  10. }
复制代码
其中,AI 修复阶段会先读取当前 errorLog,然后把模型输出按 SSE 方式流式推给前端:
  1. private Flux<ServerSentEvent<String>> streamAiFixReactive(FixLoopContext context, int attempt) {
  2.     return Mono.fromCallable(() -> getErrorLogs(context))
  3.             .subscribeOn(Schedulers.boundedElastic())
  4.             .flatMapMany(errorLog -> {
  5.                 GenAppDto dto = new GenAppDto(
  6.                         errorLog, context.appId, context.user, context.conversationId);
  7.                 return Flux.concat(
  8.                         Flux.just(phaseEvent(FixPhaseEnum.FIXING, attempt)),
  9.                         aiChatClient.fixCode(dto)
  10.                                 .timeout(Duration.ofMinutes(AI_TIMEOUT_MINUTES))
  11.                                 .doOnNext(context.aiContent::append)
  12.                                 .map(this::dataEvent)
  13.                 );
  14.             });
  15. }
复制代码
4. 构建失败就继续下一轮,最多 3 次

AI 输出结束后,会立即进入 build 和产物校验阶段。这里同样要把阻塞操作放到 boundedElastic 线程池里执行:
  1. private Flux<ServerSentEvent<String>> continueAfterAi(FixLoopContext context, int attempt) {
  2.     return Mono.fromCallable(() -> {
  3.                 BuildValidationResult buildResult = buildAndValidate(context.appId);
  4.                 updateMetadataFile(context, buildResult);
  5.                 syncVersionStatus(context, buildResult);
  6.                 return buildResult;
  7.             })
  8.             .subscribeOn(Schedulers.boundedElastic())
  9.             .flatMapMany(buildResult -> afterBuild(context, attempt, buildResult));
  10. }
复制代码
如果构建成功,直接发送 done 事件;
如果失败但还没到上限,就继续下一轮;
如果达到最大次数仍然失败,则返回最终失败结果:
  1. private Flux<ServerSentEvent<String>> afterBuild(
  2.         FixLoopContext context, int attempt, BuildValidationResult buildResult) {
  3.     Flux<ServerSentEvent<String>> buildEvents = buildEvents(buildResult, attempt);
  4.     if (buildResult.success()) {
  5.         return Flux.concat(buildEvents, Flux.just(doneEvent(successPayload(context, attempt))));
  6.     }
  7.     if (attempt >= MAX_ATTEMPTS) {
  8.         return Flux.concat(buildEvents, Flux.just(doneEvent(failurePayload(context, attempt))));
  9.     }
  10.     return Flux.concat(buildEvents, doFixLoopReactive(context, attempt + 1));
  11. }
复制代码
5. 修复循环结束后统一清理聊天记录

这里还有一个细节值得单独说一下:聊天记录不是在每一轮重试后立刻清,而是在整个修复循环彻底结束后统一清理。
原因如下:
1、如果每次重试都把上下文清掉,下一轮修复就拿不到上一轮已经产生的修复信息。
2、如果一直不清理,长时间积累又会增加内存压力,甚至带来 OOM 风险。
所以更平衡的做法是:在一次完整修复任务结束后,通过 doFinally 做统一清理。
  1. @Override
  2. public Flux<ServerSentEvent<String>> executeFixLoop(String appId, UserVO user) {
  3.     Long appIdLong = Long.parseLong(appId);
  4.     return Mono.fromCallable(() -> appVersionService.getCurrentVersionNum(appIdLong))
  5.             .subscribeOn(Schedulers.boundedElastic())
  6.             .defaultIfEmpty(0)
  7.             .flatMapMany(currentVersionNum -> {
  8.                 if (currentVersionNum == null || currentVersionNum <= 0) {
  9.                     return missingVersionEvents();
  10.                 }
  11.                 FixLoopContext context = new FixLoopContext(
  12.                         appId,
  13.                         currentVersionNum,
  14.                         user,
  15.                         generateSubAgentConversationId(appId)
  16.                 );
  17.                 return doFixLoopReactive(context, 1)
  18.                         .doFinally(signalType -> cleanupConversationMemory(context));
  19.             });
  20. }
复制代码
清理逻辑本身很直接:
  1. private void cleanupConversationMemory(FixLoopContext context) {
  2.     try {
  3.         subAgentChatMemoryRepository.deleteByConversationId(context.conversationId);
  4.         log.info("[SubAgent] Cleared chat memory, appId: {}, conversationId: {}",
  5.                 context.appId, context.conversationId);
  6.     } catch (Exception e) {
  7.         log.warn("[SubAgent] Failed to clear chat memory, appId: {}, conversationId: {}",
  8.                 context.appId, context.conversationId, e);
  9.     }
  10. }
复制代码
前端效果

前端这部分主要做了三件事:

  • 在用户触发修复后,及时展示当前状态和错误提示。
  • 把 SubAgent 的流式输出实时渲染出来,方便观察修复过程。
  • 在修复成功后,把 [Fix Summary] 回传给主 Agent,完成主从 Agent 之间的信息闭环。
如果出现错误,右上角会展示对应提示:
4.png

修复中的流式输出会展示在对话区域:
5.png

修复完成后,再把摘要通知给主 Agent:
6.png

总结

至此 SubAgent 的实现告一段落,大概有以下几点:

  • SubAgent 负责局部修复,避免污染主 Agent 上下文。
  • SSE 让修复过程可观察,前端能实时感知阶段变化。
  • build 和产物校验把“修没修好”从主观判断变成了可验证结果。
  • 自动重试让整个链路具备了基础的自愈能力。
下一期准备使用使用 Docker 隔离 AI 生成的代码,让 AI 生成的代码先在 Docker 里面跑 build => 成功生成 dist 文件 => 复制出来 => 前端进行展示

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册