Flink 中有四种执行图,分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。
在开始读代码之前,我们先来简单介绍一下四种图之间的关系和区别。
StreamGraph 是根据用户用 Stream API 编写的代码生成的图,用来表示整个程序的拓扑结构。
JobGraph 是由 StreamGraph 生成的,它在 StreamGraph 的基础上,对链化了部分算子,将其合并成为一个节点,减少数据在节点之间传输时序列化和反序列化这些消耗。
ExecutionGraph 是由 JobGraph 生成的,它的主要特点是并行,将多并发的节点拆分。
PhysicalGraph 是 ExecutionGraph 实际部署后的图,它并不是一种数据结构。
StreamExecutionEnvironment
OK,了解了 Flink 四种执行图之后,我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手,在编写 Flink 程序时,它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境(如并行度、Checkpoint 配置、时间属性等)。
本文我们主要关注 StreamGraph 的生成,首先是数据流的入口,即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法,它们用来定义从哪个数据源读取数据,然后返回一个 DataStreamSource (继承自 DataStream),得到 DataStream 之后,它会在各个算子之间流转,最终到 Sink 端输出。
我们从 addSource 方法入手,addSource 方法中主要做了三件事:
1、处理数据类型,优先使用用户执行的数据类型,也可以自动推断
2、闭包清理,使用户传入的 function 能被序列化并发布到分布式环境执行
3、创建 DataStreamSource 并返回- private <OUT> DataStreamSource<OUT> addSource(
- final SourceFunction<OUT> function,
- final String sourceName,
- @Nullable final TypeInformation<OUT> typeInfo,
- final Boundedness boundedness) {
- checkNotNull(function);
- checkNotNull(sourceName);
- checkNotNull(boundedness);
- TypeInformation<OUT> resolvedTypeInfo =
- getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
- boolean isParallel = function instanceof ParallelSourceFunction;
- clean(function);
- final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
- return new DataStreamSource<>(
- this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
- }
复制代码 现在我们有了 DataStream 了,那如何知道后续要进行哪些转换逻辑呢?答案在 transformations 这个变量中,它保存了后续所有的转换。
[code]protected final List |