找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:如何生成JobGraph

Flink源码阅读:如何生成JobGraph

钨哄魁 昨天 23:50
前文我们介绍了 Flink 的四种执行图,并且通过源码了解了 Flink 的 StreamGraph 是怎么生成的,本文我们就一起来看下 Flink 的另一种执行图——JobGraph 是如何生成的。
StreamGraph 和 JobGraph 的区别

在正式开始之前,我们再来回顾一下 StreamGraph 和 JobGraph 的区别。假设我们的任务是建造一座大楼,StreamGraph 就像是设计蓝图,它描述了每个窗户、每根水管的位置和规格,而 JobGraph 像是给到施工队的施工流程图,它描述了每个任务模块,例如先把地基浇筑好,再铺设管线等。总的来说,JobGraph 更偏向执行层面,它是由 StreamGraph 优化而来。
回到 Flink 本身,我们通过一个表格来了解两个图的区别。
   StreamGraph JobGraph     生成阶段 客户端,执行 execute() 时 客户端,提交前由 StreamGraph 转换生成   抽象层级 高层逻辑图,直接对应 API 优化后的执行图,为调度做准备   核心优化 无 主要是算子链优化   节点 StreamNode JobVertex   边 StreamEdge JobEdge   提交对象 无 提交给 JobManager   包含资源 无 包含执行作业所需的 Jar 包、依赖库和资源文件  JobVertex

JobGraph 中的节点是 JobVertex,在 StreamGraph 转换成 JobGraph 的过程中,会将多个节点串联起来,最终生成 JobVertex。
JobVertex包含以下成员变量:
1.png

我们分别看一下这些成员变量及其作用。
1、标识符相关

  1. // JobVertex的id,在作业执行过程中的唯一标识。监控、调度和故障恢复都会使用
  2. private final JobVertexID id;
  3. // operator id列表,按照深度优先顺序存储。operator 的管理、状态分配都会用到
  4. private final List<OperatorIDPair> operatorIDs;
复制代码
2、输入输出相关

  1. // 定义所有的输入边
  2. private final List<JobEdge> inputs = new ArrayList<>();
  3. // 定义所有的输出数据集
  4. private final Map<IntermediateDataSetID, IntermediateDataSet> results = new LinkedHashMap<>();
  5. // 输入分片源,主要用于批处理作业,定义如何将数据分成多个片
  6. private InputSplitSource<?> inputSplitSource;
复制代码
3、执行配置相关

  1. // 并行度,即运行时拆分子任务数量,默认使用全局配置
  2. private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
  3. // 最大并行度
  4. private int maxParallelism = MAX_PARALLELISM_DEFAULT;
  5. // 存储运行时实际执行的类,使 Flink 可以灵活处理不同类型的操作符
  6. // 流任务可以是"org.apache.flink.streaming.runtime.tasks.StreamTask"
  7. // 批任务可以是"org.apache.flink.runtime.operators.BatchTask"
  8. private String invokableClassName;
  9. // 自定义配置
  10. private Configuration configuration;
  11. // 是否是动态设置并发度
  12. private boolean dynamicParallelism = false;
  13. // 是否支持优雅停止
  14. private boolean isStoppable = false;
复制代码
4、资源管理相关

  1. // JobVertex 最小资源需求
  2. private ResourceSpec minResources = ResourceSpec.DEFAULT;
  3. // JobVertex 推荐资源需求
  4. private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
  5. // 用于资源优化,运行不同的 JobVertex 的子任务运行在同一个 slot
  6. @Nullable private SlotSharingGroup slotSharingGroup;
  7. // 需要严格共址的 JobVertex 组,每个 JobVertex 的第 n 个子任务运行在同一个 TaskManager
  8. @Nullable private CoLocationGroupImpl coLocationGroup;
复制代码
5、协调器

  1. // 操作符协调器,用于处理全局协调逻辑
  2. private final List<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators =
  3.             new ArrayList<>();
复制代码
6、显示和描述信息

  1. // JobVertex 的名称
  2. private String name;
  3. // 操作符名称,比如 'Flat Map' 或 'Join'
  4. private String operatorName;
  5. // 操作符的描述,比如 'Hash Join' 或 'Sorted Group Reduce'
  6. private String operatorDescription;
  7. // 提供比 name 更友好的描述信息
  8. private String operatorPrettyName;
复制代码
7、状态和行为标志

  1. // 是否支持同一个子任务并发多次执行
  2. private boolean supportsConcurrentExecutionAttempts = true;
  3. // 标记并发度是否被显式设置
  4. private boolean parallelismConfigured = false;
  5. // 是否有阻塞型输出
  6. private boolean anyOutputBlocking = false;
复制代码
8、缓存数据集

  1. // 存储该 JobVertex 需要消费的缓存中间数据集的 ID,可提高作业执行效率
  2. private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>();
复制代码
JobEdge

在 StreamGraph 中,StreamEdge 是连接 StreamNode 的桥梁。在 JobGraph 中,与之对应的是 JobEdge,不同点在于 JobEdge 中保存的是输入节点和输出结果。
1、连接关系成员

  1. // 定义数据流向哪个 JobVertex
  2. private final JobVertex target;
  3. // 定义这条边的源数据
  4. private final IntermediateDataSet source;
  5. // 输入类型的编号
  6. private final int typeNumber;
  7. // 多个输入间的键是否相关,如果为 true,相同键的数据在一个输入被分割时,在其他数据对应的记录也会发送到相同的下游节点
  8. private final boolean interInputsKeysCorrelated;
  9. // 同一输入内相同的键是否必须发送到同一下游任务
  10. private final boolean intraInputKeyCorrelated;
复制代码
2、数据分发模式

  1. // 定义数据在并行任务期间的分发模式
  2. // 可能值:
  3. // ALL_TO_ALL:全连接,每个上游子任务连接所有下游任务
  4. // POINTWISE:点对点连接,一对一或一对多的本地连接
  5. private final DistributionPattern distributionPattern;
复制代码
3、数据传输策略

  1. // 是否为广播连接
  2. private final boolean isBroadcast;
  3. // 是否为 forward 连接,forward 连接最高效,直接转发,无需序列化网络传输
  4. private final boolean isForward;
  5. // 数据传输策略名称,用于显示
  6. private String shipStrategyName;
复制代码
4、状态重分布映射器

  1. // 下游状态重分布映射器,当作业扩容时,决定是否重新分配下游算子的持久化状态
  2. private SubtaskStateMapper downstreamSubtaskStateMapper = SubtaskStateMapper.ROUND_ROBIN;
  3. // 上游状态重分布映射器,当作业扩容时,决定是否重新分配上游算子的持久化状态
  4. private SubtaskStateMapper upstreamSubtaskStateMapper = SubtaskStateMapper.ROUND_ROBIN;
复制代码
5、描述和缓存信息

  1. // 预处理操作的名称
  2. private String preProcessingOperationName;
  3. // 操作符级别缓存的描述
  4. private String operatorLevelCachingDescription;
复制代码
StreamGraph 转换成 JobGraph

现在我们再来看一下 StreamGraph 是如何转换成 JobGraph 的。转换逻辑的入口是 StreamGraph.getJobGraph 方法。它只是调用了 StreamingJobGraphGenerator.createJobGraph,核心逻辑在 createJobGraph 方法中。
  1. private JobGraph createJobGraph() {
  2.     // 预验证,检查 StreamGraph 配置正确性
  3.     preValidate(streamGraph, userClassloader);
  4.     // 【核心】链化操作符
  5.     setChaining();
  6.     if (jobGraph.isDynamic()) {
  7.         // 支持动态扩缩容场景,为动态图设置并行度
  8.         setVertexParallelismsForDynamicGraphIfNecessary();
  9.     }
  10.     // Note that we set all the non-chainable outputs configuration here because the
  11.     // "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job
  12.     // vertices and partition-reuse
  13.     final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =
  14.             new HashMap<>();
  15.     // 设置不能链化的输出边
  16.     setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext);
  17.     setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
  18.     // 设置物理边连接
  19.     setPhysicalEdges(jobVertexBuildContext);
  20.     // 设置支持并发执行的 JobVertex
  21.     markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
  22.     // 验证混合 shuffle 模式只在批处理模式下使用
  23.     validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
  24.     // 设置 Slot 共享和协同定位
  25.     setSlotSharingAndCoLocation(jobVertexBuildContext);
  26.     // 设置托管内存比例
  27.     setManagedMemoryFraction(jobVertexBuildContext);
  28.     // 为 JobVertex 名称添加前缀
  29.     addVertexIndexPrefixInVertexName(jobVertexBuildContext, new AtomicInteger(0));
  30.     // 设置操作符描述信息
  31.     setVertexDescription(jobVertexBuildContext);
  32.     // Wait for the serialization of operator coordinators and stream config.
  33.     // 序列化操作符协调器和流配置
  34.     serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor, jobVertexBuildContext);
  35.     return jobGraph;
  36. }
复制代码
可以看到,在 createJobGraph 方法中,调用了 setChaining 方法,即进行链化操作。这也是 JobGraph 最核心的优化之一。下面我们来看一下具体怎么做链化。
  1. private void setChaining() {
  2.     // we separate out the sources that run as inputs to another operator (chained inputs)
  3.     // from the sources that needs to run as the main (head) operator.
  4.     final Map<Integer, OperatorChainInfo> chainEntryPoints =
  5.             buildChainedInputsAndGetHeadInputs();
  6.     final Collection<OperatorChainInfo> initialEntryPoints =
  7.             chainEntryPoints.entrySet().stream()
  8.                     .sorted(Comparator.comparing(Map.Entry::getKey))
  9.                     .map(Map.Entry::getValue)
  10.                     .collect(Collectors.toList());
  11.     // iterate over a copy of the values, because this map gets concurrently modified
  12.     for (OperatorChainInfo info : initialEntryPoints) {
  13.         createChain(
  14.                 info.getStartNodeId(),
  15.                 1, // operators start at position 1 because 0 is for chained source inputs
  16.                 info,
  17.                 chainEntryPoints,
  18.                 true,
  19.                 serializationExecutor,
  20.                 jobVertexBuildContext,
  21.                 null);
  22.     }
  23. }
复制代码
setChaining 方法中主要分为两步,第一步是处理 Source 节点,将可以链化的 Source 和不能链化的 Source 节点分开。先来看如何判断一个 Source 是否可被链化。
  1. public static boolean isChainableSource(StreamNode streamNode, StreamGraph streamGraph) {
  2.     // 最基本的一些判空,输出边数量为1
  3.     if (streamNode.getOperatorFactory() == null
  4.             || !(streamNode.getOperatorFactory() instanceof SourceOperatorFactory)
  5.             || streamNode.getOutEdges().size() != 1) {
  6.         return false;
  7.     }
  8.     final StreamEdge sourceOutEdge = streamNode.getOutEdges().get(0);
  9.     final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
  10.     final ChainingStrategy targetChainingStrategy =
  11.             Preconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy();
  12.     // 链化策略必须 HEAD_WITH_SOURCES,输出边是可链化的
  13.     return targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
  14.             && isChainableInput(sourceOutEdge, streamGraph, false);
  15. }
  16. private static boolean isChainableInput(
  17.         StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) {
  18.     StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
  19.     StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
  20.     if (!(streamGraph.isChainingEnabled()
  21.             // 上下游节点是否在同一个 slot 共享组
  22.             && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
  23.             // 操作符是否可以链化,主要做并行度检查
  24.             && areOperatorsChainable(
  25.                     upStreamVertex,
  26.                     downStreamVertex,
  27.                     streamGraph,
  28.                     allowChainWithDefaultParallelism)
  29.             // 分区器和交换模式是否支持链化
  30.             && arePartitionerAndExchangeModeChainable(
  31.                     edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {
  32.         return false;
  33.     }
  34.     // check that we do not have a union operation, because unions currently only work
  35.     // through the network/byte-channel stack.
  36.     // we check that by testing that each "type" (which means input position) is used only once
  37.     // 检查是否为 Union 操作,Union 操作不能链化
  38.     for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
  39.         if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
  40.             return false;
  41.         }
  42.     }
  43.     return true;
  44. }
复制代码
Source 的链化条件主要就是这些,我们结合一些例子来看一下。
  1. Source(并行度=4) -> Map(并行度=4) -> Filter(并行度=4)
  2. Source -> Map 边:
  3. 1. isChainingEnabled() = true
  4. 2. isSameSlotSharingGroup() = true (都在默认组)
  5. 3. areOperatorsChainable() = true (Source可链化,Map是HEAD_WITH_SOURCES)
  6. 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner)
  7. 5. Union检查通过
  8. 结果:可链化
  9. Map -> Filter 边:
  10. 1. isChainingEnabled() = true
  11. 2. isSameSlotSharingGroup() = true
  12. 3. areOperatorsChainable() = true (Map和Filter都是ALWAYS)
  13. 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner)
  14. 5. Union检查通过
  15. 结果:可链化
  16. 最终:Source -> Map -> Filter 三者链化到一个JobVertex中
  17. Source(并行度=2) -> Map(并行度=4)  // 并行度不匹配
  18. Source -> Map 边:
  19. 1. isChainingEnabled() = true
  20. 2. isSameSlotSharingGroup() = true
  21. 3. areOperatorsChainable() = false (并行度不匹配)
  22. 结果:不可链化,需要网络传输
  23. Source1 --\
  24.           Union -> Map
  25. Source2 --/
  26. Source1 -> Union 边:
  27. 虽然满足前4个条件,但Union节点有两个输入边,typeNumber相同
  28. Union检查失败,不可链化
复制代码
得到了所有入口之后,就可以进行后续节点的链化操作了,它的逻辑在 createChain 方法中。这里主要是一个递归过程,先将节点的输出边分为可链化和不可链化两个 list,之后对可链化的边进行递归调用链化。对不可链化的边,需要创建出新的链。由于篇幅原因,这里只贴一部分核心的代码
  1. public static List<StreamEdge> createChain(
  2.         final Integer currentNodeId,
  3.         final int chainIndex,
  4.         final OperatorChainInfo chainInfo,
  5.         final Map<Integer, OperatorChainInfo> chainEntryPoints,
  6.         final boolean canCreateNewChain,
  7.         final Executor serializationExecutor,
  8.         final JobVertexBuildContext jobVertexBuildContext,
  9.         final @Nullable Consumer<Integer> visitedStreamNodeConsumer) {
  10.     ......
  11.         // 拆分可链化边和不可链化边
  12.         for (StreamEdge outEdge : currentNode.getOutEdges()) {
  13.             if (isChainable(outEdge, streamGraph)) {
  14.                 chainableOutputs.add(outEdge);
  15.             } else {
  16.                 nonChainableOutputs.add(outEdge);
  17.             }
  18.         }
  19.         // 处理可链化边
  20.         for (StreamEdge chainable : chainableOutputs) {
  21.             StreamNode targetNode = streamGraph.getStreamNode(chainable.getTargetId());
  22.             Attribute targetNodeAttribute = targetNode.getAttribute();
  23.             if (isNoOutputUntilEndOfInput) {
  24.                 if (targetNodeAttribute != null) {
  25.                     targetNodeAttribute.setNoOutputUntilEndOfInput(true);
  26.                 }
  27.             }
  28.             transitiveOutEdges.addAll(
  29.                     createChain(
  30.                             chainable.getTargetId(),
  31.                             chainIndex + 1,
  32.                             chainInfo,
  33.                             chainEntryPoints,
  34.                             canCreateNewChain,
  35.                             serializationExecutor,
  36.                             jobVertexBuildContext,
  37.                             visitedStreamNodeConsumer));
  38.             // Mark upstream nodes in the same chain as outputBlocking
  39.             if (targetNodeAttribute != null
  40.                     && targetNodeAttribute.isNoOutputUntilEndOfInput()) {
  41.                 currentNodeAttribute.setNoOutputUntilEndOfInput(true);
  42.             }
  43.         }
  44.         // 处理不可链化边
  45.         for (StreamEdge nonChainable : nonChainableOutputs) {
  46.             transitiveOutEdges.add(nonChainable);
  47.             // Used to control whether a new chain can be created, this value is true in the
  48.             // full graph generation algorithm and false in the progressive generation
  49.             // algorithm. In the future, this variable can be a boolean type function to adapt
  50.             // to more adaptive scenarios.
  51.             if (canCreateNewChain) {
  52.                 createChain(
  53.                         nonChainable.getTargetId(),
  54.                         1, // operators start at position 1 because 0 is for chained source
  55.                         // inputs
  56.                         chainEntryPoints.computeIfAbsent(
  57.                                 nonChainable.getTargetId(),
  58.                                 (k) -> chainInfo.newChain(nonChainable.getTargetId())),
  59.                         chainEntryPoints,
  60.                         canCreateNewChain,
  61.                         serializationExecutor,
  62.                         jobVertexBuildContext,
  63.                         visitedStreamNodeConsumer);
  64.             }
  65.         }
  66.         // 创建 JobVertex
  67.         StreamConfig config;
  68.         if (currentNodeId.equals(startNodeId)) {
  69.             JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId);
  70.             if (jobVertex == null) {
  71.                 jobVertex =
  72.                         createJobVertex(
  73.                                 chainInfo, serializationExecutor, jobVertexBuildContext);
  74.             }
  75.             config = new StreamConfig(jobVertex.getConfiguration());
  76.         } else {
  77.             config = new StreamConfig(new Configuration());
  78.         }
  79.         // 判断是否为起始节点,如果不是,将对应的配置信息存到链化起始节点的 key 中
  80.         if (currentNodeId.equals(startNodeId)) {
  81.             chainInfo.setTransitiveOutEdges(transitiveOutEdges);
  82.             jobVertexBuildContext.addChainInfo(startNodeId, chainInfo);
  83.             config.setChainStart();
  84.             config.setChainIndex(chainIndex);
  85.             config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
  86.             config.setTransitiveChainedTaskConfigs(
  87.                     jobVertexBuildContext.getChainedConfigs().get(startNodeId));
  88.         } else {
  89.             config.setChainIndex(chainIndex);
  90.             StreamNode node = streamGraph.getStreamNode(currentNodeId);
  91.             config.setOperatorName(node.getOperatorName());
  92.             jobVertexBuildContext
  93.                     .getOrCreateChainedConfig(startNodeId)
  94.                     .put(currentNodeId, config);
  95.         }
  96.     ......
  97. }
复制代码
是否可链化依赖于 isChainable 方法的结果。它主要判断了下游的输入边数量是否为1,然后调用了 isChainableInput,这个方法我们刚刚已经看过了。
  1. public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
  2.     return isChainable(edge, streamGraph, false);
  3. }
  4. public static boolean isChainable(
  5.         StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) {
  6.     StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
  7.     return downStreamVertex.getInEdges().size() == 1
  8.             && isChainableInput(edge, streamGraph, allowChainWithDefaultParallelism);
  9. }
复制代码
总结

本文我们主要介绍了生成 JobGraph 的相关代码。首先了解了 JobGraph 中的节点和边对应的类,以及它们和 StreamGraph 中的类的映射关系。然后又看了生成 JobGraph 的核心代码,其中重点学习了链化相关的代码。
最后补充一个生成 JobGraph 的调用链路,感兴趣的同学可以看下。
  1. clusterClient.submitJob() → MiniCluster.submitJob() → Dispatcher.submitJob() → JobMasterServiceLeadershipRunnerFactory → DefaultJobMasterServiceFactory → JobMaster → DefaultSchedulerFactory.createInstance()→ StreamGraph.getJobGraph()
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册