大纲
1.Seata XA分布式事务案例及AT与XA的区别
2.Seata XA分布式事务案例的各模块运行流程
3.Seata使用Spring Boot自动装配简化复杂配置
4.全局事务注解扫描组件的自动装配
5.全局事务注解扫描器的核心变量与初始化
6.全局事务注解扫描器创建AOP代理
7.全局事务降级检查开启与提交事务的源码
8.Seata Server故障时全局事务拦截器的降级处理
9.基于HTTP请求头的全局事务传播的源码
10.XA数据源代理的初始化与XA连接代理的获取
11.XA分支事务注册与原始XA连接启动的源码
12.XA分布式事务两阶段提交流程的源码
1.Seata XA分布式事务案例及AT与XA的区别
(1)seata-samples项目的seata-xa模块简介
(2)启动Seata XA分布式事务案例的步骤
(3)Seata XA模式与AT模式对比
(1)seata-samples项目的seata-xa模块简介
seata-samples项目下的seata-xa模块,便基于Seata XA模式,实现了分布式事务的提交和回滚。
该seata-xa模块是一个Spring Boot项目:
一.使用了Feign来实现远程调用
二.使用了Spring JDBC来实现访问MySQL数据库
三.使用了Seata XA来实现分布式事务
(2)启动Seata XA分布式事务案例的步骤
一.执行案例所需的SQL语句:sql/all_in_one.sql- DROP TABLE IF EXISTS `stock_tbl`;
- CREATE TABLE `stock_tbl` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `commodity_code` varchar(255) DEFAULT NULL,
- `count` int(11) DEFAULT 0,
- PRIMARY KEY (`id`),
- UNIQUE KEY (`commodity_code`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- DROP TABLE IF EXISTS `order_tbl`;
- CREATE TABLE `order_tbl` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `user_id` varchar(255) DEFAULT NULL,
- `commodity_code` varchar(255) DEFAULT NULL,
- `count` int(11) DEFAULT 0,
- `money` int(11) DEFAULT 0,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- DROP TABLE IF EXISTS `account_tbl`;
- CREATE TABLE `account_tbl` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `user_id` varchar(255) DEFAULT NULL,
- `money` int(11) DEFAULT 0,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
复制代码 二.下载Seata Sever压缩包- 访问:https://github.com/seata/seata/releases
复制代码 三.解压Seata Sever的压缩包并启动Seata Server- $ unzip seata-server-xxx.zip
- $ cd distribution
- $ sh ./bin/seata-server.sh 8091 file
复制代码 四.依次启动AccountXA、OrderXA、StockXA、BusinessXA服务
五.测试- # 具体调用参数请结合BusinessController的代码
- $ curl http://127.0.0.1:8084/purchase
复制代码 六.说明
数据初始化逻辑参考BusinessService.initData()方法。基于初始化数据和默认的调用逻辑,purchase将被成功调用3次。每次账户余额扣减3000,由最初的10000减少到1000。第4次调用,因为账户余额不足,purchase调用将失败。最后相应的库存、订单、账户都会回滚。
(3)Seata XA模式与AT模式对比
只要切换数据源代理类型,上述案例即可在XA模式和AT模式之间进行切换。
一.XA模式使用的数据源代理类型是DataSourceProxyXA- public class DataSourceProxy {
- @Bean("dataSourceProxy")
- public DataSource dataSource(DruidDataSource druidDataSource) {
- //DataSourceProxyXA for XA mode
- return new DataSourceProxyXA(druidDataSource);
- }
- }
复制代码 二.AT模式使用的数据源代理类型是DataSourceProxy- public class DataSourceProxy {
- @Bean("dataSourceProxy")
- public DataSource dataSource(DruidDataSource druidDataSource) {
- return new DataSourceProxy(druidDataSource);
- }
- }
复制代码 AT模式需要在数据库中建立undo_log表
XA模式不需要在数据库中建立undo_log表- CREATE TABLE `undo_log` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
- `branch_id` bigint(20) NOT NULL,
- `xid` varchar(100) NOT NULL,
- `context` varchar(128) NOT NULL,
- `rollback_info` longblob NOT NULL,
- `log_status` int(11) NOT NULL,
- `log_created` datetime NOT NULL,
- `log_modified` datetime NOT NULL,
- PRIMARY KEY (`id`),
- UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
复制代码
2.Seata XA分布式事务案例的各模块运行流程
(1)account-xa模块实现的账户服务的运行流程
(2)order-xa模块实现的订单服务的运行流程
(3)stock-xa模块实现的库存服务的运行流程
(4)business-xa模块实现的全局事务入口服务的运行流程
(1)account-xa模块实现的账户服务的运行流程
如下是account-xa模块的核心代码:- @EnableTransactionManagement
- @SpringBootApplication
- public class AccountXAApplication {
- public static void main(String[] args) {
- //监听8083端口
- SpringApplication.run(AccountXAApplication.class, args);
- }
- }
- @Configuration
- public class AccountXADataSourceConfiguration {
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DruidDataSource druidDataSource() {
- return new DruidDataSource();
- }
-
- @Bean("dataSourceProxy")
- public DataSource dataSource(DruidDataSource druidDataSource) {
- //DataSourceProxyXA for XA mode
- return new DataSourceProxyXA(druidDataSource);
- }
-
- @Bean("jdbcTemplate")
- public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
- return new JdbcTemplate(dataSourceProxy);
- }
-
- @Bean
- public PlatformTransactionManager txManager(DataSource dataSourceProxy) {
- return new DataSourceTransactionManager(dataSourceProxy);
- }
- }
- @RestController
- public class AccountController {
- @Autowired
- private AccountService accountService;
-
- @RequestMapping(value = "/reduce", method = RequestMethod.GET, produces = "application/json")
- public String reduce(String userId, int money) {
- try {
- accountService.reduce(userId, money);
- } catch (Exception exx) {
- exx.printStackTrace();
- return FAIL;
- }
- return SUCCESS;
- }
- }
- @Service
- public class AccountService {
- public static final String SUCCESS = "SUCCESS";
- public static final String FAIL = "FAIL";
- private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
-
- @Autowired
- private JdbcTemplate jdbcTemplate;
- @Transactional
- public void reduce(String userId, int money) {
- String xid = RootContext.getXID();
- LOGGER.info("reduce account balance in transaction: " + xid);
- jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[]{money, userId});
- int balance = jdbcTemplate.queryForObject("select money from account_tbl where user_id = ?", new Object[]{userId}, Integer.class);
- LOGGER.info("balance after transaction: " + balance);
- if (balance < 0) {
- throw new RuntimeException("Not Enough Money ...");
- }
- }
- }
复制代码 (2)order-xa模块实现的订单服务的运行流程
如下是order-xa模块的核心代码:- @SpringBootApplication
- @EnableFeignClients
- public class OrderXAApplication {
- public static void main(String[] args) {
- //监听8082端口
- SpringApplication.run(OrderXAApplication.class, args);
- }
- }
- @Configuration
- public class OrderXADataSourceConfiguration {
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DruidDataSource druidDataSource() {
- return new DruidDataSource();
- }
-
- @Bean("dataSourceProxy")
- public DataSource dataSource(DruidDataSource druidDataSource) {
- //DataSourceProxyXA for XA mode
- return new DataSourceProxyXA(druidDataSource);
- }
-
- @Bean("jdbcTemplate")
- public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
- return new JdbcTemplate(dataSourceProxy);
- }
- }
- @RestController
- public class OrderController {
- @Autowired
- private OrderService orderService;
- @RequestMapping(value = "/create", method = RequestMethod.GET, produces = "application/json")
- public String create(String userId, String commodityCode, int orderCount) {
- try {
- orderService.create(userId, commodityCode, orderCount);
- } catch (Exception exx) {
- exx.printStackTrace();
- return FAIL;
- }
- return SUCCESS;
- }
- }
- @Service
- public class OrderService {
- public static final String SUCCESS = "SUCCESS";
- public static final String FAIL = "FAIL";
- private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class);
- @Autowired
- private AccountFeignClient accountFeignClient;
- @Autowired
- private JdbcTemplate jdbcTemplate;
- public void create(String userId, String commodityCode, Integer count) {
- String xid = RootContext.getXID();
- LOGGER.info("create order in transaction: " + xid);
- //定单总价 = 订购数量(count) * 商品单价(100)
- int orderMoney = count * 100;
- //生成订单
- jdbcTemplate.update("insert order_tbl(user_id,commodity_code,count,money) values(?,?,?,?)", new Object[]{userId, commodityCode, count, orderMoney});
- //调用账户余额扣减
- String result = accountFeignClient.reduce(userId, orderMoney);
- if (!SUCCESS.equals(result)) {
- throw new RuntimeException("Failed to call Account Service. ");
- }
- }
- }
- @FeignClient(name = "account-xa", url = "127.0.0.1:8083")
- public interface AccountFeignClient {
- @GetMapping("/reduce")
- String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money);
- }
复制代码 (3)stock-xa模块实现的库存服务的运行流程
如下是stock-xa模块的核心代码:- @SpringBootApplication
- public class StockXAApplication {
- public static void main(String[] args) {
- //监听8081端口
- SpringApplication.run(StockXAApplication.class, args);
- }
- }
- @Configuration
- public class StockXADataSourceConfiguration {
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DruidDataSource druidDataSource() {
- return new DruidDataSource();
- }
-
- @Bean("dataSourceProxy")
- public DataSource dataSource(DruidDataSource druidDataSource) {
- //DataSourceProxyXA for XA mode
- return new DataSourceProxyXA(druidDataSource);
- }
-
- @Bean("jdbcTemplate")
- public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) {
- return new JdbcTemplate(dataSourceProxy);
- }
- }
- @RestController
- public class StockController {
- @Autowired
- private StockService stockService;
- @RequestMapping(value = "/deduct", method = RequestMethod.GET, produces = "application/json")
- public String deduct(String commodityCode, int count) {
- try {
- stockService.deduct(commodityCode, count);
- } catch (Exception exx) {
- exx.printStackTrace();
- return FAIL;
- }
- return SUCCESS;
- }
- }
- @Service
- public class StockService {
- public static final String SUCCESS = "SUCCESS";
- public static final String FAIL = "FAIL";
- private static final Logger LOGGER = LoggerFactory.getLogger(StockService.class);
-
- @Autowired
- private JdbcTemplate jdbcTemplate;
- public void deduct(String commodityCode, int count) {
- String xid = RootContext.getXID();
- LOGGER.info("deduct stock balance in transaction: " + xid);
- jdbcTemplate.update("update stock_tbl set count = count - ? where commodity_code = ?", new Object[]{count, commodityCode});
- }
- }
复制代码 (4)business-xa模块实现的全局事务入口服务的运行流程
如下是business-xa模块的核心代码:- @SpringBootApplication
- @EnableFeignClients
- public class BusinessXAApplication {
- public static void main(String[] args) {
- //监听8084端口
- SpringApplication.run(BusinessXAApplication.class, args);
- }
- }
- @Configuration
- public class BusinessXADataSourceConfiguration {
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DruidDataSource dataSource() {
- return new DruidDataSource();
- }
-
- @Bean("jdbcTemplate")
- public JdbcTemplate jdbcTemplate(DataSource dataSource) {
- return new JdbcTemplate(dataSource);
- }
- }
- @RestController
- public class BusinessController {
- @Autowired
- private BusinessService businessService;
- @RequestMapping(value = "/purchase", method = RequestMethod.GET, produces = "application/json")
- public String purchase(Boolean rollback, Integer count) {
- int orderCount = 30;
- if (count != null) {
- orderCount = count;
- }
- try {
- businessService.purchase(TestDatas.USER_ID, TestDatas.COMMODITY_CODE, orderCount, rollback == null ? false : rollback.booleanValue());
- } catch (Exception exx) {
- return "Purchase Failed:" + exx.getMessage();
- }
- return SUCCESS;
- }
- }
- @Service
- public class BusinessService {
- public static final String SUCCESS = "SUCCESS";
- public static final String FAIL = "FAIL";
- private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);
-
- @Autowired
- private StockFeignClient stockFeignClient;
-
- @Autowired
- private OrderFeignClient orderFeignClient;
-
- @Autowired
- private JdbcTemplate jdbcTemplate;
- @GlobalTransactional
- public void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {
- String xid = RootContext.getXID();
- LOGGER.info("New Transaction Begins: " + xid);
- String result = stockFeignClient.deduct(commodityCode, orderCount);
- if (!SUCCESS.equals(result)) {
- throw new RuntimeException("库存服务调用失败,事务回滚!");
- }
- result = orderFeignClient.create(userId, commodityCode, orderCount);
- if (!SUCCESS.equals(result)) {
- throw new RuntimeException("订单服务调用失败,事务回滚!");
- }
- if (rollback) {
- throw new RuntimeException("Force rollback ... ");
- }
- }
- ...
- }
- @FeignClient(name = "stock-xa", url = "127.0.0.1:8081")
- public interface StockFeignClient {
- @GetMapping("/deduct")
- String deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") int count);
- }
- @FeignClient(name = "order-xa", url = "127.0.0.1:8082")
- public interface OrderFeignClient {
- @GetMapping("/create")
- String create(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("orderCount") int orderCount);
- }
复制代码
3.Seata使用Spring Boot自动装配简化复杂配置
(1)seata-spring-boot-starter简介
(2)自动装配GlobalTransactionScanner的Bean
(1)seata-spring-boot-starter简介
seata-v1.5.0源码下的seata-spring-boot-starter模块,便使用了Spring Boot自动装配来简化seata-all的复杂配置。与dubbo-spring-boot-starter是Spring Boot整合Dubbo所需的依赖一样,seata-spring-boot-starter也是Spring Boot整合Seata所需的依赖。
seata-samples-1.5.0项目中的seata-spring-boot-starter-samples模块,整合了SpringBoot + Dubbo + Mybatis + Nacos +Seata来实现Dubbo的分布式事务管理。其中使用Nacos作为Dubbo和Seata的注册中心和配置中心,使用MySQL数据库和MyBatis来操作数据。
注意:seata-spring-boot-starter默认会开启数据源自动代理,用户若再手动配置DataSourceProxy将会导致异常。
(2)自动装配GlobalTransactionScanner的Bean
seata-samples-1.5.0项目的seata-xa模块与seata-samples-dubbo模块不同,后者是通过xml配置文件来装配GlobalTransactionScanner这个Bean的。- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
- ...
-
- <bean >
- <constructor-arg value="dubbo-demo-account-service"/>
- <constructor-arg value="my_test_tx_group"/>
- </bean>
- </beans>
复制代码 前者是通过seata-spring-boot-starter的SeataAutoConfiguration类,来实现自动装配GlobalTransactionScanner这个Bean的。- @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
- public class SeataAutoConfiguration {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
- @Bean(BEAN_NAME_FAILURE_HANDLER)
- @ConditionalOnMissingBean(FailureHandler.class)
- public FailureHandler failureHandler() {
- return new DefaultFailureHandlerImpl();
- }
- @Bean
- @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
- @ConditionalOnMissingBean(GlobalTransactionScanner.class)
- public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Automatically configure Seata");
- }
- //返回一个@GlobalTransaction全局事务注解的扫描组件
- return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
- }
- }
复制代码
4.全局事务注解扫描组件的自动装配
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
(2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
resources/META-INF/spring.factories的配置文件如下,所以Spring Boot会自动装配上述这4个类。- # Auto Configure
- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
- io.seata.spring.boot.autoconfigure.SeataPropertiesAutoConfiguration,\
- io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
- io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
- io.seata.spring.boot.autoconfigure.HttpAutoConfiguration
复制代码 (2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
全局事务注解扫描器GlobalTransactionScanner会扫描@GlobalTransactional全局事务注解。- @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
- public class SeataAutoConfiguration {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
- @Bean(BEAN_NAME_FAILURE_HANDLER)
- @ConditionalOnMissingBean(FailureHandler.class)
- public FailureHandler failureHandler() {
- return new DefaultFailureHandlerImpl();
- }
- @Bean
- @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
- @ConditionalOnMissingBean(GlobalTransactionScanner.class)
- public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Automatically configure Seata");
- }
- //返回一个@GlobalTransaction全局事务注解的扫描组件
- return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
- }
- }
复制代码
5.全局事务注解扫描器的核心变量与初始化- //The type Global transaction scanner.
- //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary()
- //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法
- //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理
- public class GlobalTransactionScanner extends AbstractAutoProxyCreator
- implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
- ...
- //方法拦截器
- private MethodInterceptor interceptor;
- //全局事务注解拦截器
- private MethodInterceptor globalTransactionalInterceptor;
- //应用ID
- private final String applicationId;
- //全局事务服务分组
- private final String txServiceGroup;
- //事务模式
- private final int mode;
- //与阿里云有关的,一个是访问key,一个是密钥key
- private String accessKey;
- private String secretKey;
-
- //是否禁用全局事务配置,默认是false
- private volatile boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
- //是否完成初始化
- private final AtomicBoolean initialized = new AtomicBoolean(false);
-
- //失败处理钩子
- private final FailureHandler failureHandlerHook;
- //Spring容器上下文
- private ApplicationContext applicationContext;
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String txServiceGroup) {
- this(txServiceGroup, txServiceGroup, DEFAULT_MODE);
- }
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String txServiceGroup, int mode) {
- this(txServiceGroup, txServiceGroup, mode);
- }
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String applicationId, String txServiceGroup) {
- this(applicationId, txServiceGroup, DEFAULT_MODE);
- }
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode) {
- this(applicationId, txServiceGroup, mode, null);
- }
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String applicationId, String txServiceGroup, FailureHandler failureHandlerHook) {
- this(applicationId, txServiceGroup, DEFAULT_MODE, failureHandlerHook);
- }
- //Instantiates a new Global transaction scanner.
- public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) {
- setOrder(ORDER_NUM);
- setProxyTargetClass(true);
- this.applicationId = applicationId;
- this.txServiceGroup = txServiceGroup;
- this.mode = mode;
- this.failureHandlerHook = failureHandlerHook;
- }
-
- //Spring销毁时的回调接口
- @Override
- public void destroy() {
- ShutdownHook.getInstance().destroyAll();
- }
- //Spring初始化回调接口,负责这个扫描组件GlobalTransactionScanner的初始化
- @Override
- public void afterPropertiesSet() {
- if (disableGlobalTransaction) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Global transaction is disabled.");
- }
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
- return;
- }
- if (initialized.compareAndSet(false, true)) {
- initClient();
- }
- }
- //初始化Seata的两个网络通信客户端:TM网络客户端、RM网络客户端
- private void initClient() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Initializing Global Transaction Clients ... ");
- }
- if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
- throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
- }
- //init TM
- TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
- }
- //init RM
- RMClient.init(applicationId, txServiceGroup);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
- }
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Global Transaction Clients are initialized. ");
- }
- registerSpringShutdownHook();
- }
- ...
- }
复制代码
6.全局事务扫描器创建AOP代理
- //The type Global transaction scanner.
- //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary()
- //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法
- //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理
- public class GlobalTransactionScanner extends AbstractAutoProxyCreator
- implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
- ...
- //The following will be scanned, and added corresponding interceptor:
- //由于继承自AbstractAutoProxyCreator抽象类,所以Spring所有的Bean都会传递给这个方法来判断是否有一些Seata的注解
- //如果有一些Seata的注解,那么就会针对这些注解创建对应的AOP代理
- @Override
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- try {
- synchronized (PROXYED_SET) {
- if (PROXYED_SET.contains(beanName)) {
- return bean;
- }
- interceptor = null;
- //check TCC proxy
- if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
- //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
- interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
- } else {
- Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
- Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
- if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {
- return bean;
- }
- if (globalTransactionalInterceptor == null) {
- //构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器
- globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
- }
- interceptor = globalTransactionalInterceptor;
- }
- LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
- if (!AopUtils.isAopProxy(bean)) {
- //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理
- //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器
- bean = super.wrapIfNecessary(bean, beanName, cacheKey);
- } else {
- AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
- Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
- for (Advisor avr : advisor) {
- advised.addAdvisor(0, avr);
- }
- }
- PROXYED_SET.add(beanName);
- return bean;
- }
- } catch (Exception exx) {
- throw new RuntimeException(exx);
- }
- }
- ...
- }
复制代码
7.全局事务降级检查开启与提交事务的源码
在GlobalTransactionalInterceptor的初始化方法中,如果发现需要启用全局事务降级检查机制,那么就会调用startDegradeCheck()方法启动降级检查定时调度线程。
该定时调度线程默认会每隔2秒运行一次,也就是每隔2秒会尝试到Seata Server开启一个全局事务。如果开启成功,则获取到对应的xid,并对该全局事务xid进行提交,并且发布一个降级检查成功的事件到事件总线中。如果开启失败或提交失败,则发布一个降级检查失败的事件到事件总线中。- //The type Global transactional interceptor.
- //当调用添加了全局事务注解@GlobalTransactional的方法时,会被AOP拦截进入到这个拦截器里的invoke()方法
- public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
- private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
- private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
- //全局事务执行模板
- private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
- //全局锁模板
- private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
- //失败处理器
- private final FailureHandler failureHandler;
- //是否禁用全局事务
- private volatile boolean disable;
- //降级检查周期
- //降级机制:如果Seata Server挂掉导致全局事务没法推进,那么就可以进行降级运行本地事务
- private static int degradeCheckPeriod;
- //是否启用降级检查机制
- private static volatile boolean degradeCheck;
- //降级检查允许次数
- private static int degradeCheckAllowTimes;
- //降级数量
- private static volatile Integer degradeNum = 0;
- //达标数量
- private static volatile Integer reachNum = 0;
- //降级检查的事件总线
- private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
- //降级检查定时调度线程池
- private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
- //region DEFAULT_GLOBAL_TRANSACTION_TIMEOUT
- //默认的全局事务超时时间
- private static int defaultGlobalTransactionTimeout = 0;
- //endregion
- //Instantiates a new Global transactional interceptor.
- public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
- //失败处理器
- this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
- //是否禁用全局事务,默认false
- this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
- //是否启用全局事务降级检查机制,默认是false
- degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);
- //如果启用全局事务降级检查机制
- if (degradeCheck) {
- //添加一个监听器
- ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
- //设置降级检查周期,默认是2s一次
- degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
- //设置降级检查允许次数,默认10次
- degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
- //将自己注册到降级检查事件总线里,作为事件订阅者
- EVENT_BUS.register(this);
- //如果降级检查周期大于0,而且降级检查允许次数大于0,此时启动降级检查线程
- if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
- startDegradeCheck();
- }
- }
- //初始化默认全局事务超时时间
- this.initDefaultGlobalTransactionTimeout();
- }
- private void initDefaultGlobalTransactionTimeout() {
- if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {
- int defaultGlobalTransactionTimeout;
- try {
- defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
- } catch (Exception e) {
- LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
- defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
- }
- if (defaultGlobalTransactionTimeout <= 0) {
- LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
- defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
- }
- GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
- }
- }
- ...
-
- //auto upgrade service detection
- //启动降级检查定时调度线程,默认每隔2s运行一次
- private static void startDegradeCheck() {
- executor.scheduleAtFixedRate(() -> {
- if (degradeCheck) {
- try {
- //尝试通过应用id为null、事务服务分组为null、使用事务名称是degradeCheck、超时时间是60s的参数,去Seata Server开启一个全局事务
- String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
- //如果开启成功了,就获取到一个xid,直接对全局事务xid进行commit提交
- TransactionManagerHolder.get().commit(xid);
- //如果xid提交成功了,就发布一个降级检查事件到事件总线里,表明降级检查结果是true
- EVENT_BUS.post(new DegradeCheckEvent(true));
- } catch (Exception e) {
- //如果开启一个全局事务失败,或者提交xid失败了,那么发布一个事件表示降级检查失败,结果是false
- EVENT_BUS.post(new DegradeCheckEvent(false));
- }
- }
- }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
- }
- ...
- }
复制代码
9.基于HTTP请求头的全局事务传播的源码- public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
- ...
- @Subscribe
- public static void onDegradeCheck(DegradeCheckEvent event) {
- if (event.isRequestSuccess()) {//如果检查成功,TC是有效的
- if (degradeNum >= degradeCheckAllowTimes) {
- reachNum++;
- if (reachNum >= degradeCheckAllowTimes) {
- reachNum = 0;
- degradeNum = 0;
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("the current global transaction has been restored");
- }
- }
- } else if (degradeNum != 0) {
- degradeNum = 0;
- }
- } else {//如果检查失败,TC故障了,无法开启全局事务和提交了
- //如果降级次数小于允许降级检查次数(10次)
- if (degradeNum < degradeCheckAllowTimes) {
- degradeNum++; //对降级次数+1
- if (degradeNum >= degradeCheckAllowTimes) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("the current global transaction has been automatically downgraded");
- }
- }
- } else if (reachNum != 0) {
- reachNum = 0;
- }
- }
- }
-
- //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法
- @Override
- public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
- Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
- Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
- if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
- final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
- final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
- final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
- //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true
- //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了
- boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
- //如果全局事务没有禁用
- if (!localDisable) {
- if (globalTransactionalAnnotation != null) {
- //真正处理全局事务的入口
- return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
- } else if (globalLockAnnotation != null) {
- return handleGlobalLock(methodInvocation, globalLockAnnotation);
- }
- }
- }
- //直接运行目标方法
- return methodInvocation.proceed();
- }
- ...
- }
复制代码
10.XA数据源代理的初始化与XA连接代理的获取- @Configuration
- @ConditionalOnWebApplication
- public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new TransactionPropagationInterceptor());
- }
-
- @Override
- public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) {
- exceptionResolvers.add(new HttpHandlerExceptionResolver());
- }
- }
- //Springmvc Intercepter.
- public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
- private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
-
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
- String xid = RootContext.getXID();
- //和Spring Cloud整合后,Feign在运行时也会执行Filter机制
- //此时就可以把xid作为一个请求头放到HTTP请求里去
- //接收方就会通过Spring MVC的拦截器,从请求头里提取xid,绑定到RootContext里
- String rpcXid = request.getHeader(RootContext.KEY_XID);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
- }
- if (xid == null && rpcXid != null) {
- RootContext.bind(rpcXid);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("bind[{}] to RootContext", rpcXid);
- }
- }
- return true;
- }
-
- @Override
- public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
- ModelAndView modelAndView) {
- if (RootContext.inGlobalTransaction()) {
- XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
- }
- }
- }
复制代码
11.XA分支事务注册与原始XA连接启动的源码
- //DataSource proxy for XA mode.
- public class DataSourceProxyXA extends AbstractDataSourceProxyXA {
- private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxyXA.class);
-
- public DataSourceProxyXA(DataSource dataSource) {
- this(dataSource, DEFAULT_RESOURCE_GROUP_ID);
- }
-
- public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) {
- if (dataSource instanceof SeataDataSourceProxy) {
- LOGGER.info("Unwrap the data source, because the type is: {}", dataSource.getClass().getName());
- dataSource = ((SeataDataSourceProxy) dataSource).getTargetDataSource();
- }
- this.dataSource = dataSource;
- this.branchType = BranchType.XA;
- JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId);
- //Set the default branch type to 'XA' in the RootContext.
- RootContext.setDefaultBranchType(this.getBranchType());
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- Connection connection = dataSource.getConnection();
- return getConnectionProxy(connection);
- }
-
- @Override
- public Connection getConnection(String username, String password) throws SQLException {
- Connection connection = dataSource.getConnection(username, password);
- return getConnectionProxy(connection);
- }
-
- protected Connection getConnectionProxy(Connection connection) throws SQLException {
- if (!RootContext.inGlobalTransaction()) {
- return connection;
- }
- return getConnectionProxyXA(connection);
- }
-
- @Override
- protected Connection getConnectionProxyXA() throws SQLException {
- Connection connection = dataSource.getConnection();
- return getConnectionProxyXA(connection);
- }
-
- private Connection getConnectionProxyXA(Connection connection) throws SQLException {
- //物理连接
- Connection physicalConn = connection.unwrap(Connection.class);
- XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this);
- ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID());
- connectionProxyXA.init();
- return connectionProxyXA;
- }
- }
- //Abstract DataSource proxy for XA mode.
- public abstract class AbstractDataSourceProxyXA extends BaseDataSourceResource<ConnectionProxyXA> {
- protected static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT_XA";
- //Get a ConnectionProxyXA instance for finishing XA branch(XA commit/XA rollback)
- //@return ConnectionProxyXA instance
- public ConnectionProxyXA getConnectionForXAFinish(XAXid xaXid) throws SQLException {
- ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString());
- if (connectionProxyXA != null) {
- return connectionProxyXA;
- }
- return (ConnectionProxyXA)getConnectionProxyXA();
- }
-
- protected abstract Connection getConnectionProxyXA() throws SQLException;
- //Force close the physical connection kept for XA branch of given XAXid.
- //@param xaXid the given XAXid
- public void forceClosePhysicalConnection(XAXid xaXid) throws SQLException {
- ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString());
- if (connectionProxyXA != null) {
- connectionProxyXA.close();
- Connection physicalConn = connectionProxyXA.getWrappedConnection();
- if (physicalConn instanceof PooledConnection) {
- physicalConn = ((PooledConnection)physicalConn).getConnection();
- }
- //Force close the physical connection
- physicalConn.close();
- }
- }
- }
复制代码
12.XA分布式事务两阶段提交流程的源码
Seata的XA是依赖MySQL的XA来实现的。
- //Connection proxy for XA mode.
- //XA模式的连接代理,通过它可以进行本地事务打开和提交/回滚,执行SQL语句,执行XA两阶段提交
- public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);
- //当前自动提交事务状态,默认true
- private boolean currentAutoCommitStatus = true;
- //XA分支事务xid
- private XAXid xaBranchXid;
- //XA事务是否活跃的标记,默认false
- private boolean xaActive = false;
- //是否保持住XA事务,默认false
- private boolean kept = false;
- //Constructor of Connection Proxy for XA mode.
- //@param originalConnection Normal Connection from the original DataSource.
- //@param xaConnection XA Connection based on physical connection of the normal Connection above.
- //@param resource The corresponding Resource(DataSource proxy) from which the connections was created.
- //@param xid Seata global transaction xid.
- public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
- super(originalConnection, xaConnection, resource, xid);
- }
-
- public void init() {
- try {
- this.xaResource = xaConnection.getXAResource();
- this.currentAutoCommitStatus = this.originalConnection.getAutoCommit();
- if (!currentAutoCommitStatus) {
- throw new IllegalStateException("Connection[autocommit=false] as default is NOT supported");
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
- ...
- //修改和调整自动提交事务状态时,会开始进行分支事务的注册
- @Override
- public void setAutoCommit(boolean autoCommit) throws SQLException {
- if (currentAutoCommitStatus == autoCommit) {
- return;
- }
- if (autoCommit) {
- //According to JDBC spec:
- //If this method is called during a transaction and the auto-commit mode is changed, the transaction is committed.
- if (xaActive) {
- commit();
- }
- } else {
- if (xaActive) {
- throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
- }
- //Start a XA branch
- long branchId = 0L;
- try {
- //1. register branch to TC then get the branchId
- //分支事务发起注册,类型为XA,传入resourceId和xid
- branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null);
- } catch (TransactionException te) {
- cleanXABranchContext();
- throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
- }
- //2. build XA-Xid with xid and branchId
- this.xaBranchXid = XAXidBuilder.build(xid, branchId);
- try {
- //3. XA Start
- xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
- } catch (XAException e) {
- cleanXABranchContext();
- throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
- }
- //4. XA is active
- this.xaActive = true;
- }
- currentAutoCommitStatus = autoCommit;
- }
- ...
- }
- //The type Abstract connection proxy on XA mode.
- public abstract class AbstractConnectionProxyXA implements Connection {
- public static final String SQLSTATE_XA_NOT_END = "SQLSTATE_XA_NOT_END";
- //原始连接
- protected Connection originalConnection;
- //XA包装过的连接
- protected XAConnection xaConnection;
- //XA事务资源
- protected XAResource xaResource;
- //基础数据库连接池资源
- protected BaseDataSourceResource resource;
- //分布式事务xid
- protected String xid;
-
- public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
- this.originalConnection = originalConnection;
- this.xaConnection = xaConnection;
- this.resource = resource;
- this.xid = xid;
- }
- ...
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |