大家好,我是Edison。
最近我一直在跟着圣杰的《.NET+AI智能体开发进阶》课程学习MAF的开发技巧,我强烈推荐你也上车跟我一起出发!
上一篇,我们学习了MAF中如何进行循环(loop)路由。本篇,我们来了解下在MAF中如何并行执行(fan-out/fan-in)的工作流。
1 并行执行模式
在实际业务场景中,往往需要在工作流中让多个Agent同时运行再通过聚合结果做做一些数据分析或决策呈现,这时就需要并行执行机制。
在MAF中,我们可以使用 Fan-Out/Fan-In 模式来实现这个目的,如下代码片段所示:- var workflow = new WorkflowBuilder(startExecutor)
- .AddFanOutEdge(startExecutor, [amazonExecutor, ebayExecutor, shopeeExecutor])
- .AddFanInEdge([amazonExecutor, ebayExecutor, shopeeExecutor], strategyExecutor)
- .WithOutputFrom(strategyExecutor)
- .Build();
复制代码 可以看到,我们通过MAF的 AddFanOutEdge 和 AddFanInEdge 来实现了并行执行的目的,最后通过一个自定义的执行器来做聚合。
核心概念补充:
- Fan-Out Edge => 并发执行边
- Fan-In Edge => 汇聚边
2 并行工作流实验案例
假设我们是一个跨境电商团队,想要实时监控同一个商品在多个电商平台(如亚马逊、eBay、Shopee等)的定价策略,在检测到竞争对手降价时快速做出响应决策。
因此,我们的目标是:配置一个 Fan-Out + Fan-In 工作流,实现一次查询、并行抓取、智能决策的企业级模式。
2.1 关键依赖包引入
在今天的这个案例中,我们仍然创建了一个.NET控制台应用程序,安装了以下NuGet包:
- Microsoft.Agents.AI.OpenAI
- Microsoft.Agents.AI.Workflows
- Microsoft.Extensions.AI.OpenAI
2.2 定义数据传输模型
首先,我们定义一下在这个工作流中需要生成传递的数据模型:
PriceQueryDto :价格查询模型DTO- internal class PriceQueryDto
- {
- public string ProductId { get; private set; }
- public string ProductName { get; private set; }
- public string TargetRegion { get; private set; }
- public PriceQueryDto(string productId, string productName, string targetRegion)
- {
- ProductId = productId;
- ProductName = productName;
- TargetRegion = targetRegion;
- }
- }
复制代码 2.3 定义Agents&Executors
(1)价格查询:封装各大电商平台的价格查询逻辑,模拟其API响应,仅仅做演示用无实际逻辑:- internal sealed class PlatformPriceExecutor : Executor<ChatMessage>
- {
- private readonly string _instructions;
- private readonly IChatClient _chatClient;
- public PlatformPriceExecutor(string id, IChatClient chatClient, string platformInstructions)
- : base(id)
- {
- _chatClient = chatClient;
- _instructions = platformInstructions;
- }
- public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
- {
- var messages = new List<ChatMessage>
- {
- new(ChatRole.System, _instructions),
- message
- };
- var response = await _chatClient.GetResponseAsync(messages, cancellationToken: cancellationToken);
- var replyMessage = new ChatMessage(ChatRole.Assistant, response.Text ?? string.Empty)
- {
- AuthorName = this.Id
- };
- await context.SendMessageAsync(replyMessage, cancellationToken: cancellationToken);
- Console.WriteLine($"✅ {this.Id} 完成查询");
- }
- }
复制代码 (2)广播查询请求执行器:负责广播价格查询请求到各大电商平台并发放TurnToken。
NOTE:只有发放了TurnToken才能真正开启执行后续LLM节点!
[code]internal sealed class PriceQueryStartExecutor() : Executor(nameof(PriceQueryStartExecutor)){ public override async ValueTask HandleAsync(PriceQueryDto query, IWorkflowContext context, CancellationToken cancellationToken = default) { var userPrompt = $@"商品ID: {query.ProductId}商品名称: {query.ProductName}目标区域: {query.TargetRegion}请查询该商品在你的平台上的当前价格、库存状态和配送信息。"; await context.SendMessageAsync(new ChatMessage(ChatRole.User, userPrompt), cancellationToken: cancellationToken); await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken); Console.WriteLine("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |