找回密码
 立即注册
首页 业界区 业界 一个月搞定100+表迁移:我的“偷师”Navicat实战复盘 ...

一个月搞定100+表迁移:我的“偷师”Navicat实战复盘

砂歹汤 2026-1-22 16:20:00
个人声明:本文所有代码示例均已脱敏处理,仅保留核心技术逻辑,不涉及任何敏感业务信息。
前情提要:一个堪称"社死"的工期

还记得那天,老板把我叫到办公室,递过来一份需求文档:"下个月要把项目迁移到新平台,数据这块你来搞定。"
我打开文档,扫了一眼,差点当场石化:
需求清单

  • 100+张数据表要迁移(还要支持后续动态新增)
  • 双链路同步:MySQL到MySQL、MongoDB到PostgreSQL
  • 不能写死配置,要能灵活扩展
  • 工期不到1个月
技术约束

  • 源环境(塔外)和目标环境(塔内)网络完全隔离
  • 塔外只能读源库,无法访问目标库
  • 塔内只能写目标库,无法访问源库
  • 两端唯一的桥梁:阿里云OSS(塔外只能写,塔内可以读写)
  • 塔内不支持MongoDB,必须用PostgreSQL替代
数据规模

  • 单表最大1000万+行数据
  • 单店铺单表50万+行(涉及1000+个店铺)
  • 总计100+张表
那一刻,我脑海里浮现的画面是:在公司地下室疯狂写MyBatis 、语句直到猝死...
但最终,我不仅提前5天完成迁移,还搞出了一套能让后续表秒级上线的"全自动化流水线"。怎么做到的?
答案就藏在Navicat的"导入/导出"功能里——直接构造SQL文件上传OSS,塔内执行,复杂逻辑全都在塔外处理!
一眼望去的七大技术难点

在开始动手前,我先梳理了一下面临的挑战:
难点1:表结构千差万别
100+张表,每张表的字段、类型、主键都不一样。传统MyBatis方式意味着要写100+个Mapper、100+个实体类。后续新增表还得继续写,代码复用度≈0
难点2:同步策略多样化
100+张表需要支持四种同步策略,条件各不相同:

  • 全表同步:基础配置表,数据量小,TRUNCATE后一次性插入全部数据
  • 公司级条件同步:按company_id维度同步,支持条件过滤
  • 店铺级增量同步:有is_deleted和update_time的表,按shop_id+时间条件增量同步
  • 店铺级全量同步:物理删除的表,按shop_id维度全量同步单店铺数据
每张表的策略和条件都不同,需要支持灵活配置
难点3:数据内容包含特殊字符
某些字段的内容包含分号、单引号等SQL特殊字符,如果不处理,生成的SQL文件会在执行时语法报错。
难点4:超大数据量
单表1000万+数据,一次性加载到内存必然OOM。而且生成的SQL文件可能几百MB,网络传输和存储都是问题。
难点5:MongoDB到PostgreSQL的类型鸿沟
MongoDB的ObjectId、BSON对象、数组类型,PostgreSQL都不支持。需要做复杂的类型映射和转换。
难点6:网络隔离架构
塔外和塔内网络完全隔离,传统的ETL工具(DataX)根本用不了。它们都是"读→处理→写"的单机模式,需要同时访问源库和目标库。
解决方案:自己搭建一个类似navicat的导入/导出,能动态执行SQL的功能。
难点7:表间依赖关系导致的顺序问题
部分表之间存在外键依赖关系(如order_items依赖orders),如果并发同步:

  • order_items先执行插入,但orders还未同步 → 外键约束失败
  • 需要识别依赖关系,先同步父表,再同步子表,保证数据完整性
解决方案:塔内扫描SQL文件时,优先处理父表,再并发处理其他表
灵感来源:Navicat是怎么做的?

某天深夜,我打开Navicat准备手动导出第一批测试数据。盯着"导出向导"发呆的时候,突然脑子里闪过一个念头:
Navicat是怎么做到导出任意表的?
我点开导出的.sql文件:
  1. -- 删除旧表
  2. DROP TABLE IF EXISTS `demo_table`;
  3. -- 重建表结构
  4. CREATE TABLE `demo_table` (
  5.   `id` int(11) NOT NULL AUTO_INCREMENT,
  6.   `name` varchar(50) DEFAULT NULL,
  7.   PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB;
  9. -- 插入数据
  10. INSERT INTO `demo_table` VALUES (1, 'test');
复制代码
豁然开朗!Navicat的核心逻辑就是:

  • 用SHOW CREATE TABLE获取表结构
  • 用SELECT *查询数据
  • 生成标准SQL文件
  • 用户手动在目标库执行
如果我把这套逻辑自动化呢?

  • 塔外:自动查表结构、自动查数据、自动生成SQL、自动上传OSS
  • 塔内:自动扫描OSS、自动读取SQL文件、自动执行
这不就完美契合了"塔外-塔内"的架构约束吗!
核心方案设计

整体架构流程

1.png

技术选型说明

塔外系统技术栈
组件选型使用场景选型理由消息队列RocketMQ触发同步,异步解耦进行SQL文件构造支持TAG过滤(MySQLToMySQL/MongodbToPgSQL)
顺序消费保证数据一致性,支持可后续扩展同步类型例如RedisToMySQL流式处理JDBC Stream
MongoTemplate读取超大表数据避免OOM,setFetchSize(Integer.MIN_VALUE)启用MySQL服务器端游标,Mongo使用流式读取的api,内存占用恒定配置管理MySQL配置表管理同步规则配置驱动,新增表无需改代码,支持占位符动态替换({shopId}/{companyId})文件上传阿里云OSS SDKSQL文件上传唯一能打通塔外塔内的桥梁,可用性99.995%,支持大文件塔内系统技术栈
组件选型使用场景选型理由并发控制CompletableFuture并发处理多个SQL文件JDK8原生,无需引入第三方库,轻量级异步编程文件下载阿里云OSS SDKSQL文件下载和删除流式下载,支持逐行读取,执行成功后立即删除防止重复批量执行JDBC BatchSQL批量执行1000条/批平衡性能和内存,setAutoCommit(true)防止事务过大第一难:100+张表结构各异,怎么动态生成SQL?

传统方案的绝望之路

如果用传统MyBatis写法,画面会是这样:
  1. <select id="queryTable1">
  2.     SELECT id, name, create_time FROM table_1 WHERE shop_id = #{shopId}
  3. </select>
  4. <select id="queryTable2">
  5.     SELECT id, title, status FROM table_2 WHERE company_id = #{companyId}
  6. </select>
复制代码
手写100个Mapper?别说一个月,一年都写不完!而且后续新增表还得继续写,代码复用度约等于0。
灵感来源:SHOW CREATE TABLE

MySQL提供了一个神器:SHOW CREATE TABLE
  1. SHOW CREATE TABLE `user_info`;
复制代码
输出:
  1. CREATE TABLE `user_info` (
  2.   `id` int(11) NOT NULL AUTO_INCREMENT,
  3.   `username` varchar(50) DEFAULT NULL,
  4.   `create_time` datetime DEFAULT NULL,
  5.   PRIMARY KEY (`id`)
  6. ) ENGINE=InnoDB;
复制代码
拿到建表语句 = 拿到了一切表信息(字段名、类型、主键...)
核心实现:动态解析表结构
  1. public TableStructure getTableStructure(DataSource ds, String tableName) {
  2.     String sql = "SHOW CREATE TABLE `" + tableName + "`";
  3.    
  4.     try (Connection conn = ds.getConnection();
  5.          Statement stmt = conn.createStatement();
  6.          ResultSet rs = stmt.executeQuery(sql)) {
  7.         
  8.         if (rs.next()) {
  9.             String ddl = rs.getString(2);  // 第2列是DDL语句
  10.             
  11.             // 核心:正则解析DDL语句
  12.             List<String> columns = parseColumns(ddl);      // 提取字段名
  13.             String primaryKey = parsePrimaryKey(ddl);      // 提取主键
  14.             
  15.             return new TableStructure(columns, primaryKey);
  16.         }
  17.     }
  18.     return null;
  19. }
复制代码
关键亮点

  • 表名转义:防止关键字冲突(如表名叫order、user)
  • 正则解析DDL:一次性获取字段、主键、类型信息
  • 零硬编码:任何表都能自动处理,后续新增表只需加配置
你问怎么知道哪张表要同步?表名从哪来?请继续往下看...(第三难中有解决方案,通过配置表实现)
这里用到JDBC编程,适合当前业务需求(古法编程,不得已而为之)
生成完整SQL文件

拿到表结构后,生成标准SQL文件:
  1. // 1. 先删除目标环境的旧数据(保证幂等性)
  2. String deleteStatement = "DELETE FROM `user_info` WHERE shop_id = 12345;\n";
  3. // 2. 批量插入新数据(每批1000条)
  4. String insertStatement =
  5.     "INSERT INTO `user_info` (`id`, `username`, `create_time`) VALUES\n" +
  6.     "(1, 'Alice', '2025-01-01 12:00:00'),\n" +
  7.     "(2, 'Bob', '2025-01-02 13:00:00');\n";
复制代码
上传到OSS后,塔内直接逐行读取执行,完美!
第二难:数据里有分号,SQL会被切割炸掉!

问题现场

默认SQL语句以;结尾,但数据内容可能包含各种特殊情况:
  1. -- 情况1: 数据中包含分号
  2. INSERT INTO `content` VALUES (1, '教程:Java;Spring;MyBatis');
  3. -- 情况2: 数据以分号结尾
  4. INSERT INTO `config` VALUES (2, 'path=/usr/local/bin;');
  5. -- 情况3: 数据中有换行符,且以;结尾
  6. INSERT INTO `article` VALUES (3, '第一行
  7. 第二行;
  8. 第三行');
复制代码
塔内如果用;判断SQL结束:
  1. String line = reader.readLine();  
  2. // 只读到: INSERT INTO `content` VALUES (1, '教程:Java
  3. // 数据被截断了!
复制代码
导致SQL切割错位、语法报错。
解决方案:特殊符号标记 + 逐行读取

核心思路:每条SQL独占一行,用特殊符号;#END#标记结束
塔外生成SQL时
  1. // 关键:使用特殊符号作为SQL结束标记
  2. String SPECIAL_DELIMITER = ";#END#";
  3. // 构造SQL(数据内容里的分号、换行符都不处理)
  4. String sql = "INSERT INTO `content` VALUES (1, 'Java;Spring')";  
  5. // 写入文件:每条SQL独占一行,以特殊符号结尾
  6. writer.write(sql + SPECIAL_DELIMITER);
  7. writer.write(System.lineSeparator());  // 系统换行符
复制代码
上传到OSS的文件内容:
  1. INSERT INTO `content` VALUES (1, 'Java;Spring');#END#
  2. INSERT INTO `config` VALUES (2, 'path=/usr/bin;');#END#
  3. INSERT INTO `article` VALUES (3, '第一行\n第二行');#END#
复制代码
说明

  • 每条SQL独占一行(以System.lineSeparator()换行)
  • 每条SQL以;#END#结尾(完整的SQL结束标记)
  • 数据内容里的分号;、换行符\n等都保持原样
塔内执行前还原
  1. try (BufferedReader reader = new BufferedReader(
  2.          new InputStreamReader(ossStream))) {
  3.    
  4.     List<String> sqlBatch = new ArrayList<>();
  5.     StringBuilder currentSql = new StringBuilder();
  6.     String line;
  7.    
  8.     while ((line = reader.readLine()) != null) {
  9.         // 拼接当前行
  10.         currentSql.append(line);
  11.         
  12.         // 检查是否是完整的SQL(以;#END#结尾)
  13.         if (currentSql.toString().endsWith(";#END#")) {
  14.             // 还原:特殊符号 → 正常分号
  15.             String realSql = currentSql.toString().replace(";#END#", ";");
  16.             
  17.             // 添加到批次
  18.             sqlBatch.add(realSql);
  19.             currentSql.setLength(0);  // 清空,准备下一条SQL
  20.             
  21.             // 批量执行(每500条一批)
  22.             if (sqlBatch.size() >= 100) {
  23.                 executeBatch(stmt, sqlBatch);
  24.                 sqlBatch.clear();
  25.             }
  26.         }
  27.     }
  28.    
  29.     // 执行剩余SQL
  30.     if (!sqlBatch.isEmpty()) {
  31.         executeBatch(stmt, sqlBatch);
  32.     }
  33. }
复制代码
为什么选;#END#?

  • 足够长,不会和数据内容冲突(实测几千万条数据从未冲突)
  • 标记明确,易于理解
  • 塔内处理简单,一行代码搞定
关键点:为什么塔内要逐行读取?

原因一:SQL文件可能很大
单个SQL文件可能达到几百MB(如50万行数据),如果一次性读取:

  • 内存占用过高:100MB文件加载需要几百MB+内存,而且多线程处理更容易造成OOM
  • GC压力大:大对象频繁创建和回收
原因二:无法按普通分号切割
如果用;切割会出错:
  1. // ❌ 错误做法
  2. String[] sqls = allContent.split(";");  // 会误切数据里的分号!
复制代码
正确做法:逐行拼接,遇到;#END#才算完整
  1. // ✅ 正确做法
  2. StringBuilder currentSql = new StringBuilder();
  3. while ((line = reader.readLine()) != null) {
  4.     currentSql.append(line);
  5.    
  6.     if (currentSql.toString().endsWith(";#END#")) {
  7.         String sql = currentSql.toString().replace(";#END#", ";");
  8.         executeBatch(sql);
  9.         currentSql.setLength(0);  // 清空,准备下一条
  10.     }
  11. }
复制代码
SQL文件格式示例
  1. DELETE FROM `table` WHERE id = 1;#END#
  2. INSERT INTO `table` VALUES (1, 'data;with;semicolons');#END#
  3. INSERT INTO `table` VALUES (2, 'line1\nline2');#END#
复制代码
第三难:同步策略多样化,怎么灵活配置?

背景:四种同步策略

同步策略适用场景SQL操作数据范围全表同步基础配置表(数据量小,千行级)TRUNCATE + INSERT整张表的所有数据公司级条件同步按公司维度管理的表DELETE WHERE company_id=? + INSERT单个公司的所有数据店铺级增量同步有软删除标记和更新时间的表DELETE WHERE shop_id=? AND ... + INSERT单店铺增量数据店铺级全量同步物理删除的表DELETE WHERE shop_id=? + INSERT单店铺全部数据问题:100+张表里,四种策略混杂,查询条件各不相同。需要灵活配置每张表的同步策略和WHERE条件。
解决方案:配置驱动 + 占位符

核心思想:把同步策略、查询条件放到配置表里,每张表单独配置
配置表设计
  1. CREATE TABLE `sync_config` (
  2.     `id` int PRIMARY KEY,
  3.     `table_name` varchar(100),
  4.     `table_level` varchar(20),     -- company/shop
  5.     `sync_type` int,                -- 0:全表, 1:条件同步
  6.     `where_condition` text,         -- WHERE条件模板(支持占位符)
  7.     `delete_strategy` varchar(20)   -- TRUNCATE/DELETE
  8. );
复制代码
配置示例
  1. -- 全表同步
  2. INSERT INTO sync_config VALUES (1, 'sys_config', 'company', 0, NULL, 'TRUNCATE');
  3. -- 公司级条件同步
  4. INSERT INTO sync_config VALUES (2, 'company_settings', 'company', 1,
  5.     'company_id = {companyId} AND status = 1', 'DELETE');
  6. -- 店铺级增量同步
  7. INSERT INTO sync_config VALUES (3, 'user_table', 'shop', 1,
  8.     'shop_id = {shopId} AND update_time > {lastTime}', 'DELETE');
  9. -- 店铺级全量同步
  10. INSERT INTO sync_config VALUES (4, 'order_table', 'shop', 1,
  11.     'shop_id = {shopId}', 'DELETE');
复制代码
占位符替换逻辑
  1. private String buildWhereCondition(String template, SyncContext ctx) {
  2.     if (template == null) return "";  // 全表同步,无WHERE条件
  3.    
  4.     return template
  5.         .replace("{shopId}", String.valueOf(ctx.getShopId()))
  6.         .replace("{companyId}", String.valueOf(ctx.getCompanyId()))
  7.         .replace("{lastTime}", ctx.getLastSyncTime());
  8. }
复制代码
SQL生成过程(以店铺级增量同步为例)

步骤1:构造查询SQL
  1. // 占位符替换后得到WHERE条件
  2. String whereCondition = "shop_id = 123 AND update_time > '2025-01-15 00:00:00'";
  3. // 构造SELECT语句
  4. String selectSql = "SELECT * FROM user_table WHERE " + whereCondition;
复制代码
步骤2:流式读取并生成SQL文件

关键点:从ResultSet元数据动态获取字段,而非写死字段名
  1. try (ResultSet rs = stmt.executeQuery(selectSql)) {
  2.     ResultSetMetaData metadata = rs.getMetaData();
  3.     int columnCount = metadata.getColumnCount();
  4.    
  5.     // 从元数据获取列名列表
  6.     List<String> columnNames = new ArrayList<>();
  7.     for (int i = 1; i <= columnCount; i++) {
  8.         columnNames.add(metadata.getColumnName(i));
  9.     }
  10.    
  11.     // 1. 先写DELETE语句
  12.     writer.write("DELETE FROM user_table WHERE " + whereCondition + ";#END#");
  13.     writer.write(System.lineSeparator());
  14.    
  15.     // 2. 构造INSERT语句头部(字段名从元数据获取)
  16.     String insertHeader = "INSERT INTO `user_table` (" +
  17.         String.join(", ", columnNames) + ") VALUES\n";
  18.    
  19.     StringBuilder values = new StringBuilder();
  20.     int batchCount = 0;
  21.    
  22.     // 3. 流式读取数据并拼接VALUES
  23.     while (rs.next()) {
  24.         values.append("(");
  25.         for (int i = 1; i <= columnCount; i++) {
  26.             if (i > 1) values.append(", ");
  27.             // 根据字段类型格式化值(动态处理)
  28.             values.append(formatValue(rs, i, metadata.getColumnType(i)));
  29.         }
  30.         values.append(")");
  31.         batchCount++;
  32.         
  33.         // 每10行生成一条INSERT
  34.         if (batchCount >= 10) {
  35.             writer.write(insertHeader + values.toString() + ";#END#");
  36.             writer.write(System.lineSeparator());
  37.             values.setLength(0);
  38.             batchCount = 0;
  39.         } else {
  40.             values.append(", ");
  41.         }
  42.     }
  43.    
  44.     // 4. 处理剩余数据
  45.     if (batchCount > 0) {
  46.         writer.write(insertHeader + values.toString() + ";#END#");
  47.     }
  48. }
复制代码
最终生成的SQL文件
  1. DELETE FROM user_table WHERE shop_id = 123 AND update_time > '2025-01-15 00:00:00';#END#
  2. INSERT INTO `user_table` (id, shop_id, username, update_time) VALUES
  3. (1, 123, 'Alice', '2025-01-16 10:00:00'),
  4. (2, 123, 'Bob', '2025-01-16 11:00:00');#END#
复制代码
优势总结

灵活性:四种策略自由配置,满足不同表的需求
可扩展:新增表只需加配置,代码零改动
占位符:支持{shopId}、{companyId}、{lastTime}等动态参数
零硬编码:字段名从元数据动态获取,适配任意表结构
第四难:单表50W+数据,如何防止OOM?

问题:传统方式的内存杀手
  1. // 反面教材:一次性加载全部数据
  2. String sql = "SELECT * FROM huge_table WHERE shop_id = 123";
  3. List<Map<String, Object>> allRows = jdbcTemplate.queryForList(sql);  // 直接OOM
复制代码
单店铺单表可能50W+行,全部加载到内存会导致OutOfMemoryError。
解决方案:流式读取 + 临时文件

MySQL流式读取
  1. private void generateSQL(DataSource ds, String sql) throws SQLException {
  2.     try (Connection conn = ds.getConnection();
  3.          Statement stmt = conn.createStatement(
  4.              ResultSet.TYPE_FORWARD_ONLY,    // 只向前遍历
  5.              ResultSet.CONCUR_READ_ONLY)) {  // 只读模式
  6.         
  7.         // 核心:启用MySQL流式读取
  8.         stmt.setFetchSize(Integer.MIN_VALUE);  // MySQL JDBC特殊约定!
  9.         
  10.         try (ResultSet rs = stmt.executeQuery(sql)) {
  11.             int batchCount = 0;
  12.             StringBuilder sqlValues = new StringBuilder();
  13.             
  14.             while (rs.next()) {  // 逐行处理
  15.                 sqlValues.append("(");
  16.                 for (int i = 1; i <= columnCount; i++) {
  17.                     sqlValues.append(formatValue(rs, i));
  18.                 }
  19.                 sqlValues.append(")");
  20.                 batchCount++;
  21.                
  22.                 // 每10行生成一条INSERT
  23.                 if (batchCount >= 10) {
  24.                     writeInsert(sqlValues.toString());
  25.                     sqlValues.setLength(0);  // 清空缓冲
  26.                     batchCount = 0;
  27.                 }
  28.             }
  29.         }
  30.     }
  31. }
复制代码
核心技巧

  • stmt.setFetchSize(Integer.MIN_VALUE):MySQL JDBC的特殊约定,启用服务器端游标
  • 每次只拉取1行数据到客户端,内存占用恒定
  • 批量拼接VALUES:多行生成一条INSERT,减少SQL数量
MongoDB流式读取
  1. CloseableIterator<Document> iterator =
  2.     mongoTemplate.stream(query, Document.class, collectionName);
  3. try {
  4.     while (iterator.hasNext()) {
  5.         Document doc = iterator.next();  // 逐文档处理
  6.         processDocument(doc);
  7.     }
  8. } finally {
  9.     iterator.close();  // ⚠️ 必须手动关闭,否则连接泄漏!
  10. }
复制代码
塔内执行:流式读取
  1. try (BufferedReader reader = new BufferedReader(
  2.          new InputStreamReader(ossStream))) {
  3.    
  4.     List<String> sqlBatch = new ArrayList<>();
  5.     StringBuilder currentSql = new StringBuilder();
  6.     String line;
  7.    
  8.     while ((line = reader.readLine()) != null) {
  9.         // 拼接当前行
  10.         currentSql.append(line);
  11.         
  12.         // 检查是否是完整的SQL(以;#END#结尾)
  13.         if (currentSql.toString().endsWith(";#END#")) {
  14.             // 还原:特殊符号 → 正常分号
  15.             String realSql = currentSql.toString().replace(";#END#", ";");
  16.             
  17.             // 添加到批次
  18.             sqlBatch.add(realSql);
  19.             currentSql.setLength(0);  // 清空,准备下一条SQL
  20.             
  21.             // 批量执行(每100条一批,塔外10条数据构造成1个insert语句)
  22.             if (sqlBatch.size() >= 100) {
  23.                 executeBatch(stmt, sqlBatch);
  24.                 sqlBatch.clear();
  25.             }
  26.         }
  27.     }
  28.    
  29.     // 执行剩余SQL
  30.     if (!sqlBatch.isEmpty()) {
  31.         executeBatch(stmt, sqlBatch);
  32.     }
  33.    
  34.     // 关键:自动提交,避免事务过大
  35.     conn.setAutoCommit(true);
  36. }
复制代码
为什么setAutoCommit(true)?
单文件可能几千条SQL,如果在一个事务里会导致:

  • 锁表时间过长
  • 回滚日志暴涨
  • 内存占用飙升
自动提交后,每条SQL独立提交,避免以上问题。
效果对比:
方案内存占用风险一次性加载2GB(50W行)必然OOM流式处理50MB(常量级)稳定第五难:MongoDB到PostgreSQL的类型转换

问题

MongoDB和PostgreSQL的数据类型完全不兼容:
MongoDBPostgreSQL问题ObjectId无对应类型主键转换BSON对象JSONB嵌套结构数组Array类型声明解决方案

在配置表的扩展字段定义类型映射:
  1. {
  2.   "mongoCollection": "user_profile",
  3.   "pgTable": "user_profile",
  4.   "fieldMapping": {
  5.     "_id": "id",
  6.     "preferences": "preferences",
  7.     "tags": "tags"
  8.   },
  9.   "typeMapping": {
  10.     "_id": "OBJECTID_TO_VARCHAR",
  11.     "preferences": "JSONB",
  12.     "tags": "INTEGER_ARRAY"
  13.   }
  14. }
复制代码
类型转换代码:
  1. private String convertValue(Object value, String typeRule) {
  2.     if (value == null) return "NULL";
  3.    
  4.     switch (typeRule) {
  5.         case "JSONB":
  6.             // {name: "test"} → '{"name":"test"}'::jsonb
  7.             String json = toJsonString(value);
  8.             return "'" + escapeSql(json) + "'::jsonb";
  9.             
  10.         case "INTEGER_ARRAY":
  11.             // [1,2,3] → ARRAY[1,2,3]::INTEGER[]
  12.             List<Integer> list = (List) value;
  13.             return "ARRAY[" + String.join(",", list) + "]::INTEGER[]";
  14.             
  15.         case "OBJECTID_TO_VARCHAR":
  16.             // ObjectId("507f...") → '507f...'
  17.             return "'" + value.toString() + "'";
  18.             
  19.         default:
  20.             return convertDefault(value);
  21.     }
  22. }
复制代码
复盘:一个月完成迁移的关键

整体架构:塔外-塔内双链路
  1. ┌──────────── 塔外系统 (Outer) ────────────┐
  2. │                                         │
  3. │  ① API触发同步                          │
  4. │  ② 查询配置表 → 拆分公司级/店铺级配置       │
  5. │  ③ 构建MQ消息 → 投递RocketMQ             │
  6. │  ④ MQ Consumer                         │
  7. │     ├─ SHOW CREATE TABLE 获取表结构       │
  8. │     ├─ 流式读取源数据库                    │
  9. │     ├─ 生成 DELETE + INSERT SQL          │
  10. │     ├─ 分号替换为特殊符号                  │
  11. │     └─ 上传到 OSS                        │
  12. └───────────────────────────────────────────┘
  13.                     │
  14.                     │ OSS中转
  15.                     ↓
  16. ┌──────────── 塔内系统 (Inner) ────────────┐
  17. │                                         │
  18. │  ⑤ 定时任务 / 手动触发                    │
  19. │  ⑥ 扫描OSS目录 → 获取待处理SQL文件列表      │
  20. │  ⑦ 流式下载SQL文件 → 逐行读取              │
  21. │     ├─ 特殊符号还原为分号                  │
  22. │     ├─ 批量执行(1000条/批)                │
  23. │     └─ setAutoCommit(true) 防止事务过大   │
  24. │  ⑧ 执行成功 → 立即删除OSS文件              │
  25. └───────────────────────────────────────────┘
复制代码
核心亮点总结

技术点传统方案本方案效果表结构获取手写100个MapperSHOW CREATE TABLE动态解析零硬编码,支持任意表SQL分隔符用;判断结束特殊符号;#END#支持数据含分号、换行符同步策略全量同步or硬编码配置表+占位符灵活配置,4种策略大数据量处理一次性加载(OOM)流式读取+临时文件常量级内存,50W+行稳定扩展性新增表需改代码只需加配置秒级上线新表同步做对的3件事

1. 从工具中偷师学艺
Navicat的导入/导出功能启发了整体方案,SHOW CREATE TABLE是突破口
2. 把复杂逻辑放在塔外
塔内只负责执行SQL,逻辑简单;塔外可以随意调试、优化
3. 配置驱动,而非代码驱动
新增表只需加配置,不改代码。后续维护成本趋近于0
最终效果

指标数据迁移表数量200+张(含后续新增)最大单表数据1000+万行首次全量同步10-30分钟日常增量同步公司级表约30秒,店铺级表约1分钟内存占用稳定在200MB左右OOM次数0(连续运行3个月)工期25天(提前5天完成)写在最后

以上便是我这次迁移实战的全部分享。绝非标准答案,但希望能为你带来一丝灵感。
这次迁移让我深刻体会到:
好的架构不是设计出来的,而是从实际问题中"偷"出来的。
当你面对技术难题时,不妨问自己:

  • 有没有现成的工具已经解决了类似问题?不要重复造轮子!!(Navicat)
  • 数据库/框架本身提供了什么能力?(SHOW CREATE TABLE、setFetchSize)
  • 能否用配置代替硬编码?(配置表+占位符)
感谢那些"默默扛下所有"的技术细节


  • SHOW CREATE TABLE —— 你扛下了表结构解析的苦活
  • stmt.setFetchSize(Integer.MIN_VALUE) —— 你默默守护了内存安全
  • ;#END# —— 你可能是全网最诡异但最实用的分隔符
  • RocketMQ的TAG过滤 —— 你让消息路由变得优雅
  • CompletableFuture —— 你让塔内并发处理成为可能
  • System.lineSeparator() —— 你让SQL文件格式清晰明了
最后送大家一段话

写代码的时候,我们都是站在巨人肩膀上的追梦人。
技术本身没有高低贵贱,能解决问题的就是好技术。不要盲目追求所谓的"最佳实践",在约束下求最优解,才是工程师的智慧。
愿你在技术的道路上,既能仰望星空,也能脚踏实地。
"在技术的世界里,没有完美的方案,只有最合适的选择。
而最合适的选择,往往来自于对问题本质的深刻理解。"
—— 一个在生产环境爬坑的后端开发
文章的最后,想和你多聊两句。
技术之路,常常是热闹与孤独并存。那些深夜的调试、灵光一闪的方案、还有踩坑爬起后的顿悟,如果能有人一起聊聊,该多好。
为此,我建了一个小花园——我的微信公众号「[努力的小郑]」。
这里没有高深莫测的理论堆砌,只有我对后端开发、系统设计和工程实践的持续思考与沉淀。它更像我的数字笔记本,记录着那些值得被记住的解决方案和思维火花。
如果你觉得今天的文章还有一点启发,或者单纯想找一个同行者偶尔聊聊技术、谈谈思考,那么,欢迎你来坐坐。

愿你前行路上,总有代码可写,有梦可追,也有灯火可亲。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-1-25 07:57:06

举报

2026-1-26 12:28:02

举报

您需要登录后才可以回帖 登录 | 立即注册