一、CDC简介:
CDC(Change Data Capture)是变更数据捕获的简称,其核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新、删除等),将这些变更按发生的顺序完整记录下来,并写入到消息中间件或数据仓库中以供其他服务进行订阅及消费。CDC技术广泛应用于数据同步、数据分发、数据采集等场景,是数据集成领域的重要工具。
1、CDC常用工具:
CDC工具
| Debezium
| Canal
| Maxwell
| Flink CDC
| 核心定位
| 多数据源 CDC 框架
| 轻量 MySQL 同步工具
| MySQL 专属极简工具
| 实时处理一体化框架
| 支持数据源
| MySQL、PostgreSQL、Oracle 等
| MySQL(最佳)、PostgreSQL 等
| 仅 MySQL
| MySQL、PostgreSQL 等(基于 Debezium)
| 典型输出目标
| Kafka、Flink/Spark
| Kafka、RocketMQ、数据库
| Kafka、Redis、文件
| Kafka、ES、Hive 等
| 突出优势
| 支持广泛,全量 + 增量同步
| 部署简单,国内生态适配好
| 配置极简,资源占用低
| 支持实时处理,Exactly-Once 语义
| 适用场景
| 多源同步、复杂数据管道
| MySQL 为主的轻量同步
| 简单 MySQL 变更同步
| 实时数仓、捕获 + 处理一体化
| 2、相关参考:
Flink-CDC 开源地址
Flink-CDC 中文文档
Canal学习笔记
二、Flink CDC工作原理:
Flink CDC(Change Data Capture)的核心工作原理是通过捕获数据库的变更日志(如 binlog、WAL 等),将其转换为结构化事件流,接入 Flink 实时计算引擎进行处理,并最终同步到目标系统。其工作流程可拆解为以下关键步骤:
1、Debezium 捕获解析数据库日志:
Flink CDC 本身不直接解析数据库日志,而是集成 Debezium(开源 CDC 框架)作为底层捕获引擎,支持 MySQL、PostgreSQL、Oracle 等多种数据库,具体逻辑如下:
(1)、模拟从节点获取日志:
- 对于支持主从复制的数据库(如 MySQL),Debezium 会伪装成数据库的从节点,向主库发送复制请求,获取变更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日志为结构化事件:
(3)数据库日志通常是二进制格式,Debezium 会将其解析为包含详细信息的结构化事件,包括:
- 操作类型(INSERT/UPDATE/DELETE);
- 变更数据(UPDATE 时包含旧值和新值,INSERT/DELETE 包含对应数据);
- 表名、数据库名、操作时间戳等元数据。
2. 全量 + 增量同步(无锁机制):
Flink CDC 支持 “全量数据初始化 + 增量变更同步” 的无缝衔接,且通过无锁机制避免影响源库性能:
(1)、全量快照阶段:
- 首次同步时,会对数据库表进行一次全量快照(读取当前所有数据),确保初始数据完整。
(2)、增量同步阶段:
- 快照完成后,自动切换到增量同步模式,通过监控 binlog 等日志获取实时变更,且通过记录日志位置(如 binlog 的文件名和偏移量)保证全量与增量数据的连续性(无重复、无遗漏)。
3、 封装为 Flink Source 流入引擎:
解析后的结构化事件会被封装为 Flink Source 连接器,直接作为 Flink 的输入流:
(1)、变更事件以流的形式进入 Flink 计算引擎,每条事件对应一条数据记录,可通过 Flink 的 DataStream API 或 Table/SQL 进行处理。
4、Flink实时数据处理:
Flink CDC 不仅是 “捕获工具”,更能结合 Flink 的实时计算能力对变更数据进行处理:
(1)、数据清洗与转换:
- 过滤无效数据、格式转换(如 JSON 转 Avro)、字段映射等。
(2)、关联与聚合:
- 支持与维度表(如 MySQL 维表、HBase 维表)关联,或进行窗口聚合(如统计分钟级变更量)。
(3)、状态管理:
- 利用 Flink 的状态后端(如 RocksDB)保存中间结果,支持复杂逻辑(如去重、累计计算)。
5、基于 Checkpoint 保证一致性:
Flink CDC 依赖 Flink 的 Checkpoint 机制 确保数据处理的一致性:
(1)、Checkpoint 触发:
- 定期将当前处理进度(包括 CDC 捕获的日志位置、算子状态等)持久化到存储(如 HDFS、本地文件)。
(2)、故障恢复:
- 若 Flink 任务失败,可从最近一次 Checkpoint 恢复状态和日志位置,保证数据不丢失、不重复,实现 Exactly-Once 语义(端到端一致性需下游 Sink 配合支持)。
6. 通过 Sink 同步到目标系统:
处理后的变更数据通过 Flink 的 Sink 连接器 写入目标存储,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多种系统。
三、MySQL 配置(开启 Binlog):
1、开启 Binlog(ROW 模式):
- # MySQL 配置文件
- # Linux:my.cnf配置文件(/etc/mysql/)
- # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
- # 开启 Binlog
- log_bin = mysql-bin
- # 选择 ROW 模式(记录行级变更)
- binlog-format = ROW
- # 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
- server-id = 1
复制代码
2、重启 MySQL 并验证:
- # 打开命令提示符(cmd/services.msc):
- # 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
- # 停止MySQL服务:
- net stop MySQL57
- # 启动MySQL服务:
- net start MySQL57
- # 验证
- SHOW VARIABLES LIKE 'log_bin';
- SHOW VARIABLES LIKE 'binlog_format';
复制代码
四、SpringBoot整合Flink CDC实现MySQL数据监听:
1、POM配置:
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-java</artifactId>
- <version>1.18.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-streaming-java</artifactId>
- <version>1.18.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-clients</artifactId>
- <version>1.18.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-table-api-java-bridge</artifactId>
- <version>1.18.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-table-planner_2.12</artifactId>
- <version>1.18.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- flink-connector-base</artifactId>
- <version>1.18.1</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- flink-connector-mysql-cdc</artifactId>
- <version>3.0.1</version>
- </dependency>
复制代码 2、YML配置:
- flink:
- cdc:
- # 是否开启CDC监听
- auto-start: true
- # 自定义一个唯一的id
- server-id: "123456"
- # 数据库配置
- mysql:
- hostname: localhost
- port: 3306
- username: root
- password: 123
复制代码 3、Entity类声明:
DataChangeType.class- /**
- * Flink CDC数据变更类型枚举
- * 1、"c"表示创建
- * 2、"u"表示更新
- * 3、"d"表示删除
- * 4、"r"表示读取
- */
- public enum DataChangeType {
- INSERT("c"),
- UPDATE("u"),
- DELETE("d"),
- READ("r");
- private final String code;
- DataChangeType(String code) {
- this.code = code;
- }
- public static DataChangeType getByCode(String code) {
- for (DataChangeType type : values()) {
- if (type.code.equals(code)) {
- return type;
- }
- }
- return null;
- }
- }
复制代码 FlinkCdcProperties.class- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
- @Data
- @Component
- @ConfigurationProperties(prefix = "flink.cdc")
- public class FlinkCdcProperties {
- /**
- * 是否自动启动CDC监听
- */
- private boolean autoStart;
- private String serverId;
- /**
- * MySQL配置
- */
- private Mysql mysql = new Mysql();
- @Data
- public static class Mysql {
- private String hostname;
- private int port;
- private String username;
- private String password;
- }
- }
复制代码 4、FlinkCdcRunner数据变更监听启动器:
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.iven.flinkcdcdemoservice.entity.DataChangeType;
- import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
- import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
- import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.util.Collector;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.stereotype.Component;
- import java.io.Serializable;
- import java.util.*;
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class FlinkCdcRunner implements CommandLineRunner {
- // 配置属性
- private final FlinkCdcProperties properties;
- // 处理器注册中心
- private final FlinkCdcHandlerRegistry handlerRegistry;
- @Override
- public void run(String... args) throws Exception {
- // 总开关关闭则不启动
- if (!properties.isAutoStart()) {
- log.info("Flink CDC 总开关关闭,不启动监听");
- return;
- }
- // 没有需要监听的表则不启动
- List<String> monitoredTables = handlerRegistry.getMonitoredTables();
- if (monitoredTables.isEmpty()) {
- log.warn("未发现需要监听的表(未实现FlinkCdcTableHandler),不启动监听");
- return;
- }
- // 1. 创建Flink执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 设置并行度为 1: server-id 的数量必须 ≥ 并行度
- env.setParallelism(1);
- // 启用检查点(可选)
- // env.enableCheckpointing(5000);
- // 配置检查点存储路径(本地路径或分布式存储如HDFS)
- // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
- // 检查点超时时间(60秒未完成则取消)
- // env.getCheckpointConfig().setCheckpointTimeout(60000);
- // 允许检查点失败次数(默认0,即一次失败则任务失败)
- // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
- // 禁用检查点
- env.getCheckpointConfig().disableCheckpointing();
- // 重试次数/重试间隔
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
- // 2. 配置MySQL CDC源(动态设置需要监听的表)
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .serverId(properties.getServerId())
- .hostname(properties.getMysql().getHostname())
- .port(properties.getMysql().getPort())
- .username(properties.getMysql().getUsername())
- .password(properties.getMysql().getPassword())
- // 从监听的表中提取数据库列表(去重)
- .databaseList(extractDatabases(monitoredTables))
- // 直接使用注册中心收集的表列表
- .tableList(monitoredTables.toArray(new String[0]))
- // 反序列化为JSON
- .deserializer(new JsonDebeziumDeserializationSchema())
- /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
- * latest: 只进行增量导入(不读取历史变化)
- * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
- */
- .startupOptions(StartupOptions.latest())
- .build();
- // 3. 读取CDC数据流并处理
- DataStreamSource<String> dataStream = env.fromSource(
- mySqlSource,
- WatermarkStrategy.noWatermarks(),
- "MySQL-CDC-Source"
- );
- // 4. 解析数据并路由到对应处理器,使用静态内部类代替匿名内部类
- dataStream.process(new CdcDataProcessFunction(handlerRegistry));
- // 5. 启动Flink作业
- env.execute("Flink-CDC-动态监听作业");
- }
- /**
- * 静态内部类实现ProcessFunction,确保可序列化
- */
- private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
- private final FlinkCdcHandlerRegistry handlerRegistry;
- // 通过构造函数传入依赖
- public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
- this.handlerRegistry = handlerRegistry;
- }
- @Override
- public void processElement(String json, Context ctx, Collector<Void> out) {
- try {
- JSONObject cdcData = JSON.parseObject(json);
- // 操作类型:c/u/d
- String op = cdcData.getString("op");
- JSONObject source = cdcData.getJSONObject("source");
- String dbName = source.getString("db");
- String tableName = source.getString("table");
- // 库名.表名
- String fullTableName = dbName + "." + tableName;
- // 找到对应表的处理器
- FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
- if (handler == null) {
- log.warn("表[{}]无处理器,跳过处理", fullTableName);
- return;
- }
- // 按事件类型分发
- DataChangeType changeType = DataChangeType.getByCode(op);
- if (changeType == null) {
- log.warn("未知操作类型:{}", op);
- return;
- }
- switch (changeType) {
- case INSERT:
- List<Map<String, Object>> insertData = Collections.singletonList(
- cdcData.getJSONObject("after").getInnerMap()
- );
- handler.handleInsert(insertData);
- break;
- case UPDATE:
- List<Map<String, Object>> beforeData = Collections.singletonList(
- cdcData.getJSONObject("before").getInnerMap()
- );
- List<Map<String, Object>> afterData = Collections.singletonList(
- cdcData.getJSONObject("after").getInnerMap()
- );
- handler.handleUpdate(beforeData, afterData);
- break;
- case DELETE:
- List<Map<String, Object>> deleteData = Collections.singletonList(
- cdcData.getJSONObject("before").getInnerMap()
- );
- handler.handleDelete(deleteData);
- break;
- case READ:
- // 可以忽略快照阶段的读取操作,或根据需要处理
- log.debug("处理快照读取操作: {}", fullTableName);
- break;
- }
- } catch (Exception e) {
- log.error("Flink-CDC数据处理发生未预期异常", e);
- }
- }
- }
- /**
- * 从表名(库名.表名)中提取数据库列表(去重)
- *
- * @param tables
- * @return
- */
- private String[] extractDatabases(List<String> tables) {
- // 截取库名(如demo.tb_user → demo)
- return tables.stream()
- .map(table -> table.split("\\.")[0])
- .distinct()
- .toArray(String[]::new);
- }
- }
复制代码 5、FlinkCdcHandlerRegistry策略路由:
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
- import java.io.Serializable;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * Flink CDC处理器注册中心
- * 处理器注册中心(自动扫描监听表)
- */
- @Slf4j
- @Component
- public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {
- // 缓存:表名(库名.表名)→ 处理器
- private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();
- // 收集所有需要监听的表(供Flink CDC配置使用)
- private List<String> monitoredTables;
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) {
- // 扫描所有实现类
- Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
- beans.values().forEach(handler -> {
- String tableName = handler.getTableName();
- handlerMap.put(tableName, handler);
- log.info("注册监听表:{} → 处理器:{}", tableName, handler.getClass().getSimpleName());
- });
- // 提取所有需要监听的表
- monitoredTables = new ArrayList<>(handlerMap.keySet());
- }
- /**
- * 获取指定表的处理器
- *
- * @param tableName
- * @return
- */
- public FlinkCdcHandler getHandler(String tableName) {
- return handlerMap.get(tableName);
- }
- /**
- * 获取所有需要监听的表(供Flink CDC配置)
- *
- * @return
- */
- public List<String> getMonitoredTables() {
- return monitoredTables;
- }
- }
复制代码 6、FlinkCdcHandler策略模式数据处理:
FlinkCdcHandler- import java.util.List;
- import java.util.Map;
- /**
- * 表数据处理接口,每个监听的表需实现此接口
- */
- public interface FlinkCdcHandler {
- /**
- * 获取监听的表名(格式:库名.表名,如demo.tb_user)
- */
- String getTableName();
- /**
- * 处理新增数据
- */
- default void handleInsert(List<Map<String, Object>> dataList) {
- // 默认空实现,子类可重写
- }
- /**
- * 处理更新数据(包含变更前和变更后的数据)
- * @param beforeList 变更前数据
- * @param afterList 变更后数据
- */
- default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
- // 默认空实现,子类可重写
- }
- /**
- * 处理删除数据
- */
- default void handleDelete(List<Map<String, Object>> dataList) {
- // 默认空实现,子类可重写
- }
- }
复制代码 TbUserFlinkCdcHandler- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import java.io.Serializable;
- import java.util.List;
- import java.util.Map;
- @Slf4j
- @Component
- public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
- @Override
- public String getTableName() {
- return "demo.tb_user";
- }
- @Override
- public void handleInsert(List<Map<String, Object>> dataList) {
- log.info("处理tb_user新增数据,共{}条", dataList.size());
- dataList.forEach(data -> {
- String id = (String)data.get("id");
- String username = (String) data.get("name");
- // 业务逻辑:如同步到ES、缓存等
- log.info("新增用户:id={}, name={}", id, username);
- });
- }
- @Override
- public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
- log.info("处理tb_user更新数据,共{}条", afterList.size());
- for (int i = 0; i < afterList.size(); i++) {
- Map<String, Object> before = beforeList.get(i);
- Map<String, Object> after = afterList.get(i);
- log.info("更新用户:id={}, 旧用户名={}, 新用户名={}",
- after.get("id"), before.get("name"), after.get("name"));
- }
- }
- @Override
- public void handleDelete(List<Map<String, Object>> dataList) {
- log.info("处理tb_user删除数据,共{}条", dataList.size());
- dataList.forEach(data -> {
- log.info("删除用户:id={}", data.get("id"));
- });
- }
- }
复制代码
项目启动时,FlinkCdcHandlerRegistry 扫描并注册所有 FlinkCdcHandler 实现类,建立表与处理器的映射;FlinkCdcRunner 在 Spring 容器初始化后触发,检查启动条件,初始化 Flink 环境并构建 CDC 数据源,将数据流接入 CdcDataProcessFunction,该函数解析变更事件并路由到对应处理器执行业务逻辑,最后启动 Flink 作业持续监听处理。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |