找回密码
 立即注册
首页 业界区 业界 1、SEATA分布式事务——XA模式

1、SEATA分布式事务——XA模式

染罕习 4 小时前
一、传统分布式XA事务的2PC

  2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。常见的关系型数据库如 Oracle、MySQL 都支持两阶段提交协议,如下图:

  • 成功情况
    1.png

  • 失败情况
    2.png

①、准备阶段(Prepare phase):TM(事务管理器)给每个RM(资源管理器,也就是数据库)发送 Prepare 消息,每个RM在本地执行事务,并写本地的 Undo/Redo (Undo 日志是记录修改前的数据,用于数据库回滚,Redo 日志是记录修改后的数据,用于提交事务后写入数据文件)日志,此时事务没有提交。
②、提交阶段(commit phase):如果TM(事务管理器)收到了RM(资源管理器,也就是数据库)的执行失败或者超时消息时,直接给每个RM发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交(Commit)或者回滚(Rollback)操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。
  传统XA事务的详细细节和实现方式,请查看我的另一篇博客:mysql数据库事务的实现和XA事务
二、Seata的XA模式

  seata实现分布式事务的样例程序
  在 Seata 定义的分布式事务框架内,XA模式是利用RM(资源管理器,也就是数据库)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式,如下所示:
3.png

从上面的图可以看到,seata XA 模式分为以下5个步骤:
①、TM(事务管理器) 开启全局事务;
②、RM 向 TC(事务协调者) 注册分支事务;
③、RM 向 TC 报告分支事务状态;
④、TC 向 RM (资源管理器,也就是数据库)发送commit/rollback 请求;
⑤、TM 结束全局事务Global Commit/Rollback
  负责RM 客户端的类是RmNettyRemotingClient.class,这个类的UML图如下所示:
4.png

RmNettyRemotingClient.class的父类中的内部类AbstractNettyRemotingClient.class::ClientHandler.class来处理 TC 发来的请求并再次委托给父类AbstractNettyRemoting.class::processMessage()函数来处理TC发来的请求

  • AbstractNettyRemotingClient.class::ClientHandler.class的源码如下:
  1. ...省略部分导包代码...
  2. import io.netty.channel.ChannelDuplexHandler;
  3. import io.netty.channel.ChannelHandlerContext;
  4. ...省略部分导包代码...
  5. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  6.     ...省略部分代码...
  7.     @Sharable
  8.     class ClientHandler extends ChannelDuplexHandler {
  9.         ClientHandler() {
  10.         }
  11.    
  12.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  13.             if (msg instanceof RpcMessage) {
  14.                 AbstractNettyRemotingClient.this.processMessage(ctx, (RpcMessage)msg);
  15.             } else {
  16.                 AbstractNettyRemotingClient.LOGGER.error("rpcMessage type error");
  17.             }
  18.    
  19.         }
  20.     }
  21.     ...省略部分代码...
  22. }
复制代码

  • AbstractNettyRemoting.class::processMessage()的源码如下:
  1. public abstract class AbstractNettyRemoting implements Disposable {
  2.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  3.         if (LOGGER.isDebugEnabled()) {
  4.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  5.         }
  6.         Object body = rpcMessage.getBody();
  7.         if (body instanceof MessageTypeAware) {
  8.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  9.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  10.             if (pair != null) {
  11.                 if (pair.getSecond() != null) {
  12.                     try {
  13.                         pair.getSecond().execute(() -> {
  14.                             try {
  15.                                 //最终调用的是RmBranchCommitProcessor.class的process()函数
  16.                                 pair.getFirst().process(ctx, rpcMessage);
  17.                             } catch (Throwable th) {
  18.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  19.                             } finally {
  20.                                 MDC.clear();
  21.                             }
  22.                         });
  23.                     } catch (RejectedExecutionException e) {
  24.                         LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
  25.                             "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
  26.                         if (allowDumpStack) {
  27.                             String name = ManagementFactory.getRuntimeMXBean().getName();
  28.                             String pid = name.split("@")[0];
  29.                             long idx = System.currentTimeMillis();
  30.                             try {
  31.                                 String jstackFile = idx + ".log";
  32.                                 LOGGER.info("jstack command will dump to " + jstackFile);
  33.                                 Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
  34.                             } catch (IOException exx) {
  35.                                 LOGGER.error(exx.getMessage());
  36.                             }
  37.                             allowDumpStack = false;
  38.                         }
  39.                     }
  40.                 } else {
  41.                     try {
  42.                         pair.getFirst().process(ctx, rpcMessage);
  43.                     } catch (Throwable th) {
  44.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  45.                     }
  46.                 }
  47.             } else {
  48.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  49.             }
  50.         } else {
  51.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  52.         }
  53.     }
  54. }
复制代码

  • RmBranchCommitProcessor.class的源码如下:
  1. package org.apache.seata.core.rpc.processor.client;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import org.apache.seata.common.util.NetUtil;
  4. import org.apache.seata.core.protocol.RpcMessage;
  5. import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
  6. import org.apache.seata.core.protocol.transaction.BranchCommitResponse;
  7. import org.apache.seata.core.rpc.RemotingClient;
  8. import org.apache.seata.core.rpc.RpcContext;
  9. import org.apache.seata.core.rpc.TransactionMessageHandler;
  10. import org.apache.seata.core.rpc.processor.RemotingProcessor;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. public class RmBranchCommitProcessor implements RemotingProcessor {
  14.     private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);
  15.     private TransactionMessageHandler handler;
  16.     private RemotingClient remotingClient;
  17.     public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
  18.         this.handler = handler;
  19.         this.remotingClient = remotingClient;
  20.     }
  21.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  22.         String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  23.         Object msg = rpcMessage.getBody();
  24.         if (LOGGER.isInfoEnabled()) {
  25.             LOGGER.info("rm client handle branch commit process:" + msg);
  26.         }
  27.         this.handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest)msg);
  28.     }
  29.     private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
  30.         BranchCommitResponse resultMessage = (BranchCommitResponse)this.handler.onRequest(branchCommitRequest, (RpcContext)null);
  31.         if (LOGGER.isDebugEnabled()) {
  32.             LOGGER.debug("branch commit result:" + resultMessage);
  33.         }
  34.         try {
  35.             this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
  36.         } catch (Throwable var6) {
  37.             LOGGER.error("branch commit error: {}", var6.getMessage(), var6);
  38.         }
  39.     }
  40. }
复制代码
seata 的 xa 模式是两阶段提交:
①、第一阶段先执行 XA Start、执行SQL、XA End三个步骤,之后直接执行XA Prepare。
②、第二阶段执行 XA commit/rollback。
但是oracle数据库不支持,因为 oracle 实现的是标准的 xa 协议,即 xa end 后,TC(事务协调者)向RM (资源管理器,也就是数据库)统一发送 prepare,最后再发送 commit/rollback。这也导致了 seata 的 xa 模式对 oracle 数据库的支持不太好。
2.1、xa模式数据源代理与at模式数据源代理的区别

  seata 中的 XA 模式是使用数据源代理来实现的,需要手动配置数据源代理,代码如下:
  1. import org.apache.seata.rm.datasource.xa.DataSourceProxyXA;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.context.annotation.Bean;
  4. import javax.sql.DataSource;
  5. @Bean
  6. @ConfigurationProperties(prefix = "spring.datasource")
  7. public DruidDataSource druidDataSource() {
  8.     //也可以根据普通 DataSource 来创建 XAConnection,但是这种方式有兼容性问题(比如 oracle数据库不支持)
  9.     return new DruidDataSource();
  10. }
  11. @Bean("dataSourceProxy")
  12. public DataSource dataSource(DruidDataSource druidDataSource) {
  13.     //所以 seata 使用了开发者自己配置 XADataSource
  14.     //seata 提供的 XA 数据源代理,要求代码框架中必须使用 druid 连接池
  15.     return new DataSourceProxyXA(druidDataSource);
  16. }
复制代码
①、在数据源代理根据普通数据源(DataSource)中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT 模式的数据源代理机制的区别,如下所示:
5.png

DataSourceProxyXA、ConnectionProxyXA、StatementProxyXA的UML关系图,如下所示:
6.png


  • DataSourceProxyXA.class的部分源码
  1. ...省略部分导包代码...
  2. import org.apache.seata.rm.datasource.util.XAUtils;
  3. import javax.sql.DataSource;
  4. import javax.sql.XAConnection;
  5. ...省略部分导包代码...
  6. public class DataSourceProxyXA extends AbstractDataSourceProxyXA {
  7.     ...省略部分代码...
  8.     protected Connection getConnectionProxy(Connection connection) throws SQLException {
  9.         return !RootContext.inGlobalTransaction() ? connection : this.getConnectionProxyXA(connection);
  10.     }
  11.    
  12.     protected Connection getConnectionProxyXA() throws SQLException {
  13.         Connection connection = this.dataSource.getConnection();
  14.         return this.getConnectionProxyXA(connection);
  15.     }
  16.     //创建ConnectionProxyXA和XAConnection
  17.     private Connection getConnectionProxyXA(Connection connection) throws SQLException {
  18.         Connection physicalConn = (Connection)connection.unwrap(Connection.class);
  19.         XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this);
  20.         ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID());
  21.         connectionProxyXA.init();
  22.         return connectionProxyXA;
  23.     }
  24.     ...省略部分代码...
  25. }
复制代码

  • ConnectionProxyXA .class的部分源码和AbstractConnectionProxyXA.class的部分源码
  1. ...省略部分导包代码...
  2. public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);
  4.     private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt("client.rm.branchExecutionTimeoutXA", 60000);
  5.     private volatile boolean currentAutoCommitStatus = true;
  6.     private volatile XAXid xaBranchXid;
  7.     private volatile boolean xaActive = false;
  8.     private volatile boolean xaEnded = false;
  9.     private volatile boolean kept = false;
  10.     private volatile boolean rollBacked = false;
  11.     private volatile Long branchRegisterTime = null;
  12.     private volatile Long prepareTime = null;
  13.     private static final Integer TIMEOUT;
  14.     private boolean shouldBeHeld = false;
  15.     public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
  16.         super(originalConnection, xaConnection, resource, xid);
  17.         this.shouldBeHeld = resource.isShouldBeHeld();
  18.     }
  19.    
  20.     ...省略部分代码...
  21. }
复制代码
  1. package org.apache.seata.rm.datasource.xa;
  2. ...省略部分导包代码...
  3. import javax.sql.XAConnection;
  4. import java.sql.Connection;
  5. ...省略部分导包代码...
  6. public abstract class AbstractConnectionProxyXA implements Connection {
  7.      ...省略部分代码...
  8.     protected Connection originalConnection;
  9.     public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) {
  10.         this.originalConnection = originalConnection;
  11.         this.xaConnection = xaConnection;
  12.         this.resource = resource;
  13.         this.xid = xid;
  14.     }
  15.     //用重载的方式创建不同的StatementProxyXA
  16.     @Override
  17.     public Statement createStatement() throws SQLException {
  18.         Statement targetStatement = originalConnection.createStatement();
  19.         return new StatementProxyXA(this, targetStatement);
  20.     }
  21.    
  22.     @Override
  23.     public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
  24.         Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency);
  25.         return new StatementProxyXA(this, statement);
  26.     }
  27.    
  28.     @Override
  29.     public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
  30.             throws SQLException {
  31.         Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency,
  32.                 resultSetHoldability);
  33.         return new StatementProxyXA(this, statement);
  34.     }
  35.      ...省略部分代码...
  36. }
复制代码

  • StatementProxyXA.class的部分源码
  1. import java.sql.Connection;
  2. import java.sql.ResultSet;
  3. import java.sql.SQLException;
  4. import java.sql.SQLWarning;
  5. import java.sql.Statement;
  6. /**
  7. * Statement proxy for XA mode.
  8. *
  9. */
  10. public class StatementProxyXA implements Statement {
  11.     protected AbstractConnectionProxyXA connectionProxyXA;
  12.     protected Statement targetStatement;
  13.     public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
  14.         this.connectionProxyXA = connectionProxyXA;
  15.         this.targetStatement = targetStatement;
  16.     }
  17.     ...省略部分代码...
  18. }
复制代码
②、在数据源代理是指定的XA 数据源(XADataSource)进行代理方式中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT模式的数据源代理机制的区别,如下所示:
7.png

作者在SEATA的2.3.0版本中没有找到XADataSourceProxy.class、XAConnectionProxy.class、StatementProxyXA.class
2.2、XA 第一阶段的部分源码

  XA第一阶段是指XA Start、执行SQL、XA End、XA Prepare,如下所示:
8.png

当 RM 收到 DML 请求后,seata 会使用 ExecuteTemplateXA.class中的静态函数execute()来执行,execute()中有一个地方很关键,就是把 autocommit 属性改为了 false,而 mysql 默认 autocommit 是 true。事务提交之后,还要把 autocommit 改回默认。如下所示:
  1. import java.sql.Connection;
  2. import java.sql.ResultSet;
  3. import java.sql.SQLException;
  4. import java.sql.SQLWarning;
  5. import java.sql.Statement;
  6. /**
  7. * Statement proxy for XA mode.
  8. *
  9. */
  10. public class StatementProxyXA implements Statement {
  11.     protected AbstractConnectionProxyXA connectionProxyXA;
  12.    
  13.     protected Statement targetStatement;
  14.    
  15.     public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
  16.         this.connectionProxyXA = connectionProxyXA;
  17.         this.targetStatement = targetStatement;
  18.     }
  19.    
  20.     @Override
  21.     public int executeUpdate(String sql) throws SQLException {
  22.         return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate(
  23.             (String)args[0]), targetStatement, sql);
  24.     }
  25.     ...省略部分代码...
  26. }
复制代码
2.2.1、XA Start环节


  • ExecuteTemplateXA .class的的部分源码
  1. package org.apache.seata.rm.datasource.xa;
  2. import org.apache.seata.rm.datasource.exec.StatementCallback;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. public class ExecuteTemplateXA {
  8.    
  9.     private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);
  10.     public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA,
  11.                                                      StatementCallback<T, S> statementCallback,
  12.                                                      S targetStatement,
  13.                                                      Object... args) throws SQLException {
  14.         boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
  15.         if (autoCommitStatus) {
  16.             // XA Start环节
  17.             connectionProxyXA.setAutoCommit(false);
  18.         }
  19.         ...省略部分代码...
  20.     }
  21.      ...省略部分代码...
  22. }
复制代码

  • ConnectionProxyXA .class的部分源码——真正开启XA Start环节
  1. package org.apache.seata.rm.datasource.xa;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. import javax.sql.PooledConnection;
  5. import javax.sql.XAConnection;
  6. import javax.transaction.xa.XAException;
  7. import javax.transaction.xa.XAResource;
  8. import org.apache.seata.common.DefaultValues;
  9. import org.apache.seata.common.util.StringUtils;
  10. import org.apache.seata.config.ConfigurationFactory;
  11. import org.apache.seata.core.exception.TransactionException;
  12. import org.apache.seata.core.model.BranchStatus;
  13. import org.apache.seata.core.model.BranchType;
  14. import org.apache.seata.rm.BaseDataSourceResource;
  15. import org.apache.seata.rm.DefaultResourceManager;
  16. import org.apache.seata.rm.datasource.util.SeataXAResource;
  17. import org.apache.seata.sqlparser.util.JdbcConstants;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
  21.      ...省略部分代码...
  22.     @Override
  23.     public void setAutoCommit(boolean autoCommit) throws SQLException {
  24.         if (currentAutoCommitStatus == autoCommit) {
  25.             return;
  26.         }
  27.         if (isReadOnly()) {
  28.             //If it is a read-only transaction, do nothing
  29.             currentAutoCommitStatus = autoCommit;
  30.             return;
  31.         }
  32.         if (autoCommit) {
  33.             // According to JDBC spec:
  34.             // If this method is called during a transaction and the
  35.             // auto-commit mode is changed, the transaction is committed.
  36.             if (xaActive) {
  37.                 commit();
  38.             }
  39.         } else {
  40.             if (xaActive) {
  41.                 throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
  42.             }
  43.             // Start a XA branch
  44.             long branchId;
  45.             try {
  46.                 // 1. register branch to TC then get the branch message
  47.                 branchRegisterTime = System.currentTimeMillis();
  48.                 branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
  49.                         null);
  50.             } catch (TransactionException te) {
  51.                 cleanXABranchContext();
  52.                 throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
  53.             }
  54.             // 2. build XA-Xid with xid and branchId
  55.             this.xaBranchXid = XAXidBuilder.build(xid, branchId);
  56.             // Keep the Connection if necessary
  57.             keepIfNecessary();
  58.             try {
  59.                 start();//开启XA事务
  60.             } catch (XAException e) {
  61.                 cleanXABranchContext();
  62.                 throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
  63.             }
  64.             // 4. XA is active
  65.             this.xaActive = true;
  66.    
  67.         }
  68.    
  69.         currentAutoCommitStatus = autoCommit;
  70.     }
  71.     //最终调用了start()函数才真正开启了XA Start环节
  72.     private synchronized void start() throws XAException, SQLException {
  73.         // 3. XA Start
  74.         if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
  75.             xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
  76.         } else {
  77.             xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
  78.         }
  79.    
  80.         try {
  81.             termination();
  82.         } catch (SQLException e) {
  83.             // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception
  84.             xaResource.end(this.xaBranchXid, XAResource.TMFAIL);
  85.             xaRollback(xaBranchXid);
  86.             // Branch Report to TC: Failed
  87.             reportStatusToTC(BranchStatus.PhaseOne_Failed);
  88.             throw  e;
  89.         }
  90.     }
  91.      ...省略部分代码...
  92. }
复制代码
2.2.2、执行SQL环节


  • ExecuteTemplateXA .class的部分源码
  1. package org.apache.seata.rm.datasource.xa;
  2. import org.apache.seata.rm.datasource.exec.StatementCallback;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. /**
  8. * The type Execute template.
  9. *
  10. */
  11. public class ExecuteTemplateXA {
  12.     private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);
  13.     public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA,
  14.                                                      StatementCallback<T, S> statementCallback,
  15.                                                      S targetStatement,
  16.                                                      Object... args) throws SQLException {
  17.         boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
  18.         if (autoCommitStatus) {
  19.             // XA Start
  20.             connectionProxyXA.setAutoCommit(false);
  21.         }
  22.         try {
  23.             T res = null;
  24.             try {
  25.                 //执行SQL环节,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL
  26.                 // execute SQL
  27.                 res = statementCallback.execute(targetStatement, args);
  28.                         ...省略部分代码...
  29.     }
  30.     ...省略部分代码...
  31. }
复制代码

  • StatementProxyXA .class的部分源代码,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL
  1. public class StatementProxyXA implements Statement {
  2.     protected AbstractConnectionProxyXA connectionProxyXA;
  3.    
  4.     protected Statement targetStatement;
  5.    
  6.     public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) {
  7.         this.connectionProxyXA = connectionProxyXA;
  8.         this.targetStatement = targetStatement;
  9.     }
  10.    
  11.     @Override
  12.     public int executeUpdate(String sql) throws SQLException {
  13.         //最终调用了StatementProxyXA.class中的匿名内部类执行了SQL
  14.         return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate(
  15.             (String)args[0]), targetStatement, sql);
  16.     }
  17.     ...省略部分代码...
  18. }
复制代码
2.2.3、XA End环节和XA Prepare环节


  • ExecuteTemplateXA .class的的部分源码
  1. package org.apache.seata.rm.datasource.xa;
  2. import org.apache.seata.rm.datasource.exec.StatementCallback;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. /**
  8. * The type Execute template.
  9. *
  10. */
  11. public class ExecuteTemplateXA {
  12.     private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class);
  13.     public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA,
  14.                                                      StatementCallback<T, S> statementCallback,
  15.                                                      S targetStatement,
  16.                                                      Object... args) throws SQLException {
  17.         boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
  18.         if (autoCommitStatus) {
  19.             // XA Start
  20.             connectionProxyXA.setAutoCommit(false);
  21.         }
  22.         try {
  23.             T res = null;
  24.             try {
  25.                 // execute SQL
  26.                 res = statementCallback.execute(targetStatement, args);
  27.             } catch (Throwable ex) {
  28.                 if (autoCommitStatus) {
  29.                     // XA End & Rollback
  30.                     try {
  31.                         connectionProxyXA.rollback();
  32.                     } catch (SQLException sqle) {
  33.                         // log and ignore the rollback failure.
  34.                         LOGGER.warn(
  35.                             "Failed to rollback xa branch of " + connectionProxyXA.xid +
  36.                                 "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
  37.                             sqle);
  38.                     }
  39.                 }
  40.                 if (ex instanceof SQLException) {
  41.                     throw ex;
  42.                 } else {
  43.                     throw new SQLException(ex);
  44.                 }
  45.             }
  46.             if (autoCommitStatus) {
  47.                 try {
  48.                     // XA End & Prepare
  49.                     //XA End环节和XA Prepare环节
  50.                     connectionProxyXA.commit();
  51.                     ...省略部分代码...
  52.     }
  53.     ...省略部分代码...
  54. }
复制代码

  • ConnectionProxyXA .class的部分源码——真正执行XA End环节和XA Prepare环节
  1. package org.apache.seata.rm.datasource.xa;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. import javax.sql.PooledConnection;
  5. import javax.sql.XAConnection;
  6. import javax.transaction.xa.XAException;
  7. import javax.transaction.xa.XAResource;
  8. import org.apache.seata.common.DefaultValues;
  9. import org.apache.seata.common.util.StringUtils;
  10. import org.apache.seata.config.ConfigurationFactory;
  11. import org.apache.seata.core.exception.TransactionException;
  12. import org.apache.seata.core.model.BranchStatus;
  13. import org.apache.seata.core.model.BranchType;
  14. import org.apache.seata.rm.BaseDataSourceResource;
  15. import org.apache.seata.rm.DefaultResourceManager;
  16. import org.apache.seata.rm.datasource.util.SeataXAResource;
  17. import org.apache.seata.sqlparser.util.JdbcConstants;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
  21.      ...省略部分代码...
  22.     @Override
  23.     public synchronized void commit() throws SQLException {
  24.         if (currentAutoCommitStatus || isReadOnly()) {
  25.             // Ignore the committing on an autocommit session and read-only transaction.
  26.             return;
  27.         }
  28.         if (!xaActive || this.xaBranchXid == null) {
  29.             throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
  30.         }
  31.         try {
  32.             // XA End: Success
  33.             try {
  34.                 end(XAResource.TMSUCCESS);
  35.             } catch (SQLException sqle) {
  36.                 // Rollback immediately before the XA Branch Context is deleted.
  37.                 String xaBranchXid = this.xaBranchXid.toString();
  38.                 rollback();
  39.                 throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
  40.             }
  41.             long now = System.currentTimeMillis();
  42.             checkTimeout(now);
  43.             setPrepareTime(now);
  44.             int prepare = xaResource.prepare(xaBranchXid);
  45.             // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
  46.             // only Oracle has read-only optimization; the others do not provide read-only feedback.
  47.             // Therefore, the database type check can be eliminated here.
  48.             if (prepare == XAResource.XA_RDONLY) {
  49.                 // Branch Report to TC: RDONLY
  50.                 reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
  51.             }
  52.         } catch (XAException xe) {
  53.             // Branch Report to TC: Failed
  54.             reportStatusToTC(BranchStatus.PhaseOne_Failed);
  55.             throw new SQLException(
  56.                 "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
  57.                     .getMessage(), xe);
  58.         } finally {
  59.             cleanXABranchContext();
  60.         }
  61.     }
  62.     private void xaEnd(XAXid xaXid, int flags) throws XAException {
  63.         if (!xaEnded) {
  64.             xaResource.end(xaXid, flags);
  65.             xaEnded = true;
  66.         }
  67.     }
  68.    
  69.     private synchronized void end(int flags) throws XAException, SQLException {
  70.         xaEnd(xaBranchXid, flags);
  71.         termination();
  72.     }
  73.    
  74.     private void termination() throws SQLException {
  75.         termination(this.xaBranchXid.toString());
  76.     }
  77.    
  78.     private void termination(String xaBranchXid) throws SQLException {
  79.         // if it is not empty, the resource will hang and need to be terminated early
  80.         BranchStatus branchStatus = BaseDataSourceResource.getBranchStatus(xaBranchXid);
  81.         if (branchStatus != null) {
  82.             releaseIfNecessary();
  83.             throw new SQLException("failed xa branch " + xid
  84.                     + " the global transaction has finish, branch status: " + branchStatus.getCode());
  85.         }
  86.     }
  87.     //给TC汇报XA Commit状态和XA Rollback状态
  88.     private void reportStatusToTC(BranchStatus status) {
  89.         try {
  90.             DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
  91.                     status, null);
  92.         } catch (TransactionException te) {
  93.             LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}",
  94.                     status, xid, xaBranchXid.getBranchId(), te.getCode(), te.getMessage());
  95.         }
  96.     }
  97.      ...省略部分代码...
  98. }
复制代码
2.3、XA 第二阶段的部分源码

  XA第二阶段是指XA Commit或者XA Rollback,如下所示:
9.png

当XA模式的第一阶段执行完成后,便会根据第一阶段的执行结果来执行XA模式的第二阶段,如下图所示:
10.png


  • RmBranchCommitProcessor.class的部分源码
  1. ...省略导包代码...
  2. public class RmBranchCommitProcessor implements RemotingProcessor {
  3.      ...省略部分代码...
  4.     @Override
  5.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  6.         String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  7.         Object msg = rpcMessage.getBody();
  8.         if (LOGGER.isInfoEnabled()) {
  9.             LOGGER.info("rm client handle branch commit process:" + msg);
  10.         }
  11.         handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
  12.     }
  13.    
  14.     private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
  15.         BranchCommitResponse resultMessage;
  16.         resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
  17.         if (LOGGER.isDebugEnabled()) {
  18.             LOGGER.debug("branch commit result:" + resultMessage);
  19.         }
  20.         try {
  21.             this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
  22.         } catch (Throwable throwable) {
  23.             LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
  24.         }
  25.     }
  26.      ...省略部分代码...
  27. }
复制代码

  • AbstractRMHandler.class的部分源码
  1. ...省略导包代码...
  2. public abstract class AbstractRMHandler extends AbstractExceptionHandler
  3.     implements RMInboundHandler, TransactionMessageHandler {
  4.      ...省略部分代码...
  5.     @Override
  6.     public BranchCommitResponse handle(BranchCommitRequest request) {
  7.         BranchCommitResponse response = new BranchCommitResponse();
  8.         exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
  9.             @Override
  10.             public void execute(BranchCommitRequest request, BranchCommitResponse response)
  11.                 throws TransactionException {
  12.                 doBranchCommit(request, response);
  13.             }
  14.         }, request, response);
  15.         return response;
  16.     }
  17.    
  18.     @Override
  19.     public BranchRollbackResponse handle(BranchRollbackRequest request) {
  20.         BranchRollbackResponse response = new BranchRollbackResponse();
  21.         exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
  22.             @Override
  23.             public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
  24.                 throws TransactionException {
  25.                 doBranchRollback(request, response);
  26.             }
  27.         }, request, response);
  28.         return response;
  29.     }  
  30.    
  31.     protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
  32.         throws TransactionException {
  33.         String xid = request.getXid();
  34.         long branchId = request.getBranchId();
  35.         String resourceId = request.getResourceId();
  36.         String applicationData = request.getApplicationData();
  37.         if (LOGGER.isInfoEnabled()) {
  38.             LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
  39.         }
  40.         BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
  41.             applicationData);
  42.         response.setXid(xid);
  43.         response.setBranchId(branchId);
  44.         response.setBranchStatus(status);
  45.         if (LOGGER.isInfoEnabled()) {
  46.             LOGGER.info("Branch commit result: " + status);
  47.         }
  48.    
  49.     }
  50.    
  51.     protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
  52.         throws TransactionException {
  53.         String xid = request.getXid();
  54.         long branchId = request.getBranchId();
  55.         String resourceId = request.getResourceId();
  56.         String applicationData = request.getApplicationData();
  57.         if (LOGGER.isInfoEnabled()) {
  58.             LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
  59.         }
  60.         BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
  61.             applicationData);
  62.         response.setXid(xid);
  63.         response.setBranchId(branchId);
  64.         response.setBranchStatus(status);
  65.         if (LOGGER.isInfoEnabled()) {
  66.             LOGGER.info("Branch Rollbacked result: " + status);
  67.         }
  68.     }
  69.    
  70.     @Override
  71.     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
  72.         if (!(request instanceof AbstractTransactionRequestToRM)) {
  73.             throw new IllegalArgumentException();
  74.         }
  75.         AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
  76.         transactionRequest.setRMInboundMessageHandler(this);
  77.    
  78.         return transactionRequest.handle(context);
  79.     }
  80.      ...省略部分代码...
  81. }
复制代码

  • BranchCommitRequest.class的部分源码
  1. ...省略导包代码...
  2. public class BranchCommitRequest extends AbstractBranchEndRequest {
  3.     @Override
  4.     public short getTypeCode() {
  5.         return MessageType.TYPE_BRANCH_COMMIT;
  6.     }
  7.     @Override
  8.     public AbstractTransactionResponse handle(RpcContext rpcContext) {
  9.         return handler.handle(this);
  10.     }
  11. }
复制代码

  • ResourceManagerXA.class的部分源码
  1. ...省略导包代码...
  2. public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager {
  3.      ...省略部分代码...
  4.     @Override
  5.     public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
  6.                                      String applicationData) throws TransactionException {
  7.         return finishBranch(true, branchType, xid, branchId, resourceId, applicationData);
  8.     }
  9.    
  10.     @Override
  11.     public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
  12.                                        String applicationData) throws TransactionException {
  13.         return finishBranch(false, branchType, xid, branchId, resourceId, applicationData);
  14.     }
  15.      ...省略部分代码...
  16. }
复制代码
ResourceManagerXA.class的UML关系图,如下所示:
11.png

上述类的调用关系,如下时序图所示:
12.png

ResourceManagerXA.class::finishBranch()函数最终调用了ConnectionProxyXA.class::xaCommit()函数或者ConnectionProxyXA.class::xaRollback()函数,如下所示:
  1. ...省略导包代码...
  2. public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable {
  3.      ...省略部分代码...
  4.     public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException {
  5.         XAXid xaXid = XAXidBuilder.build(xid, branchId);
  6.         xaResource.commit(xaXid, false);
  7.         releaseIfNecessary();
  8.     }
  9.    
  10.    
  11.     public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException {
  12.         if (this.xaBranchXid != null) {
  13.             xaRollback(xaBranchXid);
  14.         } else {
  15.             XAXid xaXid = XAXidBuilder.build(xid, branchId);
  16.             xaRollback(xaXid);
  17.         }
  18.     }
  19.      ...省略部分代码...
  20. }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册