【大数据 & AI】Flink Agents 源码解读 --- (7) --- AgentsExecutionEnvironment
目录
- 【大数据 & AI】Flink Agents 源码解读 --- (7) --- AgentsExecutionEnvironment
- 0x00 概要
- 0x01 基础知识
- 1.1 定义
- 1.2 功能
- 1.3 与原生 Flink Environment 的区别
- 0x02 LocalExecutionEnvironment
- 2.1 定义
- 2.1.1 主要功能
- 2.1.2 代码
- 2.1.3 执行流程
- 2.1.4 组件关系
- 2.2 LocalRunner
- 2.2.1 LocalRunner 的主要功能
- 2.2.2 事件驱动执行模型
- 2.2.3 AgentPlan 和 LocalRunner 的关系
- 2.2.4 为什么要为每种 key 维护独立的上下文
- 2.2.5 实际应用场景示例
- 2.3 LocalRunnerContext
- 2.3.1 设计目的
- 2.3.2 主要功能
- 2.3.3 定义
- 0x03 RemoteExecutionEnvironment
- 3.1 核心特色
- 3.2 与原生 Flink RemoteEnvironment 的对比
- 3.3 如何使用 ActionExecutionOperator
- 3.4 典型流程
- 3.5 交互逻辑
- 3.6 实现
- 0xFF 参考
0x00 概要
AgentsExecutionEnvironment 是在Flink基础上构建的一个更高层次的执行环境,专门为Agent而设计,同时保留了与原生 Flink API 的兼容性。
0x01 基础知识
可以把 Flink 原生的 StreamExecutionEnvironment / TableEnvironment 理解成“Agent 话题里所说的执行环境(Execution Environment)”,但是其无法直接被 Agent 使用。因此需要在其上做一些封装和适配,这就是 AgentsExecutionEnvironment。
- Flink Environment = 纯粹的计算资源与运行时容器
- 负责申请 Slot、管理 Checkpoint、调度算子链、网络 Shuffle
- 对“业务语义”一无所知,也不管用户写的是 ETL、CEP 还是 AI 推理
- 对应 Agent 词汇表里的“Runtime / Cluster / Engine”这一层
- Agent Environment = 在 Flink Runtime 之上包了一层语义抽象
- 替用户注册 Chat Model、Tools、Memory、Actions、Event Schema
- 把“用户消息”或“传感器事件”封装成 Event,按 AgentPlan 去调大模型、执行业务动作
- 内部仍然用 StreamExecutionEnvironment 去提交算子,只是看不到显式的 keyBy() 和 process()——框架帮用户生成了 ActionExecutionOperator
类比关系如下:- 传统 Flink 程序├─ StreamExecutionEnvironment ← 纯运行时├─ your ProcessFunction ← 用户的业务逻辑└─ DataStreamFlink Agents 程序├─ AgentsExecutionEnvironment ← Agent 语义环境,内部仍持有 StreamExecutionEnvironment├─ AgentPlan(your business) ← 定义“遇到啥事件该干啥”└─ ActionExecutionOperator ← 框架替用户生成的算子,负责调大模型、更新记忆
复制代码 1.1 定义
AgentsExecutionEnvironment 的代码如下。- /** * Base class for agent execution environment. * *
- This class provides the main entry point for integrating Flink Agents with different types of * Flink data sources (DataStream, Table, or simple lists). */public abstract class AgentsExecutionEnvironment { protected final Map resources; protected AgentsExecutionEnvironment() { this.resources = new HashMap(); for (ResourceType type : ResourceType.values()) { this.resources.put(type, new HashMap()); } } /** * Get agents execution environment. * *
- Factory method that creates an appropriate execution environment based on the provided * StreamExecutionEnvironment. If no environment is provided, a local execution environment is * returned for testing and development. * *
- When integrating with Flink DataStream/Table APIs, users should pass the Flink * StreamExecutionEnvironment to enable remote execution capabilities. * * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local * execution environment will be created. * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion. * @return AgentsExecutionEnvironment appropriate for the execution context. */ public static AgentsExecutionEnvironment getExecutionEnvironment( StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { if (env == null) { // Return local execution environment for testing/development try { Class localEnvClass = Class.forName( "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment"); return (AgentsExecutionEnvironment) localEnvClass.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new RuntimeException("Failed to create LocalExecutionEnvironment", e); } } else { // Return remote execution environment for Flink integration try { Class remoteEnvClass = Class.forName( "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment"); return (AgentsExecutionEnvironment) remoteEnvClass .getDeclaredConstructor( StreamExecutionEnvironment.class, StreamTableEnvironment.class) .newInstance(env, tEnv); } catch (Exception e) { throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e); } } }
复制代码 1.2 功能
AgentsExecutionEnvironment 的功能如下:
- 统一入口:
- 提供getExecutionEnvironment()系列静态工厂方法,可以依据是否传入Flink的 StreamExecutionEnvironment来创建本地或远程执行环境。
- 支持从不同数据源构建构建Agent管道,包括List,DataStream和Table。
- 资源配置管理:
- 内置 resources 映射结构,支持按照资源类型管理各种资源。
- 提供 addResource() 方法注册可序列化的资源或者资源描述符。
- 多输入源支持:
- fromList():支持从简单列表创建本地执行环境。
- fromDataStream():集成Flink DataStream API
- fromTable():集成Flink Table API
- 配置管理:
- 提供getConfig()抽象方法获取可写的配置对象。
1.3 与原生 Flink Environment 的区别
- 抽象层级更高:AgentExecutionEnvrionment是对Flink原生环境的封装,在其之上提供了面向Agent的编程模型。主要用于运行Agent,而非 直接处理数据流。
- 执行模式:AgentExecutionEnvrionment 通过 LocalExecutionEnvironment 和 RemoteExecutionEnvironment 分别支持本地测试和远程集群执行。原生 Flink 主要通过配置参数控制执行模式。
- 资源管理机制:AgentExecutionEnvrionment 内置了专门的资源注册和管理机制,支持按类型分类管理支援。原生 Flink 没有这种结构化的资源管理方式。
- API 设计目标:面向 Agent 编程范式,强调事件驱动的自主行为体模型。原生Flink 更关注数据流处理和批处理操作。
0x02 LocalExecutionEnvironment
LocalExecutionEnvironment 是 AgentsExecutionEnvironment 的一种实现形式。- Class localEnvClass = Class.forName( "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment"); return (AgentsExecutionEnvironment) localEnvClass.getDeclaredConstructor().newInstance();
复制代码 LocalExecutionEnvironment 主要用于
- 开发阶段的快速测试和调试
- 无需 Flink 集群即可验证代理逻辑
- 简化Agent应用的本地开发流程
- 它与远程执行环境形成对比,后者支持完整的 Flink DataStream 和 Table API 集成。
2.1 定义
2.1.1 主要功能
LocalExecutionEnvironment 的主要功能如下:
- 本地执行环境实现
- 集成自 AgentExecutionEnvrionment,为本地测试和开发提供执行环境
- 不依赖 Flink 集群,可以在本地环境中运行和调试代理
- 数据源支持
- 通过from_list方法支持从列表数据源读取输入数据
- 不支持 Flink 的DataStream 和 Table API(这些在远程执行环境中使用)
- 资源配置和管理
- 存储和管理通过 add_resource 方法注册的资源
- 在构建Agent时,将环境中的资源注入到Agent实例中
- 代理执行管理
- 通过 set_agent 方法设置待执行的Agent、输入和输出
- 使用 LocalRunner 在本地执行代理逻辑
- 通过 execute 方法触发代理执行
- 结果收集
- 收集Agent执行的输出结果
- 通过 to_list 方法返回执行结果
根据代码分析, execute 函数在以下情况下被调用:
- 用户手动调用。当用户完成代理配置和输入数据设置后,需要显式调用execute() 方法来启动Agent 执行流程,这通常式配置完所有必要组件后的最后一步。
2.1.2 代码
LocalExecutionEnvironment 的定义如下,其中 LocalRunner 是核心。- class LocalExecutionEnvironment(AgentsExecutionEnvironment): """Implementation of AgentsExecutionEnvironment for local execution environment.""" __input: List[Dict[str, Any]] = None __output: List[Any] = None __runner: LocalRunner = None __executed: bool = False __config: AgentConfiguration = AgentConfiguration() def set_agent(self, input: list, output: list, runner: LocalRunner) -> None: """Set agent input, output and runner.""" self.__input = input self.__runner = runner self.__output = output def execute(self) -> None: """Execute agent individually.""" if self.__executed: err_msg = ( "LocalExecutionEnvironment doesn't support execute multiple times." ) raise RuntimeError(err_msg) self.__executed = True for input in self.__input: self.__runner.run(**input) outputs = self.__runner.get_outputs() for output in outputs: self.__output.append(output)
复制代码 2.1.3 执行流程
在 LocalExecutionEnvironment 中,execute 方法会:
- 遍历所有输入数据项
- 对每个输入调用 LocalRunner 的run方法进行处理
- 收集所有输出结果
典型的调用序列如下:- # 1. 获取执行环境env = StreamExecutionEnvironment.get_execution_environment()# 2. 添加所需资源env.add_resource(...)# 3. 设置输入数据output_data = env.from_list(input_data) # 4. 应用代理builder.apply(agent) # 5. 执行代理env.execute()# 6. 获取结果results = builder.to_list()
复制代码 关键特点:
- 手动触发: execute() 不会自动调用,必须由用户显式调用
- 一次性执行:只能调用一次,多次调用会抛出异常
- 阻塞操作:在本地环境中,这是个同步阻塞操作,会等待所有输入处理完成
2.1.4 组件关系
组件关系图如下:- LocalExecutionEnvironmet ↓ ↓ createsLocalAgentBuilder ↓ ↓ createsLocalRunner ↓ ↓ creates & run(data)LocalRunnerContext
复制代码 关系说明:
- LocalExecutionEnvironmet 是入口点,管理整个执行环境。通过 from_list() 创建LocalAgentBuilder
- LocalAgentBuilder 负责构建Agent 执行管道,通过 apply() 创建 LocalRunner
- LocalRunner 是实际的执行器
- LocalRunner 为每个处理的记录创建 LocalRunnerContext
2.2 LocalRunner
LocalRunner 是本地执行环境中的核心执行器,负责实际运行Agent逻辑。LocalRunner 提供了一个完整的本地执行环境,模拟了Flink 流处理的行为,使得Agent可以在本地进行开发和测试。
2.2.1 LocalRunner 的主要功能
LocalRunner 的主要功能:
- 代理执行:将Agent转换为可执行的计划并执行
- 上下文管理:为每个处理单元创建和管理 LocalRunnerContext
- 事件处理:管理事件队列并驱动Agent的执行流程
- 结果收集:收集和存储Agent执行的输出结果
run函数是LocalRunner的核心方法,负责处理单个输入记录:
- 从输入数据中提取键值,用于上下文管理
- 为每个键创建或复用 LocalRunnerContext
- 将输入数据包装成 InputEvent 并发送到事件队列
- 事件循环处理,LocalRunner 的 run 函数使用 while 循环是为了处理事件驱动的代理执行模型。这种设计反映了代理执行的核心机制
- 持续处理事件队列直到为空
- 如果是输出事件、收集结果
- 根据事件类型查找对应的动作
- 执行动作
- 处理异步处理结果
- @override def run(self, **data: Dict[str, Any]) -> Any: """Execute the agent to process the given data. Parameters ---------- **data : dict[str, Any] input record from upstream. Returns: ------- key The key of the input that was processed. """ if "key" in data: key = data["key"] elif "k" in data: key = data["k"] else: key = uuid.uuid4() if key not in self.__keyed_contexts: self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan, key, self.__config) context = self.__keyed_contexts[key] if "value" in data: input_event = InputEvent(input=data["value"]) elif "v" in data: input_event = InputEvent(input=data["v"]) else: msg = "Input data must be dict has 'v' or 'value' field" raise RuntimeError(msg) context.send_event(input_event) while len(context.events) > 0: event = context.events.popleft() if isinstance(event, OutputEvent): self.__outputs.append({key: event.output}) continue event_type = f"{event.__class__.__module__}.{event.__class__.__name__}" for action in self.__agent_plan.get_actions(event_type): context.action_name = action.name func_result = action.exec(event, context) if isinstance(func_result, Generator): try: for _ in func_result: pass except Exception: logger.exception("Error in async execution") raise return key
复制代码 2.2.2 事件驱动执行模型
在 Flink Agents 框架中,代理的执行是基于事件的。每个动作 (action)处理一个事件并可能产生新的事件,这些新事件又会触发其他动作的执行。因此需要一个循环来持续处理事件队列中的事件,直到没有更多事件需要处理。
这种设计使得代理可以处理复杂的、多步骤的交互过程,而不需要预先确定执行步骤的数量。while 循环确保所有相关的事件都被处理完毕,形成了一个完整的事件处理链:
- 持续从事件队列 context.events 中取出事件进行处理
- 每个事件可能触发一个或多个动作的执行
- 动作执行可能产生新的事件,这些事件被添加回队列
递归事件处理:
- 初始时只有一个 InputEvent
- 动作处理 InputEvent 可能产生 ChatRequestEvent 等中间事件
- 中间事件可能触发其他动作,产生更多事件
- 最终产生 OutputEvent 完成处理
完整处理链:- InputEvent → → start_action → → ChatRequestEvent → → LLM处理 → → ChatResponseEvent → → stop_action → → OutputEvent
复制代码 具体执行流程
- 初始化:
- 将输入数据包装成 InputEvent 并添加到事件队列
- 循环处理:
- 从队列取出事件
- 根据事件类型找到对应的 actions
- 执行所有监听该事件类型的动作
- 动作可能通过 send_event 发送新事件到队列
- 终止条件:
2.2.3 AgentPlan 和 LocalRunner 的关系
- LocalRunner 在初始化时候使用 AgentPlan
- 在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作
- 在事件处理中查找对应的动作
具体为:LocalRunner 在初始化时候使用 AgentPlan- class LocalRunner(AgentRunner): """Agent runner implementation for local execution, which is convenient for debugging. Attributes: ---------- __agent_plan : AgentPlan Internal agent plan. __keyed_contexts : dict[Any, LocalRunnerContext] Dictionary of active contexts indexed by key. __outputs: Outputs generated by agent execution. __config: Internal configration. """ __agent_plan: AgentPlan __keyed_contexts: Dict[Any, LocalRunnerContext] __outputs: List[Dict[str, Any]] __config: AgentConfiguration def __init__(self, agent: Agent, config: AgentConfiguration) -> None: """Initialize the runner with the provided agent. Parameters ---------- agent : Agent The agent class to convert and run. """ self.__agent_plan = AgentPlan.from_agent(agent, config) self.__keyed_contexts = {} self.__outputs = [] self.__config = config
复制代码 具体为:在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作- class LocalRunnerContext(RunnerContext): """Implementation of RunnerContext for local agent execution. Attributes: ---------- __agent_plan : AgentPlan Internal agent plan for this context. __key : Any Unique identifier for the context, correspond to the key in flink KeyedStream. events : deque[Event] Queue of events to be processed in this context. action_name: str Name of the action being executed. """ __agent_plan: AgentPlan __key: Any events: deque[Event] action_name: str _store: dict[str, Any] _short_term_memory: MemoryObject _config: AgentConfiguration def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None: """Initialize a new context with the given agent and key. Parameters ---------- agent_plan : AgentPlan Agent plan used for this context. key : Any Unique context identifier, which is corresponding to the key in flink KeyedStream. """ self.__agent_plan = agent_plan self.__key = key self.events = deque() self._store = {} self._short_term_memory = LocalMemoryObject( self._store, LocalMemoryObject.ROOT_KEY ) self._config = config @override def get_resource(self, name: str, type: ResourceType) -> Resource: return self.__agent_plan.get_resource(name, type)
复制代码 具体为:在事件处理中查找对应的动作- for action in self.__agent_plan.get_actions(event_type): context.action_name = action.name func_result = action.exec(event, context) # 执行 if isinstance(func_result, Generator): try: for _ in func_result: pass except Exception: logger.exception("Error in async execution") raise
复制代码 2.2.4 为什么要为每种 key 维护独立的上下文
在 LocalRunner 中,每种 key 都有自己独立的上下文是为了模拟 Flink 流处理环境中 keyed state 的行为,并支持状态隔离和并发处理。
- 状态隔离:每个 key 对应的数据流需要维护自己的状态,避免不同 key 的数据相互干扰:
- # 不同 key 的状态完全隔离key1_context.short_term_memory.set("user_name", "Alice")key2_context.short_term_memory.set("user_name", "Bob")# 两个 key 有不同的状态值,互不影响
复制代码
- 模拟 Flink 的 KeyedStream 行为:在 Flink 中,当使用 keyBy() 操作时,每个 key 会有独立的状态管理。LocalRunner 通过这种方式模拟了相同的行为:
- 相同 key 的事件会在同一个上下文中处理
- 不同 key 的事件拥有各自独立的内存和事件队列
这样保证了本地调试环境与实际 Flink 环境的一致性
- 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:
- 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:
- # 可以同时处理多个 key 的数据for input_record in inputs: runner.run(**input_record) # 每个 key 有独立上下文
复制代码
- 事件队列隔离:每个 key 都有自己的事件队列(events),这样可以保证:
- 同一 key 的事件按顺序处理
- 不同 key 的事件不会互相影响处理顺序
- 每个 key 可以独立地维护待处理事件队列
- 内存状态管理:每个 LocalRunnerContext 都有自己的短期记忆对象(_short_term_memory),允许:
- # 每个 key 可以存储和检索自己的状态context.short_term_memory.set("step_count", 1)context.short_term_memory.get("step_count") # 获取该 key 特定的状态
复制代码 2.2.5 实际应用场景示例
考虑一个客户服务聊天机器人应用:- # 用户 Alice 和 Bob 的对话分别用不同的 key 处理runner.run(key="user_alice", value={"message": "Hello"})runner.run(key="user_bob", value={"message": "Hi there"})# 每个用户的对话历史和状态都独立保存# Alice 的上下文不会影响 Bob 的上下文,反之亦然
复制代码 这种设计使 LocalRunner 能够准确模拟真实分布式环境中的行为,方便开发者在本地测试和调试复杂的状态依赖型代理应用。
2.3 LocalRunnerContext
LocalRunnerContext 是 Flink Agents 框架中用于本地执行环境的运行上下文实现。它的主要功能是为每个 key 提供独立的执行环境,模拟 Flink 分布式环境中 keyed state 的行为,确保了在本地环境中能够准确模拟基于 key 的状态管理和事件处理流程。
2.3.1 设计目的
LocalRunnerContext 的设计主要是为了:
- 本地调试:在本地环境中模拟 Flink 分布式执行的行为
- 状态隔离:确保不同 key 的处理状态完全隔离,避免相互干扰
- 行为一致性:保证本地测试环境与实际 Flink 执行环境的行为一致
- 简化开发:提供与生产环境相同的 API 接口,方便开发者测试和调试
2.3.2 主要功能
- 状态管理
- 为每个 key 维护独立的短期内存状态(_short_term_memory)
- 提供内存对象的读写操作,确保不同 key 的状态隔离
- 事件处理
- 维护每个 key 的事件队列(events)
- 提供 send_event 方法将事件添加到处理队列中
- 记录事件处理日志,便于调试
- 资源访问
- 提供 get_resource 方法访问 agent 所需的资源(如模型、工具等)
- 根据资源名称和类型获取相应的资源实例
- 配置管理
- 提供对 action 配置的访问(action_config, get_action_config_value)
- 提供全局配置信息(config)
- 度量和监控
- 提供度量组访问接口(agent_metric_group, action_metric_group)
- 目前在本地环境中尚未完全实现
- 异步执行支持
- 提供 execute_async 方法支持异步函数执行
- 在本地环境中降级为同步执行并给出警告
2.3.3 定义
- class LocalRunnerContext(RunnerContext): """Implementation of RunnerContext for local agent execution. Attributes: ---------- __agent_plan : AgentPlan Internal agent plan for this context. __key : Any Unique identifier for the context, correspond to the key in flink KeyedStream. events : deque[Event] Queue of events to be processed in this context. action_name: str Name of the action being executed. """ __agent_plan: AgentPlan __key: Any events: deque[Event] action_name: str _store: dict[str, Any] _short_term_memory: MemoryObject _config: AgentConfiguration def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None: """Initialize a new context with the given agent and key. Parameters ---------- agent_plan : AgentPlan Agent plan used for this context. key : Any Unique context identifier, which is corresponding to the key in flink KeyedStream. """ self.__agent_plan = agent_plan self.__key = key self.events = deque() self._store = {} self._short_term_memory = LocalMemoryObject( self._store, LocalMemoryObject.ROOT_KEY ) self._config = config @property def key(self) -> Any: """Get the unique identifier for this context. Returns: ------- Any The unique identifier for this context. """ return self.__key @override def send_event(self, event: Event) -> None: """Send an event to the context's event queue and log it. Parameters ---------- event : Event The event to be added to the queue. """ logger.info("key: %s, send_event: %s", self.__key, event) self.events.append(event) @override def get_resource(self, name: str, type: ResourceType) -> Resource: return self.__agent_plan.get_resource(name, type) @property @override def action_config(self) -> Dict[str, Any]: """Get config of the action.""" return self.__agent_plan.get_action_config(action_name=self.action_name) @override def get_action_config_value(self, key: str) -> Any: """Get config option value of the key.""" return self.__agent_plan.get_action_config_value( action_name=self.action_name, key=key ) @property @override def short_term_memory(self) -> MemoryObject: """Get the short-term memory object associated with this context. Returns: ------- MemoryObject The root object of the short-term memory. """ return self._short_term_memory @property @override def agent_metric_group(self) -> MetricGroup: # TODO: Support metric mechanism for local agent execution. err_msg = "Metric mechanism is not supported for local agent execution yet." raise NotImplementedError(err_msg) @property @override def action_metric_group(self) -> MetricGroup: # TODO: Support metric mechanism for local agent execution. err_msg = "Metric mechanism is not supported for local agent execution yet." raise NotImplementedError(err_msg) def execute_async( self, func: Callable[[Any], Any], *args: Tuple[Any, ...], **kwargs: Dict[str, Any], ) -> Any: """Asynchronously execute the provided function. Access to memory is prohibited within the function. """ logger.warning( "Local runner does not support asynchronous execution; falling back to synchronous execution." ) func_result = func(*args, **kwargs) yield return func_result @property @override def config(self) -> AgentConfiguration: return self._config
复制代码 0x03 RemoteExecutionEnvironment
RemoteExecutionEnvironment 是 Flink Agents 对原生 Flink RemoteEnvironment 的封装(适配 Agent 语义),是 Agent(智能体)与 Flink 集群融合的核心载体,核心目标是让 Agent 能够在 Flink 集群中处理 DataStream 和 Table 类型的流式数据,具体承担以下关键职责:
- RemoteExecutionEnvironment 是 Flink Agents 实现 Agent 远程执行的核心环境组件,封装了远程集群连接、作业提交、资源管理等能力;
- 核心价值是屏蔽 Flink 远程执行的底层细节,让用户聚焦 Agent 逻辑定义,而非集群操作;
- 关键特性是兼容本地调试与远程执行,同时适配 Agent 特有的状态、事件、资源需求,是 Flink Agents 从 “本地单机” 走向 “分布式集群” 的核心支撑。
- 桥接 Agent 框架与 Flink 运行时:将 Agent 的执行逻辑嵌入 Flink 的 StreamExecutionEnvironment/StreamTableEnvironment,使 Agent 能利用 Flink 的分布式计算能力处理流式数据;
- 标准化 Agent 执行流程:提供统一的入口(fromDataStream/fromTable)、处理(apply Agent)、输出(toDataStream/toTable)接口,规范 Agent 在 Flink 中的数据处理链路;
- 配置管理:加载 Flink 集群中 Agent 的专属配置文件(config.yaml),为 Agent 执行提供环境配置支撑;
- 执行调度:最终触发 Flink 作业的执行(env.execute ()),完成 Agent 处理逻辑的分布式运行。
3.1 核心特色
特色维度具体说明环境适配性专门面向 Flink 集群的远程执行场景设计,依赖 Flink 的流执行环境(StreamExecutionEnvironment)和表环境(StreamTableEnvironment),而非本地执行;数据类型聚焦仅支持 Flink 原生的 DataStream/Table 作为输入输出,明确禁用本地场景的 List 类型(fromList/toList),贴合流式计算场景;分层设计采用 “环境类(RemoteExecutionEnvironment)+ 构建器类(RemoteAgentBuilder)” 分层模式:环境类管控全局配置和 Flink 环境,构建器类聚焦单个 Agent 的执行链路;灵活的 Key 选择支持自定义 KeySelector 对输入数据分片,无自定义 Key 时默认使用数据自身作为 Key,适配不同的分布式处理需求;配置解耦从 Flink 配置目录加载 Agent 专属配置,配置文件与代码解耦,便于集群环境下的配置管理;容错与校验包含关键流程校验(如调用 toDataStream 前必须先 apply Agent)、空值处理(TableEnvironment 懒加载)、异常封装(配置加载 / Agent 执行异常统一抛出 RuntimeException);表与流互通支持 Table 与 DataStream 的双向转换(Table 转 DataStream 作为 Agent 输入、DataStream 转 Table 作为输出),适配 Flink SQL / 流处理双场景;资源关联为 Agent 绑定运行时资源(resources),保障 Agent 执行所需的资源依赖;3.2 与原生 Flink RemoteEnvironment 的对比
特性Flink 原生 RemoteEnvironmentFlink Agents RemoteExecutionEnvironment核心定位通用 Flink Job 的远程执行环境Agent 语义专属的远程执行环境封装层级底层 Flink Job 提交上层 Agent/AgentPlan 提交核心适配无 Agent 语义,仅处理通用 JobGraph适配 AgentPlan → JobGraph 编译、Agent 状态管理资源管理通用 Slot / 资源分配按 Agent 实例隔离资源,适配工具 / 动作资源需求事件 / 状态处理无内置事件语义封装 Agent 事件(Event)的跨集群传输、状态序列化3.3 如何使用 ActionExecutionOperator
RemoteExecutionEnvironment 通过 Python 层的 RemoteAgentBuilder.to_datastream() 方法间接使用ActionExecutionOperator,过程为:
- 用户定义了一个Agent并应用到执行环境中
- 调用 to_datastream() 方法触发实际的操作符创建
- 通过 JNI 调用 Java 层的 CompileUtils.connectToAgent() 方法
- 最终在 Flink 作业图中创建并配置 ActionExecutionOperator
connectToAgent()中会:
- 接收输入的 Java DATa Stream 对象
- 接收序列化的 AgentPlan(包含所有动作和资源配置)
- 创建并连接 ActionExecutionOperator 到数据流处理图中
- // ============================ basic ==================================== /** * Connects the given KeyedStream to the Flink Agents agent. * *
- This method accepts a keyed DataStream and applies the specified agent plan to it. The * source of the input stream determines the data format: Java streams provide Objects, while * Python streams use serialized byte arrays. * * @param keyedInputStream The input keyed DataStream. * @param agentPlan The agent plan to be executed. * @param inputIsJava A flag indicating whether the input stream originates from Java. - If * true, input and output types are Java Objects. - If false, input and output types are * byte[]. * @param The type of the key used in the keyed DataStream. * @param The type of the input data (Object or byte[]). * @param The type of the output data (Object or byte[]). * @return The processed DataStream as the result of the agent. */ private static DataStream connectToAgent( KeyedStream keyedInputStream, AgentPlan agentPlan, TypeInformation outTypeInformation, boolean inputIsJava) { return (DataStream) keyedInputStream .transform( "action-execute-operator", outTypeInformation, new ActionExecutionOperatorFactory(agentPlan, inputIsJava)) .setParallelism(keyedInputStream.getParallelism()); }
复制代码 3.4 典型流程
当用户编写基于 RemoteExecutionEnvironment 的应用程序时,典型流程如下:- # 获取Flink执行环境env = StreamExecutionEnvironment.get_execution_environment()# 创建 Agent 执行环境agent_env = AgentsExecutionEnvironment.get_execution_environment(env)# 添加所需资源agent_env.add_resource(...)# 设置输入数据流到Agentinput_stream = agent_env.from_Collection(input_data) builder = agent_env.from_datastream(input_stream)# 应用代理逻辑agent = MyCustomAgent()builder.apply(agent) # 执行代理output_stream = builder.to_datastream()env.execute()
复制代码 3.5 交互逻辑
RemoteExecutionEnvironment、ActionExecutionOperator 和 ActionTask 之间的交互逻辑如下。
这三个组件在 Flink Agents 框架中扮演不同的角色,协同工作来执行 Agent 逻辑:
- RemoteExecutionEnvironment:提供远程执行环境,负责构建和连接 Agent 到 Flink 数据流
- ActionExecutionOperator:Flink 流处理操作符,实际执行 Agent 的动作逻辑
- ActionTask:表示单个动作执行任务的抽象概念
交互流程图解
详细交互流程如下:
3.6 实现
RemoteAgentBuilder 的代码如下- class RemoteAgentBuilder(AgentBuilder): """RemoteAgentBuilder for integrating datastream/table and agent.""" __input: DataStream __agent_plan: AgentPlan = None __output: DataStream = None __t_env: StreamTableEnvironment __config: AgentConfiguration __resources: Dict[ResourceType, Dict[str, Any]] = None def __init__( self, input: DataStream, config: AgentConfiguration, t_env: StreamTableEnvironment | None = None, resources: Dict[ResourceType, Dict[str, Any]] | None = None, ) -> None: """Init method of RemoteAgentBuilder.""" self.__input = input self.__t_env = t_env self.__config = config self.__resources = resources @property def t_env(self) -> StreamTableEnvironment: """Get or crate table environment.""" if self.__t_env is None: self.__t_env = StreamTableEnvironment.create( stream_execution_environment=self.__env ) return self.__t_env def apply(self, agent: Agent) -> "AgentBuilder": """Set agent of execution environment. Parameters ---------- agent : Agent The agent user defined to run in execution environment. """ if self.__agent_plan is not None: err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet." raise RuntimeError(err_msg) # inspect refer actions and resources from env to agent. for type, name_to_resource in self.__resources.items(): agent.resources[type] = name_to_resource | agent.resources[type] self.__agent_plan = AgentPlan.from_agent(agent, self.__config) return self def to_datastream(self, output_type: TypeInformation | None = None) -> DataStream: """Get output datastream of agent execution. Returns: ------- DataStream Output datastream of agent execution. """ if self.__agent_plan is None: err_msg = "Must apply agent before call to_datastream/to_table." raise RuntimeError(err_msg) # return the same output datastream when call to_datastream multiple. if self.__output is None: j_data_stream_output = invoke_method( None, "org.apache.flink.agents.runtime.CompileUtils", "connectToAgent", [ self.__input._j_data_stream, self.__agent_plan.model_dump_json(serialize_as_any=True), ], [ "org.apache.flink.streaming.api.datastream.KeyedStream", "java.lang.String", ], ) output_stream = DataStream(j_data_stream_output) self.__output = output_stream.map( lambda x: cloudpickle.loads(x), output_type=output_type ) return self.__output def to_table(self, schema: Schema, output_type: TypeInformation) -> Table: """Get output Table of agent execution. Parameters ---------- schema : Schema Indicate schema of the output table. output_type : TypeInformation Indicate schema corresponding type information. Returns: ------- Table Output Table of agent execution. """ return self.t_env.from_data_stream(self.to_datastream(output_type), schema) def to_list(self) -> List[Dict[str, Any]]: """Get output list of agent execution. This method is not supported for remote execution environments. """ msg = "RemoteAgentBuilder does not support to_list." raise NotImplementedError(msg)
复制代码 RemoteExecutionEnvironment 的代码如下。- class RemoteExecutionEnvironment(AgentsExecutionEnvironment): """Implementation of AgentsExecutionEnvironment for execution with DataStream.""" __env: StreamExecutionEnvironment __t_env: StreamTableEnvironment __config: AgentConfiguration def __init__( self, env: StreamExecutionEnvironment, t_env: StreamTableEnvironment | None = None, ) -> None: """Init method of RemoteExecutionEnvironment.""" super().__init__() self.__env = env self.__t_env = t_env self.__config = AgentConfiguration() self.__load_config_from_flink_conf_dir() @property def t_env(self) -> StreamTableEnvironment: """Get or crate table environment.""" if self.__t_env is None: self.__t_env = StreamTableEnvironment.create( stream_execution_environment=self.__env ) return self.__t_env def get_config(self, path: str | None = None) -> AgentConfiguration: """Get the writable configuration for flink agents. Returns: ------- LocalConfiguration The configuration for flink agents. """ return self.__config @staticmethod def __process_input_datastream( input: DataStream, key_selector: KeySelector | Callable | None = None ) -> KeyedStream: if isinstance(input, KeyedStream): return input else: if key_selector is None: msg = "KeySelector must be provided." raise RuntimeError(msg) input = input.key_by(key_selector) return input def from_datastream( self, input: DataStream, key_selector: KeySelector | Callable | None = None ) -> RemoteAgentBuilder: """Set input datastream of agent. Parameters ---------- input : DataStream Receive a DataStream as input. key_selector : KeySelector Extract key from each input record, must not be None when input is not KeyedStream. """ input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( input=input, config=self.__config, t_env=self.__t_env, resources=self.resources, ) def from_table( self, input: Table, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input Table of agent. Parameters ---------- input : Table Receive a Table as input. key_selector : KeySelector Extract key from each input record. """ input = self.t_env.to_data_stream(table=input) input = input.map(lambda x: x, output_type=PickledBytesTypeInfo()) input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( input=input, config=self.__config, t_env=self.t_env, resources=self.resources, ) def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment": """Set input list of agent execution. This method is not supported for remote execution environments. """ msg = "RemoteExecutionEnvironment does not support from_list." raise NotImplementedError(msg) def execute(self) -> None: """Execute agent.""" self.__env.execute() def __load_config_from_flink_conf_dir(self) -> None: """Load agent configuration from FLINK_CONF_DIR if available.""" flink_conf_dir = os.environ.get("FLINK_CONF_DIR") if flink_conf_dir is None: return # Try to find config file, with fallback to legacy name config_path = self.__find_config_file(flink_conf_dir) if config_path is None: logging.error(f"Config file not found in {flink_conf_dir}") else: self.__config.load_from_file(str(config_path)) def __find_config_file(self, flink_conf_dir: str) -> Path | None: """Find config file in the given directory, checking both new and legacy names. Parameters ---------- flink_conf_dir : str Directory to search for config files. Returns: ------- Path | None Path to the config file if found, None otherwise. """ # Try legacy config file name first legacy_config_path = Path(flink_conf_dir).joinpath(_LEGACY_CONFIG_FILE_NAME) if legacy_config_path.exists(): logging.warning( f"Using legacy config file {_LEGACY_CONFIG_FILE_NAME}" ) return legacy_config_path # Try new config file name as fallback primary_config_path = Path(flink_conf_dir).joinpath(_CONFIG_FILE_NAME) if primary_config_path.exists(): return primary_config_path return None
复制代码 0xFF 参考
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |