前言
咱们星球中的商城系统中使用了动态数据源的功能,实现了分库分表的订单库的读库和写库的自动切换。
有球友反馈说,对动态数据源不太熟悉。
今天这篇文章就专门跟大家一起聊聊动态数据源,希望对你会有所帮助。
一、为什么需要动态数据源?
有些小伙伴在开发中可能会遇到这样的场景:一个系统需要同时访问多个数据库,或者需要根据业务参数动态选择数据源。这
时候,传统的单数据源配置就显得力不从心了。
1.1 传统多数据源的问题
传统方式的多个数据源配置,硬编码,不灵活。
例如下面这样:- @Configuration
- public class TraditionalDataSourceConfig {
-
- @Bean
- @Primary
- public DataSource primaryDataSource() {
- HikariDataSource dataSource = new HikariDataSource();
- dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db1");
- dataSource.setUsername("user1");
- dataSource.setPassword("pass1");
- return dataSource;
- }
-
- @Bean
- public DataSource secondaryDataSource() {
- HikariDataSource dataSource = new HikariDataSource();
- dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db2");
- dataSource.setUsername("user2");
- dataSource.setPassword("pass2");
- return dataSource;
- }
- }
复制代码 使用时需要手动管理数据源。- @Repository
- public class TraditionalUserDao {
-
- @Autowired
- @Qualifier("primaryDataSource")
- private DataSource primaryDataSource;
-
- @Autowired
- @Qualifier("secondaryDataSource")
- private DataSource secondaryDataSource;
-
- public User findUserFromPrimary(Long id) {
- // 需要手动获取连接、处理异常、关闭连接
- try (Connection conn = primaryDataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {
- stmt.setLong(1, id);
- ResultSet rs = stmt.executeQuery();
- // 处理结果集...
- } catch (SQLException e) {
- throw new RuntimeException("查询失败", e);
- }
- }
-
- }
复制代码 每个方法都要重复这样的模板代码,需要手动指定数据源,很麻烦。
那么,如何做优化呢?
1.2 动态数据源的优势
接下来,我们一起看看使用动态数据源后的优雅代码。- @Service
- public class UserService {
-
- @Autowired
- private UserMapper userMapper;
-
- // 根据租户ID自动选择数据源
- public User findUserByTenant(Long userId, String tenantId) {
- // 设置数据源上下文
- DataSourceContextHolder.setDataSource(tenantId);
- try {
- return userMapper.selectById(userId);
- } finally {
- // 清理上下文
- DataSourceContextHolder.clear();
- }
- }
-
- // 多租户数据聚合查询
- public UserAggregateInfo getUserAggregateInfo(Long userId) {
- UserAggregateInfo result = new UserAggregateInfo();
-
- // 查询主库
- DataSourceContextHolder.setDataSource("master");
- result.setBaseInfo(userMapper.selectById(userId));
-
- // 查询归档库
- DataSourceContextHolder.setDataSource("archive");
- result.setHistory(userMapper.selectHistory(userId));
-
- // 查询统计库
- DataSourceContextHolder.setDataSource("stats");
- result.setStatistics(userMapper.selectStats(userId));
-
- return result;
- }
- }
复制代码 代码中能根据租户ID自动选择数据源。
代码一下子变得更优雅了。
二、动态数据源的原理
有些小伙伴在使用动态数据源时,可能只是简单配置使用,并不清楚其底层工作原理。
理解核心原理对于排查问题和性能优化至关重要。
下面跟大家一起聊聊动态数据源的核心原理,希望对你会有所帮助。
数据源路由的核心机制
动态数据源的核心在于AbstractRoutingDataSource,它是Spring框架提供的抽象类:- // Spring AbstractRoutingDataSource 源码分析
- public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
-
- // 目标数据源映射表
- private Map<Object, Object> targetDataSources;
-
- // 默认数据源
- private Object defaultTargetDataSource;
-
- // 解析后的数据源映射
- private Map<Object, DataSource> resolvedDataSources;
-
- // 解析后的默认数据源
- private DataSource resolvedDefaultDataSource;
-
- // 关键方法:确定当前查找键
- protected abstract Object determineCurrentLookupKey();
-
- // 获取连接时选择数据源
- @Override
- public Connection getConnection() throws SQLException {
- return determineTargetDataSource().getConnection();
- }
-
- @Override
- public Connection getConnection(String username, String password) throws SQLException {
- return determineTargetDataSource().getConnection(username, password);
- }
-
- // 确定目标数据源
- protected DataSource determineTargetDataSource() {
- // 获取查找键
- Object lookupKey = determineCurrentLookupKey();
-
- // 根据查找键获取数据源
- DataSource dataSource = this.resolvedDataSources.get(lookupKey);
- if (dataSource == null && (this.resolvedDefaultDataSource != null || lookupKey == null)) {
- dataSource = this.resolvedDefaultDataSource;
- }
- if (dataSource == null) {
- throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
- }
- return dataSource;
- }
- }
复制代码 线程安全的数据源上下文管理
动态数据源执行流程
三、基于Spring Boot的完整实现
有些小伙伴在配置动态数据源时可能会遇到各种问题,下面我提供一个生产级别的完整实现。
完整配置实现
应用配置文件
- # application.yml
- spring:
- datasource:
- # 动态数据源配置
- dynamic:
- primary: master
- strict: true
- health-check-interval: 30
-
- # 主数据源
- master:
- jdbc-url: jdbc:mysql://localhost:3306/master_db?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
- username: root
- password: master_password
- driver-class-name: com.mysql.cj.jdbc.Driver
- maximum-pool-size: 20
- minimum-idle: 5
- connection-timeout: 30000
- idle-timeout: 600000
- max-lifetime: 1800000
- pool-name: MasterHikariPool
-
- # 从数据源1
- slave1:
- jdbc-url: jdbc:mysql://slave1:3306/slave_db?useUnicode=true&characterEncoding=utf8
- username: root
- password: slave1_password
- driver-class-name: com.mysql.cj.jdbc.Driver
- maximum-pool-size: 15
- minimum-idle: 3
- connection-timeout: 30000
- idle-timeout: 600000
- max-lifetime: 1800000
- pool-name: Slave1HikariPool
-
- # 从数据源2
- slave2:
- jdbc-url: jdbc:mysql://slave2:3306/slave_db?useUnicode=true&characterEncoding=utf8
- username: root
- password: slave2_password
- driver-class-name: com.mysql.cj.jdbc.Driver
- maximum-pool-size: 15
- minimum-idle: 3
- connection-timeout: 30000
- idle-timeout: 600000
- max-lifetime: 1800000
- pool-name: Slave2HikariPool
- # MyBatis配置
- mybatis:
- configuration:
- map-underscore-to-camel-case: true
- cache-enabled: true
- lazy-loading-enabled: false
- aggressive-lazy-loading: false
复制代码 注解式数据源切换
- /**
- * 数据源注解
- */
- @Target({ElementType.METHOD, ElementType.TYPE})
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface DataSource {
-
- /**
- * 数据源名称
- */
- String value() default "master";
-
- /**
- * 是否在方法执行后清除数据源(默认清除)
- */
- boolean clear() default true;
- }
- /**
- * 数据源切面
- */
- @Aspect
- @Component
- @Slf4j
- public class DataSourceAspect {
-
- /**
- * 定义切点:所有标注@DataSource注解的方法
- */
- @Pointcut("@annotation(com.example.annotation.DataSource)")
- public void dataSourcePointCut() {}
-
- /**
- * 环绕通知:在方法执行前后切换数据源
- */
- @Around("dataSourcePointCut()")
- public Object around(ProceedingJoinPoint point) throws Throwable {
- MethodSignature signature = (MethodSignature) point.getSignature();
- Method method = signature.getMethod();
-
- DataSource dataSourceAnnotation = method.getAnnotation(DataSource.class);
- if (dataSourceAnnotation == null) {
- // 类级别注解
- dataSourceAnnotation = point.getTarget().getClass().getAnnotation(DataSource.class);
- }
-
- if (dataSourceAnnotation != null) {
- String dataSourceKey = dataSourceAnnotation.value();
- boolean clearAfter = dataSourceAnnotation.clear();
-
- try {
- log.debug("切换数据源到: {}", dataSourceKey);
- DataSourceContextHolder.setDataSource(dataSourceKey);
-
- // 执行原方法
- return point.proceed();
-
- } finally {
- if (clearAfter) {
- DataSourceContextHolder.clear();
- log.debug("清除数据源上下文");
- }
- }
- }
-
- // 没有注解,使用默认数据源
- return point.proceed();
- }
- }
复制代码 四、高级特性
有些小伙伴在基础功能实现后,可能会遇到一些高级场景的需求。
下面介绍几个生产环境中常用的高级特性。
读写分离自动路由
- /**
- * 读写分离数据源路由器
- */
- @Component
- @Slf4j
- public class ReadWriteDataSourceRouter {
-
- // 读数据源列表
- private final List<String> readDataSources = Arrays.asList("slave1", "slave2");
-
- // 轮询计数器
- private final AtomicInteger counter = new AtomicInteger(0);
-
- /**
- * 根据SQL自动选择数据源
- */
- public String determineDataSource(boolean isReadOperation) {
- if (isReadOperation && !readDataSources.isEmpty()) {
- // 读操作:轮询选择从库
- int index = counter.getAndIncrement() % readDataSources.size();
- if (counter.get() > 9999) {
- counter.set(0); // 防止溢出
- }
- String readDataSource = readDataSources.get(index);
- log.debug("读操作选择数据源: {}", readDataSource);
- return readDataSource;
- } else {
- // 写操作:选择主库
- log.debug("写操作选择数据源: master");
- return "master";
- }
- }
-
- /**
- * 根据SQL语句判断是否为读操作
- */
- public boolean isReadOperation(String sql) {
- if (sql == null) {
- return true; // 默认为读操作
- }
-
- String trimmedSql = sql.trim().toLowerCase();
- return trimmedSql.startsWith("select") ||
- trimmedSql.startsWith("show") ||
- trimmedSql.startsWith("explain");
- }
- }
- /**
- * MyBatis拦截器 - 自动读写分离
- */
- @Intercepts({
- @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
- @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
- })
- @Component
- @Slf4j
- public class ReadWriteInterceptor implements Interceptor {
-
- @Autowired
- private ReadWriteDataSourceRouter dataSourceRouter;
-
- @Override
- public Object intercept(Invocation invocation) throws Throwable {
- String methodName = invocation.getMethod().getName();
- MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
-
- boolean isReadOperation = "query".equals(methodName);
- String sql = getSql(ms, invocation.getArgs()[1]);
-
- // 如果当前没有手动设置数据源,则自动选择
- if (!DataSourceContextHolder.hasDataSource()) {
- String dataSource = dataSourceRouter.determineDataSource(isReadOperation);
- DataSourceContextHolder.setDataSource(dataSource);
-
- try {
- return invocation.proceed();
- } finally {
- DataSourceContextHolder.clear();
- }
- }
-
- return invocation.proceed();
- }
-
- private String getSql(MappedStatement ms, Object parameter) {
- BoundSql boundSql = ms.getBoundSql(parameter);
- return boundSql.getSql();
- }
- }
复制代码 多租户数据源管理
- /**
- * 多租户数据源管理器
- */
- @Component
- @Slf4j
- public class TenantDataSourceManager {
-
- @Autowired
- private DynamicRoutingDataSource dynamicRoutingDataSource;
-
- @Autowired
- private DataSourceProperties dataSourceProperties;
-
- // 租户数据源配置缓存
- private final Map<String, TenantDataSourceConfig> tenantConfigCache = new ConcurrentHashMap<>();
-
- /**
- * 根据租户ID获取数据源
- */
- public DataSource getDataSourceForTenant(String tenantId) {
- String dataSourceKey = "tenant_" + tenantId;
-
- // 检查是否已存在数据源
- if (dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {
- return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);
- }
-
- // 动态创建数据源
- synchronized (this) {
- if (!dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {
- DataSource dataSource = createTenantDataSource(tenantId);
- dynamicRoutingDataSource.addDataSource(dataSourceKey, dataSource);
- log.info("为租户 {} 创建数据源: {}", tenantId, dataSourceKey);
- }
- }
-
- return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);
- }
-
- /**
- * 动态创建租户数据源
- */
- private DataSource createTenantDataSource(String tenantId) {
- TenantDataSourceConfig config = getTenantConfig(tenantId);
-
- HikariDataSource dataSource = new HikariDataSource();
- dataSource.setJdbcUrl(buildJdbcUrl(config));
- dataSource.setUsername(config.getUsername());
- dataSource.setPassword(config.getPassword());
- dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
- dataSource.setMaximumPoolSize(10);
- dataSource.setMinimumIdle(2);
- dataSource.setConnectionTimeout(30000);
- dataSource.setIdleTimeout(600000);
- dataSource.setMaxLifetime(1800000);
- dataSource.setPoolName("TenantPool_" + tenantId);
-
- return dataSource;
- }
-
- /**
- * 获取租户数据源配置(可从配置中心或数据库获取)
- */
- private TenantDataSourceConfig getTenantConfig(String tenantId) {
- return tenantConfigCache.computeIfAbsent(tenantId, key -> {
- // 这里可以从配置中心、数据库或缓存中获取租户配置
- // 简化实现,实际项目中需要完善
- TenantDataSourceConfig config = new TenantDataSourceConfig();
- config.setHost("tenant-" + tenantId + ".db.example.com");
- config.setPort(3306);
- config.setDatabase("tenant_" + tenantId);
- config.setUsername("tenant_" + tenantId);
- config.setPassword("password_" + tenantId);
- return config;
- });
- }
-
- private String buildJdbcUrl(TenantDataSourceConfig config) {
- return String.format("jdbc:mysql://%s:%d/%s?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true",
- config.getHost(), config.getPort(), config.getDatabase());
- }
-
- @Data
- public static class TenantDataSourceConfig {
- private String host;
- private int port;
- private String database;
- private String username;
- private String password;
- }
- }
复制代码 数据源健康监控
- /**
- * 数据源健康监控器
- */
- @Component
- @Slf4j
- public class DataSourceHealthMonitor {
-
- @Autowired
- private DynamicRoutingDataSource dynamicRoutingDataSource;
-
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- // 健康状态缓存
- private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();
-
- @PostConstruct
- public void init() {
- // 启动健康检查任务
- scheduler.scheduleAtFixedRate(this::checkAllDataSources, 0, 30, TimeUnit.SECONDS);
- }
-
- /**
- * 检查所有数据源的健康状态
- */
- public void checkAllDataSources() {
- Map<Object, Object> dataSources = dynamicRoutingDataSource.getTargetDataSources();
-
- for (Map.Entry<Object, Object> entry : dataSources.entrySet()) {
- String dataSourceKey = (String) entry.getKey();
- DataSource dataSource = (DataSource) entry.getValue();
-
- boolean isHealthy = checkDataSourceHealth(dataSource);
- healthStatus.put(dataSourceKey, isHealthy);
-
- if (!isHealthy) {
- log.warn("数据源 {} 健康检查失败", dataSourceKey);
- // 可以发送告警通知
- }
- }
- }
-
- /**
- * 检查单个数据源健康状态
- */
- private boolean checkDataSourceHealth(DataSource dataSource) {
- try (Connection conn = dataSource.getConnection();
- Statement stmt = conn.createStatement()) {
-
- ResultSet rs = stmt.executeQuery("SELECT 1");
- return rs.next() && rs.getInt(1) == 1;
-
- } catch (SQLException e) {
- log.error("数据源健康检查异常", e);
- return false;
- }
- }
-
- /**
- * 获取数据源健康状态
- */
- public boolean isDataSourceHealthy(String dataSourceKey) {
- return healthStatus.getOrDefault(dataSourceKey, true);
- }
-
- /**
- * 获取健康的数据源列表
- */
- public List<String> getHealthyDataSources() {
- return healthStatus.entrySet().stream()
- .filter(Map.Entry::getValue)
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
- }
-
- @PreDestroy
- public void destroy() {
- scheduler.shutdown();
- }
- }
复制代码 五、动态数据源的应用场景
让我们通过架构图来理解动态数据源的典型应用场景:
六、优缺点
优点
- 灵活性高:支持运行时动态添加、移除数据源
- 解耦性好:业务代码与具体数据源解耦
- 扩展性强:易于实现读写分离、多租户等复杂场景
- 维护方便:数据源配置集中管理,便于维护
缺点
- 复杂度增加:系统架构变得更加复杂
- 事务管理复杂:跨数据源事务需要特殊处理
- 连接池开销:每个数据源都需要独立的连接池
- 调试困难:数据源切换增加了调试复杂度
七、生产环境注意事项
事务管理策略
- /**
- * 多数据源事务管理器
- */
- @Component
- @Slf4j
- public class MultiDataSourceTransactionManager {
-
- /**
- * 在多个数据源上执行事务性操作
- */
- @Transactional(rollbackFor = Exception.class)
- public void executeInTransaction(Runnable task, String... dataSources) {
- if (dataSources.length == 1) {
- // 单数据源事务
- DataSourceContextHolder.setDataSource(dataSources[0]);
- try {
- task.run();
- } finally {
- DataSourceContextHolder.clear();
- }
- } else {
- // 多数据源伪事务(最终一致性)
- executeWithCompensation(task, dataSources);
- }
- }
-
- /**
- * 使用补偿机制实现多数据源"事务"
- */
- private void executeWithCompensation(Runnable task, String[] dataSources) {
- List<Runnable> compensationTasks = new ArrayList<>();
-
- try {
- // 按顺序执行各个数据源的操作
- for (String dataSource : dataSources) {
- DataSourceContextHolder.setDataSource(dataSource);
- try {
- // 执行实际业务操作
- task.run();
-
- // 记录补偿操作
- compensationTasks.add(0, createCompensationTask(dataSource));
-
- } finally {
- DataSourceContextHolder.clear();
- }
- }
- } catch (Exception e) {
- // 执行补偿操作
- log.error("多数据源操作失败,执行补偿操作", e);
- executeCompensation(compensationTasks);
- throw e;
- }
- }
-
- private void executeCompensation(List<Runnable> compensationTasks) {
- for (Runnable compensation : compensationTasks) {
- try {
- compensation.run();
- } catch (Exception ex) {
- log.error("补偿操作执行失败", ex);
- // 记录补偿失败,需要人工介入
- }
- }
- }
- }
复制代码 性能优化建议
- 连接池优化:根据业务特点调整各数据源连接池参数
- 数据源预热:应用启动时预热常用数据源
- 缓存策略:缓存数据源配置和路由信息
- 监控告警:建立完善的数据源监控体系
总结
动态数据源是一个强大的技术方案,能够很好地解决多数据源管理的复杂性。
通过本文的详细解析,我们可以看到:
- 核心原理:基于AbstractRoutingDataSource和ThreadLocal的上下文管理
- 实现方式:注解+AOP的声明式数据源切换
- 高级特性:读写分离、多租户、健康监控等生产级功能
- 适用场景:多租户、读写分离、分库分表等复杂数据架构
在实际项目中,建议根据具体业务需求选择合适的实现方案,不要过度设计。
同时,要建立完善的监控和运维体系,确保动态数据源的稳定运行。
最后说一句(求关注,别白嫖我)
如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。
更多项目实战在我的技术网站:http://www.susan.net.cn/project
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |