大家好,我是Edison。
上一篇,我们学习了MAF中如何持久化聊天记录到关系型数据库。这一篇,我们来学习一下工作流编排。不知大家是否记得,我们在之前用Semantic Kernel学习过多Agent编排的一些知识,例如顺序,并发,移交等模式仍然历历在目。那么,今天,我们用MAF的工作流编排来实现一下,看看有什么不一样。
多Agent编排
传统的单代理系统在处理复杂多面任务的能力方面受到限制。 通过协调多个代理,每个代理都有专门的技能或角色,我们可以创建更可靠、更自适应且能够协作解决实际问题的系统。
目前,MAF支持以下编排模式,和SK几乎一致:
今天,我们用MAF来实践一下顺序编排 和 移交编排 两个最常用的模式。
顺序编排的典型案例如下图所示:
移交编排的典型案例如下图所示:
准备工作
在今天的这个案例中,我们创建了一个.NET控制台应用程序,安装了以下NuGet包:
- Microsoft.Agents.AI.OpenAI
- Microsoft.Agents.AI.Workflows
- Microsoft.Extensions.AI.OpenAI
我们的配置文件中定义了LLM API的信息:- {
- "OpenAI": {
- "EndPoint": "https://api.siliconflow.cn",
- "ApiKey": "******************************",
- "ModelId": "Qwen/Qwen2.5-32B-Instruct"
- }
- }
复制代码 这里我们使用 SiliconCloud 提供的 Qwen2.5-32B-Instruct 模型,你可以通过这个URL注册账号:https://cloud.siliconflow.cn/i/DomqCefW 获取大量免费的Token来进行本次实验。
然后,我们将配置文件中的API信息读取出来:- var config = new ConfigurationBuilder()
- .AddJsonFile($"appsettings.json", optional: false, reloadOnChange: true)
- .Build();
- var openAIProvider = config.GetSection("OpenAI").Get<OpenAIProvider>();
复制代码 实现移交编排
首先,我们创建一个ChatClient供后续使用:- // Step1. Create one ChatClient
- var chatClient = new OpenAIClient(
- new ApiKeyCredential(openAIProvider.ApiKey),
- new OpenAIClientOptions { Endpoint = new Uri(openAIProvider.Endpoint) })
- .GetChatClient(openAIProvider.ModelId);
复制代码 然后,我们定义一个FunctionAgentFactory,封装我们需要编排的几个Agent:- public class FunctionAgentFactory
- {
- public static ChatClientAgent CreateTriageAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- "You determine which agent to use based on the user's homework question. ALWAYS handoff to another agent.",
- "triage_agent",
- "Routes messages to the appropriate specialist agent");
- }
- public static ChatClientAgent CreateHistoryTutorAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- "You provide assistance with historical queries. Explain important events and context clearly. Please only respond about history.",
- "history_tutor",
- "Specialist agent for historical questions");
- }
- public static ChatClientAgent CreateMathTutorAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- "You provide help with math problems. Explain your reasoning at each step and include examples. Please only respond about math.",
- "math_tutor",
- "Specialist agent for mathematical questions");
- }
- }
复制代码 基于AgentFactory依次创建编排Agent,历史Agent 和 数学Agent:- var triageAgent = FunctionAgentFactory.CreateTriageAgent(chatClient);
- var historyTutor = FunctionAgentFactory.CreateHistoryTutorAgent(chatClient);
- var mathTutor = FunctionAgentFactory.CreateMathTutorAgent(chatClient);
复制代码 基于上面的3个Agent可以快速创建一个移交编排Workflow:- var workflow = AgentWorkflowBuilder.CreateHandoffBuilderWith(triageAgent)
- .WithHandoffs(triageAgent, [mathTutor, historyTutor]) // Triage can route to either specialist
- .WithHandoffs([mathTutor, historyTutor], triageAgent) // Math or History tutor can return to triage
- .Build();
复制代码 怎么样,是不是很简单?
下面,我们进行一个多轮对话测试:- List<ChatMessage> messages = new();
- while (true)
- {
- Console.Write("User: ");
- string userInput = Console.ReadLine()!;
- messages.Add(new(ChatRole.User, userInput));
- // Execute workflow and process events
- var response = await workflow.AsAgent().RunAsync(messages);
- Console.WriteLine($"Agent: {response}\n");
- // Add new messages to conversation history
- messages.AddRange(response.Messages);
- }
复制代码 这里我们使用了Workflow as Agent的方式,即将工作流转换为一个独立的Agent来调用。
测试结果如下图所示:
可以看到,编排Agent根据用户的输入将请求转给了对应的数学话题Agent 和 历史话题Agent,进而使用户获得正确的回答。
实现顺序编排
首先,创建一个ChatClient,同上。
然后,封装一个AgentFactory来定义我们用到的3个Agent:
- Analyst 需求分析
- Writer 文案写手
- Editor 文案审稿人/编辑
- public class FunctionAgentFactory
- {
- public static ChatClientAgent CreateAnalystAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- """
- You are a marketing analyst. Given a product description, identify:
- - Key features
- - Target audience
- - Unique selling points
- """,
- "Analyst",
- "An agent that extracts key concepts from a product description.");
- }
- public static ChatClientAgent CreatWriterAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- """
- You are a marketing copywriter. Given a block of text describing features, audience, and USPs,
- compose a compelling marketing copy (like a newsletter section) that highlights these points.
- Output should be short (around 150 words), output just the copy as a single text block.
- """,
- "CopyWriter",
- "An agent that writes a marketing copy based on the extracted concepts.");
- }
- public static ChatClientAgent CreateEditorAgent(ChatClient client)
- {
- return client.CreateAIAgent(
- """
- You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone,
- give format and make it polished. Output the final improved copy as a single text block.
- """,
- "Editor",
- "An agent that formats and proofreads the marketing copy.");
- }
- }
复制代码 基于这几个Agent我们快速定一个顺序工作流:- var analyst = FunctionAgentFactory.CreateAnalystAgent(chatClient);
- var writer = FunctionAgentFactory.CreatWriterAgent(chatClient);
- var editor = FunctionAgentFactory.CreateEditorAgent(chatClient);
- // Workflow: Analyst -> Writer -> Editor
- var workflow = AgentWorkflowBuilder.BuildSequential(
- "content-team-workflow",
- [analyst, writer, editor]);
复制代码 快速运行,直接得到答案:- var userMessage = "Please help to introduce our new product: An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours.";
- Console.Write($"User: {userMessage}\n");
- // Execute the workflow via RunAsync
- var response = await workflow.AsAgent().RunAsync(userMessage);
- Console.WriteLine($"Agent: {response}");
复制代码 这里用户给的任务是:" lease help to introduce our new product: An eco-friendly stainless steel water bottle that keeps drinks cold for 24 hours."
假设客户公司有一个新产品:一个环保的不锈钢水瓶,可以让饮料保持24小时的低温,需要帮忙创作一个广告文案。
我们仍然将其转换为Agent来执行,获取结果下图所示:
可以看到,它将最后审核校对的最终版内容输出给了我们。借助事件机制监控
假设我们想要获取每个步骤对应Agent的输出内容,进而实现一定程度的监控。那么,同步执行就不合适了,这时我们就需要用到工作流的事件机制 结合 流式执行获取事件流 来实现。
MAF的工作流事件机制内置了一些系统事件,例如:
- AgentRunUpdatedEvent
- ExecutorCompletedEvent 执行器完成事件,例如Workflow中的某个Agent完成了任务,准备转到下个Agent执行。
- WorkflowOutputEvent 整个工作流已完成任务处理准备进行最终输出。
这里我们使用MAF中的StreamAsync 和 WatchStreamAsync,其主要流程如下图所示:
下面我们直接看代码:- await using (StreamingRun run = await InProcessExecution.StreamAsync(workflow, userMessage))
- {
- await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); // Enable event emitting
- Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
- Console.WriteLine("Process Tracking");
- Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
- Console.WriteLine();
- var result = new List<ChatMessage>();
- var stageOutput = new StringBuilder();
- int stepNumber = 1;
- await foreach (WorkflowEvent evt in run.WatchStreamAsync())
- {
- if (evt is AgentRunUpdateEvent updatedEvent)
- {
- stageOutput.Append($"{updatedEvent.Data} ");
- }
- else if (evt is ExecutorCompletedEvent completedEvent)
- {
- if (stageOutput.Length > 0)
- {
- Console.WriteLine($"Step {stepNumber}: {completedEvent.ExecutorId}");
- Console.WriteLine($"Output: {stageOutput.ToString()}\n");
- stepNumber++;
- stageOutput.Clear();
- }
- }
- else if (evt is WorkflowOutputEvent endEvent)
- {
- result = (List<ChatMessage>)endEvent.Data!;
- break;
- }
- }
- // Display final result and skip user message
- foreach (var message in result.Skip(1))
- Console.WriteLine($"Agent: {message.Text}");
- }
- Console.WriteLine("\nTask Finished!");
复制代码 最终,输出的结果如下所示:
Step1: Analyst
Step2 ~3: Writer & Editor
最终输出的内容:
由上图可以看出,如果我们需要对于Workflow进行进度监控,使用流式执行 并 监听内置事件 完全可以实现这个目的。
其他更多的内置工作流事件如下代码示例所示:
[code]// 执行工作流并监听所有事件Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");Console.WriteLine("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |