找回密码
 立即注册
首页 业界区 业界 【大数据 & AI】Flink Agents 源码解读 --- (5) --- Ac ...

【大数据 & AI】Flink Agents 源码解读 --- (5) --- ActionExecutionOperator

寇油 昨天 22:45
【大数据 & AI】Flink Agents 源码解读 --- (5) ---  ActionExecutionOperator


目录

  • 【大数据 & AI】Flink Agents 源码解读 --- (5) ---  ActionExecutionOperator

    • 0x00 摘要
    • 0x01 基础知识

      • 1.1 总体架构
      • 1.2 定义
      • 1.3 关键设计
      • 1.4 流程
      • 1.5 组件 & 关系
      • 1.6 核心依赖组件
      • 1.7 主要功能
      • 1.8 运行ActionTask流程

        • 创建与初始化
        • ActionTask 的创建与调度
        • ActionTask 的执行
        • 状态管理与容错

      • 1.8 Flink MailboxExecutor

        • 为什么需要 MailboxExecutor
        • 使用场景速览
        • 工作模型(用邮筒来隐喻)


    • 0x02 ActionExecutionOperator 生成机制和策略

      • 2.1 生成位置
      • 2.2 生成机制

        • 创建流程
        • 工厂模式实现

      • 2.3 关键生成策略

        • 策略 1:基于计划的生成
        • 策略 2:类型特定处理
        • 策略 3:资源管理

      • 2.4 操作符初始化过程
      • 2.5 与执行环境的集成
      • 2.6 总结

    • 0x03 任务拆分机制详解

      • 3.1 任务拆分的基本原理

        • 为什么需要拆分任务
        • 拆分粒度

      • 3.2 任务创建过程

        • 不同类型 ActionTask 的创建
        • 动态创建 ActionTask


    • 0x04 任务队列管理

      • 4.1 任务状态存储
      • 4.2 任务调度机制
      • 4.3 任务处理循环

    • 0x05 实际应用示例

      • 5.1 异步 HTTP 请求处理
      • 5.2 复杂业务流程处理

    • 0xFF 参考


0x00 摘要

ActionExecutionOperator 是整个Flink Agent 系统的执行引擎,它连接了 Flink 流处理框架和 Agent 逻辑,协调各种组件完成了 Agent 定义的动作执行。ActionExecutionOperator 主要职责:

  • 事件处理:接收来自上游的数据,包装成InputEvent
  • 动作执行:根据Agent定义的动作规则,触发相应的处理逻辑
  • 状态管理:维护短期记忆,检查点状态等
  • 异步支持:处理需要异步执行的任务
  • Python/Java交互操作:协调组件间的交互
  • 输出产生:将最终结果作为OutputEvent发送到下游
这样,整个Agent的逻辑就被集成到了标准的Flink流处理管道中,能够利用Flink的分布式计算能力和容错机制。
本篇主要是为了学习 ActionExecutionOperator 的架构,生成机制和策略,也会学习 ActionExecutionOperator 如何使用 AgentPlan,如何生成 ActionTask。
0x01 基础知识

ActionExecutionOperator 算子是整个 Flink Agent 框架运行时的核心组件,负责将用户定义的代理逻辑转换为实际的流处理操作。
1.1 总体架构

可以把 Flink Agents 的整个执行流程比作 “做一道菜”,我们借此进行分析。
Flink Agents 是基于原生 Flink 分布式流处理能力封装的上层框架。其中四个主要组件代表了 Flink Agents 框架中的四个层次:

  • Agent(顶层设计,定义了“做什么”):用户定义的智能实体,类似 “餐厅菜单 + 规则手册”,包含业务逻辑、动作(Action)和资源(工具、模型等)定义,明确 “做什么”。
  • AgentPlan(中间编译层,确定了“怎么做”):将 Agent 编译后的可执行计划,类似 “详细操作流程图”,明确动作触发规则、资源映射关系,确定 “怎么做”。
  • ActionExecutionOperator(运行时执行层,是执行环境,负责“协调调度”):Flink 集群中的执行核心,在 Flink 流处理环境中实际执行操作,类似 “餐厅首席大厨”,负责接收数据、调度任务、管理状态,协调整体执行流程。
  • ActionTask(最小执行单元,负责“具体实施”):具体的执行任务,类似 “员工的单个服务步骤”,分为 JavaActionTask 和 PythonActionTask,处理单个事件并返回结果。
具体可以参见下图。
  1. [Agent] 菜单手册
  2.     ↓(编译)
  3. [AgentPlan] 详细流程图
  4.     ↓(运行时实例化)
  5. [ActionExecutionOperator] 餐厅首席大厨
  6.     ↓(分配任务)
  7. [ActionTask] 员工具体任务
复制代码
这样的设计使得系统既灵活又高效,能够处理复杂的AI代理任务,同时保证了良好的扩展性和维护性。
1.2 定义

ActionExecutionOperator 的定义如下。
  1. /**
  2. * An operator that executes the actions defined in the agent. Upon receiving data from the
  3. * upstream, it first wraps the data into an {@link InputEvent}. It then invokes the corresponding
  4. * action that is interested in the {@link InputEvent}, and collects the output event produced by
  5. * the action.
  6. *
  7. * <p>For events of type {@link OutputEvent}, the data contained in the event is sent downstream.
  8. * For all other event types, the process is repeated: the event triggers the corresponding action,
  9. * and the resulting output event is collected for further processing.
  10. */
  11. public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT>
  12.         implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
  13.     private static final long serialVersionUID = 1L;
  14.     private static final Logger LOG = LoggerFactory.getLogger(ActionExecutionOperator.class);
  15.     private static final String RECOVERY_MARKER_STATE_NAME = "recoveryMarker";
  16.     private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME = "messageSequenceNumber";
  17.     private static final String PENDING_INPUT_EVENT_STATE_NAME = "pendingInputEvents";
  18.     private final AgentPlan agentPlan;
  19.     private final Boolean inputIsJava;
  20.     private transient StreamRecord<OUT> reusedStreamRecord;
  21.     private transient MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState;
  22.     // PythonActionExecutor for Python actions
  23.     private transient PythonActionExecutor pythonActionExecutor;
  24.     private transient FlinkAgentsMetricGroupImpl metricGroup;
  25.     private transient BuiltInMetrics builtInMetrics;
  26.     private transient SegmentedQueue keySegmentQueue;
  27.     private final transient MailboxExecutor mailboxExecutor;
  28.     // We need to check whether the current thread is the mailbox thread using the mailbox
  29.     // processor.
  30.     // TODO: This is a temporary workaround. In the future, we should add an interface in
  31.     // MailboxExecutor to check whether a thread is a mailbox thread, rather than using reflection
  32.     // to obtain the MailboxProcessor instance and make the determination.
  33.     private transient MailboxProcessor mailboxProcessor;
  34.     // An action will be split into one or more ActionTask objects. We use a state to store the
  35.     // pending ActionTasks that are waiting to be executed.
  36.     private transient ListState actionTasksKState;
  37.     // To avoid processing different InputEvents with the same key, we use a state to store pending
  38.     // InputEvents that are waiting to be processed.
  39.     private transient ListState<Event> pendingInputEventsKState;
  40.     // An operator state is used to track the currently processing keys. This is useful when
  41.     // receiving an EndOfInput signal, as we need to wait until all related events are fully
  42.     // processed.
  43.     private transient ListState<Object> currentProcessingKeysOpState;
  44.     private final transient EventLogger eventLogger;
  45.     private final transient List<EventListener> eventListeners;
  46.     private transient ActionStateStore actionStateStore;
  47.     private transient ValueState<Long> sequenceNumberKState;
  48.     private transient ListState<Object> recoveryMarkerOpState;
  49.     private transient Map<Long, Map<Object, Long>> checkpointIdToSeqNums;
  50.     // This in memory map keep track of the runner context for the async action task that having
  51.     // been finished
  52.     private final transient Map actionTaskRunnerContexts;
复制代码
1.3 关键设计

ActionExecutionOperator 关键设计要点如下:

  • 状态管理:使用Flink的状态后端管理短期记忆和处理状态
  • 异步支持:通过邮箱线程机制支持异步操作执行
  • 容错恢复:通过checkpoint机制实现状态持久化和恢复
  • 资源隔离:每个key拥有独立的处理上下文和状态
  • 多语言支持:同时支持Java和Python实现的动作
  • 这种设计使得复杂的Agent逻辑能够在Flink的分布式流处理环境中高效、可靠地执行
1.4 流程

ActionExecutionOperator 的工作流程:

  • 接收数据:从上游接收数据并包装成 InputEvent
  • 动作触发:依据事件类型查找并触发相应的动作
  • 动作执行:执行动作逻辑,可能产生新的事件
  • 事件处理:处理动作产生的事件,如果是输入事件则发送给下游
  • 状态更新:更新内存状态和执行状态
  • 循环处理:继续处理新产生的事件,直到没有待处理的事件。
关键特性:

  • 事件驱动:基于事件的处理模型,事件触发动作执行
  • 状态一致性:通过Flink的状态管理来保证处理一致性
  • 容错恢复:支持从检查点恢复执行状态
  • 混合执行:同时支持 java 和 python 动作的执行
  • 内存管理:管理Agent的短期内存状态
使用流程图如下
  1. graph TD
  2.         A[AgentsExecutionEnvironment] --> B[RemoteExecutionEnvironment]
  3.         B --> C[ActionExecutionOperator]
  4.         C --> D[AgentPlan]
  5.         C --> D[Flink State Backend]
  6.         C --> D[Python Components]
  7.         C --> D[Event Processing Components]
复制代码
1.png

1.5 组件 & 关系

ActionExecutionOperator 是 Flink Agent 系统中的核心执行组件,位于 Flink 流处理管道中,负责执行 Agent 定义的各种动作。其主要交互组件如下:
上游组件

  • DataStream API:作为 Flink 流处理管道中的操作符,接收来自上游 DataStream 的数据
  • Input 数据源:接收各种类型的输入数据,将其包装为 InputEvent
  • KeyedStream:处理按键分组的数据流
下游组件

  • Output 数据接收器:将OutputEvent 中的数据发送到下游操作符
  • DataStream API:输出处理结果到下游 DataStream 操作符
核心依赖组件
工作流程中的交互
  1. graph TD
  2.         A[DataStream Source] --> B[ActionExecutionOperator]
  3.         B --> C[DataStream Sink]
  4.         B --> D[AgentPlan]
  5.         B --> E[ActionTask]
  6.         B --> F[RunnerContext]
  7.         B --> G[Flink State Backend]
  8.         B --> H[PythonActionExecutor]
  9.         B --> I[ActionStateStore]
复制代码
2.png

1.6 核心依赖组件


  • AgentPlan:AgentPlan会被 ActionExecutionOperator  使用,因为操作符需要访问AgentPlan中的动作定义和资源配置来执行相应逻辑

    • 包含Agent的所有动作定义和资源配置
    • 提供动作触发逻辑和资源配置信息

  • ActionTask 及其子类:

    • JavaActionTask :执行 Java 动作
    • PythonActionTask :执行Python动作

  • Environment

    • Flink  Streaming Runtime Environment:ActionExecutionOperator 是一个Flink 流处理操作符,会被 Flink 流处理运行时环境使用,作为流处理作业 DAG 图中的一个节点。
    • RemoteExecutionEnvironment:在远程执行环境中,RemoteExecutionEnvironment 会创建和使用 ActionExecutionOperator 来执行Agent 逻辑。

  • RunnerContext 及其实现:

    • RunnerContextImpl:Java 动作执行上下文
    • PythonRunnerContextImpl:Python 动作执行上下文

  • 状态管理组件。以下状态存储组件会被 ActionExecutionOperator  使用。

    • Flink 状态后端,用于短期内存、待处理任务等。
    • ActionStateStore:KafkaActionStateStore:可选的外部状态存储,用于动作状态持久化和恢复。

  • Python相关组件。当代理包含Python动作时,以下组件会与 ActionExecutionOperator  协同工作。

    • PythonActionExecutor:管理Python环境和执行Python动作
    • PythonActionTask:表示Python动作任务
    • PythonRunnerContextImpl :Python动作执行上下文。

  • 辅助组件

    • EventLogger:事件日志记录器
    • EventListner:事件监听器
    • BultInMetrics:内置指标收集
    • FlinkAgentsMetricGroupImpl:指标组实现

  • 事件处理相关组件

    • 各种Event类型(InputEvent、OutputEvent、PythonEvent等)
    • SegmentedQueue:管理按键分段的事件队列和水印处理

1.7 主要功能

ActionExecutionOperator 主要功能如下:

  • 动作执行核心

    • 执行Agent中定义的所有动作(actions)
    • 处理事件驱动的执行流程,根据事件类型触发的相应的动作

  • 事件处理流程

    • 接收上游数据并将其包装成 InputEvent
    • 根据事件类型触发对应的 action 执行
    • 对应 OutputEvent 类型的事件,将数据发送到下游
    • 对于其他类型的事件,继续触发相应的动作来执行

  • 状态管理

    • 管理短期内存状态(short-term memory)
    • 维护动作执行状态和序列号
    • 支持检查点和故障恢复
    • 使用 Flink 状态后端持久化关键状态信息

  • 混合语言支持

    • 支持 Java 和 Python 两种语言编写的动作
    • 通过 PythonActionExecutor 处理 Python 动作的执行
    • 在同一个算子中协调 Java 和 Python 动作的执行

  • 异步处理支持

    • 支持异步动作执行
    • 通过 MailboxExector 管理异步任务
    • 处理长时间运行的动作任务

  • 监控和日志

    • 集成指标收集和监控
    • 支持事件日志记录
    • 提供内置指标跟踪执行情况

1.8 运行ActionTask流程

ActionTask 在 Flink 中的运行机制分三步完成:创建、调度、执行,全程由 ActionExecutionOperator 托管,并利用 Flink 的邮箱线程保证并发安全,具体如下。
创建与初始化


  • 作业启动时,ActionExecutionOperator 完成实例化,内部持有 AgentPlan(含全部 Action 定义)。
  • 上游数据到达 processElement() 后,先被封装成 InputEvent;随后调用 processEvent() 开始处理。
ActionTask 的创建与调度


  • processEvent() 通过 getActionsTriggeredBy() 找出响应该事件的所有 Action。
  • 对每个 Action,调用 createActionTask() 生成对应的 ActionTask 实例,并存入 actionTasksState 状态。
  • 使用 Flink MailboxExecutor 提交异步任务:
    1. mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
    复制代码
    保证后续逻辑在 mailbox 线程中顺序执行,避免并发冲突。这是Agent与Flink进行结合的关键之一,是Agent代码运行的关键
ActionTask 的执行


  • tryProcessActionTaskForKey() → processActionTaskForKey() 从 actionTasksState 取出待处理任务。
  • createAndSetRunnerContext() 为任务绑定运行时上下文(内存、指标、邮箱检查等)。
  • 调用 ActionTask.invoke() 执行具体动作逻辑;若任务返回新生成器(异步场景),则循环创建后续 ActionTask 并再次提交邮箱,实现“拆分-继续”流程。

    • ActionTask.invoke() 返回 ActionTaskResult;若任务已完成(isFinished() 为 true),则通过 processEvent() 向下游发送输出事件或触发新的 ActionTask,并持久化内存状态。
    • 若任务未完成(异步操作),则保存 RunnerContext 供后续使用,并将新生成的 ActionTask 加入 actionTasksState,再次提交到 MailboxExecutor,形成循环直到所有任务完成。

状态管理与容错


  • 使用 Flink 状态后端持久化 actionTasksState、pendingInputEventsState 等;在 checkpoint 时保存快照,故障恢复后可继续执行。
  • 实现 snapshotState() 和 initializeState() 方法,完成状态的快照与恢复。
整个流程依托 Flink 核心特性:

  • 事件驱动:基于事件触发动作执行;
  • 状态管理:使用 Flink 状态后端管理中间状态;
  • 容错机制:通过 checkpoint 和状态恢复保证 exactly-once 语义;
  • 异步处理:通过 MailboxExecutor 实现高效的异步任务调度;
  • 键控处理:支持按键分区处理,保证同一键的数据顺序处理。
通过“状态存储 + 邮箱调度”双机制,ActionTask 既能顺序处理单键事件,又能优雅拆分异步步骤,全程与 Flink 的并发模型无缝衔接。
1.8 Flink MailboxExecutor

MailboxExecutor 是 Flink 内部的一个“单线程邮筒”调度器,作用可以一句话概括:“外部线程只管投信,内部让同一个算子/同一个键的所有任务按顺序一条一条执行,避免并发竞争,同时不阻塞整个 TaskManager”
为什么需要 MailboxExecutor


  • Flink 算子可能被多线程同时调用(网络线程、异步 I/O、用户回调)。
  • 很多算子状态不是线程安全的(例如 keyed state、Python 解释器)。
  • 不能把整个算子锁起来,否则背压会拖垮整个 TaskManager。
→ 解决方案:把“真正干活”的代码投递到同一个“邮箱”里,由一条专用线程按顺序取出执行;其他线程只负责“投信”,瞬间返回,不会阻塞。
使用场景速览

场景投递到邮箱的任务好处ActionTask 拆分tryProcessActionTaskForKey(key)同一键的任务顺序执行,状态无锁Python UDF 异步PythonGeneratorActionTask解释器单线程,避免 GIL 竞争网络线程回调onNext(record)网络线程立即返回,不阻塞 TCP 接收工作模型(用邮筒来隐喻)
  1. 任意线程(网络、异步、用户)  
  2.    ├─→ mailbox.put(mail)   // 非阻塞,瞬间返回  
  3.    └─→ 立即继续干别的  
  4. 专用单线程(Mailbox Thread)  
  5.    ├─→ while(true) mail = mailbox.take()  
  6.    ├─→ 顺序执行 mail.run()  
  7.    └─→ 更新状态、发下游、写 checkpoint
复制代码

  • 邮箱使用 无锁队列(Disruptor),put/take 都是 O(1) 且线程安全。
  • 单线程内部仍可异步(例如生成器返回新的 ActionTask),但状态读写无并发
对应示例如下:
  1. // 投递任务
  2. mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
  3. // 邮箱线程循环(简化)
  4. while (isRunning) {
  5.     Runnable task = mailbox.take();   // 阻塞直到有信
  6.     task.run();                       // 顺序执行
  7. }
复制代码
0x02 ActionExecutionOperator 生成机制和策略

2.1 生成位置

ActionExecutionOperator 在 ActionExecutionOperatorFactory 类中创建。从代码结构可以看出,这个工厂类被用在 CompileUtils.connectToAgent() 方法中。
2.2 生成机制

创建流程
  1. // 在 CompileUtils.java 中
  2. private static <K, IN, OUT> DataStream<OUT> connectToAgent(
  3.     KeyedStream<IN, K> keyedInputStream,
  4.     AgentPlan agentPlan,
  5.     TypeInformation<OUT> outTypeInformation,
  6.     boolean inputIsJava) {
  7. return (DataStream<OUT>)
  8.     keyedInputStream
  9.         .transform(
  10.             "action-execute-operator",
  11.             outTypeInformation,
  12.             new ActionExecutionOperatorFactory(agentPlan, inputIsJava)
  13.         )
  14.         .setParallelism(keyedInputStream.getParallelism());
  15. }
复制代码
操作符通过 Flink 的 transform() 方法创建,该方法接收一个 ActionExecutionOperatorFactory。
工厂模式实现

ActionExecutionOperatorFactory 实现了 Flink 的 StreamOperatorFactory 接口:
  1. public class ActionExecutionOperatorFactory implements StreamOperatorFactory<Object> {
  2.     private final AgentPlan agentPlan;
  3.     private final Boolean inputIsJava;
  4. @Override
  5. public <T extends StreamOperator<Object>> T createStreamOperator(
  6.         StreamOperatorParameters<Object> parameters) {
  7.     ProcessingTimeService processingTimeService = parameters.getProcessingTimeService();
  8.     MailboxExecutor mailboxExecutor = parameters.getMailboxExecutor();
  9.     return (T) new ActionExecutionOperator<>(
  10.             agentPlan,
  11.             inputIsJava,
  12.             processingTimeService,
  13.             mailboxExecutor,
  14.             null); // actionStateStore
  15. }
复制代码
2.3 关键生成策略

策略 1:基于计划的生成


  • AgentPlan 包含关于动作、资源和配置的所有必要信息
  • 这个计划是从用户定义的 Python 或 Java Agent 生成的
  • 操作符的所有行为都在创建时由这个计划决定
策略 2:类型特定处理

inputIsJava 参数决定了如何处理输入数据:

  • 如果为 true:输入和输出是 Java 对象
  • 如果为 false:输入和输出是字节数组(用于 Python 互操作)
策略 3:资源管理

资源在初始化期间注入到操作符中:
  1. // 在 ActionExecutionOperator 构造函数中
  2. public ActionExecutionOperator(
  3.         AgentPlan agentPlan,
  4.         Boolean inputIsJava,
  5.         ProcessingTimeService processingTimeService,
  6.         MailboxExecutor mailboxExecutor,
  7.         ActionStateStore actionStateStore) {
  8.     ...
  9. }
  10. @Override
  11. public <T extends StreamOperator<Object>> T createStreamOperator(
  12.         StreamOperatorParameters<Object> parameters) {
  13.     ProcessingTimeService processingTimeService = parameters.getProcessingTimeService();
  14.     MailboxExecutor mailboxExecutor = parameters.getMailboxExecutor();
  15.     return (T) new ActionExecutionOperator<>(
  16.             agentPlan,
  17.             inputIsJava,
  18.             processingTimeService,
  19.             mailboxExecutor,
  20.             null); // actionStateStore
  21. }
复制代码
2.4 操作符初始化过程

当操作符打开时。
状态初始化:

  • 短期记忆状态(shortTermMemState)
  • 动作任务状态(actionTasksState)
  • 待处理事件状态(pendingInputEventsState)
  • 处理中的键跟踪状态(currentProcessingKeysOpState)
组件设置:

  • Python 执行器(如果需要)
  • 指标收集
  • 事件日志记录
  • 动作状态存储(可选)
恢复逻辑:
  1. // 故障/重启后恢复处理
  2. private void tryResumeProcessActionTasks() throws Exception {
  3.     Iterable<Object> keys = currentProcessingKeysOpState.get();
  4.     if (keys != null) {
  5.         for (Object key : keys) {
  6.             keySegmentQueue.addKeyToLastSegment(key);
  7.             mailboxExecutor.submit(
  8.                 () -> tryProcessActionTaskForKey(key), "process action task");
  9.         }
  10.     }
  11. }
复制代码
2.5 与执行环境的集成

本地环境和远程环境的生成过程不同:
远程环境(Flink 集群)
  1. // 在 RemoteExecutionEnvironment.java 中
  2. @Override
  3. public DataStream<Object> toDataStream() {
  4.     if (agentPlan == null) {
  5.         throw new IllegalStateException("Must apply agent before calling toDataStream");
  6.     }
  7.     if (outputDataStream == null) {
  8.         if (keySelector != null) {
  9.             outputDataStream =
  10.                 CompileUtils.connectToAgent(inputDataStream, keySelector, agentPlan);
  11.         } else {
  12.             // 如果没有提供键选择器,则使用简单的直键选择器
  13.             outputDataStream =
  14.                 CompileUtils.connectToAgent(inputDataStream, x -> x, agentPlan);
  15.         }
  16.     }
  17.     return outputDataStream;
  18. }
复制代码
本地环境
对于本地执行,使用完全不同的 LocalRunner 而不是基于操作符的方法。
2.6 总结

ActionExecutionOperator 通过 Flink 的转换 API 使用工厂模式生成。生成策略重点关注:

  • 声明式方法:使用 AgentPlan 定义行为
  • 类型灵活性:通过序列化支持 Java 和 Python
  • 状态管理:正确管理 Flink 状态以实现容错
  • 资源注入:提供对已配置资源的访问
  • 可恢复处理:通过状态快照支持故障恢复
这种设计允许操作符基于代理定义动态创建,同时保持 Flink 流处理的保证。
0x03 任务拆分机制详解

3.1 任务拆分的基本原理

为什么需要拆分任务

ActionExecutionOperator 将复杂的 Action 拆分成多个 ActionTask 的主要原因是为了支持异步操作和状态管理。
一个完整的 Action 可能包含以下复杂场景:

  • 需要等待外部服务响应的异步调用
  • 需要分步骤执行的长时间运行任务
  • 需要在不同时间点产生多个输出事件的任务
拆分粒度

每个 ActionTask 代表 Action 中的一个执行单元,通常对应以下情况之一:

  • Action 的初始执行步骤
  • 异步操作的触发和回调处理
  • 长时间运行任务的阶段性执行
3.2 任务创建过程

不同类型 ActionTask 的创建

根据 Action 的执行类型(Java 或 Python),创建相应的 ActionTask 实现:

  • JavaActionTask:处理 Java 实现的 Action
  • PythonActionTask:处理 Python 实例的 Action
动态创建 ActionTask

在某些情况下,ActionTask.invoke() 可能会产生新的 ActionTask 来继续处理:

  • 异步操作完成后需要进一步处理结果
  • 长时间运行的任务需要分阶段执行
  • 需要等待某些条件满足后再继续执行
具体对应代码中,就是当 ActionTask 被执行时,其 invoke() 方法可能会返回一个新的 ActionTask 来继续执行剩余的工作:
  1. // PythonActionTask.invoke() 示例
  2. public ActionTaskResult invoke() throws Exception {
  3.     // 执行 Python 函数
  4.     String pythonGeneratorRef = pythonActionExecutor.executePythonFunction(...);
  5.     if (pythonGeneratorRef != null) {
  6.         // 如果返回了生成器引用,创建新的 ActionTask 来处理后续步骤
  7.         ActionTask tempGeneratedActionTask =
  8.             new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
  9.         tempGeneratedActionTask.setRunnerContext(runnerContext);
  10.         return tempGeneratedActionTask.invoke();
  11.     }
  12.     // 如果没有更多步骤,标记任务完成
  13.     return new ActionTaskResult(
  14.         true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
  15. }
复制代码
也会在 processEvent 中启动新的task。
  1.     private void processEvent(Object key, Event event) throws Exception {
  2.         notifyEventProcessed(event);
  3.         boolean isInputEvent = EventUtil.isInputEvent(event);
  4.         if (EventUtil.isOutputEvent(event)) {
  5.             // If the event is an OutputEvent, we send it downstream.
  6.             OUT outputData = getOutputFromOutputEvent(event);
  7.             if (event.hasSourceTimestamp()) {
  8.                 output.collect(reusedStreamRecord.replace(outputData, event.getSourceTimestamp()));
  9.             } else {
  10.                 reusedStreamRecord.eraseTimestamp();
  11.                 output.collect(reusedStreamRecord.replace(outputData));
  12.             }
  13.         } else {
  14.             if (isInputEvent) {
  15.                 // If the event is an InputEvent, we mark that the key is currently being processed.
  16.                 currentProcessingKeysOpState.add(key);
  17.                 initOrIncSequenceNumber();
  18.             }
  19.             // We then obtain the triggered action and add ActionTasks to the waiting processing
  20.             // queue.
  21.             List triggerActions = getActionsTriggeredBy(event);
  22.             if (triggerActions != null && !triggerActions.isEmpty()) {
  23.                 for (Action triggerAction : triggerActions) {
  24.                     actionTasksKState.add(createActionTask(key, triggerAction, event));
  25.                 }
  26.             }
  27.         }
  28.         if (isInputEvent) {
  29.             // If the event is an InputEvent, we submit a new mail to try processing the actions.
  30.             mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
  31.         }
  32.     }
复制代码
createActionTask 的代码如下。
  1. private ActionTask createActionTask(Object key, Action action, Event event) {
  2.     if (action.getExec() instanceof JavaFunction) {
  3.         return new JavaActionTask(
  4.             key, event, action, getRuntimeContext().getUserCodeClassLoader());
  5.     } else if (action.getExec() instanceof PythonFunction) {
  6.         return new PythonActionTask(key, event, action);
  7.     } else {
  8.         throw new IllegalStateException(
  9.             "Unsupported action type: " + action.getExec().getClass());
  10.     }
  11. }
复制代码
0x04 任务队列管理

4.1 任务状态存储

ActionExecutionOperator 使用 Flink 的状态管理来存储待处理的 ActionTask:
  1. // 在 operator 中定义的状态
  2. private transient ListState actionTasksKState;
  3. // 初始化状态
  4. actionTasksKState = getRuntimeContext().getListState(
  5.     new ListStateDescriptor<>("actionTasks", TypeInformation.of(ActionTask.class)));
复制代码
4.2 任务调度机制

ActionExecutionOperator 使用 Flink 的 Mailbox 机制来调度 ActionTask 的执行:
  1. private void processEvent(Object key, Event event) throws Exception {
  2.     // ... 处理事件逻辑 ...
  3.     if (isInputEvent) {
  4.         // 提交邮件任务来处理 ActionTask
  5.         mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
  6.     }
  7. }
复制代码
4.3 任务处理循环

tryProcessActionTaskForKey 方法实现了持续处理同一个 key 下的多个 ActionTask 的机制:
  1. private void tryProcessActionTaskForKey(Object key) {
  2.     try {
  3.         processActionTaskForKey(key);
  4.     } catch (Exception e) {
  5.         // 错误处理
  6.     }
  7. }
  8. private void processActionTaskForKey(Object key) throws Exception {
  9.     // 1. 获取待处理的 ActionTask
  10.     ActionTask actionTask = pollFromListState(actionTasksKState);
  11.     if (actionTask == null) {
  12.         // 没有更多任务,清状态
  13.         return;
  14.     }
  15.     // 2. 执行 ActionTask
  16.     ActionTaskResult result = actionTask.invoke();
  17.     // 3. 处理结果
  18.     if (!result.isFinished()) {
  19.         // 如果未完成,将生成的新任务加入队列
  20.         actionTasksKState.add(result.getGeneratedActionTask().get());
  21.     }
  22.     // 4. 处理输出事件
  23.     for (Event outputEvent : result.getOutputEvents()) {
  24.         processEvent(key, outputEvent);
  25.     }
  26.     // 5. 如果还有任务,继续调度
  27.     if (currentKeyHasMoreActionTask()) {
  28.         mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
  29.     }
  30. }        
复制代码
0x05 实际应用示例

5.1 异步 HTTP 请求处理

考虑一个需要调用外部 API 的 Action:
  1. @action(SomeEvent)
  2. def fetch_external_data(event, ctx):
  3.     # 步骤1: 发起异步HTTP请求
  4.     future = asyncio.create_task(http_client.get(f"https://api.example.com/data/{event.id}"))
  5.     # 第一次执行到这里会暂停,返回一个生成器
  6.     response = await future
  7.     # 步骤2: 处理响应数据
  8.     processed_data = process_response(response)
  9.     # 步骤3: 发送结果事件
  10.     ctx.send_event(ResultEvent(processed_data))
复制代码
这个 Action 会被拆分为多个 ActionTask:

  • 初始任务:发起 HTTP 请求并等待
  • 后续任务:处理响应并发送结果
5.2 复杂业务流程处理

一个多步骤的业务流程也可能被拆分:
  1. @action(OrderEvent)
  2. def process_order(event, ctx):
  3.     # 步骤1:验证订单
  4.     validate_order(event.order)
  5.     # 步骤2:检查库存(可能需要异步调用)
  6.     inventory_status = check_inventory(event.order.items)
  7.     # 步骤3:处理支付(可能需要异步调用)
  8.     payment_result = process_payment(event.order)
  9.     # 步骤4:更新订单状态
  10.     update_order_status(event.order.id, "completed")
  11.     # 步骤5:发送通知
  12.     send_notification(event.customer_email, "Order completed")
复制代码
每个需要等待的步骤都会生成一个新的 ActionTask,确保整个流程正确执行。
通过这种方式,ActionExecutionOperator 能够有效地管理和执行复杂的、可能涉及异步操作的 Action,同时保证状态的一致性和容错能力。
0xFF 参考


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册