找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:如何生成StreamGraph

Flink源码阅读:如何生成StreamGraph

梁丘艷蕙 2 小时前
Flink 中有四种执行图,分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。
在开始读代码之前,我们先来简单介绍一下四种图之间的关系和区别。
1.png

StreamGraph 是根据用户用 Stream API 编写的代码生成的图,用来表示整个程序的拓扑结构。
JobGraph 是由 StreamGraph 生成的,它在 StreamGraph 的基础上,对链化了部分算子,将其合并成为一个节点,减少数据在节点之间传输时序列化和反序列化这些消耗。
ExecutionGraph 是由 JobGraph 生成的,它的主要特点是并行,将多并发的节点拆分。
PhysicalGraph 是 ExecutionGraph 实际部署后的图,它并不是一种数据结构。
StreamExecutionEnvironment

OK,了解了 Flink 四种执行图之后,我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手,在编写 Flink 程序时,它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境(如并行度、Checkpoint 配置、时间属性等)。
本文我们主要关注 StreamGraph 的生成,首先是数据流的入口,即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法,它们用来定义从哪个数据源读取数据,然后返回一个 DataStreamSource (继承自 DataStream),得到 DataStream 之后,它会在各个算子之间流转,最终到 Sink 端输出。
我们从 addSource 方法入手,addSource 方法中主要做了三件事:
1、处理数据类型,优先使用用户执行的数据类型,也可以自动推断
2、闭包清理,使用户传入的 function 能被序列化并发布到分布式环境执行
3、创建 DataStreamSource 并返回
  1. private <OUT> DataStreamSource<OUT> addSource(
  2.         final SourceFunction<OUT> function,
  3.         final String sourceName,
  4.         @Nullable final TypeInformation<OUT> typeInfo,
  5.         final Boundedness boundedness) {
  6.     checkNotNull(function);
  7.     checkNotNull(sourceName);
  8.     checkNotNull(boundedness);
  9.     TypeInformation<OUT> resolvedTypeInfo =
  10.             getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
  11.     boolean isParallel = function instanceof ParallelSourceFunction;
  12.     clean(function);
  13.     final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
  14.     return new DataStreamSource<>(
  15.             this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
  16. }
复制代码
现在我们有了 DataStream 了,那如何知道后续要进行哪些转换逻辑呢?答案在 transformations 这个变量中,它保存了后续所有的转换。
[code]protected final List

相关推荐

您需要登录后才可以回帖 登录 | 立即注册