找回密码
 立即注册
首页 业界区 业界 【大数据 & AI】Flink Agents 源码解读 --- (6) --- Ac ...

【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask

仁夹篇 昨天 22:10
【大数据 & AI】Flink Agents 源码解读 --- (6) ---  ActionTask


目录

  • 【大数据 & AI】Flink Agents 源码解读 --- (6) ---  ActionTask

    • 0x00 概要
    • 0x01 基础知识

      • 1.1 相关组件
      • 1.2 ActionTask
      • 1.3 PythonActionTask

        • 1.3.1 定义
        • 1.3.2 PythonActionTask 与 Function 的关系
        • 1.3.3 与其他组件的关系
        • 1.3.4 调用流程

      • 1.4 PythonGeneratorActionTask
      • 1.5 JavaActionTask
      • 1.6 ActionTaskResult 结构

    • 0x02 ActionTask 切分机制

      • 2.1 切分方式
      • 2.2 实现细节
      • 2.3 关键点



0x00 概要

ActionTask 是 Action 执行的基本单位,代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下:
  1. Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime
复制代码
0x01 基础知识

ActionTask 是 Action 执行过程中的一个片段,用于支持复杂的执行逻辑(如异步处理),对应关系如下:

  • 一个 Action 对应一个函数
在 AgentPlan 中,每个 Action 包含一个执行函数 (exec),通常是 PythonFunction 或 JavaFunction,例如在 tool_call_action.py 中:
  1. TOOL_CALL_ACTION = Action (
  2.     name="tool_call_action",
  3.     exec=PythonFunction.from_callable (process_tool_request), // 一个函数
  4.     listen_event_types=[...]
  5. )
复制代码

  • 一个 Action 可能产生多个 ActionTask

    • ActionTask 是 Action 的执行时表示,可以看作是 Action 的 “执行片段”
    • 一个 Action 可能在执行过程中被拆分为多个 ActionTask,特别是在处理异步操作时

1.1 相关组件

ActionTask 概念的相关组件如下
组件核心功能JavaActionTask执行 Java 函数PythonActionTask执行 Python 函数,支持异步 / 生成器模式,桥接 Java 与 Python 生态LocalRunnerContext本地执行上下文,模拟 Flink 分布式状态,管理事件队列、key 隔离状态、资源访问ActionTaskResult动作执行结果载体,包含是否完成、输出事件、下一个待执行任务(若有)PythonGeneratorActionTask处理 Python 生成器的异步任务,持续执行直到完成所有异步操作Tool 相关机制支持装饰器(@tool)、add_resource 等方式注册工具,通过 TOOL_CALL_ACTION 触发执行在系统中的架构如下
1.png

1.2 ActionTask

我们接下来看看 ActionTask 的具体实现。
ActionTask 是基类。
  1. /**
  2. * This class represents a task related to the execution of an action in {@link
  3. * ActionExecutionOperator}.
  4. *
  5. * <p>An action is split into multiple code blocks, and each code block is represented by an {@code
  6. * ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result
  7. * {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next
  8. * {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing
  9. * it.
  10. */
  11. public abstract class ActionTask {
  12.     protected final Object key;
  13.     protected final Event event;
  14.     protected final Action action;
  15.     /**
  16.      * Since RunnerContextImpl contains references to the Operator and state, it should not be
  17.      * serialized and included in the state with ActionTask. Instead, we should check if a valid
  18.      * RunnerContext exists before each ActionTask invocation and create a new one if necessary.
  19.      */
  20.     protected transient RunnerContextImpl runnerContext;
  21.     public ActionTask(Object key, Event event, Action action) {
  22.         this.key = key;
  23.         this.event = event;
  24.         this.action = action;
  25.     }
  26.     public RunnerContextImpl getRunnerContext() {
  27.         return runnerContext;
  28.     }
  29.     public void setRunnerContext(RunnerContextImpl runnerContext) {
  30.         this.runnerContext = runnerContext;
  31.     }
  32.     public Object getKey() {
  33.         return key;
  34.     }
  35.     /** Invokes the action task. */
  36.     public abstract ActionTaskResult invoke() throws Exception;
  37.     public class ActionTaskResult {
  38.         private final boolean finished;
  39.         private final List<Event> outputEvents;
  40.         private final Optional generatedActionTaskOpt;
  41.         public ActionTaskResult(
  42.                 boolean finished,
  43.                 List<Event> outputEvents,
  44.                 @Nullable ActionTask generatedActionTask) {
  45.             this.finished = finished;
  46.             this.outputEvents = outputEvents;
  47.             this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTask);
  48.         }
  49.         public boolean isFinished() {
  50.             return finished;
  51.         }
  52.         public List<Event> getOutputEvents() {
  53.             return outputEvents;
  54.         }
  55.         public Optional getGeneratedActionTask() {
  56.             return generatedActionTaskOpt;
  57.         }
  58.     }
  59. }
复制代码
1.3 PythonActionTask

PythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括:

  • 执行 Python 函数:调用 Python 函数来处理事件
  • 处理异步操作:支持 Python 中的异步操作,通过生成器机制实现
  • 桥接 Java 和 Python:作为 Java 端和 Python 端之间的桥梁,协调两者间的交互
1.3.1 定义

PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。它提供了以下附加价值:

  • 复杂逻辑:PythonActionTask 不仅仅是执行函数,还负责处理复杂的交互逻辑
  • 执行环境管理:为函数提供合适的执行上下文
  • 异步支持:通过生成器机制支持长时间运行的操作
  • 事件处理:管理和传递执行过程中产生的事件
  • 状态维护:在整个执行过程中维护必要的状态信息
PythonActionTask 在系统架构中的位置和交互关系如下:
2.png

代码如下:
  1. public class PythonActionTask extends ActionTask {
  2.     public ActionTaskResult invoke() throws Exception {
  3.         PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();
  4.         // 这里执行实际的 Python 函数
  5.         String pythonGeneratorRef =
  6.             pythonActionExecutor.executePythonFunction(
  7.                 (PythonFunction) action.getExec(), // <-- 这就是对应的函数
  8.                 (PythonEvent) event,
  9.                 runnerContext);
  10.         // 处理异步情况
  11.         if (pythonGeneratorRef != null) {
  12.             // 如果函数返回了生成器,则创建新的任务继续执行
  13.             ActionTask tempGeneratedActionTask =
  14.                 new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
  15.             tempGeneratedActionTask.setRunnerContext(runnerContext);
  16.         if (pythonGeneratorRef != null) {
  17.             // 如果函数返回了生成器,则创建新的任务继续执行
  18.             ActionTask tempGeneratedActionTask =
  19.                 new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
  20.             tempGeneratedActionTask.setRunnerContext(runnerContext);
  21.             return tempGeneratedActionTask.invoke();
  22.         }
  23.         // 否则表示函数已执行完毕
  24.         return new ActionTaskResult(
  25.             true,
  26.             runnerContext.drainEvents(event.getSourceTimestamp()),
  27.             null);            
复制代码
这种设计允许将带有异步操作的复杂动作分解为可管理的单元,同时保持执行语义和状态一致性。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册