找回密码
 立即注册
首页 业界区 业界 从零开始学Flink:TopN 榜单

从零开始学Flink:TopN 榜单

路逸思 昨天 22:54
在上一篇 《从零开始学Flink:Flink SQL四大Join解析》结尾提到过,下一篇要把 窗口聚合(Window Aggregation)与 TopN 讲清楚。窗口负责把无界流切成可统计的时间片,TopN 负责把“统计结果”变成榜单输出;两者组合起来,PV/UV、订单量、热销榜、实时大屏基本都能覆盖。
但这块也最容易踩坑:SQL 明明在跑却一直没输出、TopN 结果频繁更新/撤回下游写不进去、滑动窗口一上来状态就撑爆。本文直接用可复现的 Kafka 数据流把这些问题跑出来,并给出对应的处理方式。
本文基于 Flink 1.20+,用 SQL Client 直接在本地 standalone 集群验证。你可以把文中的 SQL 原样复制过去跑通,再按自己的业务把窗口粒度、乱序容忍、下游写入方式替换掉。
0. 环境准备:用 SQL Client 直接跑起来

为了把注意力放在 SQL 本身,本文用 Kafka 做数据源:手动往 Topic 推送点击行为数据,用 print 在 TaskManager Stdout 里观察结果。
使用前请确认 Flink 已加载 Kafka SQL Connector(把 flink-sql-connector-kafka-*.jar 放到 $FLINK_HOME/lib 并重启集群/SQL Client)。
先把下面几个参数设好,后面跑窗口/TopN 时更容易看到输出:
  1. -- 1) 避免 source 空闲导致 watermark 不推进,从而窗口一直不触发
  2. SET 'table.exec.source.idle-timeout' = '5s';
  3. -- 2) 让窗口/TopN 的结果更“及时”(更快看到输出)
  4. SET 'execution.checkpointing.interval' = '10s';
  5. -- 3) 以流模式运行(源是无界),持续刷到 SQL Client
  6. SET 'execution.runtime-mode' = 'streaming';
  7. -- 4) 开启 changelog 模式,使窗口/TopN 的结果更“及时”(更快看到输出)
  8. SET 'sql-client.execution.result-mode' = 'changelog';
复制代码
接着创建一张点击行为表(事件时间 + Watermark):
  1. CREATE TABLE dwd_click_log (
  2.   user_id     STRING,
  3.   item_id     STRING,
  4.   category_id STRING,
  5.   ts          BIGINT,
  6.   event_time  AS TO_TIMESTAMP_LTZ(ts, 3),
  7.   WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND
  8. ) WITH (
  9.   'connector' = 'kafka',
  10.   'topic' = 'dwd_click_log',
  11.   'properties.bootstrap.servers' = 'localhost:9092',
  12.   'properties.group.id' = 'flink-sql-dwd-click-log',
  13.   'scan.startup.mode' = 'earliest-offset',
  14.   'format' = 'json',
  15.   'json.ignore-parse-errors' = 'true'
  16. );
复制代码
先准备 Kafka Topic:
  1. $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic dwd_click_log --partitions 1 --replication-factor 1
复制代码
再创建几个 print sink 用来观察输出:
  1. CREATE TABLE ads_window_metrics_print (
  2.   window_start TIMESTAMP_LTZ(3),
  3.   window_end   TIMESTAMP_LTZ(3),
  4.   pv           BIGINT,
  5.   uv           BIGINT
  6. ) WITH ('connector' = 'print',
  7.   'print-identifier' = 'ads_window_metrics_print'
  8. );
  9. CREATE TABLE ads_session_metrics_print (
  10.   window_start TIMESTAMP_LTZ(3),
  11.   window_end   TIMESTAMP_LTZ(3),
  12.   user_id      STRING,
  13.   click_cnt    BIGINT
  14. ) WITH ('connector' = 'print',
  15.   'print-identifier' = 'ads_session_metrics_print'
  16. );
  17. CREATE TABLE ads_topn_print (
  18.   window_start TIMESTAMP_LTZ(3),
  19.   window_end   TIMESTAMP_LTZ(3),
  20.   category_id  STRING,
  21.   item_id      STRING,
  22.   cnt          BIGINT,
  23.   rn           BIGINT
  24. ) WITH ('connector' = 'print',
  25.   'print-identifier' = 'ads_topn_print'
  26. );
复制代码
1. 窗口聚合基础:你到底在对“哪段时间”做统计

在 Flink SQL 里,窗口的本质是:把无界流切成一个个“有限集合”,再在集合上做 GROUP BY 聚合。
窗口统计能否输出,核心取决于两件事:

  • 你选的是 Processing Time 还是 Event Time
  • Event Time 场景下,Watermark 是否在推进(决定窗口是否“关窗”)
本文以 Event Time 为主,因为绝大多数实时数仓指标都需要“按业务发生时间统计”,而不是“按处理到达时间统计”。
2. Window TVF:Flink SQL 窗口的主流写法

Flink 早期有 GROUP BY TUMBLE(...) 这类 Group Window 语法,新版本更推荐 Window TVF(Table Valued Function),它的输出会直接带上 window_start/window_end/window_time 字段,更清晰,也更容易与 TopN/Join 组合。
2.1 滚动窗口(TUMBLE):每条数据只属于一个窗口

典型场景:按分钟/小时统计 PV、UV、GMV。
  1. INSERT INTO ads_window_metrics_print
  2. SELECT
  3.   window_start,
  4.   window_end,
  5.   COUNT(*) AS pv,
  6.   COUNT(DISTINCT user_id) AS uv
  7. FROM TABLE(
  8.   TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '1' SECOND)
  9. )
  10. GROUP BY window_start, window_end;
复制代码
推送数据(最简单:控制台直接粘贴 JSON,一行一条)
  1. $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log
复制代码
粘贴下面数据(ts 用毫秒时间戳;为了让事件时间窗口及时“关窗”,建议 ts 单调递增,或者最后补一条明显更大的 ts 用来推进 Watermark):
  1. {"user_id":"u01","item_id":"i01","category_id":"c01","ts":1774454400000}
  2. {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1774454402000}
  3. {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1774454403000}
  4. {"user_id":"u03","item_id":"i04","category_id":"c02","ts":1774454404000}
  5. {"user_id":"u04","item_id":"i01","category_id":"c01","ts":1774454405000}
复制代码
到 Flink Web UI → TaskManagers → Stdout 查看输出:
1.png

要点:

  • TUMBLE 适合“报表型”指标,窗口不重叠,状态相对可控
  • COUNT(DISTINCT ...) 会引入去重状态,用户数大时要关注状态体积(生产中可考虑用近似去重或分层聚合)
2.2 滑动窗口(HOP):一条数据会被“复制”到多个窗口

典型场景:最近 5 分钟滚动 UV、最近 1 小时成交额每 5 分钟刷新一次。
示例:窗口长度 30s,每 10s 滑动一次。
  1. INSERT INTO ads_window_metrics_print
  2. SELECT
  3.   window_start,
  4.   window_end,
  5.   COUNT(*) AS pv,
  6.   COUNT(DISTINCT user_id) AS uv
  7. FROM TABLE(
  8.   HOP(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND, INTERVAL '30' SECOND)
  9. )
  10. GROUP BY window_start, window_end;
复制代码
到 Flink Web UI → TaskManagers → Stdout 查看输出:
2.png

要点:

  • HOP 的状态压力通常显著高于 TUMBLE,因为数据会进入多个窗口
  • 业务上能用 TUMBLE 不用 HOP;必须用 HOP 时,尽量降低窗口长度或放大 slide(减少并行窗口数)
2.3 会话窗口(SESSION):按“事件间隔”自动切窗

典型场景:统计用户一次访问会话内的点击数/停留时长、按会话做转化漏斗。
示例:同一用户 10s 内没有新事件就认为会话结束。
  1. INSERT INTO ads_session_metrics_print
  2. SELECT
  3.   window_start,
  4.   window_end,
  5.   user_id,
  6.   COUNT(*) AS click_cnt
  7. FROM TABLE(
  8.   SESSION(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND)
  9. )
  10. GROUP BY window_start, window_end, user_id;
复制代码
推送数据(最简单:控制台直接粘贴 JSON,一行一条)
  1. $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log
复制代码
往 topic dwd_click_log 推送数据(控制台直接粘贴 JSON,一行一条)
  1. {"user_id":"u01","item_id":"i01","category_id":"c01","ts":1775143687285}
  2. {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1775143688285}
  3. {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1775143689285}
复制代码
到 Flink Web UI → TaskManagers → Stdout 查看输出:
3.png

要点:

  • SESSION 窗口边界不固定,会因为迟到数据发生“合并”,下游会看到更新/撤回更频繁
  • 如果你的下游只接受 Append(只插入不更新),SESSION 往往不合适,除非你引入可更新的 sink(Upsert)
4. TopN:把窗口聚合变成实时榜单

TopN 的正确打开方式是“两段式”:

  • 先做窗口聚合得到每个候选项的指标(比如每个商品的点击数)
  • 再在聚合结果上做排序,取前 N
4.1 窗口内 TopN:每个窗口的热榜 Top3

需求:每 10 秒统计一次“各品类内点击 Top3 商品”。
[code]INSERT INTO ads_topn_printWITH item_cnt AS (  SELECT    window_start,    window_end,    category_id,    item_id,    COUNT(*) AS cnt  FROM TABLE(    TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND)  )  GROUP BY window_start, window_end, category_id, item_id),ranked AS (  SELECT    window_start,    window_end,    category_id,    item_id,    cnt,    ROW_NUMBER() OVER (      PARTITION BY window_start, window_end, category_id      ORDER BY cnt DESC, item_id    ) AS rn  FROM item_cnt)SELECT  window_start,  window_end,  category_id,  item_id,  cnt,  rnFROM rankedWHERE rn

相关推荐

您需要登录后才可以回帖 登录 | 立即注册