【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask
目录
- 【大数据 & AI】Flink Agents 源码解读 --- (6) --- ActionTask
- 0x00 概要
- 0x01 基础知识
- 1.1 相关组件
- 1.2 ActionTask
- 1.3 PythonActionTask
- 1.3.1 定义
- 1.3.2 PythonActionTask 与 Function 的关系
- 1.3.3 与其他组件的关系
- 1.3.4 调用流程
- 1.4 PythonGeneratorActionTask
- 1.5 JavaActionTask
- 1.6 ActionTaskResult 结构
- 0x02 ActionTask 切分机制
- 2.1 切分方式
- 2.2 实现细节
- 2.3 关键点
0x00 概要
ActionTask 是 Action 执行的基本单位,代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下:- Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime
复制代码 0x01 基础知识
ActionTask 是 Action 执行过程中的一个片段,用于支持复杂的执行逻辑(如异步处理),对应关系如下:
在 AgentPlan 中,每个 Action 包含一个执行函数 (exec),通常是 PythonFunction 或 JavaFunction,例如在 tool_call_action.py 中:- TOOL_CALL_ACTION = Action (
- name="tool_call_action",
- exec=PythonFunction.from_callable (process_tool_request), // 一个函数
- listen_event_types=[...]
- )
复制代码
- 一个 Action 可能产生多个 ActionTask
- ActionTask 是 Action 的执行时表示,可以看作是 Action 的 “执行片段”
- 一个 Action 可能在执行过程中被拆分为多个 ActionTask,特别是在处理异步操作时
1.1 相关组件
ActionTask 概念的相关组件如下
组件核心功能JavaActionTask执行 Java 函数PythonActionTask执行 Python 函数,支持异步 / 生成器模式,桥接 Java 与 Python 生态LocalRunnerContext本地执行上下文,模拟 Flink 分布式状态,管理事件队列、key 隔离状态、资源访问ActionTaskResult动作执行结果载体,包含是否完成、输出事件、下一个待执行任务(若有)PythonGeneratorActionTask处理 Python 生成器的异步任务,持续执行直到完成所有异步操作Tool 相关机制支持装饰器(@tool)、add_resource 等方式注册工具,通过 TOOL_CALL_ACTION 触发执行在系统中的架构如下
1.2 ActionTask
我们接下来看看 ActionTask 的具体实现。
ActionTask 是基类。- /**
- * This class represents a task related to the execution of an action in {@link
- * ActionExecutionOperator}.
- *
- * <p>An action is split into multiple code blocks, and each code block is represented by an {@code
- * ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result
- * {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next
- * {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing
- * it.
- */
- public abstract class ActionTask {
- protected final Object key;
- protected final Event event;
- protected final Action action;
- /**
- * Since RunnerContextImpl contains references to the Operator and state, it should not be
- * serialized and included in the state with ActionTask. Instead, we should check if a valid
- * RunnerContext exists before each ActionTask invocation and create a new one if necessary.
- */
- protected transient RunnerContextImpl runnerContext;
- public ActionTask(Object key, Event event, Action action) {
- this.key = key;
- this.event = event;
- this.action = action;
- }
- public RunnerContextImpl getRunnerContext() {
- return runnerContext;
- }
- public void setRunnerContext(RunnerContextImpl runnerContext) {
- this.runnerContext = runnerContext;
- }
- public Object getKey() {
- return key;
- }
- /** Invokes the action task. */
- public abstract ActionTaskResult invoke() throws Exception;
- public class ActionTaskResult {
- private final boolean finished;
- private final List<Event> outputEvents;
- private final Optional generatedActionTaskOpt;
- public ActionTaskResult(
- boolean finished,
- List<Event> outputEvents,
- @Nullable ActionTask generatedActionTask) {
- this.finished = finished;
- this.outputEvents = outputEvents;
- this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTask);
- }
- public boolean isFinished() {
- return finished;
- }
- public List<Event> getOutputEvents() {
- return outputEvents;
- }
- public Optional getGeneratedActionTask() {
- return generatedActionTaskOpt;
- }
- }
- }
复制代码 1.3 PythonActionTask
PythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括:
- 执行 Python 函数:调用 Python 函数来处理事件
- 处理异步操作:支持 Python 中的异步操作,通过生成器机制实现
- 桥接 Java 和 Python:作为 Java 端和 Python 端之间的桥梁,协调两者间的交互
1.3.1 定义
PythonActionTask 对应一个 Python 函数(更准确地说是一个 PythonFunction 对象),这个函数是在创建 Action 时定义的,存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装,而是使其能够在 Flink Agents 框架中正确执行,并支持框架所需的高级特性。它提供了以下附加价值:
- 复杂逻辑:PythonActionTask 不仅仅是执行函数,还负责处理复杂的交互逻辑
- 执行环境管理:为函数提供合适的执行上下文
- 异步支持:通过生成器机制支持长时间运行的操作
- 事件处理:管理和传递执行过程中产生的事件
- 状态维护:在整个执行过程中维护必要的状态信息
PythonActionTask 在系统架构中的位置和交互关系如下:
代码如下:- public class PythonActionTask extends ActionTask {
- public ActionTaskResult invoke() throws Exception {
- PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();
- // 这里执行实际的 Python 函数
- String pythonGeneratorRef =
- pythonActionExecutor.executePythonFunction(
- (PythonFunction) action.getExec(), // <-- 这就是对应的函数
- (PythonEvent) event,
- runnerContext);
- // 处理异步情况
- if (pythonGeneratorRef != null) {
- // 如果函数返回了生成器,则创建新的任务继续执行
- ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
- tempGeneratedActionTask.setRunnerContext(runnerContext);
- if (pythonGeneratorRef != null) {
- // 如果函数返回了生成器,则创建新的任务继续执行
- ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef);
- tempGeneratedActionTask.setRunnerContext(runnerContext);
- return tempGeneratedActionTask.invoke();
- }
- // 否则表示函数已执行完毕
- return new ActionTaskResult(
- true,
- runnerContext.drainEvents(event.getSourceTimestamp()),
- null);
复制代码 这种设计允许将带有异步操作的复杂动作分解为可管理的单元,同时保持执行语义和状态一致性。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |