找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:Task数据交互

Flink源码阅读:Task数据交互

鲫疹 昨天 20:50
经过前面的学习,Flink 的几个核心概念相关的源码实现我们已经了解了。本文我们来梳理 Task 的数据交互相关的源码。
数据输出

话不多说,我们直接进入正题。首先来看 Task 的数据输出,在进入流程之前,我们先介绍几个基本概念。
基本概念


  • RecordWriterOutput:它是 Output 接口的一个具体实现类,底层使用 RecordWriter 来发送数据。
  • RecordWriter:数据写入的执行者,负责将数据写到 ResultPartition。
  • ResultPartition 和 ResultSubpartition:ResultPartition 是 ExecutionGraph 中一个节点的输出结果,下游的每个需要从当前 ResultPartition 消费数据的 Task 都会有一个 ResultSubpartition。
  • ChannelSelector:用来决定一个 Record 要被写到哪个 Subpartition 中。
  • LocalBufferPool:用来管理 Buffer 的缓冲池。在介绍反压的原理时,我们提到过。
对这些基本概念有了一定的了解之后,我们来看数据输出的具体流程。
执行流程

我们以 map 为例,看一下数据的输出过程。
1.png

在 StreamMap.processElement 方法中,调用完 map 方法之后,就会调用 output.collect 方法将数据输出,这里的 output 就是 RecordWriterOutput。在 RecordWriterOutput 中,会调用 RecordWriter 的 emit 方法。
  1. private <X> void pushToRecordWriter(StreamRecord<X> record) {
  2.     serializationDelegate.setInstance(record);
  3.     try {
  4.         recordWriter.emit(serializationDelegate);
  5.     } catch (IOException e) {
  6.         throw new UncheckedIOException(e.getMessage(), e);
  7.     }
  8. }
复制代码
这里的 serializationDelegate 是用来对 record 进行序列化的。RecordWriter 有两个实现类,一个是 ChannelSelectorRecordWriter,另一个是 BroadcastRecordWriter。ChannelSelectorRecordWriter 需要先调用 ChannelSelector 选择对应的 subparition,然后进行写入。BroadcastRecordWriter 则是写到所有的 subparition。
接下来就是调用 BufferWritingResultPartition.emitRecord 来写入数据。
  1. public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
  2.     totalWrittenBytes += record.remaining();
  3.     BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
  4.     while (record.hasRemaining()) {
  5.         // full buffer, partial record
  6.         finishUnicastBufferBuilder(targetSubpartition);
  7.         buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
  8.     }
  9.     if (buffer.isFull()) {
  10.         // full buffer, full record
  11.         finishUnicastBufferBuilder(targetSubpartition);
  12.     }
  13.     // partial buffer, full record
  14. }
复制代码
这里把 record 写入到 buffer 中,如果 buffer 不够,则会从 LocalBufferPool 中申请新的 buffer,申请到之后就会继续写入。下面是具体的申请过程。
  1. private MemorySegment requestMemorySegment(int targetChannel) {
  2.     MemorySegment segment = null;
  3.     synchronized (availableMemorySegments) {
  4.         checkDestroyed();
  5.         if (!availableMemorySegments.isEmpty()) {
  6.             segment = availableMemorySegments.poll();
  7.         } else if (isRequestedSizeReached()) {
  8.             // Only when the buffer request reaches the upper limit(i.e. current pool size),
  9.             // requests an overdraft buffer.
  10.             segment = requestOverdraftMemorySegmentFromGlobal();
  11.         }
  12.         if (segment == null) {
  13.             return null;
  14.         }
  15.         if (targetChannel != UNKNOWN_CHANNEL) {
  16.             if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
  17.                 unavailableSubpartitionsCount++;
  18.             }
  19.         }
  20.         checkAndUpdateAvailability();
  21.     }
  22.     return segment;
  23. }
复制代码
如果有可用内存,就直接从队列中出队。如果达到了本地 BufferPool 的上限,就从全局的 NetworkBufferPool 中申请,申请不到就会阻塞写入过程,等待申请。最后还会检查并更新可用内存状态。
有了可用的 buffer 之后,就会调用 addToSubpartition,最终数据存储在 PipelinedSubpartition 的 buffers 队列中。
  1. private void addToSubpartition(
  2.         BufferBuilder buffer,
  3.         int targetSubpartition,
  4.         int partialRecordLength,
  5.         int minDesirableBufferSize)
  6.         throws IOException {
  7.     int desirableBufferSize =
  8.             subpartitions[targetSubpartition].add(
  9.                     buffer.createBufferConsumerFromBeginning(), partialRecordLength);
  10.     resizeBuffer(buffer, desirableBufferSize, minDesirableBufferSize);
  11. }
复制代码
数据输入

看完了数据输出的过程之后,我们再来看一下数据输入的过程。首先还是了解几个基本概念。
基本概念


  • InputGate:InputGate 是对输入的封装,与 JobGraph 中的 JobEdge 一一对应,每个 InputGate 消费上游一个或多个 Resultpartition。
  • InputChannel:InputChannel 是和 ExecutionGraph 中的 ExecutionEdge 一一对应的。每个 InputChannel 接收一个 ResultSubpartition 的输出,InputChannel 主要关注 LocalInputChannel 和 RemoteInputChannel 两种实现。
执行流程

了解了具体概念之后,我们再看数据输入的具体流程。
2.png

数据输入的入口是 StreamTask.processInput 方法,这个方法中主要是调用 inputProcessor.processInput 方法,我们以 StreamOneInputProcessor 为例。这个方法就是调用 input.emitNext 方法。
  1. public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
  2.     while (true) {
  3.         // get the stream element from the deserializer
  4.         if (currentRecordDeserializer != null) {
  5.             RecordDeserializer.DeserializationResult result;
  6.             try {
  7.                 result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
  8.             } catch (IOException e) {
  9.                 throw new IOException(
  10.                         String.format("Can't get next record for channel %s", lastChannel), e);
  11.             }
  12.             if (result.isBufferConsumed()) {
  13.                 currentRecordDeserializer = null;
  14.             }
  15.             if (result.isFullRecord()) {
  16.                 final boolean breakBatchEmitting =
  17.                         processElement(deserializationDelegate.getInstance(), output);
  18.                 if (canEmitBatchOfRecords.check() && !breakBatchEmitting) {
  19.                     continue;
  20.                 }
  21.                 return DataInputStatus.MORE_AVAILABLE;
  22.             }
  23.         }
  24.         Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
  25.         if (bufferOrEvent.isPresent()) {
  26.             // return to the mailbox after receiving a checkpoint barrier to avoid processing of
  27.             // data after the barrier before checkpoint is performed for unaligned checkpoint
  28.             // mode
  29.             if (bufferOrEvent.get().isBuffer()) {
  30.                 processBuffer(bufferOrEvent.get());
  31.             } else {
  32.                 DataInputStatus status = processEvent(bufferOrEvent.get(), output);
  33.                 if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) {
  34.                     continue;
  35.                 }
  36.                 return status;
  37.             }
  38.         } else {
  39.             if (checkpointedInputGate.isFinished()) {
  40.                 checkState(
  41.                         checkpointedInputGate.getAvailableFuture().isDone(),
  42.                         "Finished BarrierHandler should be available");
  43.                 return DataInputStatus.END_OF_INPUT;
  44.             }
  45.             return DataInputStatus.NOTHING_AVAILABLE;
  46.         }
  47.     }
  48. }
复制代码
这里是调用 checkpointedInputGate.pollNext 来获取输入的数据。它的内部就是调用 InputGate 的 pollNext 方法来获取数据。当获取到完整数据之后,就会调用 processElement 来处理数据。
我们以 SingleInputGate 为例看 InputGate 的 pollNext 方法。它的内部调用链路可用一直追踪到 readBufferFromInputChannel 方法,这个方法内会调用 inputChannel.getNextBuffer,这里交给 InputChannel 来具体执行数据读取。
  1. public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
  2.     checkError();
  3.     if (!toBeConsumedBuffers.isEmpty()) {
  4.         return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
  5.     }
  6.     ResultSubpartitionView subpartitionView = this.subpartitionView;
  7.     if (subpartitionView == null) {
  8.         // There is a possible race condition between writing a EndOfPartitionEvent (1) and
  9.         // flushing (3) the Local
  10.         // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush
  11.         // notification (4). When
  12.         // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue
  13.         // LocalInputChannel after (or
  14.         // during) it was released during reading the EndOfPartitionEvent (2).
  15.         if (isReleased) {
  16.             return Optional.empty();
  17.         }
  18.         // this can happen if the request for the partition was triggered asynchronously
  19.         // by the time trigger
  20.         // would be good to avoid that, by guaranteeing that the requestPartition() and
  21.         // getNextBuffer() always come from the same thread
  22.         // we could do that by letting the timer insert a special "requesting channel" into the
  23.         // input gate's queue
  24.         subpartitionView = checkAndWaitForSubpartitionView();
  25.     }
  26.     BufferAndBacklog next = subpartitionView.getNextBuffer();
  27.     // ignore the empty buffer directly
  28.     while (next != null && next.buffer().readableBytes() == 0) {
  29.         next.buffer().recycleBuffer();
  30.         next = subpartitionView.getNextBuffer();
  31.         numBuffersIn.inc();
  32.     }
  33.     if (next == null) {
  34.         if (subpartitionView.isReleased()) {
  35.             throw new CancelTaskException(
  36.                     "Consumed partition " + subpartitionView + " has been released.");
  37.         } else {
  38.             return Optional.empty();
  39.         }
  40.     }
  41.     Buffer buffer = next.buffer();
  42.     if (buffer instanceof FullyFilledBuffer) {
  43.         List<Buffer> partialBuffers = ((FullyFilledBuffer) buffer).getPartialBuffers();
  44.         int seq = next.getSequenceNumber();
  45.         for (Buffer partialBuffer : partialBuffers) {
  46.             toBeConsumedBuffers.add(
  47.                     new BufferAndBacklog(
  48.                             partialBuffer,
  49.                             next.buffersInBacklog(),
  50.                             buffer.getDataType(),
  51.                             seq++));
  52.         }
  53.         return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
  54.     }
  55.     return getBufferAndAvailability(next);
  56. }
复制代码
我们先来看 LocalInputChannel,先获取到了 subpartitionView,并调用 getNextBuffer,这里其实就是从 PipelinedSubpartition 的 buffers 队列中读取数据。
RemoteInputChannel 则需要从 receivedBuffers 中读取数据,这个队列的数据就是消费上游数据后保存的。
至此,Flink 中 Task 的数据输入和输出过程的源码就梳理完了,更加底层的 Netty 相关代码我们在后面继续梳理。
总结

最后简单总结一下,本文我们梳理了 Task 的数据输出和输入的过程。输出过程主要是利用 RecordWriter 将数据写入到 Buffer 中,输入过程则是利用 InputChannel 从 Buffer 消费的过程。如果你的 Flink 任务数据量特别大,并且没什么复杂的逻辑,可以考虑适当调整 localBufferPool 的大小来调优任务的吞吐。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册