找回密码
 立即注册
首页 业界区 业界 MAF快速入门(8)条件路由工作流

MAF快速入门(8)条件路由工作流

猷咎 昨天 21:20
大家好,我是Edison。
最近我一直在跟着圣杰的《.NET+AI智能体开发进阶》课程学习MAF的开发技巧,我强烈推荐你也上车跟我一起出发!
上一篇,我们学习了MAF中如何进行工作流的状态共享,相信你一定对WorkflowContext有了更多的了解。本篇,我们来了解下MAF中工作流是如何实现条件路由的。
条件路由的应用场景

在实际业务场景中,一个AI工作流的多个步骤之间往往会传递一些数据,有些步骤只会在数据满足一定条件下才会被触发,而有些步骤只会在数据不满足条件的时候才被触发,总之这种if-else的决策在工作流中很常见。
在MAF中,我们可以使用 Conditional Edge 即条件边 或 待决策函数的边,来实现这种工作流内部if-else决策需求。
实验案例:企业内部邮件检测

今天来实践上面这个企业内部邮件检测的工作流案例:假设我们是一个企业客服团队,我们每天会接收成百上千封用户发来的Email邮件,其中既包含正常咨询 也夹杂很多 垃圾信息。那么,如何降低人工分拣成本(减少人工负担) 又同时保留 风控可观测性(便于扩展更多节点),就是我们需要解决的问题。因此,我们可以构建一个基于LLM的工作流:

  • 首先,通过一个垃圾邮件检测器接收客户发来的邮件,通过LLM判断是否为垃圾邮件,输出一个判断结果(关键数据:IsSpam = true/false)。
  • 其次,系统根据判断结果IsSpam决定交由哪个后续节点处理。

    •   如果判断结果是非垃圾邮件,则转交给发送邮件执行器进行处理,比如发送给对应的客服代表;
    •   否则,则交给垃圾邮件处理执行器进行截断并记录或者上报人工处理等等。

  • 最后,结束工作流。
1.png

这个案例展示了Conditional Edge 条件路由的经典场景,后续它也可以扩展为多级路由。
准备工作

在今天的这个案例中,我们仍然创建了一个.NET控制台应用程序,安装了以下NuGet包:

  • Microsoft.Agents.AI.OpenAI
  • Microsoft.Agents.AI.Workflows
  • Microsoft.Extensions.AI.OpenAI
我们的配置文件中定义了LLM API的信息:
  1. {
  2.   "OpenAI": {
  3.     "EndPoint": "https://api.siliconflow.cn",
  4.     "ApiKey": "******************************",
  5.     "ModelId": "Qwen/Qwen3-30B-A3B-Instruct-2507"
  6.   }
  7. }
复制代码
这里我们使用 SiliconCloud 提供的 Qwen/Qwen3-30B-A3B-Instruct-2507 模型,你可以通过这个URL注册账号:https://cloud.siliconflow.cn/i/DomqCefW 获取大量免费的Token来进行本次实验。
然后,我们将配置文件中的API信息读取出来:
  1. var config = new ConfigurationBuilder()
  2.     .AddJsonFile($"appsettings.json", optional: false, reloadOnChange: true)
  3.     .Build();
  4. var openAIProvider = config.GetSection("OpenAI").Get<OpenAIProvider>();
复制代码
定义数据传输模型

首先,我们定义一下在这个工作流中需要生成传递的数据模型:
(1)DetectionResult :拉件邮件检测结果
  1. public sealed class DetectionResult
  2. {
  3.     [JsonPropertyName("is_spam")]
  4.     public bool IsSpam { get; set; }
  5.     [JsonPropertyName("reason")]
  6.     public string Reason { get; set; } = string.Empty;
  7.     [JsonIgnore]
  8.     public string EmailId { get; set; } = string.Empty;
  9. }
复制代码
(2)EmailStateConstants :常量,类似于Cache Key的作用
  1. internal static class EmailStateConstants
  2. {
  3.     public const string EmailStateScope = "EmailState";
  4. }
复制代码
(3)EmailMessage & EmailResponse :DTO作用
  1. internal sealed class EmailMessage
  2. {
  3.     [JsonPropertyName("email_id")]
  4.     public string EmailId { get; set; } = string.Empty;
  5.     [JsonPropertyName("email_content")]
  6.     public string EmailContent { get; set; } = string.Empty;
  7. }
  8. public sealed class EmailResponse
  9. {
  10.     [JsonPropertyName("response")]
  11.     public string Response { get; set; } = string.Empty;
  12. }
复制代码
入口节点:垃圾邮件检测Executor

这个垃圾邮件检测是本流程的核心节点,它接收用户邮件内容并调用LLM做检测,最后生成该邮件的唯一ID并将邮件原文写入工作流共享状态存储区,返回唯一ID供下游节点使用。
  1. internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
  2. {
  3.     private readonly AIAgent _agent;
  4.     private readonly AgentThread _thread;
  5.     public SpamDetectionExecutor(AIAgent agent) : base("SpamDetectionExecutor")
  6.     {
  7.         // 创建 Agent 和对话线程
  8.         this._agent = agent;
  9.         this._thread = this._agent.GetNewThread();
  10.     }
  11.     public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
  12.     {
  13.         var trackedEmail = new EmailMessage
  14.         {
  15.             EmailId = Guid.NewGuid().ToString("N"),
  16.             EmailContent = message.Text
  17.         };
  18.         await context.QueueStateUpdateAsync(trackedEmail.EmailId, trackedEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
  19.         var agentResponse = await _agent.RunAsync(message, cancellationToken: cancellationToken);
  20.         var detectionResult = JsonSerializer.Deserialize<DetectionResult>(agentResponse.Text)
  21.             ?? throw new InvalidOperationException("无法解析 Spam Detection 响应。");
  22.         detectionResult.EmailId = trackedEmail.EmailId;
  23.         return detectionResult;
  24.     }
  25. }
复制代码
在这个Executor中,它接收我们如下所示定义好的Agent来实现:
  1. var spamDetectionAgent = new ChatClientAgent(
  2.     chatClient,
  3.     new ChatClientAgentOptions(instructions: "You are a spam detection assistant that labels spam emails with reasons.")
  4.     {
  5.         ChatOptions = new()
  6.         {
  7.             ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
  8.         }
  9.     });
复制代码
可以看到,我们在ChatOptions中指定了该Agent返回的消息需要进行序列化到强类型,便于后续通过强类型数据进行决策路由。
下游节点A:正常邮件处理+发送

这里我们针对识别到的正常邮件开发两个执行器,假设其用于邮件处和转发:
(1)邮件处理:读取共享状态区的原文,然后调用Agent输出JSON回复。
  1. internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
  2. {
  3.     private readonly AIAgent _agent;
  4.     private readonly AgentThread _thread;
  5.     public EmailAssistantExecutor(AIAgent agent) : base("EmailAssistantExecutor")
  6.     {
  7.         // 创建 Agent 和对话线程
  8.         this._agent = agent;
  9.         this._thread = this._agent.GetNewThread();
  10.     }
  11.     public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
  12.     {
  13.         if (message.IsSpam)
  14.             throw new InvalidOperationException("Spam 邮件不应进入 EmailAssistantExecutor。");
  15.         var email = await context.ReadStateAsync<EmailMessage>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken)
  16.             ?? throw new InvalidOperationException("找不到对应 Email 内容。");
  17.         var agentResponse = await _agent.RunAsync(email.EmailContent, _thread, cancellationToken: cancellationToken);
  18.         var emailResponse =  JsonSerializer.Deserialize<EmailResponse>(agentResponse.Text)
  19.             ?? throw new InvalidOperationException("无法解析 Email Assistant 响应。");
  20.         return emailResponse;
  21.     }
  22. }
复制代码
这里的Agent定义如下:
  1. var emailAssistantAgent = new ChatClientAgent(
  2.     chatClient,
  3.     new ChatClientAgentOptions(instructions: "You are an enterprise email assistant. You can provide professional Chinese responses to user.")
  4.     {
  5.         ChatOptions = new()
  6.         {
  7.             ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
  8.         }
  9.     });
复制代码
(2)邮件转发:模拟邮件转发到具体的客服,这里仅仅使用YieldOutputAsync完成工作流输出消息内容。
[code]internal sealed class EmailSendingExecutor() : Executor("EmailSendingExecutor"){    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default)    {        await context.YieldOutputAsync($"
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册