分布式事务
约 2150 字大约 7 分钟
2025-10-12
分布式事务
在使用分库分表的时候不可避免的会涉及到多数据源的情况,多数据源可能是单数据库实例,也可能是多数据库实例。在多数据库实例的场景下,就不可避免的出现了分布式事务的场景。
例如:我们的订单表(ds_1.t_order_0和ds_2.t_order_0)和商品表(ds_1.t_product_0和ds_2.t_product_0)都进行了分库分表。此时,用户下单,我们需要做的操作是创建订单 + 扣减商品库存,根据分片的的规则是非常容易出现,要添加的订单需要添加到ds_1.t_order_0表中,要扣减库存的商品在ds_2.t_product_0表中。所以如果要保证创建订单与扣减库存的原子性,就不可避免涉及到了分布式事务的解决方案。

ShardingSphere的分布式事务的支持
ShardingSphere提供了多种方式来支持和管理分布式事务,以确保在分库分表场景下事务执行的正确性。
XA协议事务(强一致性)
基于XA规范(两阶段提交,2PC),通过协调器(TM)管理多个资源管理器(RM)的事务。XA事务的特点是:
- 强一致性
- 支持回滚
- 性能相对较低(因为需要两阶段提交,有同步阻塞)
- 适用于对一致性要求高的业务场景
支持XA的实现有:Atomikos、Narayana和Bitronix三种。
但是在使用的过程中发现,。因为从Spring Boot 3.x 开始就全面迁移到了Jakarta EE 9+,所有的Javax.*的类都迁移到了jakarta.*命名空间,而sharding-sphere 5.5中的XA事务主要依赖与Atomikos和Narayana等第三方分布式事务管理器,这些库目前大部分都还是基于javax.transaction.*的API。
降低ShardingShpere-JDBC的版本到5.2.1版本,来实现基于XA协议的分布式事务的一致性。
1️⃣引入XA相关的依赖
<!-- 默认是Atomikos实现 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-xa-core</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- 使用 XA 的 Narayana模式时,需要引入此模块 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-xa-narayana</artifactId>
<version>${project.version}</version>
</dependency>
<!-- 使用 BASE 事务时,需要引入此模块 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-base-seata-at</artifactId>
<version>${shardingsphere.version}</version>
</dependency>2️⃣进行Transaction相关的配置
rules:
- !TRANSACTION
defaultType: XA
providerType: Atomikos3️⃣建表语句以及CRUD的代码构建
create table t_order_0(
order_id bigint,
gmt_create datetime not null default current_timestamp,
gmt_update DATETIME not null default current_timestamp on UPDATE CURRENT_TIMESTAMP,
price DECIMAL(8,2) not null default 0.00,
user_id bigint,
primary key (order_id)
)engine=InnoDB DEFAULT charset=utf8mb4;
CREATE table t_product_0(
product_id bigint primary key,
gmt_create datetime not null default current_timestamp,
gmt_update datetime not null default current_timestamp on update current_timestamp,
product_name varchar(50) not null,
store_count int8 not null default 0
)engine=InnoDB default charset=utf8mb4;注意:这里只贴出了建表语句,实际上还需要在两个数据库ds_1和ds_2中都建立这两张表。
4️⃣测试代码
在要进行分布式事务控制的代码上添加@ShardingSphereTransaction注解进行标注,让它可以被ShardingSphere代理到,然后通过Atomikos进行分布式事务的控制。
@Override
@ShardingSphereTransactionType(TransactionType.XA)
public void create() {
// 插入订单
OrderEntity orderEntity = new OrderEntity();
int orderId = RandomUtil.randomInt();
orderEntity.setOrderId(orderId % 2 == 0 ? orderId : orderId + 1L);
orderEntity.setUserId(1L);
orderEntity.setPrice(new BigDecimal("100.00"));
orderMapper.insert(orderEntity);
// 模拟出现异常
int a = 1 / 0;
// 扣减库存
long productId = 1L;
productMapper.update(new LambdaUpdateWrapper<ProductEntity>()
.eq(ProductEntity::getProductId, productId)
.setSql("store_count = store_count - 1"));
}最后出现的结果就是订单创建失败,库存也不会进行扣减,这就是我们需要的场景实现。
BASE事务(柔性事务)
实现了ACID特定的事务称之为刚性事务,基于BASE事务要素的称之为柔性事务。
刚性事务对隔离性的要求很高,在事务执行的过程中,必须锁定所有的资源。柔性事务是通过业务逻辑将互斥锁操作从资源层面上移到业务层面。通过降低对一致性的要求,来增加系统的吞吐量。
ShardingSphere提供了一套接入分布式事务的SPI,设计的目标是保证数据分片后事务的ACID语义。分布式事务的实现目前主要包含两阶段的XA和BASE 柔性事务。SEATA的AT事务作为Base柔性事务的一种实现,可以无缝接入到ShardingShpere中。
ShardingSphere集成了两套柔性事务的框架:ServiceComb Saga和Seata AT。ServiceComb Saga是通过一阶段提交 + 补偿的方式来提高整体事务的性能,其中补偿的方式同Seata大致相同(根据执行的SQL反向生成补偿的SQL)。
我们这里仍然使用的5.2.1版本的shardingsphere-jdbc-core依赖来实现,步骤跟上面其实都差不多的。
1️⃣引入依赖
<!-- 引入ShardingSphere依赖 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- 引入ShardingSphere 与 Seata集成时的依赖 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-base-seata-at</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- 引入Seata的依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2023.0.1.2</version>
</dependency>2️⃣添加配置文件
在application.yml中添加Seata相关的依赖,其中额外注意的在于不要让Seata来代理这个数据源,因为ShardingSphere也需要代理这个数据源;
seata:
enable: true,
application-id: ${spring.application.name}
enable-auto-data-source-proxy: false # 关闭自动代理数据源
tx-service-group: default_tx_group
data-source-proxy-mode: AT # 数据源使用XA模式
service:
vgroup-mapping:
default_tx_group: default
group-list:
default: 124.222.91.208:8091
registry:
type: file # 表示采用file作为注册中心
config:
type: file # 表示采用file作为配置中心Seata不需要创建自己的DatasourceProxy,数据源的管理可以完全的交给ShardingSphere。ShardingSphere会自行集成Seata的事务能力(通过内置的SeataATShardingSphereTransactionManager来实现)。
也正因为使用了AT模式,所以需要再每个数据源中都建立一张undo_log的表。
CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`branch_id` bigint NOT NULL COMMENT '分支事务ID',
`xid` varchar(100) CHARACTER SET utf8mb3 COLLATE utf8_general_ci NOT NULL COMMENT '全局事务唯一标识',
`context` varchar(128) CHARACTER SET utf8mb3 COLLATE utf8_general_ci NOT NULL COMMENT '上下文',
`rollback_info` longblob NOT NULL COMMENT '回滚信息',
`log_status` int NOT NULL COMMENT '状态,0正常,1全局已完成(防悬挂)',
`log_created` datetime NOT NULL COMMENT '创建时间',
`log_modified` datetime NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='AT模式回滚日志表';在sharding.yml中需要修改Transaction的相关配置:
rules:
- !TRANSACTION
defaultType: BASE
providerType: Seata因为我这里使用的是shardingsphere-jdbc-core依赖,不是sharding-jdbc-spring-boot-starter依赖,所以需要在resources文件夹下面再创建一个seata.conf来给ShardingShpere使用。
client {
application.id = work-shardingsphere ## 应用唯一主键
transaction.service.group = default_tx_group ## 所属事务组
}3️⃣创建代码
这需要注意的是TransactionType需要配置为BASE了,表示柔性事务了。。
@Override
@Transactional
@ShardingSphereTransactionType(TransactionType.BASE)
public void create() {
// 插入订单
OrderEntity orderEntity = new OrderEntity();
int orderId = RandomUtil.randomInt();
orderEntity.setOrderId(orderId % 2 == 0 ? orderId : orderId + 1L);
orderEntity.setUserId(1L);
orderEntity.setPrice(new BigDecimal("100.00"));
orderMapper.insert(orderEntity);
// 扣减库存
long productId = 1L;
productMapper.update(new LambdaUpdateWrapper<ProductEntity>()
.eq(ProductEntity::getProductId, productId)
.setSql("store_count = store_count - 1"));
// 模拟出现异常
int a = 1 / 0;
}ShardingShpere与Seata集成过程中的问题
在ShardingSphere与SpringBoot3.x集成的过程中,我首先是尝试了使用shardingshpere-jdbc:5.5.2版本。但是在5.5.2版本中,都会出现SPI加载不到对应的类。这个主要是SpringBoot 3.x的命名空间发生了变化。
将ShardingSphere版本降低到5.2.1版本的时候,其实有两个选择:
- 使用shardingsphere-jdbc-core依赖来进行集成
- 使用shardingsphere-jdbc-core-spring-boot-starter来进行集成(内核使用的也是shardingsphere-jdbc-core:5.2.1)
但是因为之前在前面使用的都是直接用ShardingSphere的核心依赖来集成的,所以后续没有使用。如果使用的是5.2.1版本的ShardingShpere的话,还是推荐使用shardingsphere-jdbc-core-spring-boot-starter来进行集成,主要原因是后续使用Nacos和Seata进行进一步集成的时候会更更加的方便。
将ShardingSphere版本降低到5.2.1版本的时候,它与SpringBoot3.x中的snakeyml:2.2.0版本冲突了,少了两个类:
org.yaml.snakeyaml.inspector.TagInspectororg.yaml.snakeyaml.inspector.UnTrustedTagInspector
处理的办法就是直接下载2.2.0版本的依赖,然后找到这两个类,直接复制到我们的代码中,主要复制过来的类的全限定类型必须与上面保持一致。