找回密码
 立即注册
首页 业界区 业界 从零开始学Flink:Flink SQL 元数据持久化实战 ...

从零开始学Flink:Flink SQL 元数据持久化实战

喜及眩 前天 16:40
在上一篇 《从零开始学Flink:实时数仓与维表时态Join实战》 中,我们通过「订单事实流 + 用户维表」构建了一条基础的实时数仓链路。
但在实际操作 Flink SQL Client 时,你可能已经痛感到了一个问题:
痛点:会话窗口一旦关闭,或者 Flink 集群重启,辛辛苦苦编写的 CREATE TABLE、CREATE VIEW 等 DDL 语句瞬间“归零”。每次调试都需要从头再来,重复建表。
本文将带你彻底解决这个“元数据无法持久化”的顽疾,实现:

  • DDL 元数据持久化:告别重复建表,重启无忧。
  • 全局共享:多个 Flink 作业、多个 SQL Client 会话共享同一套表结构、视图和函数
  • 全 Connector 支持:无论是 KafkaJDBC 还是 FileSystem,其元数据均可统一管理。
实现这一目标的核心利器,正是:Hive Catalog
一、为什么要实现 Flink SQL 的 DDL 持久化?

在生产实践中,缺乏元数据持久化会带来两个典型的痛点:

  • 重复劳动:SQL Client 每次重启都需要重新执行 CREATE TABLE 语句。
  • 维护困难:同一个业务表被多个作业使用时,每个人都需复制一份 DDL。一旦表结构变更,所有作业的 DDL 都需要手动同步,极易导致不一致。
问题的根源在于:默认情况下,Flink 将表结构、视图等元数据存储在 内存 Catalog (GenericInMemoryCatalog) 中。

  • 生命周期短:仅与当前 Session 会话绑定。
  • 易丢失:作业停止或 SQL Client 退出后,元数据即被销毁。
在生产环境中,我们需要构建一套稳定的“元数据中心”,以实现:

  • 持久化存储:长久保存数据库、表、视图、函数等 DDL 信息。
  • 复用性:支持跨作业、跨会话复用同一套元数据。
  • 统一治理:支持元数据的统一变更与管理。
Hive Catalog 便是 Flink 生态中目前最成熟、最通用的解决方案:

  • 生产标准:无缝对接大多数大数据平台已有的 Hive Metastore。
  • 通用性强:不仅支持 Hive 表,还能存储 Kafka、MySQL、HBase 等任意 Flink 表的元数据。
  • 生态兼容:便于与 Spark、Hive 等其他计算引擎共享元数据,打通数据孤岛。
二、深入理解 Catalog 与 Hive Catalog

在开始实战前,我们需要厘清几个核心概念。

  • Catalog (目录):Flink 的“元数据命名空间”。

    • 它负责管理 数据库 (Database)表 (Table)视图 (View)函数 (Function)
    • 一个 Flink 作业可以注册多个 Catalog,例如:default_catalog、my_hive、my_jdbc,并根据需要进行切换。

  • Catalog 的分类

    • 内存 Catalog (GenericInMemoryCatalog):Flink 默认使用,元数据保存在内存中,重启即失。
    • 持久化 Catalog:将元数据持久化到外部系统(如 Hive Metastore、MySQL 等)。

Hive Catalog 的核心特性

  • 存储后端:直接复用 Hive Metastore (HMS) 作为元数据存储。
  • 自动映射:Flink 会将 DDL 解析后的元数据(表名、字段类型、属性配置)自动写入 Hive Metastore。
  • 透明使用:对开发者而言,仅需通过 USE CATALOG 切换上下文,即可享受持久化服务。
一句话总结
只要配置了 Hive Catalog,Flink 创建的 Kafka、MySQL 等任意类型的表结构都能被自动保存到 Hive Metastore 中。下次启动时直接读取,无需重建。
三、环境准备:部署 Hive Metastore

使用 Hive Catalog 的前提是拥有一个可用的 Hive Metastore (HMS) 服务。
1. 实战:基于 Docker 快速部署 HMS (连接宿主机 MySQL)

本节将演示如何在 WSL2/Linux 环境下,通过 Docker 部署 Hive Metastore,并使其连接宿主机 (Windows/WSL2) 的 MySQL 来存储元数据。
1)MySQL 准备工作
首先,请在宿主机 MySQL 中执行以下 SQL,完成数据库初始化及权限配置:
  1. DROP DATABASE IF EXISTS metastore;
  2. CREATE DATABASE metastore CHARACTER SET latin1;
  3. CREATE USER 'hive'@'%' IDENTIFIED BY 'hive_123';
  4. GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%';
  5. FLUSH PRIVILEGES;
复制代码
然后,创建必要的目录并下载 MySQL 驱动。
关键步骤:提取容器默认配置与依赖
为了便于后续配置修改与 Jar 包管理,我们需要将容器内的 conf 和 lib 目录挂载到本地。由于本地新建的目录是空的,直接挂载会导致容器内原有文件不可见,因此必须先将镜像内的文件复制到本地
  1. # 1. 创建本地工作目录
  2. mkdir -p flink-hive-metastore/data
  3. mkdir -p flink-hive-metastore/lib
  4. mkdir -p flink-hive-metastore/conf
  5. cd flink-hive-metastore
  6. # 2. 启动一个临时容器
  7. docker run -d --name temp-hive apache/hive:3.1.3
  8. # 3. 把容器里的 conf 和 lib 拷贝到本地(这一步很重要!)
  9. docker cp temp-hive:/opt/hive/data/. ./data
  10. docker cp temp-hive:/opt/hive/conf/. ./conf
  11. docker cp temp-hive:/opt/hive/lib/. ./lib
  12. # 4. 删除临时容器
  13. docker rm -f temp-hive
  14. # 5. 下载 MySQL 驱动到本地 lib 目录
  15. # (请自行下载 mysql-connector-java-8.0.28.jar 放入 ./lib)
复制代码
2)编写配置文件 conf/hive-site.xml
此时,本地 ./conf 目录下应包含 log4j.properties 等默认文件。我们需要在此目录下新建(或覆盖)hive-site.xml,配置 Metastore 连接 MySQL 的参数:
  1. <configuration>
  2.     <property>
  3.         <name>javax.jdo.option.ConnectionURL</name>
  4.         <value>jdbc:mysql://host.docker.internal:3306/metastore?useSSL=false&allowPublicKeyRetrieval=true</value>
  5.     </property>
  6.     <property>
  7.         <name>javax.jdo.option.ConnectionDriverName</name>
  8.         <value>com.mysql.cj.jdbc.Driver</value>
  9.     </property>
  10.     <property>
  11.         <name>javax.jdo.option.ConnectionUserName</name>
  12.         <value>hive</value>
  13.     </property>
  14.     <property>
  15.         <name>javax.jdo.option.ConnectionPassword</name>
  16.         <value>hive_123</value>
  17.     </property>
  18.     <property>
  19.         <name>hive.metastore.uris</name>
  20.         <value>thrift://0.0.0.0:9083</value>
  21.     </property>
  22.     <property>
  23.         <name>hive.metastore.schema.verification</name>
  24.         <value>false</value>
  25.     </property>
  26.     <property>
  27.         <name>datanucleus.schema.autoCreateAll</name>
  28.         <value>true</value>
  29.     </property>
  30. </configuration>
复制代码
3)编写 docker-compose.yml
关键点:挂载配置文件,并配置 extra_hosts 以支持容器连接宿主机。
  1. version: '3'
  2. services:
  3.   metastore:
  4.     image: apache/hive:3.1.3
  5.     container_name: metastore
  6.     ports:
  7.       - "9083:9083"
  8.     environment:
  9.       SERVICE_NAME: metastore
  10.       DB_DRIVER: mysql
  11.     volumes:
  12.       - ./data/warehouse:/opt/hive/data/warehouse
  13.       - ./lib:/opt/hive/lib   
  14.       - ./conf:/opt/hive/conf
  15.    
  16.     # 重点配置:让 Linux/WSL2 下的容器能解析 host.docker.internal
  17.     extra_hosts:
  18.       - "host.docker.internal:host-gateway"
复制代码
配置详解:

  • extra_hosts:Docker Compose 的标准网络配置项,用于在 /etc/hosts 中添加记录。
  • host-gateway:Docker 的特殊关键字,自动解析为宿主机的网关 IP(通常是 172.17.0.1)。
  • 通信原理:通过该配置,容器内部访问 host.docker.internal 时,流量会被正确转发到 WSL2 宿主机的网关,进而访问到监听在 0.0.0.0 的 MySQL 服务。
4)启动服务与故障排查
步骤 1:清理旧容器(强烈建议)
由于涉及配置文件挂载与网络变更,如果之前运行过同名容器(无论成功与否),建议先彻底清理:
  1. docker-compose down
复制代码
步骤 2:启动并实时查看日志
启动后请立刻查看日志,这是发现问题的唯一办法:
  1. docker-compose up -d
  2. docker logs -f --tail 100 metastore
复制代码
常见启动报错排查:

  • 关于重启报错
    该镜像默认每次启动都会尝试初始化 Schema。如果 MySQL 里已有表,重建容器时会报错。
    解决方案

    • 如果只是重启服务,请使用 docker restart metastore(不要 down)。
    • 如果要重建容器(如修改配置),请确保 MySQL 里的 metastore 库是干净的(Drop 后重建)。

  • MySQL 驱动没找到 (ClassNotFoundException)

    • 现象:日志报错 java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver。
    • 原因:宿主机 lib 目录下没有 jar 包,或者 Docker 把它挂载成了空目录。
    • 解决:检查文件是否存在,执行 docker-compose down 后重试。

等待日志无报错且显示 Starting Hive Metastore Server 后,说明启动成功。
四、实战:在 Flink SQL Client 中配置 Hive Catalog

接下来,我们将通过 Flink SQL Client 体验元数据持久化的全过程。
1. 准备依赖 Jar 包

集成 Hive Catalog 需要两类核心依赖:Flink Hive ConnectorHadoop 基础库
1)Flink Hive Connector
请根据你的 Flink 和 Hive 版本选择对应的 Jar 包。以 Flink 1.20.1 和 Hive 3.1.3 为例:
  1. cd $FLINK_HOME/lib
  2. wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.20.1/flink-sql-connector-hive-3.1.3_2.12-1.20.1.jar
复制代码
2)Hadoop 依赖(必选)
Flink 发行包默认不包含 Hadoop 依赖。注意:即使仅连接 Hive Metastore 而不读写 HDFS,底层通信仍依赖 Hadoop 基础库。

  • 方案 A(推荐):如果你机器上装了 Hadoop,配置环境变量即可:
    1. export HADOOP_CLASSPATH=`hadoop classpath`
    复制代码
  • 方案 B(简单):如果没有 Hadoop 环境,直接去 Maven 下载 hadoop-client-api 和 hadoop-client-runtime(Hadoop 3.3.4):
    1. wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar
    2. wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar
    3. wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar
    4. wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
    复制代码
注意:版本兼容性非常重要。请务必根据你的 Flink 版本和 Hive 版本去官网查看对应的依赖列表。
2. 准备客户端配置文件

在启动 SQL Client 之前,我们需要准备一个专门给 Flink 客户端用的 hive-site.xml。
为什么不能复用容器的配置?
因为容器内的配置是给 Metastore 服务端用的,通常没有配置 Thrift 地址(或者配的是 0.0.0.0),而客户端需要明确知道去连接 localhost:9083。此外,Windows 本地还需要指定一个合法的 Warehouse 路径。

  • 新建目录:在项目根目录下新建一个 conf-client 文件夹(不要混用容器的 conf 目录)。
  • 创建文件:在里面新建 hive-site.xml,内容如下:
  1. <configuration>
  2.     <property>
  3.         <name>hive.metastore.uris</name>
  4.         <value>thrift://localhost:9083</value>
  5.     </property>
  6.     <property>
  7.         <name>hive.metastore.warehouse.dir</name>
  8.         
  9.         <value>file:///opt/flink-hive-metastore/data/warehouse</value>
  10.     </property>
  11. </configuration>
复制代码
(注意:请确保该目录真实存在且当前用户有写权限。)
3. 启动 SQL Client 并注册 Catalog

依赖与配置准备就绪后,启动 Flink SQL Client:
  1. bin/sql-client.sh embedded
复制代码
进入交互式命令行后,执行以下 SQL 语句注册 Hive Catalog:
  1. CREATE CATALOG myhive WITH (
  2.     'type' = 'hive',
  3.     'default-database' = 'default',
  4.     'hive-conf-dir' = '/opt/flink-hive-metastore/conf-client'
  5. );
复制代码
验证注册状态:
  1. SHOW CATALOGS;
复制代码
如果返回结果中包含 myhive,说明注册成功。
4. 切换当前 Catalog

关键步骤:切换上下文
在执行建表语句前,务必确认当前 Session 已切换至 myhive Catalog。否则,表结构仍会被创建在默认的内存 Catalog 中,无法持久化。
  1. USE CATALOG myhive;
  2. SHOW CURRENT CATALOG;
复制代码
此时,你已经进入了 Hive Catalog 的世界。所有创建的表都会持久化保存。
五、实战示例:在 Hive Catalog 中创建 Kafka 源表

Hive Catalog 的强大之处在于:它不仅仅支持 Hive 表,还能存储任意 Connector(如 Kafka、MySQL)的元数据!
下面我们尝试创建之前的 Kafka orders 和 payments 表:
  1. CREATE DATABASE IF NOT EXISTS ods ;
  2. USE ods;
  3. CREATE TABLE orders (
  4.   order_id     STRING,
  5.   user_id      STRING,
  6.   order_amount DECIMAL(10, 2),
  7.   order_time   TIMESTAMP_LTZ(3),
  8.   WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
  9. ) WITH (
  10.   'connector' = 'kafka',
  11.   'topic' = 'orders',
  12.   'properties.bootstrap.servers' = '127.0.0.1:9092',
  13.   'properties.group.id' = 'flink-orders',
  14.   'scan.startup.mode' = 'earliest-offset',
  15.   'format' = 'json',
  16.   'json.timestamp-format.standard' = 'ISO-8601'
  17. );
  18. CREATE TABLE payments (
  19.   pay_id     STRING,
  20.   order_id   STRING,
  21.   pay_amount DECIMAL(10, 2),
  22.   pay_time   TIMESTAMP_LTZ(3),
  23.   WATERMARK FOR pay_time AS pay_time - INTERVAL '5' SECOND
  24. ) WITH (
  25.   'connector' = 'kafka',
  26.   'topic' = 'payments',
  27.   'properties.bootstrap.servers' = '127.0.0.1:9092',
  28.   'properties.group.id' = 'flink-payments',
  29.   'scan.startup.mode' = 'earliest-offset',
  30.   'format' = 'json',
  31.   'json.timestamp-format.standard' = 'ISO-8601'
  32. );
复制代码
执行成功!
尽管这是 Kafka 表,但 Flink 会将其 DDL 信息(包含 connector=kafka、topic 等属性)序列化后,存储到 Hive Metastore 的 TBLS 和 TABLE_PARAMS 等系统表中。
六、验证:DDL 持久化效果

1. 重启 SQL Client 验证

让我们模拟一次会话失效的场景:

  • 退出当前 Flink SQL Client:
  1. QUIT;
复制代码

  • 重启 Flink SQL Client :
  1. bin/sql-client.sh embedded
复制代码

  • 注册 Hive Catalog :
  1. CREATE CATALOG myhive WITH (
  2.     'type' = 'hive',
  3.     'default-database' = 'default',
  4.     'hive-conf-dir' = '/opt/flink-hive-metastore/conf-client'
  5. );
  6. USE CATALOG myhive;
  7. USE ods;
  8. SHOW TABLES;
  9. DESCRIBE orders;
  10. DESCRIBE payments;
复制代码
结果验证:你会发现 orders 和 payments 表依然存在!

2. 在 Hive CLI 中验证(可选)

如果你通过 Hive CLI 连接 Metastore,执行 use ods; show tables;,同样可以看到 orders 和 payments 表。
注意:在 Hive 视图中,这些表通常被标记为 MANAGED_TABLE 或 EXTERNAL_TABLE。虽然元数据可见,但 Hive 引擎本身无法直接查询这些 Kafka 表的数据(除非额外配置了 Hive-Kafka Handler)。对 Flink 而言,它们就是标准的、持久化的 Kafka 表。
七、辨析:Hive Catalog vs Hive Connector

这是初学者最容易混淆的两个概念:

  • Hive Catalog:侧重于 元数据管理。它的作用是让 Flink 能够访问和存储 Hive Metastore 中的表结构信息。无论表的底层是 Hive、Kafka 还是 MySQL,只要元数据存在 HMS 里,就需要用到 Hive Catalog。
  • Hive Connector:侧重于 数据读写。它的作用是让 Flink 能够读写 Hive 表(即 HDFS 上的文件数据)。
结合本文示例

  • 我们使用了 Hive Catalog 来存储 orders 和 payments 的 DDL 元数据。
  • 但这两张表的 connector 属性是 kafka,表示它们的数据流向 Kafka。
  • 只有当我们创建一个 connector 属性为 hive 的表时,才是真正利用 Hive Connector 读写 Hive 数据。
八、常见问题与避坑指南

1)依赖冲突问题
Flink 与 Hive 集成时,Jar 包冲突是常见痛点。

  • 建议:优先使用 Flink 官方提供的 flink-sql-connector-hive(Shaded 包),它封装了大部分依赖,减少冲突。
  • 检查:确保 Hadoop 基础依赖(hadoop-client-runtime, hadoop-client-api)已正确加载。
2)Hive 版本匹配
版本兼容性至关重要。Flink 的 Hive Connector 版本必须严格匹配 Hive Metastore 的服务端版本(1.x, 2.x, 3.x)。
3)为什么不能用 JDBC Catalog 存储 Kafka 表元数据?
这是一个常见的误区。

  • JDBC Catalog:用于映射物理表。它要求远程数据库(如 MySQL)中必须存在一张真实的物理表。标准的关系型数据库表结构无法直接存储 topic、bootstrap.servers 等 Flink 特有的配置参数。
  • Hive Catalog:Hive Metastore 的设计包含 TABLE_PARAMS 键值对结构,天然适合存储任意扩展属性。Flink 正是利用这一特性,将 Kafka 等 Connector 的配置序列化存储,从而实现了通用的元数据管理。
九、总结

通过引入 Hive Catalog,我们成功实现了:

  • DDL 持久化:彻底告别“重启即丢”的尴尬,保障元数据安全。
  • 全能元数据支持:统一管理 Kafka、JDBC、HBase 等各类 Connector 的表结构。
  • 跨作业共享:实现 SQL Client 建表、Java/Scala 作业复用,打破作业间的元数据壁垒。
Hive Catalog 是构建 实时数仓 (Real-time Data Warehouse) 不可或缺的基础设施。掌握这一步,你的 Flink 开发效率与生产级实践能力将迈上一个新的台阶!
原文来自:http://blog.daimajiangxin.com.cn

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册