在上一篇 《从零开始学Flink:Flink SQL四大Join解析》结尾提到过,下一篇要把 窗口聚合(Window Aggregation)与 TopN 讲清楚。窗口负责把无界流切成可统计的时间片,TopN 负责把“统计结果”变成榜单输出;两者组合起来,PV/UV、订单量、热销榜、实时大屏基本都能覆盖。
但这块也最容易踩坑:SQL 明明在跑却一直没输出、TopN 结果频繁更新/撤回下游写不进去、滑动窗口一上来状态就撑爆。本文直接用可复现的 Kafka 数据流把这些问题跑出来,并给出对应的处理方式。
本文基于 Flink 1.20+,用 SQL Client 直接在本地 standalone 集群验证。你可以把文中的 SQL 原样复制过去跑通,再按自己的业务把窗口粒度、乱序容忍、下游写入方式替换掉。
0. 环境准备:用 SQL Client 直接跑起来
为了把注意力放在 SQL 本身,本文用 Kafka 做数据源:手动往 Topic 推送点击行为数据,用 print 在 TaskManager Stdout 里观察结果。
使用前请确认 Flink 已加载 Kafka SQL Connector(把 flink-sql-connector-kafka-*.jar 放到 $FLINK_HOME/lib 并重启集群/SQL Client)。
先把下面几个参数设好,后面跑窗口/TopN 时更容易看到输出:- -- 1) 避免 source 空闲导致 watermark 不推进,从而窗口一直不触发
- SET 'table.exec.source.idle-timeout' = '5s';
- -- 2) 让窗口/TopN 的结果更“及时”(更快看到输出)
- SET 'execution.checkpointing.interval' = '10s';
- -- 3) 以流模式运行(源是无界),持续刷到 SQL Client
- SET 'execution.runtime-mode' = 'streaming';
- -- 4) 开启 changelog 模式,使窗口/TopN 的结果更“及时”(更快看到输出)
- SET 'sql-client.execution.result-mode' = 'changelog';
复制代码 接着创建一张点击行为表(事件时间 + Watermark):- CREATE TABLE dwd_click_log (
- user_id STRING,
- item_id STRING,
- category_id STRING,
- ts BIGINT,
- event_time AS TO_TIMESTAMP_LTZ(ts, 3),
- WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'dwd_click_log',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'properties.group.id' = 'flink-sql-dwd-click-log',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'json',
- 'json.ignore-parse-errors' = 'true'
- );
复制代码 先准备 Kafka Topic:- $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic dwd_click_log --partitions 1 --replication-factor 1
复制代码 再创建几个 print sink 用来观察输出:- CREATE TABLE ads_window_metrics_print (
- window_start TIMESTAMP_LTZ(3),
- window_end TIMESTAMP_LTZ(3),
- pv BIGINT,
- uv BIGINT
- ) WITH ('connector' = 'print',
- 'print-identifier' = 'ads_window_metrics_print'
- );
- CREATE TABLE ads_session_metrics_print (
- window_start TIMESTAMP_LTZ(3),
- window_end TIMESTAMP_LTZ(3),
- user_id STRING,
- click_cnt BIGINT
- ) WITH ('connector' = 'print',
- 'print-identifier' = 'ads_session_metrics_print'
- );
- CREATE TABLE ads_topn_print (
- window_start TIMESTAMP_LTZ(3),
- window_end TIMESTAMP_LTZ(3),
- category_id STRING,
- item_id STRING,
- cnt BIGINT,
- rn BIGINT
- ) WITH ('connector' = 'print',
- 'print-identifier' = 'ads_topn_print'
- );
复制代码 1. 窗口聚合基础:你到底在对“哪段时间”做统计
在 Flink SQL 里,窗口的本质是:把无界流切成一个个“有限集合”,再在集合上做 GROUP BY 聚合。
窗口统计能否输出,核心取决于两件事:
- 你选的是 Processing Time 还是 Event Time
- Event Time 场景下,Watermark 是否在推进(决定窗口是否“关窗”)
本文以 Event Time 为主,因为绝大多数实时数仓指标都需要“按业务发生时间统计”,而不是“按处理到达时间统计”。
2. Window TVF:Flink SQL 窗口的主流写法
Flink 早期有 GROUP BY TUMBLE(...) 这类 Group Window 语法,新版本更推荐 Window TVF(Table Valued Function),它的输出会直接带上 window_start/window_end/window_time 字段,更清晰,也更容易与 TopN/Join 组合。
2.1 滚动窗口(TUMBLE):每条数据只属于一个窗口
典型场景:按分钟/小时统计 PV、UV、GMV。- INSERT INTO ads_window_metrics_print
- SELECT
- window_start,
- window_end,
- COUNT(*) AS pv,
- COUNT(DISTINCT user_id) AS uv
- FROM TABLE(
- TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '1' SECOND)
- )
- GROUP BY window_start, window_end;
复制代码 推送数据(最简单:控制台直接粘贴 JSON,一行一条)- $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log
复制代码 粘贴下面数据(ts 用毫秒时间戳;为了让事件时间窗口及时“关窗”,建议 ts 单调递增,或者最后补一条明显更大的 ts 用来推进 Watermark):- {"user_id":"u01","item_id":"i01","category_id":"c01","ts":1774454400000}
- {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1774454402000}
- {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1774454403000}
- {"user_id":"u03","item_id":"i04","category_id":"c02","ts":1774454404000}
- {"user_id":"u04","item_id":"i01","category_id":"c01","ts":1774454405000}
复制代码 到 Flink Web UI → TaskManagers → Stdout 查看输出:
要点:
- TUMBLE 适合“报表型”指标,窗口不重叠,状态相对可控
- COUNT(DISTINCT ...) 会引入去重状态,用户数大时要关注状态体积(生产中可考虑用近似去重或分层聚合)
2.2 滑动窗口(HOP):一条数据会被“复制”到多个窗口
典型场景:最近 5 分钟滚动 UV、最近 1 小时成交额每 5 分钟刷新一次。
示例:窗口长度 30s,每 10s 滑动一次。- INSERT INTO ads_window_metrics_print
- SELECT
- window_start,
- window_end,
- COUNT(*) AS pv,
- COUNT(DISTINCT user_id) AS uv
- FROM TABLE(
- HOP(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND, INTERVAL '30' SECOND)
- )
- GROUP BY window_start, window_end;
复制代码 到 Flink Web UI → TaskManagers → Stdout 查看输出:
要点:
- HOP 的状态压力通常显著高于 TUMBLE,因为数据会进入多个窗口
- 业务上能用 TUMBLE 不用 HOP;必须用 HOP 时,尽量降低窗口长度或放大 slide(减少并行窗口数)
2.3 会话窗口(SESSION):按“事件间隔”自动切窗
典型场景:统计用户一次访问会话内的点击数/停留时长、按会话做转化漏斗。
示例:同一用户 10s 内没有新事件就认为会话结束。- INSERT INTO ads_session_metrics_print
- SELECT
- window_start,
- window_end,
- user_id,
- COUNT(*) AS click_cnt
- FROM TABLE(
- SESSION(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND)
- )
- GROUP BY window_start, window_end, user_id;
复制代码 推送数据(最简单:控制台直接粘贴 JSON,一行一条)- $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log
复制代码 往 topic dwd_click_log 推送数据(控制台直接粘贴 JSON,一行一条)- {"user_id":"u01","item_id":"i01","category_id":"c01","ts":1775143687285}
- {"user_id":"u02","item_id":"i02","category_id":"c01","ts":1775143688285}
- {"user_id":"u01","item_id":"i03","category_id":"c02","ts":1775143689285}
复制代码 到 Flink Web UI → TaskManagers → Stdout 查看输出:
要点:
- SESSION 窗口边界不固定,会因为迟到数据发生“合并”,下游会看到更新/撤回更频繁
- 如果你的下游只接受 Append(只插入不更新),SESSION 往往不合适,除非你引入可更新的 sink(Upsert)
4. TopN:把窗口聚合变成实时榜单
TopN 的正确打开方式是“两段式”:
- 先做窗口聚合得到每个候选项的指标(比如每个商品的点击数)
- 再在聚合结果上做排序,取前 N
4.1 窗口内 TopN:每个窗口的热榜 Top3
需求:每 10 秒统计一次“各品类内点击 Top3 商品”。
[code]INSERT INTO ads_topn_printWITH item_cnt AS ( SELECT window_start, window_end, category_id, item_id, COUNT(*) AS cnt FROM TABLE( TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND) ) GROUP BY window_start, window_end, category_id, item_id),ranked AS ( SELECT window_start, window_end, category_id, item_id, cnt, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end, category_id ORDER BY cnt DESC, item_id ) AS rn FROM item_cnt)SELECT window_start, window_end, category_id, item_id, cnt, rnFROM rankedWHERE rn |