找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:JobManager的HA机制

Flink源码阅读:JobManager的HA机制

揉幽递 昨天 01:05
JobManager 在 Flink 集群中发挥着重要的作用,包括任务调度和资源管理等工作。如果 JobManager 宕机,那么整个集群的任务都将失败。为了解决 JobManager 的单点问题,Flink 也设计了 HA 机制来保障整个集群的稳定性。
基本概念

在 JobManager 启动时,调用 HighAvailabilityServicesUtils.createHighAvailabilityServices 来创建 HA 服务,HA 依赖的服务都被封装在 HighAvailabilityServices 中。当前 Flink 内部支持两种高可用模式,分别是 ZooKeeper 和 KUBERNETES。
  1. case ZOOKEEPER:
  2.     return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
  3. case KUBERNETES:
  4.     return createCustomHAServices(
  5.             "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
  6.             configuration,
  7.             executor);
复制代码
HighAvailabilityServices 中提供的关键组件包括:

  • LeaderRetrievalService:服务发现,用于获取当前 leader 的地址。目前用到服务发现的组件有 ResourceManager、Dispatcher、JobManager、ClusterRestEndpoint。
  • LeaderElection:选举服务,从多个候选者中选出一个作为 leader。用到选举服务的同样是 ResourceManager、Dispatcher、JobManager、ClusterRestEndpoint 这四个。
  • CheckpointRecoveryFactory:Checkpoint 恢复组件的工厂类,提供了创建 CompletedCheckpointStore 和 CheckpointIDCounter 的方法。CompletedCheckpointStore 是用于存储已完成的 checkpoint 的元信息,CheckpointIDCounter 是用于生成 checkpoint ID。
  • ExecutionPlanStore:用于存储执行计划。
  • JobResultStore:用于存储作业结果,这里有两种状态,一种是 dirty,表示作业没有被完全清理,另一种是 clean,表示作业清理工作已经执行完成了。
  • BlobStore:存储作业运行期间的一些二进制文件。
选举服务

Flink 的选举是依靠 LeaderElection 和 LeaderContender 配合完成的。LeaderElection 是 LeaderElectionService 的代理接口,提供了注册候选者、确认 leader 和 判断候选者是否是 leader 三个接口。LeaderContender 则是用来表示候选者对象。当一个 LeaderContender 当选 leader 后,LeaderElectionService 会为其生成一个 leaderSessionId,LeaderContender 会调用 confirmLeadershipAsync 发布自己的地址。选举服务的具体实现在 LeaderElectionDriver 接口中。
服务发现

服务发现的作用是获取各组件的 leader 地址。服务发现依赖 LeaderRetrievalService 和 LeaderRetrievalListener。LeaderRetrievalService 可以启动一个监听,当有新的 leader 当选时,会调用 LeaderRetrievalListener 的 notifyLeaderAddress 方法。
信息保存

当 leader 发生切换时,新的 leader 需要获取到旧 leader 存储的信息,这就需要旧 leader 把这些信息存在一个公共的存储上。它可以是 ZooKeeper 或 Kubernetes 的存储,也可以是分布式文件系统的存储。
基于 ZooKeeper 的 HA

选举服务

前面我们提到了选举服务主要依赖 LeaderElection 和 LeaderContender 配合完成。我们就以 JobManager 为例,看一下机遇 ZooKeeper 的选举流程的具体实现。
1.png

图中 JobMasterServiceLeadershipRunner 是 LeaderContender 的实现类。在启动服务时,会向 LeaderElection 注册自己的信息,实际执行者是 DefaultLeaderElectionService。它先创建了 LeaderElectionDriver,然后将 LeaderContender 保存在 leaderContenderRegistry 中。选举的核心逻辑封装在 LeaderElectionDriver 中。
在创建 LeaderElectionDriver 时,会创建 LeaderLatch 对象和 TreeCache 对象, LeaderLatch 封装了与 ZooKeeper 关联的回调,会接收一个 LeaderElectionDriver 作为监听。TreeCache 主要用于监听 ZooKeeper 中 leader 节点的变更。
  1. public ZooKeeperLeaderElectionDriver(
  2.         CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
  3.         throws Exception {
  4.     ...
  5.     this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
  6.     this.treeCache =
  7.             ZooKeeperUtils.createTreeCache(
  8.                     curatorFramework,
  9.                     "/",
  10.                     new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());
  11.     treeCache
  12.             .getListenable()
  13.             .addListener(
  14.                     (client, event) -> {
  15.                         switch (event.getType()) {
  16.                             case NODE_ADDED:
  17.                             case NODE_UPDATED:
  18.                                 Preconditions.checkNotNull(
  19.                                         event.getData(),
  20.                                         "The ZooKeeper event data must not be null.");
  21.                                 handleChangedLeaderInformation(event.getData());
  22.                                 break;
  23.                             case NODE_REMOVED:
  24.                                 Preconditions.checkNotNull(
  25.                                         event.getData(),
  26.                                         "The ZooKeeper event data must not be null.");
  27.                                 handleRemovedLeaderInformation(event.getData().getPath());
  28.                                 break;
  29.                         }
  30.                     });
  31.     leaderLatch.addListener(this);
  32.     ...
  33.     leaderLatch.start();
  34.     treeCache.start();
  35. }
复制代码
我们进入到 LeaderLatch 的 start 方法。它的内部是在 ZooKeeper 上创建 latch-xxx 节点。xxx 是当前 LeaderLatch 的 ID,它由 ZooKeeper 生成,ID 最小的当选 Leader。
  1. private void checkLeadership(List<String> children) throws Exception {
  2.     if (this.debugCheckLeaderShipLatch != null) {
  3.         this.debugCheckLeaderShipLatch.await();
  4.     }
  5.     String localOurPath = (String)this.ourPath.get();
  6.     List<String> sortedChildren = LockInternals.getSortedChildren("latch-", sorter, children);
  7.     int ourIndex = localOurPath != null ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
  8.     this.log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", new Object[]{this.id, localOurPath, sortedChildren});
  9.     if (ourIndex < 0) {
  10.         this.log.error("Can't find our node. Resetting. Index: " + ourIndex);
  11.         this.reset();
  12.     } else if (ourIndex == 0) {
  13.         this.lastPathIsLeader.set(localOurPath);
  14.         this.setLeadership(true);
  15.     } else {
  16.         this.setLeadership(false);
  17.         String watchPath = (String)sortedChildren.get(ourIndex - 1);
  18.         Watcher watcher = new Watcher() {
  19.             public void process(WatchedEvent event) {
  20.                 if (LeaderLatch.this.state.get() == LeaderLatch.State.STARTED && event.getType() == EventType.NodeDeleted) {
  21.                     try {
  22.                         LeaderLatch.this.getChildren();
  23.                     } catch (Exception ex) {
  24.                         ThreadUtils.checkInterrupted(ex);
  25.                         LeaderLatch.this.log.error("An error occurred checking the leadership.", ex);
  26.                     }
  27.                 }
  28.             }
  29.         };
  30.         BackgroundCallback callback = new BackgroundCallback() {
  31.             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  32.                 if (event.getResultCode() == Code.NONODE.intValue()) {
  33.                     LeaderLatch.this.getChildren();
  34.                 }
  35.             }
  36.         };
  37.         ((ErrorListenerPathable)((BackgroundPathable)this.client.getData().usingWatcher(watcher)).inBackground(callback)).forPath(ZKPaths.makePath(this.latchPath, watchPath));
  38.     }
  39. }
复制代码
当选 Leader 后,会回调 LeaderElectionDriver 的 isLeader 方法,如果未当选,则继续监听 latch 节点的变更。isLeader 会继续回调 LeaderElection 的 onGrantLeadership 方法,接着调用 LeaderContender 的 grantLeadership。这时会启动 JobMaster 服务,然后调用 LeaderElection 的 confirmLeadershipAsync 来确认当选成功。确认的过程是由 LeaderElectionDriver 来执行的。主要作用是把当前 leader 的信息写回到 ZooKeeper 的 connection_info 节点。
  1. public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
  2.     Preconditions.checkState(running.get());
  3.     if (!leaderLatch.hasLeadership()) {
  4.         return;
  5.     }
  6.     final String connectionInformationPath =
  7.             ZooKeeperUtils.generateConnectionInformationPath(componentId);
  8.     LOG.debug(
  9.             "Write leader information {} for component '{}' to {}.",
  10.             leaderInformation,
  11.             componentId,
  12.             ZooKeeperUtils.generateZookeeperPath(
  13.                     curatorFramework.getNamespace(), connectionInformationPath));
  14.     try {
  15.         ZooKeeperUtils.writeLeaderInformationToZooKeeper(
  16.                 leaderInformation,
  17.                 curatorFramework,
  18.                 leaderLatch::hasLeadership,
  19.                 connectionInformationPath);
  20.     } catch (Exception e) {
  21.         leaderElectionListener.onError(e);
  22.     }
  23. }
复制代码
服务发现

梳理完选举服务的源码后,我们再来看一下服务发现的过程。我们以 TaskManager 获取 JobManager 的 leader 为例。
2.png

当我们往 TaskManager 添加任务时,会调用 JobLeaderService 的 addJob 方法。这里会先获取 LeaderRetrieval,然后调用 start 方法注册 LeaderRetrievalListener 监听,并创建 LeaderRetrievalDriver。在 LeaderRetrievalDriver 中主要是向 ZooKeeper 注册 connection_info 节点的变更。
如果发生变更,ZooKeeper 会回调 LeaderRetrievalDriver.retrieveLeaderInformationFromZooKeeper 方法。我们从 ZooKeeper 获取到 leader 的地址和 sessionId 后,就回调 LeaderRetrievalService.notifyLeaderAddress 方法。最终调用到 JobLeaderService 的 notifyLeaderAddress 方法,这个方法中就是断开与旧 leader 的连接,增加与新 leader 的连接。
信息保存

最后我们再来看信息保存相关的源码。在 JobManager 完成一次 Checkpoint 时,会执行 CheckpointCoordinator.completePendingCheckpoint 方法,跟随调用链路可以找到 ZooKeeperStateHandleStore.addAndLock 方法,这里会把状态写入到文件系统中,然后把文件路径保存在 ZooKeeper 中。
  1. public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T state)
  2.         throws PossibleInconsistentStateException, Exception {
  3.     checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
  4.     checkNotNull(state, "State");
  5.     final String path = normalizePath(pathInZooKeeper);
  6.     final Optional<Stat> maybeStat = getStat(path);
  7.     if (maybeStat.isPresent()) {
  8.         if (isNotMarkedForDeletion(maybeStat.get())) {
  9.             throw new AlreadyExistException(
  10.                     String.format("ZooKeeper node %s already exists.", path));
  11.         }
  12.         Preconditions.checkState(
  13.                 releaseAndTryRemove(path),
  14.                 "The state is marked for deletion and, therefore, should be deletable.");
  15.     }
  16.     final RetrievableStateHandle<T> storeHandle = storage.store(state);
  17.     final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
  18.     try {
  19.         writeStoreHandleTransactionally(path, serializedStoreHandle);
  20.         return storeHandle;
  21.     } catch (KeeperException.NodeExistsException e) {
  22.         // Transactions are not idempotent in the curator version we're currently using, so it
  23.         // is actually possible that we've re-tried a transaction that has already succeeded.
  24.         // We've ensured that the node hasn't been present prior executing the transaction, so
  25.         // we can assume that this is a result of the retry mechanism.
  26.         return storeHandle;
  27.     } catch (Exception e) {
  28.         if (indicatesPossiblyInconsistentState(e)) {
  29.             throw new PossibleInconsistentStateException(e);
  30.         }
  31.         // In case of any other failure, discard the state and rethrow the exception.
  32.         storeHandle.discardState();
  33.         throw e;
  34.     }
  35. }
复制代码
至此,基于 ZooKeeper 的 HA 逻辑我们就梳理完了。从 1.12 版本开始,Flink 还支持了 Kubernetes 高可用,下面我们再来一下它是如何实现的。
基于 Kubernetes 的 HA

选举服务

通过前面的学习,我们已经了解到,选举的主要逻辑是在 LeaderElectionDriver 中,因此,我们直接来看 KubernetesLeaderElectionDriver 的逻辑即可。创建 KubernetesLeaderElectionDriver 时,创建并启动了 KubernetesLeaderElector。这个类似于 ZooKeeper 逻辑中 LeaderLatch,会跟 Kubernetes 底层的选举逻辑交互,同时注册监听。
  1. public KubernetesLeaderElector(
  2.         NamespacedKubernetesClient kubernetesClient,
  3.         KubernetesLeaderElectionConfiguration leaderConfig,
  4.         LeaderCallbackHandler leaderCallbackHandler,
  5.         ExecutorService executorService) {
  6.     this.kubernetesClient = kubernetesClient;
  7.     this.leaderElectionConfig =
  8.             new LeaderElectionConfigBuilder()
  9.                     .withName(leaderConfig.getConfigMapName())
  10.                     .withLeaseDuration(leaderConfig.getLeaseDuration())
  11.                     .withLock(
  12.                             new ConfigMapLock(
  13.                                     new ObjectMetaBuilder()
  14.                                             .withNamespace(kubernetesClient.getNamespace())
  15.                                             .withName(leaderConfig.getConfigMapName())
  16.                                             // Labels will be used to clean up the ha related
  17.                                             // ConfigMaps.
  18.                                             .withLabels(
  19.                                                     KubernetesUtils.getConfigMapLabels(
  20.                                                             leaderConfig.getClusterId()))
  21.                                             .build(),
  22.                                     leaderConfig.getLockIdentity()))
  23.                     .withRenewDeadline(leaderConfig.getRenewDeadline())
  24.                     .withRetryPeriod(leaderConfig.getRetryPeriod())
  25.                     .withReleaseOnCancel(true)
  26.                     .withLeaderCallbacks(
  27.                             new LeaderCallbacks(
  28.                                     leaderCallbackHandler::isLeader,
  29.                                     leaderCallbackHandler::notLeader,
  30.                                     newLeader ->
  31.                                             LOG.info(
  32.                                                     "New leader elected {} for {}.",
  33.                                                     newLeader,
  34.                                                     leaderConfig.getConfigMapName())))
  35.                     .build();
  36.     this.executorService = executorService;
  37.     LOG.info(
  38.             "Create KubernetesLeaderElector on lock {}.",
  39.             leaderElectionConfig.getLock().describe());
  40. }
复制代码
选举成功后,会回调 LeaderElectionListener.onGrantLeadership 方法。后续的调用链路还是会调用到 KubernetesLeaderElectionDriver.publishLeaderInformation 方法。这个方法是把 leader 信息写到 Kubernetes 的 configMap 中。
  1. public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
  2.     Preconditions.checkState(running.get());
  3.     try {
  4.         kubeClient
  5.                 .checkAndUpdateConfigMap(
  6.                         configMapName,
  7.                         updateConfigMapWithLeaderInformation(componentId, leaderInformation))
  8.                 .get();
  9.     } catch (InterruptedException | ExecutionException e) {
  10.         leaderElectionListener.onError(e);
  11.     }
  12.     LOG.debug(
  13.             "Successfully wrote leader information {} for leader {} into the config map {}.",
  14.             leaderInformation,
  15.             componentId,
  16.             configMapName);
  17. }
复制代码
服务发现

服务发现的逻辑在 KubernetesLeaderRetrievalDriver 类中,在创建时,会将内部类 ConfigMapCallbackHandlerImpl 注册为监听回调类。
当 configMap 有新增或变更后,会回调 LeaderRetrievalService.notifyLeaderAddress 方法。
  1. private class ConfigMapCallbackHandlerImpl
  2.         implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
  3.     @Override
  4.     public void onAdded(List<KubernetesConfigMap> configMaps) {
  5.         // The ConfigMap is created by KubernetesLeaderElectionDriver with
  6.         // empty data. We don't really need to process anything unless the retriever was started
  7.         // after the leader election has already succeeded.
  8.         final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName);
  9.         final LeaderInformation leaderInformation = leaderInformationExtractor.apply(configMap);
  10.         if (!leaderInformation.isEmpty()) {
  11.             leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation);
  12.         }
  13.     }
  14.     @Override
  15.     public void onModified(List<KubernetesConfigMap> configMaps) {
  16.         final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName);
  17.         leaderRetrievalEventHandler.notifyLeaderAddress(
  18.                 leaderInformationExtractor.apply(configMap));
  19.     }
  20.     ...
  21. }
复制代码
信息保存

信息保存的逻辑和 ZooKeeper 也非常类似。即先把 state 保存在文件系统,然后把存储路径写到 Kubernetes 写到 configMap 中。具体可以看 KubernetesStateHandleStore.addAndLock 方法。
总结

本文我们一起梳理了 Flink 中 JobManager 的 HA 机制相关源码。目前 Flink 支持 ZooKeeper 和 Kubernetes 两种实现。在梳理过程中,我们以 JobManager 为例,其他几个用到高可用的服务的选举逻辑也是一样的。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册