找回密码
 立即注册
首页 业界区 业界 Seata源码—9.Seata XA模式的事务处理

Seata源码—9.Seata XA模式的事务处理

鸳剿 2025-6-5 10:19:16
大纲
1.Seata XA分布式事务案例及AT与XA的区别
2.Seata XA分布式事务案例的各模块运行流程
3.Seata使用Spring Boot自动装配简化复杂配置
4.全局事务注解扫描组件的自动装配
5.全局事务注解扫描器的核心变量与初始化
6.全局事务注解扫描器创建AOP代理
7.全局事务降级检查开启与提交事务的源码
8.Seata Server故障时全局事务拦截器的降级处理
9.基于HTTP请求头的全局事务传播的源码
10.XA数据源代理的初始化与XA连接代理的获取
11.XA分支事务注册与原始XA连接启动的源码
12.XA分布式事务两阶段提交流程的源码
 
1.Seata XA分布式事务案例及AT与XA的区别
(1)seata-samples项目的seata-xa模块简介
(2)启动Seata XA分布式事务案例的步骤
(3)Seata XA模式与AT模式对比
 
(1)seata-samples项目的seata-xa模块简介
seata-samples项目下的seata-xa模块,便基于Seata XA模式,实现了分布式事务的提交和回滚。
 
该seata-xa模块是一个Spring Boot项目:
一.使用了Feign来实现远程调用
二.使用了Spring JDBC来实现访问MySQL数据库
三.使用了Seata XA来实现分布式事务
 
(2)启动Seata XA分布式事务案例的步骤
一.执行案例所需的SQL语句:sql/all_in_one.sql
  1. DROP TABLE IF EXISTS `stock_tbl`;
  2. CREATE TABLE `stock_tbl` (
  3.     `id`             int(11) NOT NULL AUTO_INCREMENT,
  4.     `commodity_code` varchar(255) DEFAULT NULL,
  5.     `count`          int(11) DEFAULT 0,
  6.     PRIMARY KEY (`id`),
  7.     UNIQUE KEY (`commodity_code`)
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  9. DROP TABLE IF EXISTS `order_tbl`;
  10. CREATE TABLE `order_tbl` (
  11.     `id`             int(11) NOT NULL AUTO_INCREMENT,
  12.     `user_id`        varchar(255) DEFAULT NULL,
  13.     `commodity_code` varchar(255) DEFAULT NULL,
  14.     `count`          int(11) DEFAULT 0,
  15.     `money`          int(11) DEFAULT 0,
  16.     PRIMARY KEY (`id`)
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  18. DROP TABLE IF EXISTS `account_tbl`;
  19. CREATE TABLE `account_tbl` (
  20.     `id`      int(11) NOT NULL AUTO_INCREMENT,
  21.     `user_id` varchar(255) DEFAULT NULL,
  22.     `money`   int(11) DEFAULT 0,
  23.     PRIMARY KEY (`id`)
  24. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
复制代码
二.下载Seata Sever压缩包
  1. 访问:https://github.com/seata/seata/releases
复制代码
三.解压Seata Sever的压缩包并启动Seata Server
  1. $ unzip seata-server-xxx.zip
  2. $ cd distribution
  3. $ sh ./bin/seata-server.sh 8091 file
复制代码
四.依次启动AccountXA、OrderXA、StockXA、BusinessXA服务
1.png
五.测试
  1. # 具体调用参数请结合BusinessController的代码
  2. $ curl http://127.0.0.1:8084/purchase
复制代码
六.说明
数据初始化逻辑参考BusinessService.initData()方法。基于初始化数据和默认的调用逻辑,purchase将被成功调用3次。每次账户余额扣减3000,由最初的10000减少到1000。第4次调用,因为账户余额不足,purchase调用将失败。最后相应的库存、订单、账户都会回滚。
 
(3)Seata XA模式与AT模式对比
只要切换数据源代理类型,上述案例即可在XA模式和AT模式之间进行切换。
 
一.XA模式使用的数据源代理类型是DataSourceProxyXA
  1. public class DataSourceProxy {
  2.     @Bean("dataSourceProxy")
  3.     public DataSource dataSource(DruidDataSource druidDataSource) {
  4.         //DataSourceProxyXA for XA mode
  5.         return new DataSourceProxyXA(druidDataSource);
  6.     }
  7. }
复制代码
二.AT模式使用的数据源代理类型是DataSourceProxy
  1. public class DataSourceProxy {
  2.     @Bean("dataSourceProxy")
  3.     public DataSource dataSource(DruidDataSource druidDataSource) {
  4.         return new DataSourceProxy(druidDataSource);
  5.     }
  6. }
复制代码
AT模式需要在数据库中建立undo_log表
XA模式不需要在数据库中建立undo_log表
  1. CREATE TABLE `undo_log` (
  2.     `id` bigint(20) NOT NULL AUTO_INCREMENT,
  3.     `branch_id` bigint(20) NOT NULL,
  4.     `xid` varchar(100) NOT NULL,
  5.     `context` varchar(128) NOT NULL,
  6.     `rollback_info` longblob NOT NULL,
  7.     `log_status` int(11) NOT NULL,
  8.     `log_created` datetime NOT NULL,
  9.     `log_modified` datetime NOT NULL,
  10.     PRIMARY KEY (`id`),
  11.     UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  12. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
复制代码
 
2.Seata XA分布式事务案例的各模块运行流程
(1)account-xa模块实现的账户服务的运行流程
(2)order-xa模块实现的订单服务的运行流程
(3)stock-xa模块实现的库存服务的运行流程
(4)business-xa模块实现的全局事务入口服务的运行流程
 
(1)account-xa模块实现的账户服务的运行流程
2.png
如下是account-xa模块的核心代码:
  1. @EnableTransactionManagement
  2. @SpringBootApplication
  3. public class AccountXAApplication {
  4.     public static void main(String[] args) {
  5.         //监听8083端口
  6.         SpringApplication.run(AccountXAApplication.class, args);
  7.     }
  8. }
  9. @Configuration
  10. public class AccountXADataSourceConfiguration {
  11.     @Bean
  12.     @ConfigurationProperties(prefix = "spring.datasource")
  13.     public DruidDataSource druidDataSource() {
  14.         return new DruidDataSource();
  15.     }
  16.    
  17.     @Bean("dataSourceProxy")
  18.     public DataSource dataSource(DruidDataSource druidDataSource) {
  19.         //DataSourceProxyXA for XA mode
  20.         return new DataSourceProxyXA(druidDataSource);
  21.     }
  22.    
  23.     @Bean("jdbcTemplate")
  24.     public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
  25.         return new JdbcTemplate(dataSourceProxy);
  26.     }
  27.    
  28.     @Bean
  29.     public PlatformTransactionManager txManager(DataSource dataSourceProxy) {
  30.         return new DataSourceTransactionManager(dataSourceProxy);
  31.     }
  32. }
  33. @RestController
  34. public class AccountController {
  35.     @Autowired
  36.     private AccountService accountService;
  37.    
  38.     @RequestMapping(value = "/reduce", method = RequestMethod.GET, produces = "application/json")
  39.     public String reduce(String userId, int money) {
  40.         try {
  41.             accountService.reduce(userId, money);
  42.         } catch (Exception exx) {
  43.             exx.printStackTrace();
  44.             return FAIL;
  45.         }
  46.         return SUCCESS;
  47.     }
  48. }
  49. @Service
  50. public class AccountService {
  51.     public static final String SUCCESS = "SUCCESS";
  52.     public static final String FAIL = "FAIL";
  53.     private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
  54.    
  55.     @Autowired
  56.     private JdbcTemplate jdbcTemplate;
  57.     @Transactional
  58.     public void reduce(String userId, int money) {
  59.         String xid = RootContext.getXID();
  60.         LOGGER.info("reduce account balance in transaction: " + xid);
  61.         jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[]{money, userId});
  62.         int balance = jdbcTemplate.queryForObject("select money from account_tbl where user_id = ?", new Object[]{userId}, Integer.class);
  63.         LOGGER.info("balance after transaction: " + balance);
  64.         if (balance < 0) {
  65.             throw new RuntimeException("Not Enough Money ...");
  66.         }
  67.     }
  68. }
复制代码
(2)order-xa模块实现的订单服务的运行流程
3.png
如下是order-xa模块的核心代码:
  1. @SpringBootApplication
  2. @EnableFeignClients
  3. public class OrderXAApplication {
  4.     public static void main(String[] args) {
  5.         //监听8082端口
  6.         SpringApplication.run(OrderXAApplication.class, args);
  7.     }
  8. }
  9. @Configuration
  10. public class OrderXADataSourceConfiguration {
  11.     @Bean
  12.     @ConfigurationProperties(prefix = "spring.datasource")
  13.     public DruidDataSource druidDataSource() {
  14.         return new DruidDataSource();
  15.     }
  16.    
  17.     @Bean("dataSourceProxy")
  18.     public DataSource dataSource(DruidDataSource druidDataSource) {
  19.         //DataSourceProxyXA for XA mode
  20.         return new DataSourceProxyXA(druidDataSource);
  21.     }
  22.    
  23.     @Bean("jdbcTemplate")
  24.     public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
  25.         return new JdbcTemplate(dataSourceProxy);
  26.     }
  27. }
  28. @RestController
  29. public class OrderController {
  30.     @Autowired
  31.     private OrderService orderService;
  32.     @RequestMapping(value = "/create", method = RequestMethod.GET, produces = "application/json")
  33.     public String create(String userId, String commodityCode, int orderCount) {
  34.         try {
  35.             orderService.create(userId, commodityCode, orderCount);
  36.         } catch (Exception exx) {
  37.             exx.printStackTrace();
  38.             return FAIL;
  39.         }
  40.         return SUCCESS;
  41.     }
  42. }
  43. @Service
  44. public class OrderService {
  45.     public static final String SUCCESS = "SUCCESS";
  46.     public static final String FAIL = "FAIL";
  47.     private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class);
  48.     @Autowired
  49.     private AccountFeignClient accountFeignClient;
  50.     @Autowired
  51.     private JdbcTemplate jdbcTemplate;
  52.     public void create(String userId, String commodityCode, Integer count) {
  53.         String xid = RootContext.getXID();
  54.         LOGGER.info("create order in transaction: " + xid);
  55.         //定单总价 = 订购数量(count) * 商品单价(100)
  56.         int orderMoney = count * 100;
  57.         //生成订单
  58.         jdbcTemplate.update("insert order_tbl(user_id,commodity_code,count,money) values(?,?,?,?)", new Object[]{userId, commodityCode, count, orderMoney});
  59.         //调用账户余额扣减
  60.         String result = accountFeignClient.reduce(userId, orderMoney);
  61.         if (!SUCCESS.equals(result)) {
  62.             throw new RuntimeException("Failed to call Account Service. ");
  63.         }
  64.     }
  65. }
  66. @FeignClient(name = "account-xa", url = "127.0.0.1:8083")
  67. public interface AccountFeignClient {
  68.     @GetMapping("/reduce")
  69.     String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money);
  70. }
复制代码
(3)stock-xa模块实现的库存服务的运行流程
4.png
如下是stock-xa模块的核心代码:
  1. @SpringBootApplication
  2. public class StockXAApplication {
  3.     public static void main(String[] args) {
  4.         //监听8081端口
  5.         SpringApplication.run(StockXAApplication.class, args);
  6.     }
  7. }
  8. @Configuration
  9. public class StockXADataSourceConfiguration {
  10.     @Bean
  11.     @ConfigurationProperties(prefix = "spring.datasource")
  12.     public DruidDataSource druidDataSource() {
  13.         return new DruidDataSource();
  14.     }
  15.    
  16.     @Bean("dataSourceProxy")
  17.     public DataSource dataSource(DruidDataSource druidDataSource) {
  18.         //DataSourceProxyXA for XA mode
  19.         return new DataSourceProxyXA(druidDataSource);
  20.     }
  21.    
  22.     @Bean("jdbcTemplate")
  23.     public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
  24.         return new JdbcTemplate(dataSourceProxy);
  25.     }
  26. }
  27. @RestController
  28. public class StockController {
  29.     @Autowired
  30.     private StockService stockService;
  31.     @RequestMapping(value = "/deduct", method = RequestMethod.GET, produces = "application/json")
  32.     public String deduct(String commodityCode, int count) {
  33.         try {
  34.             stockService.deduct(commodityCode, count);
  35.         } catch (Exception exx) {
  36.             exx.printStackTrace();
  37.             return FAIL;
  38.         }
  39.         return SUCCESS;
  40.     }
  41. }
  42. @Service
  43. public class StockService {
  44.     public static final String SUCCESS = "SUCCESS";
  45.     public static final String FAIL = "FAIL";
  46.     private static final Logger LOGGER = LoggerFactory.getLogger(StockService.class);
  47.    
  48.     @Autowired
  49.     private JdbcTemplate jdbcTemplate;
  50.     public void deduct(String commodityCode, int count) {
  51.         String xid = RootContext.getXID();
  52.         LOGGER.info("deduct stock balance in transaction: " + xid);
  53.         jdbcTemplate.update("update stock_tbl set count = count - ? where commodity_code = ?", new Object[]{count, commodityCode});
  54.     }
  55. }
复制代码
(4)business-xa模块实现的全局事务入口服务的运行流程
5.png
如下是business-xa模块的核心代码:
  1. @SpringBootApplication
  2. @EnableFeignClients
  3. public class BusinessXAApplication {
  4.     public static void main(String[] args) {
  5.         //监听8084端口
  6.         SpringApplication.run(BusinessXAApplication.class, args);
  7.     }
  8. }
  9. @Configuration
  10. public class BusinessXADataSourceConfiguration {
  11.     @Bean
  12.     @ConfigurationProperties(prefix = "spring.datasource")
  13.     public DruidDataSource dataSource() {
  14.         return new DruidDataSource();
  15.     }
  16.    
  17.     @Bean("jdbcTemplate")
  18.     public JdbcTemplate jdbcTemplate(DataSource dataSource) {
  19.         return new JdbcTemplate(dataSource);
  20.     }
  21. }
  22. @RestController
  23. public class BusinessController {
  24.     @Autowired
  25.     private BusinessService businessService;
  26.     @RequestMapping(value = "/purchase", method = RequestMethod.GET, produces = "application/json")
  27.     public String purchase(Boolean rollback, Integer count) {
  28.         int orderCount = 30;
  29.         if (count != null) {
  30.             orderCount = count;
  31.         }
  32.         try {
  33.             businessService.purchase(TestDatas.USER_ID, TestDatas.COMMODITY_CODE, orderCount, rollback == null ? false : rollback.booleanValue());
  34.         } catch (Exception exx) {
  35.             return "Purchase Failed:" + exx.getMessage();
  36.         }
  37.         return SUCCESS;
  38.     }
  39. }
  40. @Service
  41. public class BusinessService {
  42.     public static final String SUCCESS = "SUCCESS";
  43.     public static final String FAIL = "FAIL";
  44.     private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);
  45.    
  46.     @Autowired
  47.     private StockFeignClient stockFeignClient;
  48.    
  49.     @Autowired
  50.     private OrderFeignClient orderFeignClient;
  51.    
  52.     @Autowired
  53.     private JdbcTemplate jdbcTemplate;
  54.     @GlobalTransactional
  55.     public void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {
  56.         String xid = RootContext.getXID();
  57.         LOGGER.info("New Transaction Begins: " + xid);
  58.         String result = stockFeignClient.deduct(commodityCode, orderCount);
  59.         if (!SUCCESS.equals(result)) {
  60.             throw new RuntimeException("库存服务调用失败,事务回滚!");
  61.         }
  62.         result = orderFeignClient.create(userId, commodityCode, orderCount);
  63.         if (!SUCCESS.equals(result)) {
  64.             throw new RuntimeException("订单服务调用失败,事务回滚!");
  65.         }
  66.         if (rollback) {
  67.             throw new RuntimeException("Force rollback ... ");
  68.         }
  69.     }
  70.     ...
  71. }
  72. @FeignClient(name = "stock-xa", url = "127.0.0.1:8081")
  73. public interface StockFeignClient {
  74.     @GetMapping("/deduct")
  75.     String deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") int count);
  76. }
  77. @FeignClient(name = "order-xa", url = "127.0.0.1:8082")
  78. public interface OrderFeignClient {
  79.     @GetMapping("/create")
  80.     String create(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("orderCount") int orderCount);
  81. }
复制代码
 
3.Seata使用Spring Boot自动装配简化复杂配置
(1)seata-spring-boot-starter简介
(2)自动装配GlobalTransactionScanner的Bean
 
(1)seata-spring-boot-starter简介
seata-v1.5.0源码下的seata-spring-boot-starter模块,便使用了Spring Boot自动装配来简化seata-all的复杂配置。与dubbo-spring-boot-starter是Spring Boot整合Dubbo所需的依赖一样,seata-spring-boot-starter也是Spring Boot整合Seata所需的依赖。
 
seata-samples-1.5.0项目中的seata-spring-boot-starter-samples模块,整合了SpringBoot + Dubbo + Mybatis + Nacos +Seata来实现Dubbo的分布式事务管理。其中使用Nacos作为Dubbo和Seata的注册中心和配置中心,使用MySQL数据库和MyBatis来操作数据。
 
注意:seata-spring-boot-starter默认会开启数据源自动代理,用户若再手动配置DataSourceProxy将会导致异常。
 
(2)自动装配GlobalTransactionScanner的Bean
seata-samples-1.5.0项目的seata-xa模块与seata-samples-dubbo模块不同,后者是通过xml配置文件来装配GlobalTransactionScanner这个Bean的。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.        xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
  5.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
  6.     ...
  7.    
  8.     <bean >
  9.         <constructor-arg value="dubbo-demo-account-service"/>
  10.         <constructor-arg value="my_test_tx_group"/>
  11.     </bean>
  12. </beans>
复制代码
前者是通过seata-spring-boot-starter的SeataAutoConfiguration类,来实现自动装配GlobalTransactionScanner这个Bean的。
  1. @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
  2. public class SeataAutoConfiguration {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
  4.     @Bean(BEAN_NAME_FAILURE_HANDLER)
  5.     @ConditionalOnMissingBean(FailureHandler.class)
  6.     public FailureHandler failureHandler() {
  7.         return new DefaultFailureHandlerImpl();
  8.     }
  9.     @Bean
  10.     @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
  11.     @ConditionalOnMissingBean(GlobalTransactionScanner.class)
  12.     public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
  13.         if (LOGGER.isInfoEnabled()) {
  14.             LOGGER.info("Automatically configure Seata");
  15.         }
  16.         //返回一个@GlobalTransaction全局事务注解的扫描组件
  17.         return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
  18.     }
  19. }
复制代码
 
4.全局事务注解扫描组件的自动装配
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
(2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
6.png
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
resources/META-INF/spring.factories的配置文件如下,所以Spring Boot会自动装配上述这4个类。
  1. # Auto Configure
  2. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  3. io.seata.spring.boot.autoconfigure.SeataPropertiesAutoConfiguration,\
  4. io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
  5. io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
  6. io.seata.spring.boot.autoconfigure.HttpAutoConfiguration
复制代码
(2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
全局事务注解扫描器GlobalTransactionScanner会扫描@GlobalTransactional全局事务注解。
  1. @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
  2. public class SeataAutoConfiguration {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
  4.     @Bean(BEAN_NAME_FAILURE_HANDLER)
  5.     @ConditionalOnMissingBean(FailureHandler.class)
  6.     public FailureHandler failureHandler() {
  7.         return new DefaultFailureHandlerImpl();
  8.     }
  9.     @Bean
  10.     @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
  11.     @ConditionalOnMissingBean(GlobalTransactionScanner.class)
  12.     public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
  13.         if (LOGGER.isInfoEnabled()) {
  14.             LOGGER.info("Automatically configure Seata");
  15.         }
  16.         //返回一个@GlobalTransaction全局事务注解的扫描组件
  17.         return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
  18.     }
  19. }
复制代码
 
5.全局事务注解扫描器的核心变量与初始化
  1. //The type Global transaction scanner.
  2. //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary()
  3. //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法
  4. //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理
  5. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  6.     implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  7.     ...
  8.     //方法拦截器
  9.     private MethodInterceptor interceptor;
  10.     //全局事务注解拦截器
  11.     private MethodInterceptor globalTransactionalInterceptor;
  12.     //应用ID
  13.     private final String applicationId;
  14.     //全局事务服务分组
  15.     private final String txServiceGroup;
  16.     //事务模式
  17.     private final int mode;
  18.     //与阿里云有关的,一个是访问key,一个是密钥key
  19.     private String accessKey;
  20.     private String secretKey;
  21.    
  22.     //是否禁用全局事务配置,默认是false
  23.     private volatile boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
  24.     //是否完成初始化
  25.     private final AtomicBoolean initialized = new AtomicBoolean(false);
  26.    
  27.     //失败处理钩子
  28.     private final FailureHandler failureHandlerHook;
  29.     //Spring容器上下文
  30.     private ApplicationContext applicationContext;
  31.     //Instantiates a new Global transaction scanner.
  32.     public GlobalTransactionScanner(String txServiceGroup) {
  33.         this(txServiceGroup, txServiceGroup, DEFAULT_MODE);
  34.     }
  35.     //Instantiates a new Global transaction scanner.
  36.     public GlobalTransactionScanner(String txServiceGroup, int mode) {
  37.         this(txServiceGroup, txServiceGroup, mode);
  38.     }
  39.     //Instantiates a new Global transaction scanner.
  40.     public GlobalTransactionScanner(String applicationId, String txServiceGroup) {
  41.         this(applicationId, txServiceGroup, DEFAULT_MODE);
  42.     }
  43.     //Instantiates a new Global transaction scanner.
  44.     public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode) {
  45.         this(applicationId, txServiceGroup, mode, null);
  46.     }
  47.     //Instantiates a new Global transaction scanner.
  48.     public GlobalTransactionScanner(String applicationId, String txServiceGroup, FailureHandler failureHandlerHook) {
  49.         this(applicationId, txServiceGroup, DEFAULT_MODE, failureHandlerHook);
  50.     }
  51.     //Instantiates a new Global transaction scanner.
  52.     public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) {
  53.         setOrder(ORDER_NUM);
  54.         setProxyTargetClass(true);
  55.         this.applicationId = applicationId;
  56.         this.txServiceGroup = txServiceGroup;
  57.         this.mode = mode;
  58.         this.failureHandlerHook = failureHandlerHook;
  59.     }
  60.    
  61.     //Spring销毁时的回调接口
  62.     @Override
  63.     public void destroy() {
  64.         ShutdownHook.getInstance().destroyAll();
  65.     }
  66.     //Spring初始化回调接口,负责这个扫描组件GlobalTransactionScanner的初始化
  67.     @Override
  68.     public void afterPropertiesSet() {
  69.         if (disableGlobalTransaction) {
  70.             if (LOGGER.isInfoEnabled()) {
  71.                 LOGGER.info("Global transaction is disabled.");
  72.             }
  73.             ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
  74.             return;
  75.         }
  76.         if (initialized.compareAndSet(false, true)) {
  77.             initClient();
  78.         }
  79.     }
  80.     //初始化Seata的两个网络通信客户端:TM网络客户端、RM网络客户端
  81.     private void initClient() {
  82.         if (LOGGER.isInfoEnabled()) {
  83.             LOGGER.info("Initializing Global Transaction Clients ... ");
  84.         }
  85.         if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
  86.             throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
  87.         }
  88.         //init TM
  89.         TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
  90.         if (LOGGER.isInfoEnabled()) {
  91.             LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  92.         }
  93.         //init RM
  94.         RMClient.init(applicationId, txServiceGroup);
  95.         if (LOGGER.isInfoEnabled()) {
  96.             LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  97.         }
  98.         if (LOGGER.isInfoEnabled()) {
  99.             LOGGER.info("Global Transaction Clients are initialized. ");
  100.         }
  101.         registerSpringShutdownHook();
  102.     }
  103.     ...
  104. }
复制代码
 
6.全局事务扫描器创建AOP代理
7.png
  1. //The type Global transaction scanner.
  2. //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary()
  3. //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法
  4. //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理
  5. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  6.     implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  7.     ...
  8.     //The following will be scanned, and added corresponding interceptor:
  9.     //由于继承自AbstractAutoProxyCreator抽象类,所以Spring所有的Bean都会传递给这个方法来判断是否有一些Seata的注解
  10.     //如果有一些Seata的注解,那么就会针对这些注解创建对应的AOP代理
  11.     @Override
  12.     protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  13.         try {
  14.             synchronized (PROXYED_SET) {
  15.                 if (PROXYED_SET.contains(beanName)) {
  16.                     return bean;
  17.                 }
  18.                 interceptor = null;
  19.                 //check TCC proxy
  20.                 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  21.                     //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  22.                     interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  23.                     ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
  24.                 } else {
  25.                     Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  26.                     Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  27.                     if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {
  28.                         return bean;
  29.                     }
  30.                     if (globalTransactionalInterceptor == null) {
  31.                         //构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器
  32.                         globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  33.                         ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
  34.                     }
  35.                     interceptor = globalTransactionalInterceptor;
  36.                 }
  37.                 LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  38.                 if (!AopUtils.isAopProxy(bean)) {
  39.                     //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理
  40.                     //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器
  41.                     bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  42.                 } else {
  43.                     AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  44.                     Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  45.                     for (Advisor avr : advisor) {
  46.                         advised.addAdvisor(0, avr);
  47.                     }
  48.                 }
  49.                 PROXYED_SET.add(beanName);
  50.                 return bean;
  51.             }
  52.         } catch (Exception exx) {
  53.             throw new RuntimeException(exx);
  54.         }
  55.     }
  56.     ...
  57. }
复制代码
 
7.全局事务降级检查开启与提交事务的源码
在GlobalTransactionalInterceptor的初始化方法中,如果发现需要启用全局事务降级检查机制,那么就会调用startDegradeCheck()方法启动降级检查定时调度线程。
 
该定时调度线程默认会每隔2秒运行一次,也就是每隔2秒会尝试到Seata Server开启一个全局事务。如果开启成功,则获取到对应的xid,并对该全局事务xid进行提交,并且发布一个降级检查成功的事件到事件总线中。如果开启失败或提交失败,则发布一个降级检查失败的事件到事件总线中。
  1. //The type Global transactional interceptor.
  2. //当调用添加了全局事务注解@GlobalTransactional的方法时,会被AOP拦截进入到这个拦截器里的invoke()方法
  3. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  4.     private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
  5.     private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
  6.     //全局事务执行模板
  7.     private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
  8.     //全局锁模板
  9.     private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
  10.     //失败处理器
  11.     private final FailureHandler failureHandler;
  12.     //是否禁用全局事务
  13.     private volatile boolean disable;
  14.     //降级检查周期
  15.     //降级机制:如果Seata Server挂掉导致全局事务没法推进,那么就可以进行降级运行本地事务
  16.     private static int degradeCheckPeriod;
  17.     //是否启用降级检查机制
  18.     private static volatile boolean degradeCheck;
  19.     //降级检查允许次数
  20.     private static int degradeCheckAllowTimes;
  21.     //降级数量
  22.     private static volatile Integer degradeNum = 0;
  23.     //达标数量
  24.     private static volatile Integer reachNum = 0;
  25.     //降级检查的事件总线
  26.     private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
  27.     //降级检查定时调度线程池
  28.     private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
  29.     //region DEFAULT_GLOBAL_TRANSACTION_TIMEOUT
  30.     //默认的全局事务超时时间
  31.     private static int defaultGlobalTransactionTimeout = 0;
  32.     //endregion
  33.     //Instantiates a new Global transactional interceptor.
  34.     public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
  35.         //失败处理器
  36.         this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
  37.         //是否禁用全局事务,默认false
  38.         this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
  39.         //是否启用全局事务降级检查机制,默认是false
  40.         degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);
  41.         //如果启用全局事务降级检查机制
  42.         if (degradeCheck) {
  43.             //添加一个监听器
  44.             ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
  45.             //设置降级检查周期,默认是2s一次
  46.             degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
  47.             //设置降级检查允许次数,默认10次
  48.             degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
  49.             //将自己注册到降级检查事件总线里,作为事件订阅者
  50.             EVENT_BUS.register(this);
  51.             //如果降级检查周期大于0,而且降级检查允许次数大于0,此时启动降级检查线程
  52.             if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
  53.                 startDegradeCheck();
  54.             }
  55.         }
  56.         //初始化默认全局事务超时时间
  57.         this.initDefaultGlobalTransactionTimeout();
  58.     }
  59.     private void initDefaultGlobalTransactionTimeout() {
  60.         if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {
  61.             int defaultGlobalTransactionTimeout;
  62.             try {
  63.                 defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
  64.             } catch (Exception e) {
  65.                 LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
  66.                 defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
  67.             }
  68.             if (defaultGlobalTransactionTimeout <= 0) {
  69.                 LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
  70.                 defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
  71.             }
  72.             GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
  73.         }
  74.     }
  75.     ...
  76.    
  77.     //auto upgrade service detection
  78.     //启动降级检查定时调度线程,默认每隔2s运行一次
  79.     private static void startDegradeCheck() {
  80.         executor.scheduleAtFixedRate(() -> {
  81.             if (degradeCheck) {
  82.                 try {
  83.                     //尝试通过应用id为null、事务服务分组为null、使用事务名称是degradeCheck、超时时间是60s的参数,去Seata Server开启一个全局事务
  84.                     String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
  85.                     //如果开启成功了,就获取到一个xid,直接对全局事务xid进行commit提交
  86.                     TransactionManagerHolder.get().commit(xid);
  87.                     //如果xid提交成功了,就发布一个降级检查事件到事件总线里,表明降级检查结果是true
  88.                     EVENT_BUS.post(new DegradeCheckEvent(true));
  89.                 } catch (Exception e) {
  90.                     //如果开启一个全局事务失败,或者提交xid失败了,那么发布一个事件表示降级检查失败,结果是false
  91.                     EVENT_BUS.post(new DegradeCheckEvent(false));
  92.                 }
  93.             }
  94.         }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
  95.     }
  96.     ...
  97. }
复制代码
 
9.基于HTTP请求头的全局事务传播的源码
  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2.     ...
  3.     @Subscribe
  4.     public static void onDegradeCheck(DegradeCheckEvent event) {
  5.         if (event.isRequestSuccess()) {//如果检查成功,TC是有效的
  6.             if (degradeNum >= degradeCheckAllowTimes) {
  7.                 reachNum++;
  8.                 if (reachNum >= degradeCheckAllowTimes) {
  9.                     reachNum = 0;
  10.                     degradeNum = 0;
  11.                     if (LOGGER.isInfoEnabled()) {
  12.                         LOGGER.info("the current global transaction has been restored");
  13.                     }
  14.                 }
  15.             } else if (degradeNum != 0) {
  16.                 degradeNum = 0;
  17.             }
  18.         } else {//如果检查失败,TC故障了,无法开启全局事务和提交了
  19.             //如果降级次数小于允许降级检查次数(10次)
  20.             if (degradeNum < degradeCheckAllowTimes) {
  21.                 degradeNum++; //对降级次数+1
  22.                 if (degradeNum >= degradeCheckAllowTimes) {
  23.                     if (LOGGER.isWarnEnabled()) {
  24.                         LOGGER.warn("the current global transaction has been automatically downgraded");
  25.                     }
  26.                 }
  27.             } else if (reachNum != 0) {
  28.                 reachNum = 0;
  29.             }
  30.         }
  31.     }
  32.    
  33.     //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法
  34.     @Override
  35.     public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  36.         Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
  37.         Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  38.         if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
  39.             final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  40.             final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
  41.             final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
  42.             //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true
  43.             //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了
  44.             boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
  45.             //如果全局事务没有禁用
  46.             if (!localDisable) {
  47.                 if (globalTransactionalAnnotation != null) {
  48.                     //真正处理全局事务的入口
  49.                     return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
  50.                 } else if (globalLockAnnotation != null) {
  51.                     return handleGlobalLock(methodInvocation, globalLockAnnotation);
  52.                 }
  53.             }
  54.         }
  55.         //直接运行目标方法
  56.         return methodInvocation.proceed();
  57.     }
  58.     ...
  59. }
复制代码
 
10.XA数据源代理的初始化与XA连接代理的获取
  1. @Configuration
  2. @ConditionalOnWebApplication
  3. public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {
  4.     @Override
  5.     public void addInterceptors(InterceptorRegistry registry) {
  6.         registry.addInterceptor(new TransactionPropagationInterceptor());
  7.     }
  8.    
  9.     @Override
  10.     public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) {
  11.         exceptionResolvers.add(new HttpHandlerExceptionResolver());
  12.     }
  13. }
  14. //Springmvc Intercepter.
  15. public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
  16.     private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
  17.    
  18.     @Override
  19.     public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
  20.         String xid = RootContext.getXID();
  21.         //和Spring Cloud整合后,Feign在运行时也会执行Filter机制
  22.         //此时就可以把xid作为一个请求头放到HTTP请求里去
  23.         //接收方就会通过Spring MVC的拦截器,从请求头里提取xid,绑定到RootContext里
  24.         String rpcXid = request.getHeader(RootContext.KEY_XID);
  25.         if (LOGGER.isDebugEnabled()) {
  26.             LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
  27.         }
  28.         if (xid == null && rpcXid != null) {
  29.             RootContext.bind(rpcXid);
  30.             if (LOGGER.isDebugEnabled()) {
  31.                 LOGGER.debug("bind[{}] to RootContext", rpcXid);
  32.             }
  33.         }
  34.         return true;
  35.     }
  36.    
  37.     @Override
  38.     public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
  39.         ModelAndView modelAndView) {
  40.         if (RootContext.inGlobalTransaction()) {
  41.             XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
  42.         }
  43.     }
  44. }
复制代码
 
11.XA分支事务注册与原始XA连接启动的源码
8.png
  1. //DataSource proxy for XA mode.
  2. public class DataSourceProxyXA extends AbstractDataSourceProxyXA {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxyXA.class);
  4.    
  5.     public DataSourceProxyXA(DataSource dataSource) {
  6.         this(dataSource, DEFAULT_RESOURCE_GROUP_ID);
  7.     }
  8.    
  9.     public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) {
  10.         if (dataSource instanceof SeataDataSourceProxy) {
  11.             LOGGER.info("Unwrap the data source, because the type is: {}", dataSource.getClass().getName());
  12.             dataSource = ((SeataDataSourceProxy) dataSource).getTargetDataSource();
  13.         }
  14.         this.dataSource = dataSource;
  15.         this.branchType = BranchType.XA;
  16.         JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId);
  17.         //Set the default branch type to 'XA' in the RootContext.
  18.         RootContext.setDefaultBranchType(this.getBranchType());
  19.     }
  20.    
  21.     @Override
  22.     public Connection getConnection() throws SQLException {
  23.         Connection connection = dataSource.getConnection();
  24.         return getConnectionProxy(connection);
  25.     }
  26.    
  27.     @Override
  28.     public Connection getConnection(String username, String password) throws SQLException {
  29.         Connection connection = dataSource.getConnection(username, password);
  30.         return getConnectionProxy(connection);
  31.     }
  32.    
  33.     protected Connection getConnectionProxy(Connection connection) throws SQLException {
  34.         if (!RootContext.inGlobalTransaction()) {
  35.             return connection;
  36.         }
  37.         return getConnectionProxyXA(connection);
  38.     }
  39.    
  40.     @Override
  41.     protected Connection getConnectionProxyXA() throws SQLException {
  42.         Connection connection = dataSource.getConnection();
  43.         return getConnectionProxyXA(connection);
  44.     }
  45.    
  46.     private Connection getConnectionProxyXA(Connection connection) throws SQLException {
  47.         //物理连接
  48.         Connection physicalConn = connection.unwrap(Connection.class);
  49.         XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this);
  50.         ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID());
  51.         connectionProxyXA.init();
  52.         return connectionProxyXA;
  53.     }
  54. }
  55. //Abstract DataSource proxy for XA mode.
  56. public abstract class AbstractDataSourceProxyXA extends BaseDataSourceResource<ConnectionProxyXA> {
  57.     protected static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT_XA";
  58.     //Get a ConnectionProxyXA instance for finishing XA branch(XA commit/XA rollback)
  59.     //@return ConnectionProxyXA instance
  60.     public ConnectionProxyXA getConnectionForXAFinish(XAXid xaXid) throws SQLException {
  61.         ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString());
  62.         if (connectionProxyXA != null) {
  63.             return connectionProxyXA;
  64.         }
  65.         return (ConnectionProxyXA)getConnectionProxyXA();
  66.     }
  67.    
  68.     protected abstract Connection getConnectionProxyXA() throws SQLException;
  69.     //Force close the physical connection kept for XA branch of given XAXid.
  70.     //@param xaXid the given XAXid
  71.     public void forceClosePhysicalConnection(XAXid xaXid) throws SQLException {
  72.         ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString());
  73.         if (connectionProxyXA != null) {
  74.             connectionProxyXA.close();
  75.             Connection physicalConn = connectionProxyXA.getWrappedConnection();
  76.             if (physicalConn instanceof PooledConnection) {
  77.                 physicalConn = ((PooledConnection)physicalConn).getConnection();
  78.             }
  79.             //Force close the physical connection
  80.             physicalConn.close();
  81.         }
  82.     }
  83. }
复制代码
 
12.XA分布式事务两阶段提交流程的源码
Seata的XA是依赖MySQL的XA来实现的。
9.png
  1. //Connection proxy for XA mode.
  2. //XA模式的连接代理,通过它可以进行本地事务打开和提交/回滚,执行SQL语句,执行XA两阶段提交
  3. public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
  4.     private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);
  5.     //当前自动提交事务状态,默认true
  6.     private boolean currentAutoCommitStatus = true;
  7.     //XA分支事务xid
  8.     private XAXid xaBranchXid;
  9.     //XA事务是否活跃的标记,默认false
  10.     private boolean xaActive = false;
  11.     //是否保持住XA事务,默认false
  12.     private boolean kept = false;
  13.     //Constructor of Connection Proxy for XA mode.
  14.     //@param originalConnection Normal Connection from the original DataSource.
  15.     //@param xaConnection XA Connection based on physical connection of the normal Connection above.
  16.     //@param resource The corresponding Resource(DataSource proxy) from which the connections was created.
  17.     //@param xid Seata global transaction xid.
  18.     public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
  19.         super(originalConnection, xaConnection, resource, xid);
  20.     }
  21.    
  22.     public void init() {
  23.         try {
  24.             this.xaResource = xaConnection.getXAResource();
  25.             this.currentAutoCommitStatus = this.originalConnection.getAutoCommit();
  26.             if (!currentAutoCommitStatus) {
  27.                 throw new IllegalStateException("Connection[autocommit=false] as default is NOT supported");
  28.             }
  29.         } catch (SQLException e) {
  30.             throw new RuntimeException(e);
  31.         }
  32.     }
  33.     ...
  34.     //修改和调整自动提交事务状态时,会开始进行分支事务的注册
  35.     @Override
  36.     public void setAutoCommit(boolean autoCommit) throws SQLException {
  37.         if (currentAutoCommitStatus == autoCommit) {
  38.             return;
  39.         }
  40.         if (autoCommit) {
  41.             //According to JDBC spec:
  42.             //If this method is called during a transaction and the auto-commit mode is changed, the transaction is committed.
  43.             if (xaActive) {
  44.                 commit();
  45.             }
  46.         } else {
  47.             if (xaActive) {
  48.                 throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
  49.             }
  50.             //Start a XA branch
  51.             long branchId = 0L;
  52.             try {
  53.                 //1. register branch to TC then get the branchId
  54.                 //分支事务发起注册,类型为XA,传入resourceId和xid
  55.                 branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null);
  56.             } catch (TransactionException te) {
  57.                 cleanXABranchContext();
  58.                 throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
  59.             }
  60.             //2. build XA-Xid with xid and branchId
  61.             this.xaBranchXid = XAXidBuilder.build(xid, branchId);
  62.             try {
  63.                 //3. XA Start
  64.                 xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
  65.             } catch (XAException e) {
  66.                 cleanXABranchContext();
  67.                 throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
  68.             }
  69.             //4. XA is active
  70.             this.xaActive = true;
  71.         }
  72.         currentAutoCommitStatus = autoCommit;
  73.     }
  74.     ...
  75. }
  76. //The type Abstract connection proxy on XA mode.
  77. public abstract class AbstractConnectionProxyXA implements Connection {
  78.     public static final String SQLSTATE_XA_NOT_END = "SQLSTATE_XA_NOT_END";
  79.     //原始连接
  80.     protected Connection originalConnection;
  81.     //XA包装过的连接
  82.     protected XAConnection xaConnection;
  83.     //XA事务资源
  84.     protected XAResource xaResource;
  85.     //基础数据库连接池资源
  86.     protected BaseDataSourceResource resource;
  87.     //分布式事务xid
  88.     protected String xid;
  89.    
  90.     public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
  91.         this.originalConnection = originalConnection;
  92.         this.xaConnection = xaConnection;
  93.         this.resource = resource;
  94.         this.xid = xid;
  95.     }
  96.     ...
  97. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册