找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:Watermark机制

Flink源码阅读:Watermark机制

纪音悦 6 天前
前面我们已经梳理了 Flink 状态和 Checkpoint 相关的源码。从本文开始,我们再来关注另外几个核心概念,即时间、Watermark 和窗口。
写在前面

在 Flink 中 Watermark 是用来解决数据乱序问题的,它也是窗口关闭的触发条件。对于 Watermark 的概念和用法还不熟悉的同学可以先阅读Flink学习笔记:时间与Watermark一文。下面我们进入正题,开始梳理 Watermark 相关的源码。
Watermark 定义

Watermark 的定义非常简单,它继承了 StreamElement 类,内部只有一个 timestamp 变量。
  1. @PublicEvolving
  2. public class Watermark extends StreamElement {
  3.     /** The watermark that signifies end-of-event-time. */
  4.     public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  5.     /** The watermark that signifies is used before any actual watermark has been generated. */
  6.     public static final Watermark UNINITIALIZED = new Watermark(Long.MIN_VALUE);
  7.     // ------------------------------------------------------------------------
  8.     /** The timestamp of the watermark in milliseconds. */
  9.     protected final long timestamp;
  10.     /** Creates a new watermark with the given timestamp in milliseconds. */
  11.     public Watermark(long timestamp) {
  12.         this.timestamp = timestamp;
  13.     }
  14.     /** Returns the timestamp associated with this {@link Watermark} in milliseconds. */
  15.     public long getTimestamp() {
  16.         return timestamp;
  17.     }
  18.     // ------------------------------------------------------------------------
  19.     @Override
  20.     public boolean equals(Object o) {
  21.         return this == o
  22.                 || o != null
  23.                         && o.getClass() == this.getClass()
  24.                         && ((Watermark) o).timestamp == timestamp;
  25.     }
  26.     @Override
  27.     public int hashCode() {
  28.         return (int) (timestamp ^ (timestamp >>> 32));
  29.     }
  30.     @Override
  31.     public String toString() {
  32.         return "Watermark @ " + timestamp;
  33.     }
  34. }
复制代码
Watermark 处理过程

我们先来回顾一下 Watermark 的生成方法。
  1. SingleOutputStreamOperator<Event> withTimestampsAndWatermarks = source
  2.         .assignTimestampsAndWatermarks(
  3.                 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
  4.         );
复制代码
初始化

在定义 Watermark 的时候,我们调用 assignTimestampsAndWatermarks 方法。
  1. public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
  2.         WatermarkStrategy<T> watermarkStrategy) {
  3.     final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
  4.     // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
  5.     // and chain
  6.     final int inputParallelism = getTransformation().getParallelism();
  7.     final TimestampsAndWatermarksTransformation<T> transformation =
  8.             new TimestampsAndWatermarksTransformation<>(
  9.                     "Timestamps/Watermarks",
  10.                     inputParallelism,
  11.                     getTransformation(),
  12.                     cleanedStrategy,
  13.                     false);
  14.     getExecutionEnvironment().addOperator(transformation);
  15.     return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
  16. }
复制代码
这个方法接收了一个 WatermarkStrategy 参数,把它封装到 TimestampsAndWatermarksTransformation 中之后,就添加到 transformations 列表中了。在生成 StreamGraph 的过程中,会调用每个 transformation 的 transform 方法。
1.png

通过这个调用链路,创建出了 TimestampsAndWatermarksOperatorFactory,在初始化 StreamTask 时,会调用 TimestampsAndWatermarksOperatorFactory.createStreamOperator 方法来创建 TimestampsAndWatermarksOperator,并调用它的 open 方法。
在这个 open 方法中,主要是生成 timestampAssigner 和 watermarkGenerator。timestampAssigner 是用于提取时间戳,watermarkGenerator 是用于生成 Watermark。
生成完成之后注册了一个定时器,到指定时间后会调用 onProcessingTime 方法。
  1. public void onProcessingTime(long timestamp) throws Exception {
  2.     watermarkGenerator.onPeriodicEmit(wmOutput);
  3.     final long now = getProcessingTimeService().getCurrentProcessingTime();
  4.     getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  5. }
复制代码
这个方法的逻辑也很简单,先发送创建并发送 Watermark,然后再注册一个定时器。
发送 Watermark

2.png

我们以 BoundedOutOfOrdernessWatermarks 为例,它向下游发送了一个 Watermark,时间戳为 maxTimestamp - outOfOrdernessMillis - 1(maxTimestamp 是当前最大的事件时间戳,outOfOrdernessMillis 是我们定义的周期时间毫秒值)。随后在 WatermarkEmitter.emitWatermark 方法中,更新了当前 Watermark 的值。最后 RecordWriterOutput.emitWatermark 则是向下游广播当前的 Watermark。
下游处理

下游处理方法我们从 StreamOneInputProcessor.processInput 入手,先来看具体的调用链路。
3.png

在 inputWatermark 方法中,先是对 alignedSubpartitionStatuses 进行调整,alignedSubpartitionStatuses 这个变量主要是用来获取最小的 Watermark。最后调用了 findAndOutputNewMinWatermarkAcrossAlignedSubpartitions 方法。这个方法中,会获取到所有上游最小的 Watermark,如果它大于最近发送的一个 Watermark,就会向下游发送。
  1. public void emitWatermark(Watermark watermark) throws Exception {
  2.     watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
  3.     operator.processWatermark(watermark);
  4. }
复制代码
这个发送方法中,调用了 operator.processWatermark,我们接着看这个处理方法。
4.png

在 tryAdvanceWatermark 方法中如果 Watermark 的时间大于 eventTimeTimersQueue 队列中头节点的时间,那么对 eventTimeTimersQueue 这个队列进行出队操作,这个操作意味着触发了窗口计算。
[code]public boolean tryAdvanceWatermark(        long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn)        throws Exception {    currentWatermark = time;    InternalTimer timer;    boolean interrupted = false;    while ((timer = eventTimeTimersQueue.peek()) != null            && timer.getTimestamp()

相关推荐

您需要登录后才可以回帖 登录 | 立即注册