基于JTA的XA模式
约 1399 字大约 5 分钟
2025-09-05
基于JTA的XA模式
JTA全称Java Transaction API,是Open Group规范中分布式事务XA规范在Java中的映射,是Java中使用事务的标准API,同时支持单机事务与分布式事务。

JTA定义了分布式事务的5种角色,不同角色关注的内容不同:
事务管理器
事务管理器Transaction Manager是分布式事务的核心,在JTA中使用TransactionManager接口表示,提供了事务操作、资源管理、同步、事务传播等功能。
- 事务管理:包括将事务对象存至线程上下文,开始事务,提交事务,回滚事务等操作;
- 资源管理:将资源与事务对象建立关系,以便通过两阶段提交事务;
- 同步:同步原文为Synchronization,其实是事务完成前后一个回调接口;
- 事务传播:指在一个事务中开启子事务时,子事务的行为,与Spring事务传播行为类似。
资源管理器
资源管理器提供了对资源的访问能力,典型的代表是关系型数据库、消息队列,在JTA中使用XAResource表示,通常通过一个资源适配器来实现,例如JDBC中数据库驱动。
通信资源管理器
通信资源管理器用于支持跨应用分布式事务管理器之间的通信,JTA规定它实现了JTS规范定义的接口即可,通常用户不用关心。
应用服务器
应用的运行环境,JTA规定事务管理器应该由应用服务器来实现,如jboss、weblogic、websphere,不过并非所有的应用服务器都实现了事务管理器,如Tomcat。如果想在标准环境使用JTA,可以使用支持JTA的第三方类库,如Atomikos、Bitronix等。
JTA Atomikos实战
1️⃣引入依赖
<dependencies>
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>2️⃣创建测试方法
private static DataSource getDataSource() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://localhost:3306/mytest?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true");
properties.put("user", "root");
properties.put("password", "RootCC123@");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
atomikosDataSourceBean.setXaProperties(properties);
atomikosDataSourceBean.setUniqueResourceName("resourceName");
atomikosDataSourceBean.setPoolSize(10);
atomikosDataSourceBean.setBorrowConnectionTimeout(60);
return atomikosDataSourceBean;
}
private static DataSource getDataSource2() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://localhost:3306/mall?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true");
properties.put("user", "root");
properties.put("password", "RootCC123@");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
atomikosDataSourceBean.setXaProperties(properties);
atomikosDataSourceBean.setUniqueResourceName("resourceName2");
atomikosDataSourceBean.setPoolSize(10);
atomikosDataSourceBean.setBorrowConnectionTimeout(60);
return atomikosDataSourceBean;
}
private static void test2() {
TransactionManager transactionManager = new UserTransactionManager();
Connection connection = null;
Connection connection2 = null;
PreparedStatement ps = null;
PreparedStatement ps2 = null;
try {
// 开启一个事务
transactionManager.begin();
// 第一个事务
connection = getDataSource().getConnection();
ps = connection.prepareStatement("insert into user(id,username,name) values(" + RandomUtil.randomInt() + ",'admin','管理员')");
ps.executeUpdate();
// 第二个事务
connection2 = getDataSource2().getConnection();
ps2 = connection2.prepareStatement("insert into user(id,username,name) values(" + RandomUtil.randomInt() + ",'admin','管理员')");
ps2.executeUpdate();
if (Math.random() > 0) {
throw new RuntimeException("模拟错误发生了");
}
// 提交事务
transactionManager.commit();
} catch (NotSupportedException | SystemException | SQLException | RollbackException | HeuristicMixedException |
HeuristicRollbackException | RuntimeException e) {
try {
transactionManager.rollback();
} catch (SystemException ex) {
ex.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (ps != null) ps.close();
if (ps2 != null) ps2.close();
if (connection != null) connection.close();
if (connection2 != null) connection2.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}3️⃣编写测试方法
public static void main(String[] args) throws Exception {
test2();
}4️⃣实际执行结果
在MySQL中存在两个数据库,这里为了方便就支持就建立了一样表。在向第一个数据源插入数据的时候没有发生异常,然后继续向第二个数据源插入数据,插入完成后发生了异常,我们通过TransactionManager的rollback()方法将两个数据源中事务都进行了回滚。
【异常信息】XAER_RMERR: Fatal error occurred in the transaction branch - check your data for consistency
出现这种异常的时候是因为数据库权限的问题:
GRANT XA_RECOVER_ADMIN ON *.* TO root@'%' ; flush PRIVILEGES;
SpringBoot集成JTA
基于SpringBoot集成JTA可以使用Spring的@Transaction注解,让Spring自动管理事务的开启、提交、回滚,而无需手动调用UserTransactionManager。
1️⃣引入依赖(与上一小节中的依赖保持一致,注意它与Spring之间兼容的版本即可)
2️⃣配置XA数据源
@Bean(name = "orderDataSource")
public DataSource orderDataSource() {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName("orderDB");
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
Properties xaProps = new Properties();
xaProps.setProperty("URL", "jdbc:mysql://localhost:3306/order_db?useSSL=false");
xaProps.setProperty("user", "root");
xaProps.setProperty("password", "123456");
ds.setXaProperties(xaProps);
ds.setPoolSize(5);
return ds;
}
// ======================
// 2. 配置 XA 数据源(数据库2:inventory_db)
// ======================
@Bean(name = "inventoryDataSource")
public DataSource inventoryDataSource() {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName("inventoryDB");
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
Properties xaProps = new Properties();
xaProps.setProperty("URL", "jdbc:mysql://localhost:3306/inventory_db?useSSL=false");
xaProps.setProperty("user", "root");
xaProps.setProperty("password", "123456");
ds.setXaProperties(xaProps);
ds.setPoolSize(5);
return ds;
}
// ======================
// 3. 配置 JTA TransactionManager & UserTransaction
// ======================
@Bean
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(300);
return userTransactionImp;
}
@Bean
public TransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
// ======================
// 4. 配置 Spring 的 JtaTransactionManager(让 @Transactional 生效)
// ======================
@Bean
public PlatformTransactionManager jtaTransactionManager() throws Throwable {
return new JtaTransactionManager(userTransaction(), transactionManager());
}3️⃣使用@Transaction注解来管理分布式事务
@Service
public class OrderService {
@Resource(name = "orderDataSource")
private DataSource orderDataSource;
@Resource(name = "inventoryDataSource")
private DataSource inventoryDataSource;
/**
* 使用 @Transactional 注解,Spring 会自动使用 JTA 管理分布式事务
*/
@Transactional
public void placeOrder(int orderId, int productId, int quantity) throws SQLException {
// 操作订单库
try (Connection conn = orderDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"INSERT INTO orders (id, product_id, quantity) VALUES (?, ?, ?)")) {
ps.setInt(1, orderId);
ps.setInt(2, productId);
ps.setInt(3, quantity);
ps.executeUpdate();
}
// 操作库存库
try (Connection conn = inventoryDataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"UPDATE inventory SET stock = stock - ? WHERE product_id = ?")) {
ps.setInt(1, quantity);
ps.setInt(2, productId);
int rows = ps.executeUpdate();
if (rows == 0) {
throw new RuntimeException("库存不足,商品ID: " + productId);
}
}
// TODO: 如果集成了 JMS,这里可以发送事务性消息(如订单创建成功通知)
// 该消息发送也会纳入当前 JTA 全局事务中(如果使用 XAConnectionFactory)
System.out.println("✅ 订单与库存操作成功(事务自动提交)");
}
}4️⃣观察数据库中的结果可以看到当库存扣减失败的时候,订单也是无法扣减成功的。