找回密码
 立即注册
首页 业界区 业界 MAF快速入门(11)并行工作流

MAF快速入门(11)并行工作流

频鹏凶 昨天 19:00
大家好,我是Edison。
最近我一直在跟着圣杰的《.NET+AI智能体开发进阶》课程学习MAF的开发技巧,我强烈推荐你也上车跟我一起出发!
上一篇,我们学习了MAF中如何进行循环(loop)路由。本篇,我们来了解下在MAF中如何并行执行(fan-out/fan-in)的工作流。
1 并行执行模式

在实际业务场景中,往往需要在工作流中让多个Agent同时运行再通过聚合结果做做一些数据分析或决策呈现,这时就需要并行执行机制。
在MAF中,我们可以使用 Fan-Out/Fan-In 模式来实现这个目的,如下代码片段所示:
  1. var workflow = new WorkflowBuilder(startExecutor)
  2.     .AddFanOutEdge(startExecutor, [amazonExecutor, ebayExecutor, shopeeExecutor])
  3.     .AddFanInEdge([amazonExecutor, ebayExecutor, shopeeExecutor], strategyExecutor)
  4.     .WithOutputFrom(strategyExecutor)
  5.     .Build();
复制代码
可以看到,我们通过MAF的 AddFanOutEdge 和 AddFanInEdge 来实现了并行执行的目的,最后通过一个自定义的执行器来做聚合。
核心概念补充:

  • Fan-Out Edge => 并发执行边
  • Fan-In Edge => 汇聚边
2 并行工作流实验案例

假设我们是一个跨境电商团队,想要实时监控同一个商品在多个电商平台(如亚马逊、eBay、Shopee等)的定价策略,在检测到竞争对手降价时快速做出响应决策。
因此,我们的目标是:配置一个 Fan-Out + Fan-In 工作流,实现一次查询、并行抓取、智能决策的企业级模式。
2.1 关键依赖包引入

在今天的这个案例中,我们仍然创建了一个.NET控制台应用程序,安装了以下NuGet包:

  • Microsoft.Agents.AI.OpenAI
  • Microsoft.Agents.AI.Workflows
  • Microsoft.Extensions.AI.OpenAI
2.2 定义数据传输模型

首先,我们定义一下在这个工作流中需要生成传递的数据模型:
PriceQueryDto :价格查询模型DTO
  1. internal class PriceQueryDto
  2. {
  3.     public string ProductId { get; private set; }
  4.     public string ProductName { get; private set; }
  5.     public string TargetRegion { get; private set; }
  6.     public PriceQueryDto(string productId, string productName, string targetRegion)
  7.     {
  8.         ProductId = productId;
  9.         ProductName = productName;
  10.         TargetRegion = targetRegion;
  11.     }
  12. }
复制代码
2.3 定义Agents&Executors

(1)价格查询:封装各大电商平台的价格查询逻辑,模拟其API响应,仅仅做演示用无实际逻辑:
  1. internal sealed class PlatformPriceExecutor : Executor<ChatMessage>
  2. {
  3.     private readonly string _instructions;
  4.     private readonly IChatClient _chatClient;
  5.     public PlatformPriceExecutor(string id, IChatClient chatClient, string platformInstructions)
  6.         : base(id)
  7.     {
  8.         _chatClient = chatClient;
  9.         _instructions = platformInstructions;
  10.     }
  11.     public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
  12.     {
  13.         var messages = new List<ChatMessage>
  14.         {
  15.             new(ChatRole.System, _instructions),
  16.             message
  17.         };
  18.         var response = await _chatClient.GetResponseAsync(messages, cancellationToken: cancellationToken);
  19.         var replyMessage = new ChatMessage(ChatRole.Assistant, response.Text ?? string.Empty)
  20.         {
  21.             AuthorName = this.Id
  22.         };
  23.         await context.SendMessageAsync(replyMessage, cancellationToken: cancellationToken);
  24.         Console.WriteLine($"✅ {this.Id} 完成查询");
  25.     }
  26. }
复制代码
(2)广播查询请求执行器:负责广播价格查询请求到各大电商平台并发放TurnToken。
NOTE:只有发放了TurnToken才能真正开启执行后续LLM节点!
[code]internal sealed class PriceQueryStartExecutor() : Executor(nameof(PriceQueryStartExecutor)){    public override async ValueTask HandleAsync(PriceQueryDto query, IWorkflowContext context, CancellationToken cancellationToken = default)    {        var userPrompt = $@"商品ID: {query.ProductId}商品名称: {query.ProductName}目标区域: {query.TargetRegion}请查询该商品在你的平台上的当前价格、库存状态和配送信息。";        await context.SendMessageAsync(new ChatMessage(ChatRole.User, userPrompt), cancellationToken: cancellationToken);        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);        Console.WriteLine("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册