【大数据 & AI】Flink Agents 源码解读 --- (5) --- ActionExecutionOperator
目录
- 【大数据 & AI】Flink Agents 源码解读 --- (5) --- ActionExecutionOperator
- 0x00 摘要
- 0x01 基础知识
- 1.1 总体架构
- 1.2 定义
- 1.3 关键设计
- 1.4 流程
- 1.5 组件 & 关系
- 1.6 核心依赖组件
- 1.7 主要功能
- 1.8 运行ActionTask流程
- 创建与初始化
- ActionTask 的创建与调度
- ActionTask 的执行
- 状态管理与容错
- 1.8 Flink MailboxExecutor
- 为什么需要 MailboxExecutor
- 使用场景速览
- 工作模型(用邮筒来隐喻)
- 0x02 ActionExecutionOperator 生成机制和策略
- 2.1 生成位置
- 2.2 生成机制
- 2.3 关键生成策略
- 策略 1:基于计划的生成
- 策略 2:类型特定处理
- 策略 3:资源管理
- 2.4 操作符初始化过程
- 2.5 与执行环境的集成
- 2.6 总结
- 0x03 任务拆分机制详解
- 3.1 任务拆分的基本原理
- 3.2 任务创建过程
- 不同类型 ActionTask 的创建
- 动态创建 ActionTask
- 0x04 任务队列管理
- 4.1 任务状态存储
- 4.2 任务调度机制
- 4.3 任务处理循环
- 0x05 实际应用示例
- 5.1 异步 HTTP 请求处理
- 5.2 复杂业务流程处理
- 0xFF 参考
0x00 摘要
ActionExecutionOperator 是整个Flink Agent 系统的执行引擎,它连接了 Flink 流处理框架和 Agent 逻辑,协调各种组件完成了 Agent 定义的动作执行。ActionExecutionOperator 主要职责:
- 事件处理:接收来自上游的数据,包装成InputEvent
- 动作执行:根据Agent定义的动作规则,触发相应的处理逻辑
- 状态管理:维护短期记忆,检查点状态等
- 异步支持:处理需要异步执行的任务
- Python/Java交互操作:协调组件间的交互
- 输出产生:将最终结果作为OutputEvent发送到下游
这样,整个Agent的逻辑就被集成到了标准的Flink流处理管道中,能够利用Flink的分布式计算能力和容错机制。
本篇主要是为了学习 ActionExecutionOperator 的架构,生成机制和策略,也会学习 ActionExecutionOperator 如何使用 AgentPlan,如何生成 ActionTask。
0x01 基础知识
ActionExecutionOperator 算子是整个 Flink Agent 框架运行时的核心组件,负责将用户定义的代理逻辑转换为实际的流处理操作。
1.1 总体架构
可以把 Flink Agents 的整个执行流程比作 “做一道菜”,我们借此进行分析。
Flink Agents 是基于原生 Flink 分布式流处理能力封装的上层框架。其中四个主要组件代表了 Flink Agents 框架中的四个层次:
- Agent(顶层设计,定义了“做什么”):用户定义的智能实体,类似 “餐厅菜单 + 规则手册”,包含业务逻辑、动作(Action)和资源(工具、模型等)定义,明确 “做什么”。
- AgentPlan(中间编译层,确定了“怎么做”):将 Agent 编译后的可执行计划,类似 “详细操作流程图”,明确动作触发规则、资源映射关系,确定 “怎么做”。
- ActionExecutionOperator(运行时执行层,是执行环境,负责“协调调度”):Flink 集群中的执行核心,在 Flink 流处理环境中实际执行操作,类似 “餐厅首席大厨”,负责接收数据、调度任务、管理状态,协调整体执行流程。
- ActionTask(最小执行单元,负责“具体实施”):具体的执行任务,类似 “员工的单个服务步骤”,分为 JavaActionTask 和 PythonActionTask,处理单个事件并返回结果。
具体可以参见下图。- [Agent] 菜单手册
- ↓(编译)
- [AgentPlan] 详细流程图
- ↓(运行时实例化)
- [ActionExecutionOperator] 餐厅首席大厨
- ↓(分配任务)
- [ActionTask] 员工具体任务
复制代码 这样的设计使得系统既灵活又高效,能够处理复杂的AI代理任务,同时保证了良好的扩展性和维护性。
1.2 定义
ActionExecutionOperator 的定义如下。- /**
- * An operator that executes the actions defined in the agent. Upon receiving data from the
- * upstream, it first wraps the data into an {@link InputEvent}. It then invokes the corresponding
- * action that is interested in the {@link InputEvent}, and collects the output event produced by
- * the action.
- *
- * <p>For events of type {@link OutputEvent}, the data contained in the event is sent downstream.
- * For all other event types, the process is repeated: the event triggers the corresponding action,
- * and the resulting output event is collected for further processing.
- */
- public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ActionExecutionOperator.class);
- private static final String RECOVERY_MARKER_STATE_NAME = "recoveryMarker";
- private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME = "messageSequenceNumber";
- private static final String PENDING_INPUT_EVENT_STATE_NAME = "pendingInputEvents";
- private final AgentPlan agentPlan;
- private final Boolean inputIsJava;
- private transient StreamRecord<OUT> reusedStreamRecord;
- private transient MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState;
- // PythonActionExecutor for Python actions
- private transient PythonActionExecutor pythonActionExecutor;
- private transient FlinkAgentsMetricGroupImpl metricGroup;
- private transient BuiltInMetrics builtInMetrics;
- private transient SegmentedQueue keySegmentQueue;
- private final transient MailboxExecutor mailboxExecutor;
- // We need to check whether the current thread is the mailbox thread using the mailbox
- // processor.
- // TODO: This is a temporary workaround. In the future, we should add an interface in
- // MailboxExecutor to check whether a thread is a mailbox thread, rather than using reflection
- // to obtain the MailboxProcessor instance and make the determination.
- private transient MailboxProcessor mailboxProcessor;
- // An action will be split into one or more ActionTask objects. We use a state to store the
- // pending ActionTasks that are waiting to be executed.
- private transient ListState actionTasksKState;
- // To avoid processing different InputEvents with the same key, we use a state to store pending
- // InputEvents that are waiting to be processed.
- private transient ListState<Event> pendingInputEventsKState;
- // An operator state is used to track the currently processing keys. This is useful when
- // receiving an EndOfInput signal, as we need to wait until all related events are fully
- // processed.
- private transient ListState<Object> currentProcessingKeysOpState;
- private final transient EventLogger eventLogger;
- private final transient List<EventListener> eventListeners;
- private transient ActionStateStore actionStateStore;
- private transient ValueState<Long> sequenceNumberKState;
- private transient ListState<Object> recoveryMarkerOpState;
- private transient Map<Long, Map<Object, Long>> checkpointIdToSeqNums;
- // This in memory map keep track of the runner context for the async action task that having
- // been finished
- private final transient Map actionTaskRunnerContexts;
复制代码 1.3 关键设计
ActionExecutionOperator 关键设计要点如下:
- 状态管理:使用Flink的状态后端管理短期记忆和处理状态
- 异步支持:通过邮箱线程机制支持异步操作执行
- 容错恢复:通过checkpoint机制实现状态持久化和恢复
- 资源隔离:每个key拥有独立的处理上下文和状态
- 多语言支持:同时支持Java和Python实现的动作
- 这种设计使得复杂的Agent逻辑能够在Flink的分布式流处理环境中高效、可靠地执行
1.4 流程
ActionExecutionOperator 的工作流程:
- 接收数据:从上游接收数据并包装成 InputEvent
- 动作触发:依据事件类型查找并触发相应的动作
- 动作执行:执行动作逻辑,可能产生新的事件
- 事件处理:处理动作产生的事件,如果是输入事件则发送给下游
- 状态更新:更新内存状态和执行状态
- 循环处理:继续处理新产生的事件,直到没有待处理的事件。
关键特性:
- 事件驱动:基于事件的处理模型,事件触发动作执行
- 状态一致性:通过Flink的状态管理来保证处理一致性
- 容错恢复:支持从检查点恢复执行状态
- 混合执行:同时支持 java 和 python 动作的执行
- 内存管理:管理Agent的短期内存状态
使用流程图如下- graph TD
- A[AgentsExecutionEnvironment] --> B[RemoteExecutionEnvironment]
- B --> C[ActionExecutionOperator]
- C --> D[AgentPlan]
- C --> D[Flink State Backend]
- C --> D[Python Components]
- C --> D[Event Processing Components]
复制代码
1.5 组件 & 关系
ActionExecutionOperator 是 Flink Agent 系统中的核心执行组件,位于 Flink 流处理管道中,负责执行 Agent 定义的各种动作。其主要交互组件如下:
上游组件
- DataStream API:作为 Flink 流处理管道中的操作符,接收来自上游 DataStream 的数据
- Input 数据源:接收各种类型的输入数据,将其包装为 InputEvent
- KeyedStream:处理按键分组的数据流
下游组件
- Output 数据接收器:将OutputEvent 中的数据发送到下游操作符
- DataStream API:输出处理结果到下游 DataStream 操作符
核心依赖组件
工作流程中的交互- graph TD
- A[DataStream Source] --> B[ActionExecutionOperator]
- B --> C[DataStream Sink]
- B --> D[AgentPlan]
- B --> E[ActionTask]
- B --> F[RunnerContext]
- B --> G[Flink State Backend]
- B --> H[PythonActionExecutor]
- B --> I[ActionStateStore]
复制代码
1.6 核心依赖组件
- AgentPlan:AgentPlan会被 ActionExecutionOperator 使用,因为操作符需要访问AgentPlan中的动作定义和资源配置来执行相应逻辑
- 包含Agent的所有动作定义和资源配置
- 提供动作触发逻辑和资源配置信息
- ActionTask 及其子类:
- JavaActionTask :执行 Java 动作
- PythonActionTask :执行Python动作
- Environment
- Flink Streaming Runtime Environment:ActionExecutionOperator 是一个Flink 流处理操作符,会被 Flink 流处理运行时环境使用,作为流处理作业 DAG 图中的一个节点。
- RemoteExecutionEnvironment:在远程执行环境中,RemoteExecutionEnvironment 会创建和使用 ActionExecutionOperator 来执行Agent 逻辑。
- RunnerContext 及其实现:
- RunnerContextImpl:Java 动作执行上下文
- PythonRunnerContextImpl:Python 动作执行上下文
- 状态管理组件。以下状态存储组件会被 ActionExecutionOperator 使用。
- Flink 状态后端,用于短期内存、待处理任务等。
- ActionStateStore:KafkaActionStateStore:可选的外部状态存储,用于动作状态持久化和恢复。
- Python相关组件。当代理包含Python动作时,以下组件会与 ActionExecutionOperator 协同工作。
- PythonActionExecutor:管理Python环境和执行Python动作
- PythonActionTask:表示Python动作任务
- PythonRunnerContextImpl :Python动作执行上下文。
- 辅助组件
- EventLogger:事件日志记录器
- EventListner:事件监听器
- BultInMetrics:内置指标收集
- FlinkAgentsMetricGroupImpl:指标组实现
- 事件处理相关组件
- 各种Event类型(InputEvent、OutputEvent、PythonEvent等)
- SegmentedQueue:管理按键分段的事件队列和水印处理
1.7 主要功能
ActionExecutionOperator 主要功能如下:
- 动作执行核心
- 执行Agent中定义的所有动作(actions)
- 处理事件驱动的执行流程,根据事件类型触发的相应的动作
- 事件处理流程
- 接收上游数据并将其包装成 InputEvent
- 根据事件类型触发对应的 action 执行
- 对应 OutputEvent 类型的事件,将数据发送到下游
- 对于其他类型的事件,继续触发相应的动作来执行
- 状态管理
- 管理短期内存状态(short-term memory)
- 维护动作执行状态和序列号
- 支持检查点和故障恢复
- 使用 Flink 状态后端持久化关键状态信息
- 混合语言支持
- 支持 Java 和 Python 两种语言编写的动作
- 通过 PythonActionExecutor 处理 Python 动作的执行
- 在同一个算子中协调 Java 和 Python 动作的执行
- 异步处理支持
- 支持异步动作执行
- 通过 MailboxExector 管理异步任务
- 处理长时间运行的动作任务
- 监控和日志
- 集成指标收集和监控
- 支持事件日志记录
- 提供内置指标跟踪执行情况
1.8 运行ActionTask流程
ActionTask 在 Flink 中的运行机制分三步完成:创建、调度、执行,全程由 ActionExecutionOperator 托管,并利用 Flink 的邮箱线程保证并发安全,具体如下。
创建与初始化
- 作业启动时,ActionExecutionOperator 完成实例化,内部持有 AgentPlan(含全部 Action 定义)。
- 上游数据到达 processElement() 后,先被封装成 InputEvent;随后调用 processEvent() 开始处理。
ActionTask 的创建与调度
- processEvent() 通过 getActionsTriggeredBy() 找出响应该事件的所有 Action。
- 对每个 Action,调用 createActionTask() 生成对应的 ActionTask 实例,并存入 actionTasksState 状态。
- 使用 Flink MailboxExecutor 提交异步任务:
- mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
复制代码 保证后续逻辑在 mailbox 线程中顺序执行,避免并发冲突。这是Agent与Flink进行结合的关键之一,是Agent代码运行的关键。
ActionTask 的执行
- tryProcessActionTaskForKey() → processActionTaskForKey() 从 actionTasksState 取出待处理任务。
- createAndSetRunnerContext() 为任务绑定运行时上下文(内存、指标、邮箱检查等)。
- 调用 ActionTask.invoke() 执行具体动作逻辑;若任务返回新生成器(异步场景),则循环创建后续 ActionTask 并再次提交邮箱,实现“拆分-继续”流程。
- ActionTask.invoke() 返回 ActionTaskResult;若任务已完成(isFinished() 为 true),则通过 processEvent() 向下游发送输出事件或触发新的 ActionTask,并持久化内存状态。
- 若任务未完成(异步操作),则保存 RunnerContext 供后续使用,并将新生成的 ActionTask 加入 actionTasksState,再次提交到 MailboxExecutor,形成循环直到所有任务完成。
状态管理与容错
- 使用 Flink 状态后端持久化 actionTasksState、pendingInputEventsState 等;在 checkpoint 时保存快照,故障恢复后可继续执行。
- 实现 snapshotState() 和 initializeState() 方法,完成状态的快照与恢复。
整个流程依托 Flink 核心特性:
- 事件驱动:基于事件触发动作执行;
- 状态管理:使用 Flink 状态后端管理中间状态;
- 容错机制:通过 checkpoint 和状态恢复保证 exactly-once 语义;
- 异步处理:通过 MailboxExecutor 实现高效的异步任务调度;
- 键控处理:支持按键分区处理,保证同一键的数据顺序处理。
通过“状态存储 + 邮箱调度”双机制,ActionTask 既能顺序处理单键事件,又能优雅拆分异步步骤,全程与 Flink 的并发模型无缝衔接。
1.8 Flink MailboxExecutor
MailboxExecutor 是 Flink 内部的一个“单线程邮筒”调度器,作用可以一句话概括:“外部线程只管投信,内部让同一个算子/同一个键的所有任务按顺序一条一条执行,避免并发竞争,同时不阻塞整个 TaskManager”。
为什么需要 MailboxExecutor
- Flink 算子可能被多线程同时调用(网络线程、异步 I/O、用户回调)。
- 很多算子状态不是线程安全的(例如 keyed state、Python 解释器)。
- 不能把整个算子锁起来,否则背压会拖垮整个 TaskManager。
→ 解决方案:把“真正干活”的代码投递到同一个“邮箱”里,由一条专用线程按顺序取出执行;其他线程只负责“投信”,瞬间返回,不会阻塞。
使用场景速览
场景投递到邮箱的任务好处ActionTask 拆分tryProcessActionTaskForKey(key)同一键的任务顺序执行,状态无锁Python UDF 异步PythonGeneratorActionTask解释器单线程,避免 GIL 竞争网络线程回调onNext(record)网络线程立即返回,不阻塞 TCP 接收工作模型(用邮筒来隐喻)
- 任意线程(网络、异步、用户)
- ├─→ mailbox.put(mail) // 非阻塞,瞬间返回
- └─→ 立即继续干别的
- 专用单线程(Mailbox Thread)
- ├─→ while(true) mail = mailbox.take()
- ├─→ 顺序执行 mail.run()
- └─→ 更新状态、发下游、写 checkpoint
复制代码
- 邮箱使用 无锁队列(Disruptor),put/take 都是 O(1) 且线程安全。
- 单线程内部仍可异步(例如生成器返回新的 ActionTask),但状态读写无并发。
对应示例如下:- // 投递任务
- mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
- // 邮箱线程循环(简化)
- while (isRunning) {
- Runnable task = mailbox.take(); // 阻塞直到有信
- task.run(); // 顺序执行
- }
复制代码 0x02 ActionExecutionOperator 生成机制和策略
2.1 生成位置
ActionExecutionOperator 在 ActionExecutionOperatorFactory 类中创建。从代码结构可以看出,这个工厂类被用在 CompileUtils.connectToAgent() 方法中。
2.2 生成机制
创建流程
- // 在 CompileUtils.java 中
- private static <K, IN, OUT> DataStream<OUT> connectToAgent(
- KeyedStream<IN, K> keyedInputStream,
- AgentPlan agentPlan,
- TypeInformation<OUT> outTypeInformation,
- boolean inputIsJava) {
- return (DataStream<OUT>)
- keyedInputStream
- .transform(
- "action-execute-operator",
- outTypeInformation,
- new ActionExecutionOperatorFactory(agentPlan, inputIsJava)
- )
- .setParallelism(keyedInputStream.getParallelism());
- }
复制代码 操作符通过 Flink 的 transform() 方法创建,该方法接收一个 ActionExecutionOperatorFactory。
工厂模式实现
ActionExecutionOperatorFactory 实现了 Flink 的 StreamOperatorFactory 接口:- public class ActionExecutionOperatorFactory implements StreamOperatorFactory<Object> {
- private final AgentPlan agentPlan;
- private final Boolean inputIsJava;
- @Override
- public <T extends StreamOperator<Object>> T createStreamOperator(
- StreamOperatorParameters<Object> parameters) {
- ProcessingTimeService processingTimeService = parameters.getProcessingTimeService();
- MailboxExecutor mailboxExecutor = parameters.getMailboxExecutor();
- return (T) new ActionExecutionOperator<>(
- agentPlan,
- inputIsJava,
- processingTimeService,
- mailboxExecutor,
- null); // actionStateStore
- }
复制代码 2.3 关键生成策略
策略 1:基于计划的生成
- AgentPlan 包含关于动作、资源和配置的所有必要信息
- 这个计划是从用户定义的 Python 或 Java Agent 生成的
- 操作符的所有行为都在创建时由这个计划决定
策略 2:类型特定处理
inputIsJava 参数决定了如何处理输入数据:
- 如果为 true:输入和输出是 Java 对象
- 如果为 false:输入和输出是字节数组(用于 Python 互操作)
策略 3:资源管理
资源在初始化期间注入到操作符中:- // 在 ActionExecutionOperator 构造函数中
- public ActionExecutionOperator(
- AgentPlan agentPlan,
- Boolean inputIsJava,
- ProcessingTimeService processingTimeService,
- MailboxExecutor mailboxExecutor,
- ActionStateStore actionStateStore) {
- ...
- }
- @Override
- public <T extends StreamOperator<Object>> T createStreamOperator(
- StreamOperatorParameters<Object> parameters) {
- ProcessingTimeService processingTimeService = parameters.getProcessingTimeService();
- MailboxExecutor mailboxExecutor = parameters.getMailboxExecutor();
- return (T) new ActionExecutionOperator<>(
- agentPlan,
- inputIsJava,
- processingTimeService,
- mailboxExecutor,
- null); // actionStateStore
- }
复制代码 2.4 操作符初始化过程
当操作符打开时。
状态初始化:
- 短期记忆状态(shortTermMemState)
- 动作任务状态(actionTasksState)
- 待处理事件状态(pendingInputEventsState)
- 处理中的键跟踪状态(currentProcessingKeysOpState)
组件设置:
- Python 执行器(如果需要)
- 指标收集
- 事件日志记录
- 动作状态存储(可选)
恢复逻辑:- // 故障/重启后恢复处理
- private void tryResumeProcessActionTasks() throws Exception {
- Iterable<Object> keys = currentProcessingKeysOpState.get();
- if (keys != null) {
- for (Object key : keys) {
- keySegmentQueue.addKeyToLastSegment(key);
- mailboxExecutor.submit(
- () -> tryProcessActionTaskForKey(key), "process action task");
- }
- }
- }
复制代码 2.5 与执行环境的集成
本地环境和远程环境的生成过程不同:
远程环境(Flink 集群)- // 在 RemoteExecutionEnvironment.java 中
- @Override
- public DataStream<Object> toDataStream() {
- if (agentPlan == null) {
- throw new IllegalStateException("Must apply agent before calling toDataStream");
- }
- if (outputDataStream == null) {
- if (keySelector != null) {
- outputDataStream =
- CompileUtils.connectToAgent(inputDataStream, keySelector, agentPlan);
- } else {
- // 如果没有提供键选择器,则使用简单的直键选择器
- outputDataStream =
- CompileUtils.connectToAgent(inputDataStream, x -> x, agentPlan);
- }
- }
- return outputDataStream;
- }
复制代码 本地环境
对于本地执行,使用完全不同的 LocalRunner 而不是基于操作符的方法。
2.6 总结
ActionExecutionOperator 通过 Flink 的转换 API 使用工厂模式生成。生成策略重点关注:
- 声明式方法:使用 AgentPlan 定义行为
- 类型灵活性:通过序列化支持 Java 和 Python
- 状态管理:正确管理 Flink 状态以实现容错
- 资源注入:提供对已配置资源的访问
- 可恢复处理:通过状态快照支持故障恢复
这种设计允许操作符基于代理定义动态创建,同时保持 Flink 流处理的保证。
0x03 任务拆分机制详解
3.1 任务拆分的基本原理
为什么需要拆分任务
ActionExecutionOperator 将复杂的 Action 拆分成多个 ActionTask 的主要原因是为了支持异步操作和状态管理。
一个完整的 Action 可能包含以下复杂场景:
- 需要等待外部服务响应的异步调用
- 需要分步骤执行的长时间运行任务
- 需要在不同时间点产生多个输出事件的任务
拆分粒度
每个 ActionTask 代表 Action 中的一个执行单元,通常对应以下情况之一:
- Action 的初始执行步骤
- 异步操作的触发和回调处理
- 长时间运行任务的阶段性执行
3.2 任务创建过程
不同类型 ActionTask 的创建
根据 Action 的执行类型(Java 或 Python),创建相应的 ActionTask 实现:
- JavaActionTask:处理 Java 实现的 Action
- PythonActionTask:处理 Python 实例的 Action
动态创建 ActionTask
在某些情况下,ActionTask.invoke() 可能会产生新的 ActionTask 来继续处理:
- 异步操作完成后需要进一步处理结果
- 长时间运行的任务需要分阶段执行
- 需要等待某些条件满足后再继续执行
具体对应代码中,就是当 ActionTask 被执行时,其 invoke() 方法可能会返回一个新的 ActionTask 来继续执行剩余的工作:- // PythonActionTask.invoke() 示例
- public ActionTaskResult invoke() throws Exception {
- // 执行 Python 函数
- String pythonGeneratorRef = pythonActionExecutor.executePythonFunction(...);
- if (pythonGeneratorRef != null) {
- // 如果返回了生成器引用,创建新的 ActionTask 来处理后续步骤
- ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
- tempGeneratedActionTask.setRunnerContext(runnerContext);
- return tempGeneratedActionTask.invoke();
- }
- // 如果没有更多步骤,标记任务完成
- return new ActionTaskResult(
- true, runnerContext.drainEvents(event.getSourceTimestamp()), null);
- }
复制代码 也会在 processEvent 中启动新的task。- private void processEvent(Object key, Event event) throws Exception {
- notifyEventProcessed(event);
- boolean isInputEvent = EventUtil.isInputEvent(event);
- if (EventUtil.isOutputEvent(event)) {
- // If the event is an OutputEvent, we send it downstream.
- OUT outputData = getOutputFromOutputEvent(event);
- if (event.hasSourceTimestamp()) {
- output.collect(reusedStreamRecord.replace(outputData, event.getSourceTimestamp()));
- } else {
- reusedStreamRecord.eraseTimestamp();
- output.collect(reusedStreamRecord.replace(outputData));
- }
- } else {
- if (isInputEvent) {
- // If the event is an InputEvent, we mark that the key is currently being processed.
- currentProcessingKeysOpState.add(key);
- initOrIncSequenceNumber();
- }
- // We then obtain the triggered action and add ActionTasks to the waiting processing
- // queue.
- List triggerActions = getActionsTriggeredBy(event);
- if (triggerActions != null && !triggerActions.isEmpty()) {
- for (Action triggerAction : triggerActions) {
- actionTasksKState.add(createActionTask(key, triggerAction, event));
- }
- }
- }
- if (isInputEvent) {
- // If the event is an InputEvent, we submit a new mail to try processing the actions.
- mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
- }
- }
复制代码 createActionTask 的代码如下。- private ActionTask createActionTask(Object key, Action action, Event event) {
- if (action.getExec() instanceof JavaFunction) {
- return new JavaActionTask(
- key, event, action, getRuntimeContext().getUserCodeClassLoader());
- } else if (action.getExec() instanceof PythonFunction) {
- return new PythonActionTask(key, event, action);
- } else {
- throw new IllegalStateException(
- "Unsupported action type: " + action.getExec().getClass());
- }
- }
复制代码 0x04 任务队列管理
4.1 任务状态存储
ActionExecutionOperator 使用 Flink 的状态管理来存储待处理的 ActionTask:- // 在 operator 中定义的状态
- private transient ListState actionTasksKState;
- // 初始化状态
- actionTasksKState = getRuntimeContext().getListState(
- new ListStateDescriptor<>("actionTasks", TypeInformation.of(ActionTask.class)));
复制代码 4.2 任务调度机制
ActionExecutionOperator 使用 Flink 的 Mailbox 机制来调度 ActionTask 的执行:- private void processEvent(Object key, Event event) throws Exception {
- // ... 处理事件逻辑 ...
- if (isInputEvent) {
- // 提交邮件任务来处理 ActionTask
- mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
- }
- }
复制代码 4.3 任务处理循环
tryProcessActionTaskForKey 方法实现了持续处理同一个 key 下的多个 ActionTask 的机制:- private void tryProcessActionTaskForKey(Object key) {
- try {
- processActionTaskForKey(key);
- } catch (Exception e) {
- // 错误处理
- }
- }
- private void processActionTaskForKey(Object key) throws Exception {
- // 1. 获取待处理的 ActionTask
- ActionTask actionTask = pollFromListState(actionTasksKState);
- if (actionTask == null) {
- // 没有更多任务,清状态
- return;
- }
- // 2. 执行 ActionTask
- ActionTaskResult result = actionTask.invoke();
- // 3. 处理结果
- if (!result.isFinished()) {
- // 如果未完成,将生成的新任务加入队列
- actionTasksKState.add(result.getGeneratedActionTask().get());
- }
- // 4. 处理输出事件
- for (Event outputEvent : result.getOutputEvents()) {
- processEvent(key, outputEvent);
- }
- // 5. 如果还有任务,继续调度
- if (currentKeyHasMoreActionTask()) {
- mailboxExecutor.submit(() -> tryProcessActionTaskForKey(key), "process action task");
- }
- }
复制代码 0x05 实际应用示例
5.1 异步 HTTP 请求处理
考虑一个需要调用外部 API 的 Action:- @action(SomeEvent)
- def fetch_external_data(event, ctx):
- # 步骤1: 发起异步HTTP请求
- future = asyncio.create_task(http_client.get(f"https://api.example.com/data/{event.id}"))
- # 第一次执行到这里会暂停,返回一个生成器
- response = await future
- # 步骤2: 处理响应数据
- processed_data = process_response(response)
- # 步骤3: 发送结果事件
- ctx.send_event(ResultEvent(processed_data))
复制代码 这个 Action 会被拆分为多个 ActionTask:
- 初始任务:发起 HTTP 请求并等待
- 后续任务:处理响应并发送结果
5.2 复杂业务流程处理
一个多步骤的业务流程也可能被拆分:- @action(OrderEvent)
- def process_order(event, ctx):
- # 步骤1:验证订单
- validate_order(event.order)
- # 步骤2:检查库存(可能需要异步调用)
- inventory_status = check_inventory(event.order.items)
- # 步骤3:处理支付(可能需要异步调用)
- payment_result = process_payment(event.order)
- # 步骤4:更新订单状态
- update_order_status(event.order.id, "completed")
- # 步骤5:发送通知
- send_notification(event.customer_email, "Order completed")
复制代码 每个需要等待的步骤都会生成一个新的 ActionTask,确保整个流程正确执行。
通过这种方式,ActionExecutionOperator 能够有效地管理和执行复杂的、可能涉及异步操作的 Action,同时保证状态的一致性和容错能力。
0xFF 参考
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |