找回密码
 立即注册
首页 业界区 业界 DTS按业务场景批量迁移阿里云MySQL表实战(下):迁移管 ...

DTS按业务场景批量迁移阿里云MySQL表实战(下):迁移管理平台设计与实现

剽达崖 昨天 23:05
本文是 DTS按业务场景批量迁移阿里云MySQL表实战(上):技术选型和API对接  的后续,应用状态模式,完成业务系统中的迁移模块。DTS的对接方式可参考前文。
迁移管理平台设计与实现

完成DTS API对接后,就需要考虑如何将DTS和业务系统有机结合实现整套的迁移流程。
出于信息安全角度的考虑,本文删除了大量涉及实际业务的实现代码。
业务约束

从业务出发,最好的体验肯定是用户无感的,即迁移完成后,确认新旧表数据一致,直接切换到新表查询。
显然过于理想化了,如果迁移期间,用户对旧表进行了写入,新表可能会少数据,不能贸然切换,要做数据的对比。如果用户一直在写入,就要一直反复的对比、确认,有增量数据就要删除新表重新迁移,流程复杂。
和业务方沟通,得知对方可以禁止写入正在迁移的公司的表,等迁移完成后再恢复使用。这样流程就简单多了,开始迁移时,将旧表重命名增加特殊的后缀,就能防止用户操作,并确保旧表数据不发生变更。
技术校验

如何判断一个公司是否迁移成功?最严谨的方式是逐表逐行数据对比,但是在使用DTS的情况下并无必要。我采取的比较策略是,在迁移前后:

  • 源表目标表数量相同
  • 对应表数据量相同、数据最后更新时间(如有此列)相同
只要满足以上要求,就认为数据是一致的。可以通过SELECT COUNT(*), MAX(updateTime) FROM table_name一次性获取。
迁移状态机

经过分析和简化,迁移状态机如下:
1.png

可以发现,每个状态都可以进行“推进”和“回滚”两个动作,很适合使用状态模式来实现。状态模式的实现先放一放,看看几个基本数据结构:
迁移任务
  1. @Data
  2. public class TableMigrateTask {
  3.   // 主键
  4.   private Long id;
  5.   // 公司id
  6.   private Long companyId;
  7.   // 原始分库位
  8.   private Long oldSchemaId;
  9.   // 新分库位
  10.   private Long newSchemaId;
  11.   // 任务状态
  12.   private Integer state;
  13.   // DTS任务状态,和阿里云定义一致
  14.   private String dtsStatus;
  15.   // DTS任务已删除(释放)
  16.   private Boolean dtsDeleted;
  17.   // DTS实例id
  18.   private String dtsInstanceId;
  19.   // DTS任务id
  20.   private String dtsJobId;
  21.   // 失败原因
  22.   private String failedReason;
  23.   // 状态跳转时的信息,辅助排查问题
  24.   private String transitInfo;
  25.   // 迁移前表数据统计,json格式,表名、数据量、最后更新时间
  26.   private String tableStatisticsSrc;
  27.   // 迁移表结果统计,json格式kv结构,表名-数据量
  28.   private String tableStatisticsDest;
  29. }
复制代码
迁移上下文

迁移状态机实际处理的对象,封装了一些服务,可以视为领域对象(充血模型)。
  1. @Setter
  2. public class TableMigrateExecuteContext {
  3.   // 持有的任务对象
  4.   @Getter
  5.   private TableMigrateTask tableMigrateTask;
  6.   // 当前的状态
  7.   @Getter
  8.   private TableMigrateState currentState;
  9.   private TableMigrateTaskRepository tableMigrateTaskRepository;
  10.   private TableMigrateQueryService tableMigrateQueryService;
  11.   private TableMigrateService tableMigrateService;
  12.   private TableArchiveService tableArchiveService;
  13.   private DataSourceHolder dataSourceHolder;
  14.   public void createInstanceAndConfigureDtsJob() {
  15.       // 调用DTS API创建任务
  16.   }
  17.   public DescribeDtsJobDetailResponse queryDtsMigJob() {
  18.       // 调用DTS API查询
  19.   }
  20.   public void switchRoute(long oldSchemaId, long newSchemaId) {
  21.       // 将分表以外的单表update为newSchemaId
  22.   }
  23.   public void stopDtsMigJob() {
  24.       // 调用DTS API停止
  25.   }
  26.   public void updateTableMigrateTask(TableMigrateTask modifiedTask) {
  27.        // 更新持有的任务(持久化)
  28.   }
  29.   /**
  30.    * 状态推进
  31.    *
  32.    * @return 返回信息,不成功时非空
  33.    */
  34.   public String forward() {
  35.     return currentState.forward(this);
  36.   }
  37.   /** 状态回滚 */
  38.   public String rollback() {
  39.     return currentState.rollback(this);
  40.   }
  41.   /**
  42.    * 重命名旧表
  43.    *
  44.    * @param forward true-迁移场景,加——migold,反之则不加
  45.    * @param ignoreExited 是否忽略已存在的表,仅在初始态的回滚场景可用
  46.    */
  47.   public void renameOldTableNames(boolean forward, boolean ignoreExited) {
  48.       // 注意要考虑源库中是否存在和旧表相同的同名表
  49.   }
  50.   public void updateDestTableInfo() {
  51.      // 更新目标表的统计信息
  52.   }
  53.   public void archiveNewTables(List<String> needArchiveTables) {
  54.       // 归档新表  
  55.   }
  56.   /**
  57.    * 删除表
  58.    *
  59.    * @param newTable true-新表,false-旧表
  60.    */
  61.   public void deleteTables(boolean newTable) {
  62.       // 批量执行DROP TABLE
  63.   }
  64.   public void deleteDtsInstanceAndJob() {
  65.       // 调用DTS API释放实例
  66.   }
  67. }
复制代码
工厂类
  1. @Component
  2. public class TableMigrateContextFactory {
  3.   @Resource private TableMigrateTaskRepository tableMigrateTaskRepository;
  4.   @Resource private TableMigrateQueryService tableMigrateQueryService;
  5.   @Resource private TableMigrateService tableMigrateService;
  6.   @Resource private TableArchiveService tableArchiveService;
  7.   @Resource private DataSourceHolder dataSourceHolder;
  8.   public TableMigrateExecuteContext buildContext(long taskId) {
  9.     TableMigrateTask task = tableMigrateTaskRepository.getById(taskId);
  10.     if (task == null || task.getStatus() == 0) {
  11.       throw new BizException("表迁移任务不存在或已被删除");
  12.     }
  13.     TableMigrateExecuteContext context = new TableMigrateExecuteContext();
  14.     context.setTableMigrateTask(task);
  15.     context.setCurrentState(buildState(TableMigrateStateEnum.getByValue(task.getState())));
  16.     // 服务注入
  17.     context.setTableMigrateTaskRepository(tableMigrateTaskRepository);
  18.     context.setTableMigrateQueryService(tableMigrateQueryService);
  19.     context.setTableMigrateService(tableMigrateService);
  20.     context.setTableArchiveService(tableArchiveService);
  21.     context.setDataSourceHolder(dataSourceHolder);
  22.     return context;
  23.   }
  24.   private TableMigrateState buildState(TableMigrateStateEnum stateEnum) {
  25.     switch (stateEnum) {
  26.       case INIT:
  27.         return new MigrateInitState();
  28.       case FAILED:
  29.         return new MigrateFailedState();
  30.       case PROCESSING:
  31.         return new MigrateProcessingState();
  32.       case NEED_SWITCH:
  33.         return new MigrateNeedSwitchState();
  34.       case SWITCHED:
  35.         return new MigrateSwitchedState();
  36.       case FINISH:
  37.         return new MigrateFinishState();
  38.       default:
  39.         throw new BizException("迁移状态非法");
  40.     }
  41.   }
  42. }
复制代码
迁移状态

我在做本次的系统设计时,对状态模式做了一些回顾和参考。迁移状态是状态模式的核心,从设计模式的角度来看,状态模式“允许对象在其内部状态改变时动态调整自身行为,使得对象的表现形态如同修改了其所属类。”
以下是各个类的继承关系:
2.png

对应的状态如下:
类名含义说明TableMigrateState接口定义AbstractMigrateState状态抽象类AbstractFinalState终态抽象类终态很多操作都是不支持的,和AbstractMigrateState分开更简洁MigrateInitState初始记录要迁移的统计数据和配置MigrateProcessingState迁移中DTS进行迁移动作的状态MigrateNeedSwitchState迁移完成待切换分库位数据已同步在新表,但还不可以通过业务功能直接访问MigrateSwitchedState分库位已切换待删除旧表数据已同步在新表,且能通过业务功能直接访问MigrateFinishState迁移完成数据已同步在新表且能访问,旧表已删除MigrateFailedState迁移失败回滚,旧表恢复访问,新表如果有则删除状态接口
  1. public interface TableMigrateState {
  2.   /**
  3.    * 前进到下一状态
  4.    *
  5.    * @param context
  6.    * @return 失败的提示信息
  7.    */
  8.   String forward(TableMigrateExecuteContext context);
  9.   /**
  10.    * 回滚操作
  11.    *
  12.    * @param context
  13.    * @return 失败的提示信息
  14.    */
  15.   String rollback(TableMigrateExecuteContext context);
  16.   /**
  17.    * 获取当前的状态对应枚举
  18.    */
  19.   TableMigrateStateEnum getState();
  20.   /**
  21.    * 获取下一个状态
  22.    */
  23.   TableMigrateState getNextState();
  24.   /**
  25.    * 获取回滚的状态
  26.    */
  27.   TableMigrateState getRollbackState();
  28. }
复制代码
状态抽象类
  1. public abstract class AbstractMigrateState implements TableMigrateState {
  2.   @Override
  3.   public String forward(TableMigrateExecuteContext context) {
  4.     TableMigrateTask task = context.getTableMigrateTask();
  5.     // 1. 前置校验
  6.     // 根据校验结果,判断是留在当前状态,还是直接回滚到迁移失败状态
  7.     // 2. 实际动作,由实现类完成
  8.    // 简单起见,在实际动作里的异常都自动回滚
  9.     // 3. 状态跳转
  10.     transit(context, getNextState());
  11.     return null;
  12.   }
  13.   @Override
  14.   public String rollback(TableMigrateExecuteContext context) {
  15.     TableMigrateTask task = context.getTableMigrateTask();
  16.     // 1. 当前状态校验
  17.     checkCurrentState(context);
  18.     // 2. 回滚操作,如果发生异常,保持在当前状态
  19.     // 3. 状态跳转
  20.     transit(context, getRollbackState());
  21.     return null;
  22.   }
  23.   /**
  24.    * 前置校验
  25.    *
  26.    * @param context
  27.    */
  28.   protected PreCheckResult preCheck(TableMigrateExecuteContext context) {
  29.     return checkCurrentState(context);
  30.   }
  31.   protected PreCheckResult checkCurrentState(TableMigrateExecuteContext context) {
  32.       // 检查当前状态是否符合预期,构造检查结果
  33.   }
  34.   /**
  35.    * 改变当前执行上下文状态, 不做其他的业务操作
  36.    *
  37.    * @param context
  38.    * @param nextState
  39.    */
  40.   private void transit(TableMigrateExecuteContext context, TableMigrateState nextState) {
  41.     TableMigrateTask task = context.getTableMigrateTask();
  42.     task.setState(nextState.getState().getValue());
  43.     context.updateTableMigrateTask(task);
  44.     context.setCurrentState(nextState);
  45.   }
  46.   
  47.   protected abstract void doForward(TableMigrateExecuteContext context);
  48.   /**
  49.    * 回滚操作
  50.    *
  51.    * <p>需要保证幂等,如果单个回滚操作失败,可以重复执行
  52.    *
  53.    * @param context
  54.    */
  55.   protected abstract void doRollback(TableMigrateExecuteContext context);
  56.   /**
  57.    * 旧表更名
  58.    *
  59.    * @param context
  60.    * @param forward true-迁移场景,旧表加后缀; false-回滚场景,旧表删除后缀
  61.    */
  62.   protected void renameOldTableNames(
  63.       TableMigrateExecuteContext context, boolean forward, boolean ignoreExited) {
  64.     context.renameOldTableNames(forward, ignoreExited);
  65.   }
  66.   /**
  67.    * 删除新表
  68.    *
  69.    * <p>新表的删除,最好不要共用这个方法
  70.    *
  71.    * @param context
  72.    */
  73.   protected void deleteNewTables(TableMigrateExecuteContext context) {
  74.     context.deleteTables(true);
  75.   }
  76.   /** 前置校验结果 */
  77.   @Data
  78.   @AllArgsConstructor
  79.   public static class PreCheckResult {
  80.     /** 中断,需要回滚 */
  81.     public static final int ABORT = -1;
  82.     /** 校验通过 */
  83.     public static final int PASS = 0;
  84.     /** 校验不通过,保持原有状态 */
  85.     public static final int NOT_PASS = 1;
  86.     private int code;
  87.     private String msg;
  88.     public static PreCheckResult buildPass() {
  89.       return new PreCheckResult(PASS, null);
  90.     }
  91.     public boolean isPass() {
  92.       return this.code == PASS;
  93.     }
  94.   }
  95. }
复制代码
终态抽象类
  1. public abstract class AbstractFinalState implements TableMigrateState {
  2.   @Override
  3.   public String forward(TableMigrateExecuteContext context) {
  4.     return "当前状态【" + getState().getValue() + " " + getState().getDes() + "】已是终态,不能进行下一步操作";
  5.   }
  6.   @Override
  7.   public String rollback(TableMigrateExecuteContext context) {
  8.     return "当前状态【" + getState().getValue() + " " + getState().getDes() + "】已是终态,不能进行撤销操作";
  9.   }
  10.   @Override
  11.   public TableMigrateState getNextState() {
  12.     throw new BizException("当前状态" + getState().getValue() + "已是终态,没有后续状态可跳转");
  13.   }
  14.   @Override
  15.   public TableMigrateState getRollbackState() {
  16.     throw new BizException("当前状态" + getState().getValue() + "已是终态,没有后续撤销态可跳转");
  17.   }
  18. }
复制代码
初始
  1. public class MigrateInitState extends AbstractMigrateState {
  2.   @Override
  3.   protected void doForward(TableMigrateExecuteContext context) {
  4.     TableMigrateTask task = context.getTableMigrateTask();
  5.     if (task.getOldSchemaId().equals(task.getNewSchemaId())) {
  6.       throw new BizException("迁移前后的分库位id相同");
  7.     }
  8.     // 1. 旧表更名, 直接阻止后续的变更
  9.     // 归档不影响RENAME
  10.     renameOldTableNames(context, true, false);
  11.     // 2. 创建DTS任务并回写到task字段
  12.     // 创建失败则直接抛异常,回滚
  13.     // 此处DTS任务是直接提交执行的,并不能确定当前实际是哪个状态,因此状态留空
  14.     context.createInstanceAndConfigureDtsJob();
  15.   }
  16.   
  17.   @Override
  18.   protected void doRollback(TableMigrateExecuteContext context) {
  19.     // 初始态回滚时,旧表可能还没有更名
  20.     renameOldTableNames(context, false, true);
  21.   }
  22.   @Override
  23.   public TableMigrateStateEnum getState() {
  24.     return TableMigrateStateEnum.INIT;
  25.   }
  26.   @Override
  27.   public TableMigrateState getNextState() {
  28.     return new MigrateProcessingState();
  29.   }
  30.   @Override
  31.   public TableMigrateState getRollbackState() {
  32.     return new MigrateFailedState();
  33.   }
  34. }
复制代码
迁移中
  1. public class MigrateProcessingState extends AbstractMigrateState {
  2.   /** DTS-未初始化的状态 */
  3.   private static final Set<String> DTS_NOT_INIT =
  4.       Sets.newHashSet(
  5.           DtsJobStatusEnum.NOT_STARTED.getCode(), DtsJobStatusEnum.NOT_CONFIGURED.getCode());
  6.   /** DTS-处理中的状态 */
  7.   private static final Set<String> DTS_PROCESSING =
  8.       Sets.newHashSet(
  9.           DtsJobStatusEnum.PRECHECKING.getCode(),
  10.           DtsJobStatusEnum.PRECHECK_PASS.getCode(),
  11.           DtsJobStatusEnum.INITIALIZING.getCode(),
  12.           DtsJobStatusEnum.SYNCHRONIZING.getCode(),
  13.           DtsJobStatusEnum.MIGRATING.getCode(),
  14.           DtsJobStatusEnum.SUSPENDING.getCode(),
  15.           DtsJobStatusEnum.MODIFYING.getCode(),
  16.           DtsJobStatusEnum.RETRYING.getCode(),
  17.           DtsJobStatusEnum.UPGRADING.getCode(),
  18.           DtsJobStatusEnum.DOWNGRADING.getCode(),
  19.           DtsJobStatusEnum.LOCKED.getCode());
  20.   /** DTS-失败的状态 */
  21.   private static final Set<String> DTS_FAILED =
  22.       Sets.newHashSet(
  23.           DtsJobStatusEnum.PRECHECK_FAILED.getCode(),
  24.           DtsJobStatusEnum.INITIALIZE_FAILED.getCode(),
  25.           DtsJobStatusEnum.FAILED.getCode(),
  26.           DtsJobStatusEnum.MIGRATION_FAILED.getCode());
  27.   @Override
  28.   public PreCheckResult preCheck(TableMigrateExecuteContext context) {
  29.     checkCurrentState(context);
  30.     // 校验DTS任务已完成
  31.     TableMigrateTask task = context.getTableMigrateTask();
  32.     DescribeDtsJobDetailResponse dtsJobDetailResponse = context.queryDtsMigJob();
  33.     if (dtsJobDetailResponse == null || dtsJobDetailResponse.getBody() == null) {
  34.       return new PreCheckResult(
  35.           PreCheckResult.NOT_PASS,
  36.           String.format(
  37.               "DTS任务结果查询失败, 返回结果为空。instanceId=%s, jobId=%s",
  38.               task.getDtsInstanceId(), task.getDtsJobId()));
  39.     }
  40.     DescribeDtsJobDetailResponseBody responseBody = dtsJobDetailResponse.getBody();
  41.     if (BooleanUtils.isNotTrue(responseBody.getSuccess())) {
  42.       return new PreCheckResult(
  43.           PreCheckResult.NOT_PASS,
  44.           String.format(
  45.               "DTS任务结果查询失败, 接口调用结果为失败。instanceId=%s, jobId=%s",
  46.               task.getDtsInstanceId(), task.getDtsJobId()));
  47.     }
  48.     DtsJobStatusEnum dtsJobStatusEnum = DtsJobStatusEnum.getByCode(responseBody.getStatus());
  49.     if (dtsJobStatusEnum == null) {
  50.       return new PreCheckResult(
  51.           PreCheckResult.NOT_PASS,
  52.           String.format(
  53.               "DTS任务状态非法,请稍后重试。instanceId=%s, jobId=%s, 阿里云状态=%s",
  54.               task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  55.     }
  56.     task.setDtsStatus(dtsJobStatusEnum.getCode());
  57.     if (DTS_NOT_INIT.contains(dtsJobStatusEnum.getCode())) {
  58.       return new PreCheckResult(
  59.           PreCheckResult.NOT_PASS,
  60.           String.format(
  61.               "DTS任务状态异常,尚未初始化。instanceId=%s, jobId=%s, 阿里云状态=%s",
  62.               task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  63.     }
  64.     if (DTS_PROCESSING.contains(dtsJobStatusEnum.getCode())) {
  65.       return new PreCheckResult(
  66.           PreCheckResult.NOT_PASS,
  67.           String.format(
  68.               "DTS任务仍在处理中。instanceId=%s, jobId=%s, 阿里云状态=%s",
  69.               task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  70.     }
  71.     if (DTS_FAILED.contains(dtsJobStatusEnum.getCode())) {
  72.       return new PreCheckResult(
  73.           PreCheckResult.ABORT,
  74.           String.format(
  75.               "DTS任务执行失败。instanceId=%s, jobId=%s, 阿里云状态=%s",
  76.               task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  77.     }
  78.     if (StringUtils.equals(DtsJobStatusEnum.FINISHED.getCode(), dtsJobStatusEnum.getCode())) {
  79.       return PreCheckResult.buildPass();
  80.     }
  81.     return new PreCheckResult(
  82.         PreCheckResult.NOT_PASS,
  83.         String.format(
  84.             "DTS任务状态非法。instanceId=%s, jobId=%s, 阿里云状态=%s",
  85.             task.getDtsInstanceId(), task.getDtsJobId(), responseBody.getStatus()));
  86.   }
  87.   @Override
  88.   protected void doForward(TableMigrateExecuteContext context) {
  89.     // 已在校验时更新任务状态,并根据DTS执行状态判断要不要回滚
  90.     // 1. 将迁移后的表,按照原表的状态进行归档
  91.     // 2. 写入迁移后的表统计信息
  92.     context.updateDestTableInfo();
  93.   }
  94.   @Override
  95.   protected void doRollback(TableMigrateExecuteContext context) {
  96.     // 1. 尝试中止DTS任务,对已完成的DTS任务调用不会抛异常
  97.     // 2. 新表删除
  98.     deleteNewTables(context);
  99.     // 3. 旧表名称还原
  100.     renameOldTableNames(context, false, false);
  101.   }
  102.   @Override
  103.   public TableMigrateStateEnum getState() {
  104.     return TableMigrateStateEnum.PROCESSING;
  105.   }
  106.   @Override
  107.   public TableMigrateState getNextState() {
  108.     return new MigrateNeedSwitchState();
  109.   }
  110.   @Override
  111.   public TableMigrateState getRollbackState() {
  112.     return new MigrateFailedState();
  113.   }
  114. }
复制代码
迁移完成待切换分库位
  1. public class MigrateNeedSwitchState extends AbstractMigrateState {
  2.   @Override
  3.   protected void doForward(TableMigrateExecuteContext context) {
  4.     // 单表路由切换
  5.     context.switchRoute(
  6.         context.getTableMigrateTask().getOldSchemaId(),
  7.         context.getTableMigrateTask().getNewSchemaId());
  8.   }
  9.   @Override
  10.   protected void doRollback(TableMigrateExecuteContext context) {
  11.     // 1. 新表删除
  12.     deleteNewTables(context);
  13.     // 2. 旧表名称还原
  14.     renameOldTableNames(context, false, false);
  15.     // 3. 单表和路由恢复
  16.     // 可能已经在forward时做过,因此也做复原
  17.     context.switchRoute(
  18.         context.getTableMigrateTask().getNewSchemaId(),
  19.         context.getTableMigrateTask().getOldSchemaId());
  20.   }
  21.   @Override
  22.   public TableMigrateStateEnum getState() {
  23.     return TableMigrateStateEnum.NEED_SWITCH;
  24.   }
  25.   @Override
  26.   public TableMigrateState getNextState() {
  27.     return new MigrateSwitchedState();
  28.   }
  29.   @Override
  30.   public TableMigrateState getRollbackState() {
  31.     return new MigrateFailedState();
  32.   }
  33. }
复制代码
分库位已切换待删除旧表
  1. public class MigrateSwitchedState extends AbstractMigrateState {
  2.   @Override
  3.   protected void doForward(TableMigrateExecuteContext context) {
  4.     // 删除旧表
  5.     context.deleteTables(false);
  6.   }
  7.   @Override
  8.   protected void doRollback(TableMigrateExecuteContext context) {
  9.     // 1. 新表删除
  10.     deleteNewTables(context);
  11.     // 2. 旧表名称还原
  12.     renameOldTableNames(context, false, false);
  13.     // 3. 单表和路由恢复
  14.     context.switchRoute(
  15.         context.getTableMigrateTask().getNewSchemaId(),
  16.         context.getTableMigrateTask().getOldSchemaId());
  17.   }
  18.   @Override
  19.   public TableMigrateStateEnum getState() {
  20.     return TableMigrateStateEnum.SWITCHED;
  21.   }
  22.   @Override
  23.   public TableMigrateState getNextState() {
  24.     return new MigrateFinishState();
  25.   }
  26.   @Override
  27.   public TableMigrateState getRollbackState() {
  28.     return new MigrateFailedState();
  29.   }
  30. }
复制代码
迁移完成
  1. public class MigrateFinishState extends AbstractFinalState {
  2.   @Override
  3.   public TableMigrateStateEnum getState() {
  4.     return TableMigrateStateEnum.FINISH;
  5.   }
  6. }
复制代码
迁移失败
  1. public class MigrateFailedState extends AbstractFinalState {
  2.   @Override
  3.   public TableMigrateStateEnum getState() {
  4.     return TableMigrateStateEnum.FAILED;
  5.   }
  6. }
复制代码
操作流程


  • 创建迁移任务TableMigrateTask
  • 通过工厂类,使用任务id创建包含迁移任务TableMigrateTask的迁移上下文TableMigrateExecuteContext
  • 调用TableMigrateExecuteContext的forward()推进状态、rollback()回滚状态
查询相关功能实现

DTS任务列表查询

因为已经把DTS的相关字段持久化了,可以通过业务系统相关的迁移任务表实现分页查询。
不过在“迁移中”跳转到“迁移完成待切换分库位”的过程中,DTS也会经历多个状态,典型的有Prechecking、Migrating、Finished等(见DescribeDtsJobDetail_数据传输_API文档的Status字段说明),可以通过接口获取最新的状态并写入迁移任务表。
如何查询待迁移的表?

回顾一下,要迁移的表分以下三种形式:

  • 后缀是公司id,如table_companyId
  • 后缀是 公司_年份,如table_company_year
  • 后缀是业务id,如table_bizId
对于同一个前缀,以companyId=123为例,第一、三种表都可以精确匹配:

  • 第一种表每个公司只有一张,比如table_a_123、table_b_123;
  • 第三种表每个bizId同一个公司只有一张,比如bizId可以取1、2,那么会存在table_c_1、table_c_2、table_d_1、table_d_2,并且,bizId是有限的,可以从一张bizId_table表获取所有可选值。
  • 对于第二种表year的取值范围,虽然可以类似bizId一样去找,但是并没有直接的关系表。
我想到了两种方案,最后选择了第二种。
SHOW TABLES LIKE查询指定前缀

这是最先考虑到的方案,比较直接,而且在开发、测试环境中运行良好。但是在线上就不行了,将所有表查询一次要数分钟,调用早已超时。我想这应该和线上环境表数量过多导致元数据获取变慢有关,每次查询需要上百ms,累计耗时长达数分钟。
DatabaseMetaData一次性获取所有表后过滤

可以通过DataSource的元数据,一次性获取数据源对应库的所有的表,再将表名进行过滤。经过测试,10W级数据获取全部表的时间在3~7S之间,和方案一相比快很多。
以下代码片段展示了如何获取所有表名,忽略异常处理。
  1. String physicalSchemaName = dataSourceHolder.getPhysicalSchemaName(logicalSchemaName);
  2. HikariDataSource dataSource = dataSourceHolder.getDataSourceByPhysicalSchemaName(physicalSchemaName);
  3. try (Connection connection = dataSource.getConnection()) {
  4.     DatabaseMetaData metaData = connection.getMetaData();
  5.     ResultSet catalogs = metaData.getCatalogs();
  6.     while (catalogs.next()) {
  7.         String tableCat = catalogs.getString("TABLE_CAT");
  8.         if (!StringUtils.equals(tableCat, physicalSchemaName)) {
  9.             // 判断库名是否一致。如果多个库实际上在同一个RDS实例,元数据实际上是这些库的,而非单个库的
  10.             continue;
  11.         }
  12.         // 获取指定数据库中的所有表名
  13.         ResultSet tableResultSet =
  14.             metaData.getTables(physicalSchemaName, null, "%", new String[] {"TABLE"});
  15.         int count = 0;
  16.         while (tableResultSet.next()) {
  17.           count++;
  18.           String physicalTableName = tableResultSet.getString("TABLE_NAME");
  19.           // 确定是否是要查的表,判断逻辑此处省略【注1】
  20.          }
  21.         LoggerUtils.info("本次共查询了" + count + "个表的元数据");
  22.       }
  23.       LoggerUtils.info(
  24.           "获取" + physicalSchemaName + "库的所有表元数据总耗时:" + (System.currentTimeMillis() - t1) + "ms");
  25.     } catch (SQLException e) {
  26.       LoggerUtils.error("获取分库元数据失败, 发生SQL异常", e);
  27.       throw new BizException("获取分库元数据失败, 发生SQL异常", e);
  28.     }
  29. }
复制代码
对数据库中每个表判定它是否是当前公司的表,对于第一、三种表,可以将后者放到一个HashSet中,每次循环时对比;对于第二种表,字符串前缀匹配无疑要花大量的时间。
为了加速前缀匹配,可以使用经典的数据结构——前缀匹配树。前缀匹配树的代码如下:
  1. public class StringPrefixTrie {
  2.   private final Node root = new Node();
  3.   static class Node {
  4.     boolean isEnd;
  5.     Map<Character, Node> children = new HashMap<>();
  6.   }
  7.   /**
  8.    * 增加一个待匹配的模式
  9.    *
  10.    * @param pattern
  11.    */
  12.   public void addPattern(String pattern) {
  13.     Node current = root;
  14.     for (char c : pattern.toCharArray()) {
  15.       current = current.children.computeIfAbsent(c, k -> new Node());
  16.     }
  17.     current.isEnd = true;
  18.   }
  19.   /**
  20.    * 是否满足前缀匹配
  21.    *
  22.    * @param str
  23.    * @return
  24.    */
  25.   public String match(String str) {
  26.     Node current = root;
  27.     for (int i = 0; i < str.length(); i++) {
  28.       current = current.children.get(str.charAt(i));
  29.       if (current == null) {
  30.         // 没有匹配到任何前缀
  31.         return null;
  32.       }
  33.       if (current.isEnd) {
  34.         // 返回匹配到的任意一个前缀
  35.         return str.substring(0, i + 1);
  36.       }
  37.     }
  38.     return null;
  39.   }
  40. }
复制代码
对所有需要前缀匹配的表的前缀调用addPattern(),在循环中先判断是否满足前缀匹配,再判断精准匹配即可。
后续规划


  • 批量迁移功能,将迁移批量化、自动化:

    • 批量捞取公司,判断是否需要迁移、迁移成本。理想情况下,数据量少但表多的公司,是迁移到其他库的最佳候选,大大降低源库的表量又节约了复制数据的时间
    • 多个公司id提交、创建任务、状态流转

  • 自动校验迁移是否成功

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册