找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:如何生成ExecutionGraph

Flink源码阅读:如何生成ExecutionGraph

唯棉坜 昨天 22:50
今天我们一起来了解 Flink 最后一种执行图,ExecutionGraph 的执行过程。
基本概念

在阅读源码之前,我们先来了解一下 ExecutionGraph 中的一些基本概念。

  • ExecutionJobVertex: ExecutionJobVertex 是 ExecutionGraph 中的节点,对应的是 JobGraph 中的 JobVertex。
  • ExecutionVertex: 每个 ExecutionJobVertex 都包含了一组 ExecutionVertex,ExecutionVertex 的数量就是节点对应的并行度。
  • IntermediateResult: IntermediateResult 表示节点的输出结果,与之对应的是 JobGraph 中的 IntermediateDataSet。
  • IntermediateResultPartition: IntermediateResultPartition 是每个 ExecutionVertex 的输出。
  • EdgeManager: EdgeManager 主要负责存储 ExecutionGraph 中所有之间的连接,包括其并行度。
  • Execution: Execution 可以认为是一次实际的运行尝试。每次执行时,Flink 都会将ExecutionVertex 封装成一个 Execution,并通过一个 ExecutionAttemptID 来做唯一标识。
ExecutionGraph 生成过程

了解了这些基本概念之后,我们一起来看一下 ExecutionGraph 的具体生成过程。生成 ExecutionGraph 的代码入口是 DefaultExecutionGraphBuilder.build 方法。
首先是获取一些基本信息,包括 jobInformation、jobStatusChangedListeners 等。
接下来就是创建一个 DefaultExecutionGraph 和生成执行计划。
  1. // create a new execution graph, if none exists so far
  2. final DefaultExecutionGraph executionGraph =
  3.         new DefaultExecutionGraph(
  4.                 jobInformation,
  5.                 futureExecutor,
  6.                 ioExecutor,
  7.                 rpcTimeout,
  8.                 executionHistorySizeLimit,
  9.                 classLoader,
  10.                 blobWriter,
  11.                 partitionGroupReleaseStrategyFactory,
  12.                 shuffleMaster,
  13.                 partitionTracker,
  14.                 executionDeploymentListener,
  15.                 executionStateUpdateListener,
  16.                 initializationTimestamp,
  17.                 vertexAttemptNumberStore,
  18.                 vertexParallelismStore,
  19.                 isDynamicGraph,
  20.                 executionJobVertexFactory,
  21.                 jobGraph.getJobStatusHooks(),
  22.                 markPartitionFinishedStrategy,
  23.                 taskDeploymentDescriptorFactory,
  24.                 jobStatusChangedListeners,
  25.                 executionPlanSchedulingContext);
  26. try {
  27.     executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));
  28. } catch (Throwable t) {
  29.     log.warn("Cannot create plan for job", t);
  30.     // give the graph an empty plan
  31.     executionGraph.setPlan(new JobPlanInfo.Plan("", "", "", new ArrayList<>()));
  32. }
复制代码
下面就是两个比较核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。
  1. // topologically sort the job vertices and attach the graph to the existing one
  2. List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
  3. executionGraph.attachJobGraph(sortedTopology, jobManagerJobMetricGroup);
复制代码
这两个方法是先将 JobVertex 进行排序,然后构建 ExecutionGraph 的拓扑图。
getVerticesSortedTopologicallyFromSources
  1. public List<JobVertex> getVerticesSortedTopologicallyFromSources()
  2.         throws InvalidProgramException {
  3.     // early out on empty lists
  4.     if (this.taskVertices.isEmpty()) {
  5.         return Collections.emptyList();
  6.     }
  7.     List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
  8.     Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
  9.     // start by finding the vertices with no input edges
  10.     // and the ones with disconnected inputs (that refer to some standalone data set)
  11.     {
  12.         Iterator<JobVertex> iter = remaining.iterator();
  13.         while (iter.hasNext()) {
  14.             JobVertex vertex = iter.next();
  15.             if (vertex.isInputVertex()) {
  16.                 sorted.add(vertex);
  17.                 iter.remove();
  18.             }
  19.         }
  20.     }
  21.     int startNodePos = 0;
  22.     // traverse from the nodes that were added until we found all elements
  23.     while (!remaining.isEmpty()) {
  24.         // first check if we have more candidates to start traversing from. if not, then the
  25.         // graph is cyclic, which is not permitted
  26.         if (startNodePos >= sorted.size()) {
  27.             throw new InvalidProgramException("The job graph is cyclic.");
  28.         }
  29.         JobVertex current = sorted.get(startNodePos++);
  30.         addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
  31.     }
  32.     return sorted;
  33. }
复制代码
这段代码是将所有的节点进行排序,先将所有的 Source 节点筛选出来,然后再将剩余节点假如列表。这样就能构建出最终的拓扑图。
attachJobGraph
  1. @Override
  2. public void attachJobGraph(
  3.         List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)
  4.         throws JobException {
  5.     assertRunningInJobMasterMainThread();
  6.     LOG.debug(
  7.             "Attaching {} topologically sorted vertices to existing job graph with {} "
  8.                     + "vertices and {} intermediate results.",
  9.             verticesToAttach.size(),
  10.             tasks.size(),
  11.             intermediateResults.size());
  12.     attachJobVertices(verticesToAttach, jobManagerJobMetricGroup);
  13.     if (!isDynamic) {
  14.         initializeJobVertices(verticesToAttach);
  15.     }
  16.     // the topology assigning should happen before notifying new vertices to failoverStrategy
  17.     executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
  18.     partitionGroupReleaseStrategy =
  19.             partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
  20. }
复制代码
attachJobGraph 方法主要包含两步逻辑,第一步是调用 attachJobVertices 方法创建 ExecutionJobVertex 实例,第二步是调用 fromExecutionGraph 创建一些其他的核心对象。
attachJobVertices
attachJobVertices 方法中就是遍历所有的 JobVertex,然后利用 JobVertex 生成 ExecutionJobVertex。
  1. /** Attach job vertices without initializing them. */
  2. private void attachJobVertices(
  3.         List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
  4.         throws JobException {
  5.     for (JobVertex jobVertex : topologicallySorted) {
  6.         if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
  7.             this.isStoppable = false;
  8.         }
  9.         VertexParallelismInformation parallelismInfo =
  10.                 parallelismStore.getParallelismInfo(jobVertex.getID());
  11.         // create the execution job vertex and attach it to the graph
  12.         ExecutionJobVertex ejv =
  13.                 executionJobVertexFactory.createExecutionJobVertex(
  14.                         this,
  15.                         jobVertex,
  16.                         parallelismInfo,
  17.                         coordinatorStore,
  18.                         jobManagerJobMetricGroup);
  19.         ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
  20.         if (previousTask != null) {
  21.             throw new JobException(
  22.                     String.format(
  23.                             "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
  24.                             jobVertex.getID(), ejv, previousTask));
  25.         }
  26.         this.verticesInCreationOrder.add(ejv);
  27.         this.numJobVerticesTotal++;
  28.     }
  29. }
复制代码
initializeJobVertices
在 DefaultExecutionGraph.initializeJobVertices 中是遍历了刚刚排好序的 JobVertex,获取了 ExecutionJobVertex 之后调用了 ExecutionGraph.initializeJobVertex 方法。
我们直接来看 ExecutionGraph.initializeJobVertex 的逻辑。
  1. default void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
  2.         throws JobException {
  3.     initializeJobVertex(
  4.             ejv,
  5.             createTimestamp,
  6.             VertexInputInfoComputationUtils.computeVertexInputInfos(
  7.                     ejv, getAllIntermediateResults()::get));
  8. }
复制代码
这里先是调用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法,生成了 Map jobVertexInputInfos。它表示的是每个 ExecutionVertex 消费上游 IntermediateResultPartition 的范围。
这里有两种模式,分别是 POINTWISE (点对点)和 ALL_TO_ALL(全对全)
在 POINTWISE 模式中,会按照尽量均匀分布的方式处理。

  • 例如上游并发度是4,下游并发度是2时,那么前两个 IntermediateResultPartition 就会被第一个 ExecutionVertex 消费,后两个 IntermediateResultPartition 就会被第二个 ExecutionVertex 消费。
  • 如果上游并发度是2,下游是3时,那么下游前两个 IntermediateResultPartition 会被第一个 ExecutionVertex 消费,第三个 IntermediateResultPartition 则会被第二个 ExecutionVertex 消费。
  1. public static JobVertexInputInfo computeVertexInputInfoForPointwise(
  2.         int sourceCount,
  3.         int targetCount,
  4.         Function<Integer, Integer> numOfSubpartitionsRetriever,
  5.         boolean isDynamicGraph) {
  6.     final List<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<>();
  7.     if (sourceCount >= targetCount) {
  8.         for (int index = 0; index < targetCount; index++) {
  9.             int start = index * sourceCount / targetCount;
  10.             int end = (index + 1) * sourceCount / targetCount;
  11.             IndexRange partitionRange = new IndexRange(start, end - 1);
  12.             IndexRange subpartitionRange =
  13.                     computeConsumedSubpartitionRange(
  14.                             index,
  15.                             1,
  16.                             () -> numOfSubpartitionsRetriever.apply(start),
  17.                             isDynamicGraph,
  18.                             false,
  19.                             false);
  20.             executionVertexInputInfos.add(
  21.                     new ExecutionVertexInputInfo(index, partitionRange, subpartitionRange));
  22.         }
  23.     } else {
  24.         for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
  25.             int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
  26.             int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
  27.             int numConsumers = end - start;
  28.             IndexRange partitionRange = new IndexRange(partitionNum, partitionNum);
  29.             // Variable used in lambda expression should be final or effectively final
  30.             final int finalPartitionNum = partitionNum;
  31.             for (int i = start; i < end; i++) {
  32.                 IndexRange subpartitionRange =
  33.                         computeConsumedSubpartitionRange(
  34.                                 i,
  35.                                 numConsumers,
  36.                                 () -> numOfSubpartitionsRetriever.apply(finalPartitionNum),
  37.                                 isDynamicGraph,
  38.                                 false,
  39.                                 false);
  40.                 executionVertexInputInfos.add(
  41.                         new ExecutionVertexInputInfo(i, partitionRange, subpartitionRange));
  42.             }
  43.         }
  44.     }
  45.     return new JobVertexInputInfo(executionVertexInputInfos);
  46. }
复制代码
在 ALL_TO_ALL 模式中,每个下游都会消费所有上游的数据。
  1. public static JobVertexInputInfo computeVertexInputInfoForAllToAll(
  2.         int sourceCount,
  3.         int targetCount,
  4.         Function<Integer, Integer> numOfSubpartitionsRetriever,
  5.         boolean isDynamicGraph,
  6.         boolean isBroadcast,
  7.         boolean isSingleSubpartitionContainsAllData) {
  8.     final List<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<>();
  9.     IndexRange partitionRange = new IndexRange(0, sourceCount - 1);
  10.     for (int i = 0; i < targetCount; ++i) {
  11.         IndexRange subpartitionRange =
  12.                 computeConsumedSubpartitionRange(
  13.                         i,
  14.                         targetCount,
  15.                         () -> numOfSubpartitionsRetriever.apply(0),
  16.                         isDynamicGraph,
  17.                         isBroadcast,
  18.                         isSingleSubpartitionContainsAllData);
  19.         executionVertexInputInfos.add(
  20.                 new ExecutionVertexInputInfo(i, partitionRange, subpartitionRange));
  21.     }
  22.     return new JobVertexInputInfo(executionVertexInputInfos);
  23. }
复制代码
生成好了 jobVertexInputInfos 之后,我们再回到 DefaultExecutionGraph.initializeJobVertex 方法中。
  1. @Override
  2. public void initializeJobVertex(
  3.         ExecutionJobVertex ejv,
  4.         long createTimestamp,
  5.         Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)
  6.         throws JobException {
  7.     checkNotNull(ejv);
  8.     checkNotNull(jobVertexInputInfos);
  9.     jobVertexInputInfos.forEach(
  10.             (resultId, info) ->
  11.                     this.vertexInputInfoStore.put(ejv.getJobVertexId(), resultId, info));
  12.     ejv.initialize(
  13.             executionHistorySizeLimit,
  14.             rpcTimeout,
  15.             createTimestamp,
  16.             this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),
  17.             executionPlanSchedulingContext);
  18.     ejv.connectToPredecessors(this.intermediateResults);
  19.     for (IntermediateResult res : ejv.getProducedDataSets()) {
  20.         IntermediateResult previousDataSet =
  21.                 this.intermediateResults.putIfAbsent(res.getId(), res);
  22.         if (previousDataSet != null) {
  23.             throw new JobException(
  24.                     String.format(
  25.                             "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
  26.                             res.getId(), res, previousDataSet));
  27.         }
  28.     }
  29.     registerExecutionVerticesAndResultPartitionsFor(ejv);
  30.     // enrich network memory.
  31.     SlotSharingGroup slotSharingGroup = ejv.getSlotSharingGroup();
  32.     if (areJobVerticesAllInitialized(slotSharingGroup)) {
  33.         SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(
  34.                 slotSharingGroup, this::getJobVertex, shuffleMaster);
  35.     }
  36. }
复制代码
首先来看 ExecutionJobVertex.initialize 方法。这个方法主要是生成 IntermediateResult 和 ExecutionVertex。
  1. protected void initialize(
  2.         int executionHistorySizeLimit,
  3.         Duration timeout,
  4.         long createTimestamp,
  5.         SubtaskAttemptNumberStore initialAttemptCounts,
  6.         ExecutionPlanSchedulingContext executionPlanSchedulingContext)
  7.         throws JobException {
  8.     checkState(parallelismInfo.getParallelism() > 0);
  9.     checkState(!isInitialized());
  10.     this.taskVertices = new ExecutionVertex[parallelismInfo.getParallelism()];
  11.     this.inputs = new ArrayList<>(jobVertex.getInputs().size());
  12.     // create the intermediate results
  13.     this.producedDataSets =
  14.             new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
  15.     for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
  16.         final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
  17.         this.producedDataSets[i] =
  18.                 new IntermediateResult(
  19.                         result,
  20.                         this,
  21.                         this.parallelismInfo.getParallelism(),
  22.                         result.getResultType(),
  23.                         executionPlanSchedulingContext);
  24.     }
  25.     // create all task vertices
  26.     for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
  27.         ExecutionVertex vertex =
  28.                 createExecutionVertex(
  29.                         this,
  30.                         i,
  31.                         producedDataSets,
  32.                         timeout,
  33.                         createTimestamp,
  34.                         executionHistorySizeLimit,
  35.                         initialAttemptCounts.getAttemptCount(i));
  36.         this.taskVertices[i] = vertex;
  37.     }
  38.     // sanity check for the double referencing between intermediate result partitions and
  39.     // execution vertices
  40.     for (IntermediateResult ir : this.producedDataSets) {
  41.         if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {
  42.             throw new RuntimeException(
  43.                     "The intermediate result's partitions were not correctly assigned.");
  44.         }
  45.     }
  46.     // set up the input splits, if the vertex has any
  47.     try {
  48.         @SuppressWarnings("unchecked")
  49.         InputSplitSource<InputSplit> splitSource =
  50.                 (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
  51.         if (splitSource != null) {
  52.             Thread currentThread = Thread.currentThread();
  53.             ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
  54.             currentThread.setContextClassLoader(graph.getUserClassLoader());
  55.             try {
  56.                 inputSplits =
  57.                         splitSource.createInputSplits(this.parallelismInfo.getParallelism());
  58.                 if (inputSplits != null) {
  59.                     splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
  60.                 }
  61.             } finally {
  62.                 currentThread.setContextClassLoader(oldContextClassLoader);
  63.             }
  64.         } else {
  65.             inputSplits = null;
  66.         }
  67.     } catch (Throwable t) {
  68.         throw new JobException(
  69.                 "Creating the input splits caused an error: " + t.getMessage(), t);
  70.     }
  71. }
复制代码
在创建 ExecutionVertex 时,会创建 IntermediateResultPartition 和 Execution,创建 Execution 时,会设置 attemptNumber,这个值默认是0,如果 ExecutionVertex 是重新调度的,那么 attemptNumber 会自增加1。
ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 与 IntermediateResultPartition 的关联关系。这里设置关联关系也分成了点对点和全对全两种模式处理,点对点模式需要计算 ExecutionVertex 对应的 IntermediateResultPartition index 的范围。两种模式最终都调用了 connectInternal 方法。
  1. /** Connect all execution vertices to all partitions. */
  2. private static void connectInternal(
  3.         List<ExecutionVertex> taskVertices,
  4.         List<IntermediateResultPartition> partitions,
  5.         ResultPartitionType resultPartitionType,
  6.         EdgeManager edgeManager) {
  7.     checkState(!taskVertices.isEmpty());
  8.     checkState(!partitions.isEmpty());
  9.     ConsumedPartitionGroup consumedPartitionGroup =
  10.             createAndRegisterConsumedPartitionGroupToEdgeManager(
  11.                     taskVertices.size(), partitions, resultPartitionType, edgeManager);
  12.     for (ExecutionVertex ev : taskVertices) {
  13.         ev.addConsumedPartitionGroup(consumedPartitionGroup);
  14.     }
  15.     List<ExecutionVertexID> consumerVertices =
  16.             taskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());
  17.     ConsumerVertexGroup consumerVertexGroup =
  18.             ConsumerVertexGroup.fromMultipleVertices(consumerVertices, resultPartitionType);
  19.     for (IntermediateResultPartition partition : partitions) {
  20.         partition.addConsumers(consumerVertexGroup);
  21.     }
  22.     consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);
  23.     consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);
  24. }
复制代码
这个方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 负责将 ExecutionVertex 到 IntermediateResultPartition 的关联关系保存在 EdgeManager.vertexConsumedPartitions 中。
而 partition.addConsumers(consumerVertexGroup); 则负责将 IntermediateResultPartition 到 ExecutionVertex 的关系保存在 EdgeManager.partitionConsumers 中。
总结

通过本文,我们了解了 Flink 是如何将 JobGraph 转换成 ExecutionGraph 的。其中涉及到的一些核心概念名称比较类似,建议认真学习和理解透彻之后再研究其生成方法和对应关系,也可以借助前文中 ExecutionGraph 示意图辅助学习。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册