一、Canal简介:
Canal 是阿里巴巴开源的一款基于数据库增量日志解析的中间件,主要用于实现数据库变更数据的实时同步。
Canal源码
二、工作原理:
1、MySQL主备复制原理:
(1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
(2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
(3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2、canal工作原理:
(1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
(2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
(3)、canal 解析 binary log 对象(原始为 byte 流)
三、MySQL 配置(开启 Binlog):
1、开启 Binlog(ROW 模式):
- # MySQL 配置文件
- # Linux:my.cnf配置文件(/etc/mysql/)
- # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
- # 开启 Binlog
- log_bin = mysql-bin
- # 选择 ROW 模式(记录行级变更)
- binlog-format = ROW
- # 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
- server-id = 1
复制代码
2、重启 MySQL 并验证:
- # 打开命令提示符(cmd/services.msc):
- # 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
- # 停止MySQL服务:
- net stop MySQL57
- # 启动MySQL服务:
- net start MySQL57
- # 验证
- SHOW VARIABLES LIKE 'log_bin';
- SHOW VARIABLES LIKE 'binlog_format';
复制代码
3、创建 Canal 专用账号(权限最小化):
- -- 1. 创建支持远程连接的用户(% 表示任意 IP)
- -- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
- -- 授予权限
- -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- -- 2. 创建支持本地连接的用户(localhost)
- CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
- -- 授予相同权限
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
- -- 刷新权限,使配置生效
- FLUSH PRIVILEGES;
复制代码
四、Canal 服务端配置:
1、下载并解压 Canal 服务端:
github-canal包
2、配置 Canal 实例:
(1)、instance.properties配置:- # MySQL 主库地址(Canal 连接的 MySQL 地址)
- canal.instance.master.address=127.0.0.1:3306
- # MySQL 账号密码
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
复制代码
(2)、windows启动 Canal 服务端:
1)、双击启动bin/startup.bat:
2)、存在黑屏闪退,修改bin/startup.bat,重启:
3)、日志:
五、SpringBoot整合Canal实现MySQL数据监听:
1、POM配置:
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- canal.client</artifactId>
- <version>1.1.8</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- canal.protocol</artifactId>
- <version>1.1.8</version>
- </dependency>
复制代码 2、YML配置:
- canal:
- # 自动启动同步标志位
- auto-sync: true
- instances:
- # 第一个实例
- instance1:
- host: 127.0.0.1
- port: 11111
- # canal server 中配置的实例名(canal.destinations = example)
- name: example
- # 批量拉取条数
- batch-size: 100
- # 无数据时休眠时间(ms)
- sleep-time: 1000
复制代码 3、Entity类声明:
CanalProperties.class- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * Canal配置属性类(映射YAML配置)
- */
- @Data
- @Component
- @ConfigurationProperties(prefix = "canal")
- public class CanalProperties {
- // 是否自动启动同步
- private boolean autoSync = true;
- // 多实例配置
- private Map<String, InstanceConfig> instances = new HashMap<>();
- @Data
- public static class InstanceConfig {
- private String host;
- private Integer port;
- private String name;
- private Integer batchSize = 100;
- private Integer sleepTime = 1000;
- }
- }
复制代码 DataEventTypeEnum.enum- import org.springframework.util.StringUtils;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.function.Function;
- import java.util.stream.Collectors;
- public enum DataEventTypeEnum {
- INSERT("INSERT"),
- UPDATE("UPDATE"),
- DELETE("DELETE");
- private final String name;
- DataEventTypeEnum(String name) {
- this.name = name;
- }
- public String NAME() {
- return name;
- }
- private static final Map<String, DataEventTypeEnum> NAME_MAP =
- Arrays.stream(DataEventTypeEnum.values())
- .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));
- public static DataEventTypeEnum getEnum(String name) {
- if (!StringUtils.hasText(name)) {
- return null;
- }
- return NAME_MAP.get(name);
- }
- }
复制代码 JsonMessageType.class- import lombok.Data;
- @Data
- public class JsonMessageType {
- /**
- * 库名
- */
- private String schemaName;
- /**
- * 表名
- */
- private String tableName;
- /**
- * 事件类型
- * (INSERT/UPDATE/DELETE)
- */
- private String eventType;
- /**
- * 数据JSON字符串
- */
- private String data;
- }
复制代码 4、CanalRunnerAutoConfig启动Canal配置:
- import com.iven.canal.entity.CanalProperties;
- import com.iven.canal.handle.CanalWorkRegistry;
- import com.iven.canal.utils.JsonMessageParser;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * Canal自动配置
- */
- @Slf4j
- @Configuration
- @RequiredArgsConstructor
- public class CanalRunnerAutoConfig {
- private final CanalProperties canalProperties;
- private final JsonMessageParser jsonMessageParser;
- private final CanalWorkRegistry workRegistry;
- @Bean
- public ApplicationRunner canalApplicationRunner() {
- return args -> {
- if (!canalProperties.isAutoSync()) {
- log.info("Canal自动同步已关闭");
- return;
- }
- // 如果没有任何Work,则不启动Canal
- if (!workRegistry.hasWork()) {
- log.info("无表同步处理器,不启动Canal");
- return;
- }
- // 启动所有配置的Canal实例
- canalProperties.getInstances().forEach((instanceKey, config) -> {
- CanalRunner runner = new CanalRunner(
- config.getHost(),
- config.getPort(),
- config.getName(),
- config.getBatchSize(),
- config.getSleepTime(),
- jsonMessageParser,
- workRegistry
- );
- runner.start();
- });
- };
- }
- }
复制代码 5、CanalRunner拉取数据:
- import com.alibaba.fastjson2.JSON;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.iven.canal.entity.JsonMessageType;
- import com.iven.canal.handle.CanalWork;
- import com.iven.canal.handle.CanalWorkRegistry;
- import com.iven.canal.utils.JsonMessageParser;
- import lombok.extern.slf4j.Slf4j;
- import java.net.InetSocketAddress;
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
- /**
- * Canal运行器
- * 手动管理生命周期
- *
- * 1、启动Canal实例
- * 2、处理解析后的数据
- */
- @Slf4j
- public class CanalRunner {
- private Thread thread;
- private final String canalIp;
- private final Integer canalPort;
- private final String canalInstance;
- private final Integer batchSize;
- private final Integer sleepTime;
- private final JsonMessageParser jsonMessageParser;
- private final CanalWorkRegistry workRegistry;
- public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,
- Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {
- this.canalIp = canalIp;
- this.canalPort = canalPort;
- this.canalInstance = canalInstance;
- this.batchSize = batchSize;
- this.sleepTime = sleepTime;
- this.jsonMessageParser = jsonMessageParser;
- this.workRegistry = workRegistry;
- }
- /**
- * 启动Canal实例
- */
- public void start() {
- if (thread == null || !thread.isAlive()) {
- thread = new Thread(this::run, "canal-runner-" + canalInstance);
- thread.start();
- log.info("Canal实例[{}]启动成功", canalInstance);
- }
- }
- /**
- * 停止Canal实例
- */
- public void stop() {
- if (thread != null && !thread.isInterrupted()) {
- thread.interrupt();
- }
- }
- private void run() {
- log.info("Canal实例[{}]启动中...", canalInstance);
- CanalConnector connector = CanalConnectors.newSingleConnector(
- new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");
- try {
- connector.connect();
- // 订阅所有表(后续通过Work过滤)
- connector.subscribe();
- connector.rollback();
- while (!thread.isInterrupted()) {
- Message message = connector.getWithoutAck(batchSize);
- long batchId = message.getId();
- List<CanalEntry.Entry> entries = message.getEntries();
- if (batchId == -1 || entries.isEmpty()) {
- Thread.sleep(sleepTime);
- } else {
- // 解析数据并处理
- Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);
- processParsedData(parsedData);
- // 确认处理成功
- connector.ack(batchId);
- }
- }
- } catch (InterruptedException e) {
- log.info("Canal实例[{}]被中断", canalInstance);
- } catch (Exception e) {
- log.error("Canal实例[{}]运行异常", canalInstance, e);
- // 处理失败回滚
- connector.rollback();
- } finally {
- connector.disconnect();
- log.info("Canal实例[{}]已停止", canalInstance);
- }
- }
- /**
- * 调用Work处理解析后的数据
- *
- * @param parsedData
- */
- private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {
- parsedData.forEach((tableKey, dataList) -> {
- // 获取该表的所有Work
- List<CanalWork> works = workRegistry.getWorksByTable(tableKey);
- if (!works.isEmpty() && !dataList.isEmpty()) {
- // 转换数据格式(Json字符串 -> Map)
- List<Map<String, Object>> dataMaps = dataList.stream()
- .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))
- .collect(Collectors.toList());
- String schemaName = dataList.get(0).getSchemaName();
- // 调用每个Work的处理方法
- works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));
- }
- });
- }
-
- }
复制代码 6、JsonMessageParser解析数据:
MessageParser- import com.alibaba.otter.canal.protocol.CanalEntry;
- import java.util.List;
- /**
- * 消息解析器接口
- *
- */
- public interface MessageParser<T> {
- T parse(List<CanalEntry.Entry> canalEntryList);
- }
复制代码 JsonMessageParser- import com.alibaba.fastjson2.JSON;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.iven.canal.entity.DataEventTypeEnum;
- import com.iven.canal.entity.JsonMessageType;
- import com.iven.canal.handle.CanalWorkRegistry;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.util.*;
- /**
- * Json消息解析器
- *
- * 1、遍历原始数据列表接收
- * 2、解析行级变更数据
- * 3、封装为 JsonParseType
- */
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {
- private final CanalWorkRegistry workRegistry;
- @Override
- public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {
- Map<String, List<JsonMessageType>> dataMap = new HashMap<>();
- for (CanalEntry.Entry entry : canalEntryList) {
- if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
- continue;
- }
- // 1. 获取库名、表名、带库名的表标识
- String schemaName = entry.getHeader().getSchemaName();
- String tableName = entry.getHeader().getTableName();
- String fullTableName = schemaName + "." + tableName;
- // 2. 检查是否有对应的处理器(支持两种格式)
- boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));
- boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));
- if (!hasFullTableWork && !hasSimpleTableWork) {
- log.debug("表[{}]和[{}]均无同步处理器,跳过", fullTableName, tableName);
- continue;
- }
- try {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- rowChange.getRowDatasList().forEach(rowData -> {
- JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);
- if (jsonMessageType != null) {
- // 3. 按存在的处理器类型,分别添加到数据映射中
- if (hasFullTableWork) {
- dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);
- }
- if (hasSimpleTableWork) {
- dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);
- }
- }
- });
- } catch (Exception e) {
- log.error("解析数据失败", e);
- }
- }
- return dataMap;
- }
- private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,
- CanalEntry.RowData rowData) {
- // 获取库名
- String schemaName = header.getSchemaName();
- // 获取表名
- String tableName = header.getTableName();
- if (eventType == CanalEntry.EventType.DELETE) {
- return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());
- } else if (eventType == CanalEntry.EventType.UPDATE) {
- return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());
- }
- return null;
- }
- private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,
- List<CanalEntry.Column> columns) {
- Map<String, String> data = new HashMap<>();
- columns.forEach(column -> data.put(column.getName(), column.getValue()));
- JsonMessageType result = new JsonMessageType();
- result.setSchemaName(schemaName);
- result.setTableName(tableName);
- result.setEventType(eventType);
- result.setData(JSON.toJSONString(data));
- return result;
- }
- }
复制代码 7、CanalWorkRegistry匹配处理器:
CanalWork - import java.util.List;
- import java.util.Map;
- /**
- * Canal-Work处理器
- *
- */
- public interface CanalWork {
- /**
- * 返回需要处理的表名(如:tb_user)
- */
- String getTableName();
- /**
- * 处理表数据的方法
- * @param dataList 表数据列表(每条数据是字段名-值的Map)
- * @param eventType 事件类型(INSERT/UPDATE/DELETE)
- * @param schemaName 库名(用于区分不同库的表)
- */
- void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);
- }
复制代码 CanalWorkRegistry - import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
- /**
- * 处理器注册器,
- * 扫描并缓存所有CanalWork实现类,按表名分组管理,提供查询表对应处理器的方法
- */
- @Slf4j
- @Component
- public class CanalWorkRegistry implements ApplicationContextAware {
- /**
- * 表名 -> Work列表(支持一个表多个Work)
- */
- private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- // 扫描所有CanalWork实现类
- Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);
- // 按表名分组
- tableWorkMap.putAll(workMap.values().stream()
- .collect(Collectors.groupingBy(CanalWork::getTableName)));
- log.info("已注册的表同步处理器: {}", tableWorkMap.keySet());
- }
- /**
- * 获取指定表的Work列表
- *
- * @param tableName
- * @return
- */
- public List<CanalWork> getWorksByTable(String tableName) {
- return tableWorkMap.getOrDefault(tableName, Collections.emptyList());
- }
- /**
- * 判断是否有表需要处理
- *
- * @return
- */
- public boolean hasWork() {
- return !tableWorkMap.isEmpty();
- }
- }
复制代码 8、CanalWork实现类处理数据:
- import com.iven.canal.entity.DataEventTypeEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import java.util.List;
- import java.util.Map;
- /**
- * tb_user表数据处理
- *
- * Canal服务 → 变更数据 → CanalRunner 拉取 → JsonMessageParser 解析 →
- * 筛选出 tb_user 数据 → CanalWorkRegistry 获取 TbUserCanalWorkHandle →
- * 调用 handle 方法 → 按事件类型(INSERT/UPDATE/DELETE)执行对应逻辑
- */
- @Slf4j
- @Component
- public class TbUserCanalWorkHandle implements CanalWork {
- @Override
- public String getTableName() {
- return "demo.tb_user";
- }
- @Override
- public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {
- log.info("开始处理[{}库]的tb_user表数据,事件类型:{},数据量:{}", schemaName, eventType, dataList.size());
- DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);
-
- // 根据事件类型分别处理
- switch (dataEventTypeEnum) {
- case INSERT:
- handleInsert(dataList, schemaName);
- break;
- case UPDATE:
- handleUpdate(dataList, schemaName);
- break;
- case DELETE:
- handleDelete(dataList, schemaName);
- break;
- default:
- log.warn("未处理的事件类型:{}", eventType);
- }
- }
- /**
- * 处理新增数据
- */
- private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {
- log.info("处理[{}库]的tb_user新增数据,共{}条", schemaName, dataList.size());
- dataList.forEach(data -> {
- Object userId = data.get("id");
- Object username = data.get("name");
- // 新增逻辑:如同步到ES、缓存初始化等
- log.info("新增用户 - ID: {}, 用户名: {}", userId, username);
- });
- }
- /**
- * 处理更新数据
- */
- private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {
- log.info("处理[{}库]的tb_user更新数据,共{}条", schemaName, dataList.size());
- dataList.forEach(data -> {
- Object userId = data.get("id");
- Object newPhone = data.get("phone"); // 假设更新了手机号
- // 更新逻辑:如更新ES文档、刷新缓存等
- log.info("更新用户 - ID: {}, 新手机号: {}", userId, newPhone);
- });
- }
- /**
- * 处理删除数据
- */
- private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {
- log.info("处理[{}库]的tb_user删除数据,共{}条", schemaName, dataList.size());
- dataList.forEach(data -> {
- Object userId = data.get("id");
- // 删除逻辑:如从ES删除、清除缓存等
- log.info("删除用户 - ID: {}", userId);
- });
- }
- }
复制代码
调度流程:
整个流程通过注册器管理处理器、解析器转换数据格式、运行器控制 Canal 客户端生命周期,最终将数据库变更事件分发到对应表的处理器,实现了变更数据的监听与业务处理解耦。用户只需实现CanalWork接口,即可自定义任意表的变更处理逻辑。
(1)、初始化阶段
1)、Spring 容器启动时,CanalWorkRegistry 扫描所有 CanalWork 实现类(如 TbUserCanalWorkHandle),按表名分组缓存到 tableWorkMap 中。
2)、CanalRunnerAutoConfig 检查配置(CanalProperties),若开启自动同步且存在 CanalWork,则为每个 Canal 实例创建 CanalRunner 并启动。
(2)、运行阶段
1)、CanalRunner 建立与 Canal 服务的连接,订阅数据库变更事件。
2)、循环拉取变更数据(Message),通过 JsonMessageParser 解析为表名 - 数据列表的映射(Map)。
3)、调用 processParsedData 方法,根据表名从 CanalWorkRegistry 获取对应的 CanalWork 列表,执行 handle 方法处理数据。
(3)、销毁阶段
程序停止时,CanalRunner 中断线程,断开与 Canal 服务的连接。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |