找回密码
 立即注册
首页 业界区 业界 【大数据 & AI】Flink Agents 源码解读 --- (7) --- Ag ...

【大数据 & AI】Flink Agents 源码解读 --- (7) --- AgentsExecutionEnvironment

讲怔 2026-1-15 22:15:01
【大数据 & AI】Flink Agents 源码解读 --- (7) ---  AgentsExecutionEnvironment


目录

  • 【大数据 & AI】Flink Agents 源码解读 --- (7) ---  AgentsExecutionEnvironment

    • 0x00 概要
    • 0x01 基础知识

      • 1.1 定义
      • 1.2 功能
      • 1.3 与原生 Flink Environment 的区别

    • 0x02 LocalExecutionEnvironment

      • 2.1 定义

        • 2.1.1 主要功能
        • 2.1.2 代码
        • 2.1.3 执行流程
        • 2.1.4 组件关系

      • 2.2 LocalRunner

        • 2.2.1 LocalRunner 的主要功能
        • 2.2.2 事件驱动执行模型
        • 2.2.3 AgentPlan 和 LocalRunner 的关系
        • 2.2.4 为什么要为每种 key 维护独立的上下文
        • 2.2.5 实际应用场景示例

      • 2.3 LocalRunnerContext

        • 2.3.1 设计目的
        • 2.3.2 主要功能
        • 2.3.3 定义


    • 0x03 RemoteExecutionEnvironment

      • 3.1 核心特色
      • 3.2 与原生 Flink RemoteEnvironment 的对比
      • 3.3 如何使用 ActionExecutionOperator
      • 3.4 典型流程
      • 3.5 交互逻辑
      • 3.6 实现

    • 0xFF 参考


0x00 概要

AgentsExecutionEnvironment 是在Flink基础上构建的一个更高层次的执行环境,专门为Agent而设计,同时保留了与原生 Flink API 的兼容性。
0x01 基础知识

可以把 Flink 原生的 StreamExecutionEnvironment / TableEnvironment 理解成“Agent 话题里所说的执行环境(Execution Environment)”,但是其无法直接被 Agent 使用。因此需要在其上做一些封装和适配,这就是 AgentsExecutionEnvironment。

  • Flink Environment = 纯粹的计算资源与运行时容器

    • 负责申请 Slot、管理 Checkpoint、调度算子链、网络 Shuffle
    • 对“业务语义”一无所知,也不管用户写的是 ETL、CEP 还是 AI 推理
    • 对应 Agent 词汇表里的“Runtime / Cluster / Engine”这一层

  • Agent Environment = 在 Flink Runtime 之上包了一层语义抽象

    • 替用户注册 Chat Model、Tools、Memory、Actions、Event Schema
    • 把“用户消息”或“传感器事件”封装成 Event,按 AgentPlan 去调大模型、执行业务动作
    • 内部仍然用 StreamExecutionEnvironment 去提交算子,只是看不到显式的 keyBy() 和 process()——框架帮用户生成了 ActionExecutionOperator

类比关系如下:
  1. 传统 Flink 程序├─ StreamExecutionEnvironment   ← 纯运行时├─ your ProcessFunction         ← 用户的业务逻辑└─ DataStreamFlink Agents 程序├─ AgentsExecutionEnvironment   ← Agent 语义环境,内部仍持有 StreamExecutionEnvironment├─ AgentPlan(your business)   ← 定义“遇到啥事件该干啥”└─ ActionExecutionOperator     ← 框架替用户生成的算子,负责调大模型、更新记忆
复制代码
1.1 定义

AgentsExecutionEnvironment 的代码如下。
  1. /** * Base class for agent execution environment. * *
  2. This class provides the main entry point for integrating Flink Agents with different types of * Flink data sources (DataStream, Table, or simple lists). */public abstract class AgentsExecutionEnvironment {    protected final Map resources;    protected AgentsExecutionEnvironment() {        this.resources = new HashMap();        for (ResourceType type : ResourceType.values()) {            this.resources.put(type, new HashMap());        }    }        /**     * Get agents execution environment.     *     *
  3. Factory method that creates an appropriate execution environment based on the provided     * StreamExecutionEnvironment. If no environment is provided, a local execution environment is     * returned for testing and development.     *     *
  4. When integrating with Flink DataStream/Table APIs, users should pass the Flink     * StreamExecutionEnvironment to enable remote execution capabilities.     *     * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local     *     execution environment will be created.     * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion.     * @return AgentsExecutionEnvironment appropriate for the execution context.     */    public static AgentsExecutionEnvironment getExecutionEnvironment(            StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) {        if (env == null) {            // Return local execution environment for testing/development            try {                Class localEnvClass =                        Class.forName(                                "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment");                return (AgentsExecutionEnvironment)                        localEnvClass.getDeclaredConstructor().newInstance();            } catch (Exception e) {                throw new RuntimeException("Failed to create LocalExecutionEnvironment", e);            }        } else {            // Return remote execution environment for Flink integration            try {                Class remoteEnvClass =                        Class.forName(                                "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment");                return (AgentsExecutionEnvironment)                        remoteEnvClass                                .getDeclaredConstructor(                                        StreamExecutionEnvironment.class,                                        StreamTableEnvironment.class)                                .newInstance(env, tEnv);            } catch (Exception e) {                throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e);            }        }    }   
复制代码
1.2 功能

AgentsExecutionEnvironment 的功能如下:

  • 统一入口:

    • 提供getExecutionEnvironment()系列静态工厂方法,可以依据是否传入Flink的 StreamExecutionEnvironment来创建本地或远程执行环境。
    • 支持从不同数据源构建构建Agent管道,包括List,DataStream和Table。

  • 资源配置管理:

    • 内置 resources 映射结构,支持按照资源类型管理各种资源。
    • 提供 addResource() 方法注册可序列化的资源或者资源描述符。

  • 多输入源支持:

    • fromList():支持从简单列表创建本地执行环境。
    • fromDataStream():集成Flink DataStream API
    • fromTable():集成Flink Table API

  • 配置管理:

    • 提供getConfig()抽象方法获取可写的配置对象。

1.3 与原生 Flink Environment 的区别


  • 抽象层级更高:AgentExecutionEnvrionment是对Flink原生环境的封装,在其之上提供了面向Agent的编程模型。主要用于运行Agent,而非 直接处理数据流。
  • 执行模式:AgentExecutionEnvrionment 通过 LocalExecutionEnvironment 和 RemoteExecutionEnvironment 分别支持本地测试和远程集群执行。原生 Flink 主要通过配置参数控制执行模式。
  • 资源管理机制:AgentExecutionEnvrionment 内置了专门的资源注册和管理机制,支持按类型分类管理支援。原生 Flink 没有这种结构化的资源管理方式。
  • API 设计目标:面向 Agent 编程范式,强调事件驱动的自主行为体模型。原生Flink 更关注数据流处理和批处理操作。
0x02 LocalExecutionEnvironment

LocalExecutionEnvironment 是 AgentsExecutionEnvironment 的一种实现形式。
  1. Class localEnvClass = Class.forName(                                "org.apache.flink.agents.runtime.env.LocalExecutionEnvironment");                return (AgentsExecutionEnvironment)                        localEnvClass.getDeclaredConstructor().newInstance();
复制代码
LocalExecutionEnvironment  主要用于

  • 开发阶段的快速测试和调试
  • 无需 Flink 集群即可验证代理逻辑
  • 简化Agent应用的本地开发流程
  • 它与远程执行环境形成对比,后者支持完整的 Flink DataStream 和 Table API 集成。
2.1 定义

2.1.1 主要功能

LocalExecutionEnvironment  的主要功能如下:

  • 本地执行环境实现

    • 集成自 AgentExecutionEnvrionment,为本地测试和开发提供执行环境
    • 不依赖 Flink 集群,可以在本地环境中运行和调试代理

  • 数据源支持

    • 通过from_list方法支持从列表数据源读取输入数据
    • 不支持 Flink 的DataStream 和 Table API(这些在远程执行环境中使用)

  • 资源配置和管理

    • 存储和管理通过 add_resource 方法注册的资源
    • 在构建Agent时,将环境中的资源注入到Agent实例中

  • 代理执行管理

    • 通过  set_agent 方法设置待执行的Agent、输入和输出
    • 使用 LocalRunner  在本地执行代理逻辑
    • 通过 execute 方法触发代理执行

  • 结果收集

    • 收集Agent执行的输出结果
    • 通过 to_list  方法返回执行结果

根据代码分析, execute 函数在以下情况下被调用:

  • 用户手动调用。当用户完成代理配置和输入数据设置后,需要显式调用execute()  方法来启动Agent 执行流程,这通常式配置完所有必要组件后的最后一步。
2.1.2 代码

LocalExecutionEnvironment 的定义如下,其中 LocalRunner 是核心。
  1. class LocalExecutionEnvironment(AgentsExecutionEnvironment):    """Implementation of AgentsExecutionEnvironment for local execution environment."""    __input: List[Dict[str, Any]] = None    __output: List[Any] = None    __runner: LocalRunner = None    __executed: bool = False    __config: AgentConfiguration = AgentConfiguration()            def set_agent(self, input: list, output: list, runner: LocalRunner) -> None:        """Set agent input, output and runner."""        self.__input = input        self.__runner = runner        self.__output = output    def execute(self) -> None:        """Execute agent individually."""        if self.__executed:            err_msg = (                "LocalExecutionEnvironment doesn't support execute multiple times."            )            raise RuntimeError(err_msg)        self.__executed = True        for input in self.__input:            self.__runner.run(**input)        outputs = self.__runner.get_outputs()        for output in outputs:            self.__output.append(output)        
复制代码
2.1.3 执行流程

在 LocalExecutionEnvironment 中,execute 方法会:

  • 遍历所有输入数据项
  • 对每个输入调用 LocalRunner  的run方法进行处理
  • 收集所有输出结果
典型的调用序列如下:
  1. # 1. 获取执行环境env = StreamExecutionEnvironment.get_execution_environment()# 2. 添加所需资源env.add_resource(...)# 3. 设置输入数据output_data = env.from_list(input_data) # 4. 应用代理builder.apply(agent) # 5. 执行代理env.execute()# 6. 获取结果results = builder.to_list()
复制代码
关键特点:

  • 手动触发: execute() 不会自动调用,必须由用户显式调用
  • 一次性执行:只能调用一次,多次调用会抛出异常
  • 阻塞操作:在本地环境中,这是个同步阻塞操作,会等待所有输入处理完成
2.1.4 组件关系

组件关系图如下:
  1. LocalExecutionEnvironmet    ↓    ↓ createsLocalAgentBuilder    ↓    ↓ createsLocalRunner    ↓    ↓ creates & run(data)LocalRunnerContext
复制代码
关系说明:

  • LocalExecutionEnvironmet 是入口点,管理整个执行环境。通过 from_list() 创建LocalAgentBuilder
  • LocalAgentBuilder 负责构建Agent 执行管道,通过 apply() 创建 LocalRunner
  • LocalRunner 是实际的执行器
  • LocalRunner 为每个处理的记录创建 LocalRunnerContext
2.2 LocalRunner

LocalRunner 是本地执行环境中的核心执行器,负责实际运行Agent逻辑。LocalRunner 提供了一个完整的本地执行环境,模拟了Flink 流处理的行为,使得Agent可以在本地进行开发和测试。
2.2.1 LocalRunner 的主要功能

LocalRunner 的主要功能:

  • 代理执行:将Agent转换为可执行的计划并执行
  • 上下文管理:为每个处理单元创建和管理 LocalRunnerContext
  • 事件处理:管理事件队列并驱动Agent的执行流程
  • 结果收集:收集和存储Agent执行的输出结果
run函数是LocalRunner的核心方法,负责处理单个输入记录:

  • 从输入数据中提取键值,用于上下文管理
  • 为每个键创建或复用 LocalRunnerContext
  • 将输入数据包装成 InputEvent 并发送到事件队列
  • 事件循环处理,LocalRunner 的 run 函数使用 while 循环是为了处理事件驱动的代理执行模型。这种设计反映了代理执行的核心机制

    • 持续处理事件队列直到为空
    • 如果是输出事件、收集结果
    • 根据事件类型查找对应的动作
    • 执行动作
    • 处理异步处理结果

  1.     @override    def run(self, **data: Dict[str, Any]) -> Any:        """Execute the agent to process the given data.        Parameters        ----------        **data : dict[str, Any]            input record from upstream.        Returns:        -------        key            The key of the input that was processed.        """        if "key" in data:            key = data["key"]        elif "k" in data:            key = data["k"]        else:            key = uuid.uuid4()        if key not in self.__keyed_contexts:            self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan, key, self.__config)        context = self.__keyed_contexts[key]        if "value" in data:            input_event = InputEvent(input=data["value"])        elif "v" in data:            input_event = InputEvent(input=data["v"])        else:            msg = "Input data must be dict has 'v' or 'value' field"            raise RuntimeError(msg)        context.send_event(input_event)        while len(context.events) > 0:            event = context.events.popleft()            if isinstance(event, OutputEvent):                self.__outputs.append({key: event.output})                continue            event_type = f"{event.__class__.__module__}.{event.__class__.__name__}"            for action in self.__agent_plan.get_actions(event_type):                context.action_name = action.name                func_result = action.exec(event, context)                if isinstance(func_result, Generator):                    try:                        for _ in func_result:                            pass                    except Exception:                        logger.exception("Error in async execution")                        raise        return key
复制代码
2.2.2 事件驱动执行模型

在 Flink Agents 框架中,代理的执行是基于事件的。每个动作 (action)处理一个事件并可能产生新的事件,这些新事件又会触发其他动作的执行。因此需要一个循环来持续处理事件队列中的事件,直到没有更多事件需要处理。
这种设计使得代理可以处理复杂的、多步骤的交互过程,而不需要预先确定执行步骤的数量。while 循环确保所有相关的事件都被处理完毕,形成了一个完整的事件处理链:

  • 持续从事件队列 context.events 中取出事件进行处理
  • 每个事件可能触发一个或多个动作的执行
  • 动作执行可能产生新的事件,这些事件被添加回队列
递归事件处理:

  • 初始时只有一个 InputEvent
  • 动作处理 InputEvent 可能产生 ChatRequestEvent 等中间事件
  • 中间事件可能触发其他动作,产生更多事件
  • 最终产生 OutputEvent 完成处理
完整处理链:
  1. InputEvent  → →   start_action  → →   ChatRequestEvent  →  →  LLM处理  → →  ChatResponseEvent  → →   stop_action  → →   OutputEvent
复制代码
具体执行流程

  • 初始化:

    • 将输入数据包装成 InputEvent 并添加到事件队列

  • 循环处理:

    • 从队列取出事件
    • 根据事件类型找到对应的 actions
    • 执行所有监听该事件类型的动作
    • 动作可能通过 send_event 发送新事件到队列

  • 终止条件:

    • 当事件队列为空时,处理完成

2.2.3 AgentPlan 和 LocalRunner 的关系


  • LocalRunner 在初始化时候使用 AgentPlan
  • 在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作
  • 在事件处理中查找对应的动作
具体为:LocalRunner 在初始化时候使用 AgentPlan
  1. class LocalRunner(AgentRunner):    """Agent runner implementation for local execution, which is    convenient for debugging.    Attributes:    ----------    __agent_plan : AgentPlan        Internal agent plan.    __keyed_contexts : dict[Any, LocalRunnerContext]        Dictionary of active contexts indexed by key.    __outputs:        Outputs generated by agent execution.    __config:        Internal configration.    """    __agent_plan: AgentPlan    __keyed_contexts: Dict[Any, LocalRunnerContext]    __outputs: List[Dict[str, Any]]    __config: AgentConfiguration    def __init__(self, agent: Agent, config: AgentConfiguration) -> None:        """Initialize the runner with the provided agent.        Parameters        ----------        agent : Agent            The agent class to convert and run.        """        self.__agent_plan = AgentPlan.from_agent(agent, config)        self.__keyed_contexts = {}        self.__outputs = []        self.__config = config
复制代码
具体为:在执行过程中,LocalRunnerContext 通过 AgentPlan 获取资源和动作
  1. class LocalRunnerContext(RunnerContext):    """Implementation of RunnerContext for local agent execution.    Attributes:    ----------    __agent_plan : AgentPlan        Internal agent plan for this context.    __key : Any        Unique identifier for the context, correspond to the key in flink KeyedStream.    events : deque[Event]        Queue of events to be processed in this context.    action_name: str        Name of the action being executed.    """    __agent_plan: AgentPlan    __key: Any    events: deque[Event]    action_name: str    _store: dict[str, Any]    _short_term_memory: MemoryObject    _config: AgentConfiguration    def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None:        """Initialize a new context with the given agent and key.        Parameters        ----------        agent_plan : AgentPlan            Agent plan used for this context.        key : Any            Unique context identifier, which is corresponding to the key in flink            KeyedStream.        """        self.__agent_plan = agent_plan        self.__key = key        self.events = deque()        self._store = {}        self._short_term_memory = LocalMemoryObject(            self._store, LocalMemoryObject.ROOT_KEY        )        self._config = config            @override    def get_resource(self, name: str, type: ResourceType) -> Resource:        return self.__agent_plan.get_resource(name, type)        
复制代码
具体为:在事件处理中查找对应的动作
  1.             for action in self.__agent_plan.get_actions(event_type):                context.action_name = action.name                func_result = action.exec(event, context) # 执行                if isinstance(func_result, Generator):                    try:                        for _ in func_result:                            pass                    except Exception:                        logger.exception("Error in async execution")                        raise
复制代码
2.2.4 为什么要为每种 key 维护独立的上下文

在 LocalRunner 中,每种 key 都有自己独立的上下文是为了模拟 Flink 流处理环境中 keyed state 的行为,并支持状态隔离和并发处理。

  • 状态隔离:每个 key 对应的数据流需要维护自己的状态,避免不同 key 的数据相互干扰:
  1. # 不同 key 的状态完全隔离key1_context.short_term_memory.set("user_name", "Alice")key2_context.short_term_memory.set("user_name", "Bob")# 两个 key 有不同的状态值,互不影响
复制代码

  • 模拟 Flink 的 KeyedStream 行为:在 Flink 中,当使用 keyBy() 操作时,每个 key 会有独立的状态管理。LocalRunner 通过这种方式模拟了相同的行为:

    • 相同 key 的事件会在同一个上下文中处理
    • 不同 key 的事件拥有各自独立的内存和事件队列
    这样保证了本地调试环境与实际 Flink 环境的一致性

  • 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:
  • 支持并发处理:虽然 LocalRunner 是单线程运行的,但它模拟了多 key 并发处理的情况:
  1. # 可以同时处理多个 key 的数据for input_record in inputs:    runner.run(**input_record)  # 每个 key 有独立上下文
复制代码

  • 事件队列隔离:每个 key 都有自己的事件队列(events),这样可以保证:

    • 同一 key 的事件按顺序处理
    • 不同 key 的事件不会互相影响处理顺序
    • 每个 key 可以独立地维护待处理事件队列

  • 内存状态管理:每个 LocalRunnerContext 都有自己的短期记忆对象(_short_term_memory),允许:
  1. # 每个 key 可以存储和检索自己的状态context.short_term_memory.set("step_count", 1)context.short_term_memory.get("step_count")  # 获取该 key 特定的状态
复制代码
2.2.5 实际应用场景示例

考虑一个客户服务聊天机器人应用:
  1. # 用户 Alice 和 Bob 的对话分别用不同的 key 处理runner.run(key="user_alice", value={"message": "Hello"})runner.run(key="user_bob", value={"message": "Hi there"})# 每个用户的对话历史和状态都独立保存# Alice 的上下文不会影响 Bob 的上下文,反之亦然
复制代码
这种设计使 LocalRunner 能够准确模拟真实分布式环境中的行为,方便开发者在本地测试和调试复杂的状态依赖型代理应用。
2.3 LocalRunnerContext

LocalRunnerContext 是 Flink Agents 框架中用于本地执行环境的运行上下文实现。它的主要功能是为每个 key 提供独立的执行环境,模拟 Flink 分布式环境中 keyed state 的行为,确保了在本地环境中能够准确模拟基于 key 的状态管理和事件处理流程。
2.3.1 设计目的

LocalRunnerContext 的设计主要是为了:

  • 本地调试:在本地环境中模拟 Flink 分布式执行的行为
  • 状态隔离:确保不同 key 的处理状态完全隔离,避免相互干扰
  • 行为一致性:保证本地测试环境与实际 Flink 执行环境的行为一致
  • 简化开发:提供与生产环境相同的 API 接口,方便开发者测试和调试
2.3.2 主要功能


  • 状态管理

    • 为每个 key 维护独立的短期内存状态(_short_term_memory)
    • 提供内存对象的读写操作,确保不同 key 的状态隔离

  • 事件处理

    • 维护每个 key 的事件队列(events)
    • 提供 send_event 方法将事件添加到处理队列中
    • 记录事件处理日志,便于调试

  • 资源访问

    • 提供 get_resource 方法访问 agent 所需的资源(如模型、工具等)
    • 根据资源名称和类型获取相应的资源实例

  • 配置管理

    • 提供对 action 配置的访问(action_config, get_action_config_value)
    • 提供全局配置信息(config)

  • 度量和监控

    • 提供度量组访问接口(agent_metric_group, action_metric_group)
    • 目前在本地环境中尚未完全实现

  • 异步执行支持

    • 提供 execute_async 方法支持异步函数执行
    • 在本地环境中降级为同步执行并给出警告

2.3.3 定义
  1. class LocalRunnerContext(RunnerContext):    """Implementation of RunnerContext for local agent execution.    Attributes:    ----------    __agent_plan : AgentPlan        Internal agent plan for this context.    __key : Any        Unique identifier for the context, correspond to the key in flink KeyedStream.    events : deque[Event]        Queue of events to be processed in this context.    action_name: str        Name of the action being executed.    """    __agent_plan: AgentPlan    __key: Any    events: deque[Event]    action_name: str    _store: dict[str, Any]    _short_term_memory: MemoryObject    _config: AgentConfiguration    def __init__(self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration) -> None:        """Initialize a new context with the given agent and key.        Parameters        ----------        agent_plan : AgentPlan            Agent plan used for this context.        key : Any            Unique context identifier, which is corresponding to the key in flink            KeyedStream.        """        self.__agent_plan = agent_plan        self.__key = key        self.events = deque()        self._store = {}        self._short_term_memory = LocalMemoryObject(            self._store, LocalMemoryObject.ROOT_KEY        )        self._config = config    @property    def key(self) -> Any:        """Get the unique identifier for this context.        Returns:        -------        Any            The unique identifier for this context.        """        return self.__key    @override    def send_event(self, event: Event) -> None:        """Send an event to the context's event queue and log it.        Parameters        ----------        event : Event            The event to be added to the queue.        """        logger.info("key: %s, send_event: %s", self.__key, event)        self.events.append(event)    @override    def get_resource(self, name: str, type: ResourceType) -> Resource:        return self.__agent_plan.get_resource(name, type)    @property    @override    def action_config(self) -> Dict[str, Any]:        """Get config of the action."""        return self.__agent_plan.get_action_config(action_name=self.action_name)    @override    def get_action_config_value(self, key: str) -> Any:        """Get config option value of the key."""        return self.__agent_plan.get_action_config_value(            action_name=self.action_name, key=key        )    @property    @override    def short_term_memory(self) -> MemoryObject:        """Get the short-term memory object associated with this context.        Returns:        -------        MemoryObject            The root object of the short-term memory.        """        return self._short_term_memory    @property    @override    def agent_metric_group(self) -> MetricGroup:        # TODO: Support metric mechanism for local agent execution.        err_msg = "Metric mechanism is not supported for local agent execution yet."        raise NotImplementedError(err_msg)    @property    @override    def action_metric_group(self) -> MetricGroup:        # TODO: Support metric mechanism for local agent execution.        err_msg = "Metric mechanism is not supported for local agent execution yet."        raise NotImplementedError(err_msg)    def execute_async(        self,        func: Callable[[Any], Any],        *args: Tuple[Any, ...],        **kwargs: Dict[str, Any],    ) -> Any:        """Asynchronously execute the provided function. Access to memory        is prohibited within the function.        """        logger.warning(            "Local runner does not support asynchronous execution; falling back to synchronous execution."        )        func_result = func(*args, **kwargs)        yield        return func_result    @property    @override    def config(self) -> AgentConfiguration:        return self._config
复制代码
0x03 RemoteExecutionEnvironment

RemoteExecutionEnvironment 是 Flink Agents 对原生 Flink RemoteEnvironment 的封装(适配 Agent 语义),是 Agent(智能体)与 Flink 集群融合的核心载体,核心目标是让 Agent 能够在 Flink 集群中处理 DataStream 和 Table 类型的流式数据,具体承担以下关键职责:

  • RemoteExecutionEnvironment 是 Flink Agents 实现 Agent 远程执行的核心环境组件,封装了远程集群连接、作业提交、资源管理等能力;
  • 核心价值是屏蔽 Flink 远程执行的底层细节,让用户聚焦 Agent 逻辑定义,而非集群操作;
  • 关键特性是兼容本地调试与远程执行,同时适配 Agent 特有的状态、事件、资源需求,是 Flink Agents 从 “本地单机” 走向 “分布式集群” 的核心支撑。
  • 桥接 Agent 框架与 Flink 运行时:将 Agent 的执行逻辑嵌入 Flink 的 StreamExecutionEnvironment/StreamTableEnvironment,使 Agent 能利用 Flink 的分布式计算能力处理流式数据;
  • 标准化 Agent 执行流程:提供统一的入口(fromDataStream/fromTable)、处理(apply Agent)、输出(toDataStream/toTable)接口,规范 Agent 在 Flink 中的数据处理链路;
  • 配置管理:加载 Flink 集群中 Agent 的专属配置文件(config.yaml),为 Agent 执行提供环境配置支撑;
  • 执行调度:最终触发 Flink 作业的执行(env.execute ()),完成 Agent 处理逻辑的分布式运行。
3.1 核心特色

特色维度具体说明环境适配性专门面向 Flink 集群的远程执行场景设计,依赖 Flink 的流执行环境(StreamExecutionEnvironment)和表环境(StreamTableEnvironment),而非本地执行;数据类型聚焦仅支持 Flink 原生的 DataStream/Table 作为输入输出,明确禁用本地场景的 List 类型(fromList/toList),贴合流式计算场景;分层设计采用 “环境类(RemoteExecutionEnvironment)+ 构建器类(RemoteAgentBuilder)” 分层模式:环境类管控全局配置和 Flink 环境,构建器类聚焦单个 Agent 的执行链路;灵活的 Key 选择支持自定义 KeySelector 对输入数据分片,无自定义 Key 时默认使用数据自身作为 Key,适配不同的分布式处理需求;配置解耦从 Flink 配置目录加载 Agent 专属配置,配置文件与代码解耦,便于集群环境下的配置管理;容错与校验包含关键流程校验(如调用 toDataStream 前必须先 apply Agent)、空值处理(TableEnvironment 懒加载)、异常封装(配置加载 / Agent 执行异常统一抛出 RuntimeException);表与流互通支持 Table 与 DataStream 的双向转换(Table 转 DataStream 作为 Agent 输入、DataStream 转 Table 作为输出),适配 Flink SQL / 流处理双场景;资源关联为 Agent 绑定运行时资源(resources),保障 Agent 执行所需的资源依赖;3.2 与原生 Flink RemoteEnvironment 的对比

特性Flink 原生 RemoteEnvironmentFlink Agents RemoteExecutionEnvironment核心定位通用 Flink Job 的远程执行环境Agent 语义专属的远程执行环境封装层级底层 Flink Job 提交上层 Agent/AgentPlan 提交核心适配无 Agent 语义,仅处理通用 JobGraph适配 AgentPlan → JobGraph 编译、Agent 状态管理资源管理通用 Slot / 资源分配按 Agent 实例隔离资源,适配工具 / 动作资源需求事件 / 状态处理无内置事件语义封装 Agent 事件(Event)的跨集群传输、状态序列化3.3 如何使用 ActionExecutionOperator

RemoteExecutionEnvironment 通过 Python 层的 RemoteAgentBuilder.to_datastream() 方法间接使用ActionExecutionOperator,过程为:

  • 用户定义了一个Agent并应用到执行环境中
  • 调用 to_datastream() 方法触发实际的操作符创建
  • 通过 JNI 调用 Java 层的 CompileUtils.connectToAgent() 方法
  • 最终在 Flink 作业图中创建并配置  ActionExecutionOperator
connectToAgent()中会:

  • 接收输入的 Java DATa Stream 对象
  • 接收序列化的 AgentPlan(包含所有动作和资源配置)
  • 创建并连接 ActionExecutionOperator 到数据流处理图中
  1.     // ============================ basic ====================================    /**     * Connects the given KeyedStream to the Flink Agents agent.     *     *
  2. This method accepts a keyed DataStream and applies the specified agent plan to it. The     * source of the input stream determines the data format: Java streams provide Objects, while     * Python streams use serialized byte arrays.     *     * @param keyedInputStream The input keyed DataStream.     * @param agentPlan The agent plan to be executed.     * @param inputIsJava A flag indicating whether the input stream originates from Java. - If     *     true, input and output types are Java Objects. - If false, input and output types are     *     byte[].     * @param  The type of the key used in the keyed DataStream.     * @param  The type of the input data (Object or byte[]).     * @param  The type of the output data (Object or byte[]).     * @return The processed DataStream as the result of the agent.     */    private static  DataStream connectToAgent(            KeyedStream keyedInputStream,            AgentPlan agentPlan,            TypeInformation outTypeInformation,            boolean inputIsJava) {        return (DataStream)                keyedInputStream                        .transform(                                "action-execute-operator",                                outTypeInformation,                                new ActionExecutionOperatorFactory(agentPlan, inputIsJava))                        .setParallelism(keyedInputStream.getParallelism());    }
复制代码
3.4 典型流程

当用户编写基于 RemoteExecutionEnvironment  的应用程序时,典型流程如下:
  1. # 获取Flink执行环境env = StreamExecutionEnvironment.get_execution_environment()# 创建 Agent 执行环境agent_env = AgentsExecutionEnvironment.get_execution_environment(env)# 添加所需资源agent_env.add_resource(...)# 设置输入数据流到Agentinput_stream = agent_env.from_Collection(input_data) builder = agent_env.from_datastream(input_stream)# 应用代理逻辑agent = MyCustomAgent()builder.apply(agent) # 执行代理output_stream = builder.to_datastream()env.execute()
复制代码
3.5 交互逻辑

RemoteExecutionEnvironment、ActionExecutionOperator 和 ActionTask 之间的交互逻辑如下。
这三个组件在 Flink Agents 框架中扮演不同的角色,协同工作来执行 Agent 逻辑:

  • RemoteExecutionEnvironment:提供远程执行环境,负责构建和连接 Agent 到 Flink 数据流
  • ActionExecutionOperator:Flink 流处理操作符,实际执行 Agent 的动作逻辑
  • ActionTask:表示单个动作执行任务的抽象概念
交互流程图解

详细交互流程如下:

  • 初始化阶段


  • 运行时事件处理流程

3.6 实现

RemoteAgentBuilder 的代码如下
  1. class RemoteAgentBuilder(AgentBuilder):    """RemoteAgentBuilder for integrating datastream/table and agent."""    __input: DataStream    __agent_plan: AgentPlan = None    __output: DataStream = None    __t_env: StreamTableEnvironment    __config: AgentConfiguration    __resources: Dict[ResourceType, Dict[str, Any]] = None    def __init__(        self,        input: DataStream,        config: AgentConfiguration,        t_env: StreamTableEnvironment | None = None,        resources: Dict[ResourceType, Dict[str, Any]] | None = None,    ) -> None:        """Init method of RemoteAgentBuilder."""        self.__input = input        self.__t_env = t_env        self.__config = config        self.__resources = resources    @property    def t_env(self) -> StreamTableEnvironment:        """Get or crate table environment."""        if self.__t_env is None:            self.__t_env = StreamTableEnvironment.create(                stream_execution_environment=self.__env            )        return self.__t_env    def apply(self, agent: Agent) -> "AgentBuilder":        """Set agent of execution environment.        Parameters        ----------        agent : Agent            The agent user defined to run in execution environment.        """        if self.__agent_plan is not None:            err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet."            raise RuntimeError(err_msg)        # inspect refer actions and resources from env to agent.        for type, name_to_resource in self.__resources.items():            agent.resources[type] = name_to_resource | agent.resources[type]        self.__agent_plan = AgentPlan.from_agent(agent, self.__config)        return self    def to_datastream(self, output_type: TypeInformation | None = None) -> DataStream:        """Get output datastream of agent execution.        Returns:        -------        DataStream            Output datastream of agent execution.        """        if self.__agent_plan is None:            err_msg = "Must apply agent before call to_datastream/to_table."            raise RuntimeError(err_msg)        # return the same output datastream when call to_datastream multiple.        if self.__output is None:            j_data_stream_output = invoke_method(                None,                "org.apache.flink.agents.runtime.CompileUtils",                "connectToAgent",                [                    self.__input._j_data_stream,                    self.__agent_plan.model_dump_json(serialize_as_any=True),                ],                [                    "org.apache.flink.streaming.api.datastream.KeyedStream",                    "java.lang.String",                ],            )            output_stream = DataStream(j_data_stream_output)            self.__output = output_stream.map(                lambda x: cloudpickle.loads(x), output_type=output_type            )        return self.__output    def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:        """Get output Table of agent execution.        Parameters        ----------        schema : Schema            Indicate schema of the output table.        output_type : TypeInformation            Indicate schema corresponding type information.        Returns:        -------        Table            Output Table of agent execution.        """        return self.t_env.from_data_stream(self.to_datastream(output_type), schema)    def to_list(self) -> List[Dict[str, Any]]:        """Get output list of agent execution.        This method is not supported for remote execution environments.        """        msg = "RemoteAgentBuilder does not support to_list."        raise NotImplementedError(msg)
复制代码
RemoteExecutionEnvironment 的代码如下。
  1. class RemoteExecutionEnvironment(AgentsExecutionEnvironment):    """Implementation of AgentsExecutionEnvironment for execution with DataStream."""    __env: StreamExecutionEnvironment    __t_env: StreamTableEnvironment    __config: AgentConfiguration    def __init__(        self,        env: StreamExecutionEnvironment,        t_env: StreamTableEnvironment | None = None,    ) -> None:        """Init method of RemoteExecutionEnvironment."""        super().__init__()        self.__env = env        self.__t_env = t_env        self.__config = AgentConfiguration()        self.__load_config_from_flink_conf_dir()    @property    def t_env(self) -> StreamTableEnvironment:        """Get or crate table environment."""        if self.__t_env is None:            self.__t_env = StreamTableEnvironment.create(                stream_execution_environment=self.__env            )        return self.__t_env    def get_config(self, path: str | None = None) -> AgentConfiguration:        """Get the writable configuration for flink agents.        Returns:        -------        LocalConfiguration            The configuration for flink agents.        """        return self.__config    @staticmethod    def __process_input_datastream(        input: DataStream, key_selector: KeySelector | Callable | None = None    ) -> KeyedStream:        if isinstance(input, KeyedStream):            return input        else:            if key_selector is None:                msg = "KeySelector must be provided."                raise RuntimeError(msg)            input = input.key_by(key_selector)            return input    def from_datastream(        self, input: DataStream, key_selector: KeySelector | Callable | None = None    ) -> RemoteAgentBuilder:        """Set input datastream of agent.        Parameters        ----------        input : DataStream            Receive a DataStream as input.        key_selector : KeySelector            Extract key from each input record, must not be None when input is            not KeyedStream.        """        input = self.__process_input_datastream(input, key_selector)        return RemoteAgentBuilder(            input=input,            config=self.__config,            t_env=self.__t_env,            resources=self.resources,        )    def from_table(        self,        input: Table,        key_selector: KeySelector | Callable | None = None,    ) -> AgentBuilder:        """Set input Table of agent.        Parameters        ----------        input : Table            Receive a Table as input.        key_selector : KeySelector            Extract key from each input record.        """        input = self.t_env.to_data_stream(table=input)        input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())        input = self.__process_input_datastream(input, key_selector)        return RemoteAgentBuilder(            input=input,            config=self.__config,            t_env=self.t_env,            resources=self.resources,        )    def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment":        """Set input list of agent execution.        This method is not supported for remote execution environments.        """        msg = "RemoteExecutionEnvironment does not support from_list."        raise NotImplementedError(msg)    def execute(self) -> None:        """Execute agent."""        self.__env.execute()    def __load_config_from_flink_conf_dir(self) -> None:        """Load agent configuration from FLINK_CONF_DIR if available."""        flink_conf_dir = os.environ.get("FLINK_CONF_DIR")        if flink_conf_dir is None:            return        # Try to find config file, with fallback to legacy name        config_path = self.__find_config_file(flink_conf_dir)        if config_path is None:            logging.error(f"Config file not found in {flink_conf_dir}")        else:            self.__config.load_from_file(str(config_path))    def __find_config_file(self, flink_conf_dir: str) -> Path | None:        """Find config file in the given directory, checking both new and legacy names.        Parameters        ----------        flink_conf_dir : str            Directory to search for config files.        Returns:        -------        Path | None            Path to the config file if found, None otherwise.        """        # Try legacy config file name first        legacy_config_path = Path(flink_conf_dir).joinpath(_LEGACY_CONFIG_FILE_NAME)        if legacy_config_path.exists():            logging.warning(                f"Using legacy config file {_LEGACY_CONFIG_FILE_NAME}"            )            return legacy_config_path        # Try new config file name as fallback        primary_config_path = Path(flink_conf_dir).joinpath(_CONFIG_FILE_NAME)        if primary_config_path.exists():            return primary_config_path        return None
复制代码
0xFF 参考


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册