找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:Checkpoint机制(下)

Flink源码阅读:Checkpoint机制(下)

抑卞枯 前天 04:10
书接上回,前文我们梳理的 Checkpoint 机制的源码,但是对于如何写入状态数据并没有深入了解。今天就一起来梳理一下这部分代码。
写在前面

前面我们了解到在 StreamOperatorStateHandler.snapshotState 方法中会创建四个 Future,用来支持不同类型的状态写入。
  1. snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
  2. snapshotInProgress.setOperatorStateRawFuture(
  3.         snapshotContext.getOperatorStateStreamFuture());
  4. if (null != operatorStateBackend) {
  5.     snapshotInProgress.setOperatorStateManagedFuture(
  6.             operatorStateBackend.snapshot(
  7.                     checkpointId, timestamp, factory, checkpointOptions));
  8. }
  9. if (useAsyncState && null != asyncKeyedStateBackend) {
  10.     if (isCanonicalSavepoint(checkpointOptions.getCheckpointType())) {
  11.         throw new UnsupportedOperationException("Not supported yet.");
  12.     } else {
  13.         snapshotInProgress.setKeyedStateManagedFuture(
  14.                 asyncKeyedStateBackend.snapshot(
  15.                         checkpointId, timestamp, factory, checkpointOptions));
  16.     }
  17. }
复制代码
我们主要关心 ManagedState,ManagedState 都是调用 Snapshotable.snapshot 方法来写入数据的,下面具体看 KeyedState 和 OperatorState 的具体实现。
KeyedState

KeyedState 我们以 HeapKeyedStateBackend 为例,这里先是创建了一个 SnapshotStrategyRunner 实例,SnapshotStrategyRunner 是一个快照策略的一个执行类,创建完成后就会调用 snapshot 方法。在这个 snapshot 方法中主要做了做了下面几件事:

  • 同步拷贝状态数据的引用。
  • 创建 Checkpoint 输出流 CheckpointStateOutputStream
  • 完成 Checkpoint 持久化
  • 返回元信息结果
状态数据引用拷贝

在 HeapSnapshotStrategy 的 syncPrepareResources 方法中调用了 HeapSnapshotResources.create 方法。这里有一个比较重要的参数是 registeredKVStates,它代表我们在业务代码中注册的状态数据表。
  1. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  2.         new ValueStateDescriptor<>(
  3.                 "average",
  4.                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
复制代码
例如我们这样注册状态数据表,那么 registeredKVStates 的 key 就是 average,value 就是状态表,它通常是一个 CopyOnWriteStateTable。具体的状态数据引用拷贝的逻辑在 processSnapshotMetaInfoForAllStates 方法中。
  1. private static void processSnapshotMetaInfoForAllStates(
  2.         List<StateMetaInfoSnapshot> metaInfoSnapshots,
  3.         Map<StateUID, StateSnapshot> cowStateStableSnapshots,
  4.         Map<StateUID, Integer> stateNamesToId,
  5.         Map<String, ? extends StateSnapshotRestore> registeredStates,
  6.         StateMetaInfoSnapshot.BackendStateType stateType) {
  7.     for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
  8.             registeredStates.entrySet()) {
  9.         final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
  10.         stateNamesToId.put(stateUid, stateNamesToId.size());
  11.         StateSnapshotRestore state = kvState.getValue();
  12.         if (null != state) {
  13.             final StateSnapshot stateSnapshot = state.stateSnapshot();
  14.             metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
  15.             cowStateStableSnapshots.put(stateUid, stateSnapshot);
  16.         }
  17.     }
  18. }
复制代码
针对每个 State,这里都创建一个 CopyOnWriteStateTableSnapshot,然后存在 cowStateStableSnapshots 里。这里 CopyOnWriteStateTableSnapshot 就是拷贝数据的引用,因此可以同步执行。
创建 CheckpointStateOutputStream

创建 CheckpointStateOutputStream 的方法是 CheckpointStreamWithResultProvider.createSimpleStream,生产环境通常使用的是 FsCheckpointStateOutputStream。FsCheckpointStateOutputStream 中的参数如下:
  1. // 状态数据写入缓冲数组,数据先写到内存中,然后 flush 到磁盘
  2. private final byte[] writeBuffer;
  3. // 缓冲数组当前写入位置
  4. private int pos;
  5. // 文件输出流
  6. private volatile FSDataOutputStream outStream;
  7. // 内存中状态大小阈值,超过阈值会 flush 到磁盘,默认20KB,最大1MB
  8. // 目的是为了减少小文件数量
  9. private final int localStateThreshold;
  10. // checkpoint 基础路径
  11. private final Path basePath;
  12. // Flink 自己封装的文件系统
  13. private final FileSystem fs;
  14. // 状态数据完整路径
  15. private volatile Path statePath;
  16. // 相对路径
  17. private String relativeStatePath;
  18. // 是否已关闭
  19. private volatile boolean closed;
  20. // 是否允许使用相对路径
  21. private final boolean allowRelativePaths;
复制代码
Checkpoint 持久化

创建完 CheckpointStateOutputStream 之后,会调用 serializationProxy.write(outView) 写入状态的元数据。元数据包括状态的名称、类型、序列化器等一些配置。
元数据写完之后,就开始分组写入状态数据。在写入时,先写 keyGroupId,然后再写当前分组的状态数据
  1. for (int keyGroupPos = 0;
  2.         keyGroupPos < keyGroupRange.getNumberOfKeyGroups();
  3.         ++keyGroupPos) {
  4.     int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
  5.     keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
  6.     // 写 keyGroupId
  7.     outView.writeInt(keyGroupId);
  8.     for (Map.Entry<StateUID, StateSnapshot> stateSnapshot :
  9.             cowStateStableSnapshots.entrySet()) {
  10.         StateSnapshot.StateKeyGroupWriter partitionedSnapshot =
  11.                 stateSnapshot.getValue().getKeyGroupWriter();
  12.         try (OutputStream kgCompressionOut =
  13.                 keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
  14.             DataOutputViewStreamWrapper kgCompressionView =
  15.                     new DataOutputViewStreamWrapper(kgCompressionOut);
  16.             kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));
  17.             // 写状态数据
  18.             partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);
  19.         } // this will just close the outer compression stream
  20.     }
  21. }
复制代码
状态数据写入的调用链路如下
1.png
  1. public void writeState(
  2.         TypeSerializer<K> keySerializer,
  3.         TypeSerializer<N> namespaceSerializer,
  4.         TypeSerializer<S> stateSerializer,
  5.         @Nonnull DataOutputView dov,
  6.         @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer)
  7.         throws IOException {
  8.     SnapshotIterator<K, N, S> snapshotIterator =
  9.             getIterator(
  10.                     keySerializer,
  11.                     namespaceSerializer,
  12.                     stateSerializer,
  13.                     stateSnapshotTransformer);
  14.     int size = snapshotIterator.size();
  15.     dov.writeInt(size);
  16.     while (snapshotIterator.hasNext()) {
  17.         StateEntry<K, N, S> stateEntry = snapshotIterator.next();
  18.         namespaceSerializer.serialize(stateEntry.getNamespace(), dov);
  19.         keySerializer.serialize(stateEntry.getKey(), dov);
  20.         stateSerializer.serialize(stateEntry.getState(), dov);
  21.     }
  22. }
复制代码
返回结果

最后一步就是封装并返回元信息,这里收集的信息包括了每个 keyGroup 的状态数据在状态文件中的存储位置,状态数据存储的文件路径、文件大小等。
OperatorState

OperatorState 的处理逻辑比 KeyedState 更简单一些,流程上都是先做状态数据的引用快照,然后写入状态数据和返回结果。在写入数据时,没有了分组写入的逻辑。直接处理 operatorState 和 broadcastState。这里就只贴一下调用流程,不做过多赘述了。
2.png

总结

本文我们重点梳理了 KeyedState 数据写入的代码。其主要步骤包括:同步拷贝状态数据的引用,创建 Checkpoint 输出流 CheckpointStateOutputStream 并完成 Checkpoint 持久化,最后返回元信息结果。OperatorState 的处理过程和 KeyedState 的过程类似,只是少了分组的逻辑。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

前天 18:08

举报

您需要登录后才可以回帖 登录 | 立即注册