找回密码
 立即注册
首页 业界区 安全 18、Flink CDC监听MySQL-Binlog实现数据监听

18、Flink CDC监听MySQL-Binlog实现数据监听

篁瞑普 9 小时前
一、CDC简介:

CDC(Change Data Capture)是变更数据捕获的简称,其核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新、删除等),将这些变更按发生的顺序完整记录下来,并写入到消息中间件或数据仓库中以供其他服务进行订阅及消费。CDC技术广泛应用于数据同步、数据分发、数据采集等场景,是数据集成领域的重要工具。
1、CDC常用工具:

CDC工具

Debezium

Canal

Maxwell

Flink CDC

核心定位
多数据源 CDC 框架
轻量 MySQL 同步工具
MySQL 专属极简工具
实时处理一体化框架
支持数据源
MySQL、PostgreSQL、Oracle 等
MySQL(最佳)、PostgreSQL 等
仅 MySQL
MySQL、PostgreSQL 等(基于 Debezium)
典型输出目标
Kafka、Flink/Spark
Kafka、RocketMQ、数据库
Kafka、Redis、文件
Kafka、ES、Hive 等
突出优势
支持广泛,全量 + 增量同步
部署简单,国内生态适配好
配置极简,资源占用低
支持实时处理,Exactly-Once 语义
适用场景
多源同步、复杂数据管道
MySQL 为主的轻量同步
简单 MySQL 变更同步
实时数仓、捕获 + 处理一体化
2、相关参考:

Flink-CDC 开源地址
Flink-CDC 中文文档
Canal学习笔记
 
二、Flink CDC工作原理:

Flink CDC(Change Data Capture)的核心工作原理是通过捕获数据库的变更日志(如 binlog、WAL 等),将其转换为结构化事件流,接入 Flink 实时计算引擎进行处理,并最终同步到目标系统。其工作流程可拆解为以下关键步骤:
1、Debezium 捕获解析数据库日志:

Flink CDC 本身不直接解析数据库日志,而是集成 Debezium(开源 CDC 框架)作为底层捕获引擎,支持 MySQL、PostgreSQL、Oracle 等多种数据库,具体逻辑如下:
(1)、模拟从节点获取日志:

  • 对于支持主从复制的数据库(如 MySQL),Debezium 会伪装成数据库的从节点,向主库发送复制请求,获取变更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日志为结构化事件:
(3)数据库日志通常是二进制格式,Debezium 会将其解析为包含详细信息的结构化事件,包括:

  • 操作类型(INSERT/UPDATE/DELETE);
  • 变更数据(UPDATE 时包含旧值和新值,INSERT/DELETE 包含对应数据);
  • 表名、数据库名、操作时间戳等元数据。
2. 全量 + 增量同步(无锁机制):

Flink CDC 支持 “全量数据初始化 + 增量变更同步” 的无缝衔接,且通过无锁机制避免影响源库性能:
(1)、全量快照阶段:

  • 首次同步时,会对数据库表进行一次全量快照(读取当前所有数据),确保初始数据完整。
(2)、增量同步阶段:

  • 快照完成后,自动切换到增量同步模式,通过监控 binlog 等日志获取实时变更,且通过记录日志位置(如 binlog 的文件名和偏移量)保证全量与增量数据的连续性(无重复、无遗漏)。
3、 封装为 Flink Source 流入引擎:

解析后的结构化事件会被封装为 Flink Source 连接器,直接作为 Flink 的输入流:
(1)、变更事件以流的形式进入 Flink 计算引擎,每条事件对应一条数据记录,可通过 Flink 的 DataStream API 或 Table/SQL 进行处理。
4、Flink实时数据处理:

Flink CDC 不仅是 “捕获工具”,更能结合 Flink 的实时计算能力对变更数据进行处理:
(1)、数据清洗与转换:

  • 过滤无效数据、格式转换(如 JSON 转 Avro)、字段映射等。
(2)、关联与聚合:

  • 支持与维度表(如 MySQL 维表、HBase 维表)关联,或进行窗口聚合(如统计分钟级变更量)。
(3)、状态管理:

  • 利用 Flink 的状态后端(如 RocksDB)保存中间结果,支持复杂逻辑(如去重、累计计算)。
5、基于 Checkpoint 保证一致性:

Flink CDC 依赖 Flink 的 Checkpoint 机制 确保数据处理的一致性:
(1)、Checkpoint 触发:

  • 定期将当前处理进度(包括 CDC 捕获的日志位置、算子状态等)持久化到存储(如 HDFS、本地文件)。
(2)、故障恢复:

  • 若 Flink 任务失败,可从最近一次 Checkpoint 恢复状态和日志位置,保证数据不丢失、不重复,实现 Exactly-Once 语义(端到端一致性需下游 Sink 配合支持)。
6. 通过 Sink 同步到目标系统:

处理后的变更数据通过 Flink 的 Sink 连接器 写入目标存储,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多种系统。
 
三、MySQL 配置(开启 Binlog):

1、开启 Binlog(ROW 模式):
  1. # MySQL 配置文件
  2. # Linux:my.cnf配置文件(/etc/mysql/)
  3. # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
  4. # 开启 Binlog
  5. log_bin = mysql-bin
  6. # 选择 ROW 模式(记录行级变更)
  7. binlog-format = ROW
  8. # 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
  9. server-id = 1
复制代码
1.png

2.png

2、重启 MySQL 并验证:
  1. # 打开命令提示符(cmd/services.msc):
  2. # 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
  3. # 停止MySQL服务:
  4. net stop MySQL57
  5. # 启动MySQL服务:
  6. net start MySQL57
  7. # 验证
  8. SHOW VARIABLES LIKE 'log_bin';
  9. SHOW VARIABLES LIKE 'binlog_format';
复制代码
3.png

 
四、SpringBoot整合Flink CDC实现MySQL数据监听:

1、POM配置:
  1.         
  2.         <dependency>
  3.             <groupId>org.apache.flink</groupId>
  4.             flink-java</artifactId>
  5.             <version>1.18.1</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>org.apache.flink</groupId>
  9.             flink-streaming-java</artifactId>
  10.             <version>1.18.1</version>
  11.         </dependency>
  12.         <dependency>
  13.             <groupId>org.apache.flink</groupId>
  14.             flink-clients</artifactId>
  15.             <version>1.18.1</version>
  16.         </dependency>
  17.         
  18.         <dependency>
  19.             <groupId>org.apache.flink</groupId>
  20.             flink-table-api-java-bridge</artifactId>
  21.             <version>1.18.1</version>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.flink</groupId>
  25.             flink-table-planner_2.12</artifactId>
  26.             <version>1.18.1</version>
  27.         </dependency>
  28.         
  29.         <dependency>
  30.             <groupId>org.apache.flink</groupId>
  31.             flink-connector-base</artifactId>
  32.             <version>1.18.1</version>
  33.         </dependency>
  34.         
  35.         <dependency>
  36.             <groupId>com.ververica</groupId>
  37.             flink-connector-mysql-cdc</artifactId>
  38.             <version>3.0.1</version>
  39.         </dependency>
复制代码
2、YML配置:
  1. flink:
  2.   cdc:
  3.     # 是否开启CDC监听
  4.     auto-start: true
  5.     # 自定义一个唯一的id
  6.     server-id: "123456"
  7.     # 数据库配置
  8.     mysql:
  9.       hostname: localhost
  10.       port: 3306
  11.       username: root
  12.       password: 123
复制代码
3、Entity类声明:

DataChangeType.class
  1. /**
  2. * Flink CDC数据变更类型枚举
  3. * 1、"c"表示创建
  4. * 2、"u"表示更新
  5. * 3、"d"表示删除
  6. * 4、"r"表示读取
  7. */
  8. public enum DataChangeType {
  9.     INSERT("c"),
  10.     UPDATE("u"),
  11.     DELETE("d"),
  12.     READ("r");
  13.     private final String code;
  14.     DataChangeType(String code) {
  15.         this.code = code;
  16.     }
  17.     public static DataChangeType getByCode(String code) {
  18.         for (DataChangeType type : values()) {
  19.             if (type.code.equals(code)) {
  20.                 return type;
  21.             }
  22.         }
  23.         return null;
  24.     }
  25. }
复制代码
FlinkCdcProperties.class
  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.stereotype.Component;
  4. @Data
  5. @Component
  6. @ConfigurationProperties(prefix = "flink.cdc")
  7. public class FlinkCdcProperties {
  8.     /**
  9.      * 是否自动启动CDC监听
  10.      */
  11.     private boolean autoStart;
  12.     private String serverId;
  13.     /**
  14.      * MySQL配置
  15.      */
  16.     private Mysql mysql = new Mysql();
  17.     @Data
  18.     public static class Mysql {
  19.         private String hostname;
  20.         private int port;
  21.         private String username;
  22.         private String password;
  23.     }
  24. }
复制代码
4、FlinkCdcRunner数据变更监听启动器:
  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.iven.flinkcdcdemoservice.entity.DataChangeType;
  4. import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
  5. import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
  6. import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
  7. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  8. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  9. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  10. import lombok.RequiredArgsConstructor;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  13. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  14. import org.apache.flink.api.common.time.Time;
  15. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  16. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  17. import org.apache.flink.streaming.api.functions.ProcessFunction;
  18. import org.apache.flink.util.Collector;
  19. import org.springframework.boot.CommandLineRunner;
  20. import org.springframework.stereotype.Component;
  21. import java.io.Serializable;
  22. import java.util.*;
  23. @Slf4j
  24. @Component
  25. @RequiredArgsConstructor
  26. public class FlinkCdcRunner implements CommandLineRunner {
  27.     // 配置属性
  28.     private final FlinkCdcProperties properties;
  29.     // 处理器注册中心
  30.     private final FlinkCdcHandlerRegistry handlerRegistry;
  31.     @Override
  32.     public void run(String... args) throws Exception {
  33.         // 总开关关闭则不启动
  34.         if (!properties.isAutoStart()) {
  35.             log.info("Flink CDC 总开关关闭,不启动监听");
  36.             return;
  37.         }
  38.         // 没有需要监听的表则不启动
  39.         List<String> monitoredTables = handlerRegistry.getMonitoredTables();
  40.         if (monitoredTables.isEmpty()) {
  41.             log.warn("未发现需要监听的表(未实现FlinkCdcTableHandler),不启动监听");
  42.             return;
  43.         }
  44.         // 1. 创建Flink执行环境
  45.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  46.         // 设置并行度为 1: server-id 的数量必须 ≥ 并行度
  47.         env.setParallelism(1);
  48.         // 启用检查点(可选)
  49.         // env.enableCheckpointing(5000);
  50.         // 配置检查点存储路径(本地路径或分布式存储如HDFS)
  51.         // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
  52.         // 检查点超时时间(60秒未完成则取消)
  53.         // env.getCheckpointConfig().setCheckpointTimeout(60000);
  54.         // 允许检查点失败次数(默认0,即一次失败则任务失败)
  55.         // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
  56.         // 禁用检查点
  57.         env.getCheckpointConfig().disableCheckpointing();
  58.         // 重试次数/重试间隔
  59.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,  Time.seconds(10)));
  60.         // 2. 配置MySQL CDC源(动态设置需要监听的表)
  61.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  62.                 .serverId(properties.getServerId())
  63.                 .hostname(properties.getMysql().getHostname())
  64.                 .port(properties.getMysql().getPort())
  65.                 .username(properties.getMysql().getUsername())
  66.                 .password(properties.getMysql().getPassword())
  67.                 // 从监听的表中提取数据库列表(去重)
  68.                 .databaseList(extractDatabases(monitoredTables))
  69.                 // 直接使用注册中心收集的表列表
  70.                 .tableList(monitoredTables.toArray(new String[0]))
  71.                 // 反序列化为JSON
  72.                 .deserializer(new JsonDebeziumDeserializationSchema())
  73.                 /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
  74.                  * latest: 只进行增量导入(不读取历史变化)
  75.                  * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
  76.                  */
  77.                 .startupOptions(StartupOptions.latest())
  78.                 .build();
  79.         // 3. 读取CDC数据流并处理
  80.         DataStreamSource<String> dataStream = env.fromSource(
  81.                 mySqlSource,
  82.                 WatermarkStrategy.noWatermarks(),
  83.                 "MySQL-CDC-Source"
  84.         );
  85.         // 4. 解析数据并路由到对应处理器,使用静态内部类代替匿名内部类
  86.         dataStream.process(new CdcDataProcessFunction(handlerRegistry));
  87.         // 5. 启动Flink作业
  88.         env.execute("Flink-CDC-动态监听作业");
  89.     }
  90.     /**
  91.      * 静态内部类实现ProcessFunction,确保可序列化
  92.      */
  93.     private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
  94.         private final FlinkCdcHandlerRegistry handlerRegistry;
  95.         // 通过构造函数传入依赖
  96.         public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
  97.             this.handlerRegistry = handlerRegistry;
  98.         }
  99.         @Override
  100.         public void processElement(String json, Context ctx, Collector<Void> out) {
  101.             try {
  102.                 JSONObject cdcData = JSON.parseObject(json);
  103.                 // 操作类型:c/u/d
  104.                 String op = cdcData.getString("op");
  105.                 JSONObject source = cdcData.getJSONObject("source");
  106.                 String dbName = source.getString("db");
  107.                 String tableName = source.getString("table");
  108.                 // 库名.表名
  109.                 String fullTableName = dbName + "." + tableName;
  110.                 // 找到对应表的处理器
  111.                 FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
  112.                 if (handler == null) {
  113.                     log.warn("表[{}]无处理器,跳过处理", fullTableName);
  114.                     return;
  115.                 }
  116.                 // 按事件类型分发
  117.                 DataChangeType changeType = DataChangeType.getByCode(op);
  118.                 if (changeType == null) {
  119.                     log.warn("未知操作类型:{}", op);
  120.                     return;
  121.                 }
  122.                 switch (changeType) {
  123.                     case INSERT:
  124.                         List<Map<String, Object>> insertData = Collections.singletonList(
  125.                                 cdcData.getJSONObject("after").getInnerMap()
  126.                         );
  127.                         handler.handleInsert(insertData);
  128.                         break;
  129.                     case UPDATE:
  130.                         List<Map<String, Object>> beforeData = Collections.singletonList(
  131.                                 cdcData.getJSONObject("before").getInnerMap()
  132.                         );
  133.                         List<Map<String, Object>> afterData = Collections.singletonList(
  134.                                 cdcData.getJSONObject("after").getInnerMap()
  135.                         );
  136.                         handler.handleUpdate(beforeData, afterData);
  137.                         break;
  138.                     case DELETE:
  139.                         List<Map<String, Object>> deleteData = Collections.singletonList(
  140.                                 cdcData.getJSONObject("before").getInnerMap()
  141.                         );
  142.                         handler.handleDelete(deleteData);
  143.                         break;
  144.                     case READ:
  145.                         // 可以忽略快照阶段的读取操作,或根据需要处理
  146.                         log.debug("处理快照读取操作: {}", fullTableName);
  147.                         break;
  148.                 }
  149.             } catch (Exception e) {
  150.                 log.error("Flink-CDC数据处理发生未预期异常", e);
  151.             }
  152.         }
  153.     }
  154.     /**
  155.      * 从表名(库名.表名)中提取数据库列表(去重)
  156.      *
  157.      * @param tables
  158.      * @return
  159.      */
  160.     private String[] extractDatabases(List<String> tables) {
  161.         // 截取库名(如demo.tb_user → demo)
  162.         return tables.stream()
  163.                 .map(table -> table.split("\\.")[0])
  164.                 .distinct()
  165.                 .toArray(String[]::new);
  166.     }
  167. }
复制代码
5、FlinkCdcHandlerRegistry策略路由:
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.ApplicationContextAware;
  4. import org.springframework.stereotype.Component;
  5. import java.io.Serializable;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. /**
  11. * Flink CDC处理器注册中心
  12. * 处理器注册中心(自动扫描监听表)
  13. */
  14. @Slf4j
  15. @Component
  16. public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {
  17.     // 缓存:表名(库名.表名)→ 处理器
  18.     private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();
  19.     // 收集所有需要监听的表(供Flink CDC配置使用)
  20.     private List<String> monitoredTables;
  21.     @Override
  22.     public void setApplicationContext(ApplicationContext applicationContext) {
  23.         // 扫描所有实现类
  24.         Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
  25.         beans.values().forEach(handler -> {
  26.             String tableName = handler.getTableName();
  27.             handlerMap.put(tableName, handler);
  28.             log.info("注册监听表:{} → 处理器:{}", tableName, handler.getClass().getSimpleName());
  29.         });
  30.         // 提取所有需要监听的表
  31.         monitoredTables = new ArrayList<>(handlerMap.keySet());
  32.     }
  33.     /**
  34.      * 获取指定表的处理器
  35.      *
  36.      * @param tableName
  37.      * @return
  38.      */
  39.     public FlinkCdcHandler getHandler(String tableName) {
  40.         return handlerMap.get(tableName);
  41.     }
  42.     /**
  43.      * 获取所有需要监听的表(供Flink CDC配置)
  44.      *
  45.      * @return
  46.      */
  47.     public List<String> getMonitoredTables() {
  48.         return monitoredTables;
  49.     }
  50. }
复制代码
6、FlinkCdcHandler策略模式数据处理:

 FlinkCdcHandler
  1. import java.util.List;
  2. import java.util.Map;
  3. /**
  4. * 表数据处理接口,每个监听的表需实现此接口
  5. */
  6. public interface FlinkCdcHandler {
  7.     /**
  8.      * 获取监听的表名(格式:库名.表名,如demo.tb_user)
  9.      */
  10.     String getTableName();
  11.     /**
  12.      * 处理新增数据
  13.      */
  14.     default void handleInsert(List<Map<String, Object>> dataList) {
  15.         // 默认空实现,子类可重写
  16.     }
  17.     /**
  18.      * 处理更新数据(包含变更前和变更后的数据)
  19.      * @param beforeList 变更前数据
  20.      * @param afterList 变更后数据
  21.      */
  22.     default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
  23.         // 默认空实现,子类可重写
  24.     }
  25.     /**
  26.      * 处理删除数据
  27.      */
  28.     default void handleDelete(List<Map<String, Object>> dataList) {
  29.         // 默认空实现,子类可重写
  30.     }
  31. }
复制代码
TbUserFlinkCdcHandler
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.stereotype.Component;
  3. import java.io.Serializable;
  4. import java.util.List;
  5. import java.util.Map;
  6. @Slf4j
  7. @Component
  8. public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
  9.     @Override
  10.     public String getTableName() {
  11.         return "demo.tb_user";
  12.     }
  13.     @Override
  14.     public void handleInsert(List<Map<String, Object>> dataList) {
  15.         log.info("处理tb_user新增数据,共{}条", dataList.size());
  16.         dataList.forEach(data -> {
  17.             String id = (String)data.get("id");
  18.             String username = (String) data.get("name");
  19.             // 业务逻辑:如同步到ES、缓存等
  20.             log.info("新增用户:id={}, name={}", id, username);
  21.         });
  22.     }
  23.     @Override
  24.     public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
  25.         log.info("处理tb_user更新数据,共{}条", afterList.size());
  26.         for (int i = 0; i < afterList.size(); i++) {
  27.             Map<String, Object> before = beforeList.get(i);
  28.             Map<String, Object> after = afterList.get(i);
  29.             log.info("更新用户:id={}, 旧用户名={}, 新用户名={}",
  30.                     after.get("id"), before.get("name"), after.get("name"));
  31.         }
  32.     }
  33.     @Override
  34.     public void handleDelete(List<Map<String, Object>> dataList) {
  35.         log.info("处理tb_user删除数据,共{}条", dataList.size());
  36.         dataList.forEach(data -> {
  37.             log.info("删除用户:id={}", data.get("id"));
  38.         });
  39.     }
  40. }
复制代码
4.png

项目启动时,FlinkCdcHandlerRegistry 扫描并注册所有 FlinkCdcHandler 实现类,建立表与处理器的映射;FlinkCdcRunner 在 Spring 容器初始化后触发,检查启动条件,初始化 Flink 环境并构建 CDC 数据源,将数据流接入 CdcDataProcessFunction,该函数解析变更事件并路由到对应处理器执行业务逻辑,最后启动 Flink 作业持续监听处理。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册