Spring事务详解
约 10908 字大约 36 分钟
2025-12-13
Spring 事务管理
Spring事务管理的原理
@EnableTransactionManagement注解的原理
在Spring中开启事务需要用到@EnableTransactionManagement注解:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Integer.MAX_VALUE;
RollbackOn rollbackOn() default RollbackOn.RUNTIME_EXCEPTIONS;
}可以看到这个注解的作用就是导入TransactionManagementConfigurationSelector这个类:
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
public TransactionManagementConfigurationSelector() {
}
protected String[] selectImports(AdviceMode adviceMode) {
String[] var10000;
switch (adviceMode) {
case PROXY -> var10000 = new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ -> var10000 = new String[]{this.determineTransactionAspectClass()};
default -> throw new IncompatibleClassChangeError();
}
return var10000;
}
private String determineTransactionAspectClass() {
return ClassUtils.isPresent("jakarta.transaction.Transactional", this.getClass().getClassLoader()) ? "org.springframework.transaction.aspectj.AspectJJtaTransactionManagementConfiguration" : "org.springframework.transaction.aspectj.AspectJTransactionManagementConfiguration";
}
}在这里可以看到Spring的事务有两种代理的实现方式,一种是ASPECTJ,一种是PROXY两种模式:
- PROXY模式(默认模式):基于JDK的动态代理(针对接口)或CGLIB代理(针对类)实现AOP拦截;
- 适用于通过Spring容器管理的Bean
- 只能拦截外部方法调用,类内部方法调用无法被事务拦截(例如,同一个类中方法A调用方法B,方法B的事务注解不会生效);
@EnableTransactionManagement(proxyTargetClass = false) // 使用 JDK 代理
@EnableTransactionManagement(proxyTargetClass = true) // 使用 CGLIB 代理- ASPECTJ模式:基于AspectJ的编译时或加载时织入(Load-Time Weaving,LTW),直接修改字节码。
- 适用于非Spring管理的对象,如直接
new创建的对象; - 可以拦截所有方法调用,包括内部方法调用;
@EnableTransactionManagement(mode = AdviceMode.ASPECTJ)根据目前的开发环境而言,我们的应用程序都会运行在Spring的环境中,所以更重要的是基于PROXY模式下的代理方式。可以看到在Proxy模式下注入了两个Bean:AutoProxyRegistrar和ProxyTransactionManagementConfiguration。
所以**@EnableTransactionManagement注解的最终作用就是往Spring中注入两个类:AutoProxyRegistrar和ProxyTransactionManagementConfiguration;**
AutoProxyRegistrar
AutoProxyRegistrar是Spring框架中一个重要的基础设施类,主要用于自动注册AOP代理创建器。它根据@EnableTransactionManagement注解中的proxyTargetClass属性的值来决定是否需要强制使用CGLIB代理。
AutoProxyRegistrar 是一个通用的 Spring AOP 基础设施类,不是 Spring 事务专属的。为所有基于代理的 AOP 功能(包括但不限于事务、缓存、异步、安全等)提供统一的自动代理创建能力。
ProxyTransactionManagementConfiguration
ProxyTransactionManagementConfiguration类中主要是向Spring中注入事务需要几个组件:
@Configuration(proxyBeanMethods = false)
@Role(2) // 表示基础设置类 ROLE_INFRASTRUCTURE
@ImportRuntimeHints({TransactionRuntimeHints.class}) // 用于 GraalVM 原生镜像的运行时提示
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
public ProxyTransactionManagementConfiguration() {
}
@Bean(name = {"org.springframework.transaction.config.internalTransactionAdvisor"})
@Role(2)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource); // 设置事务源
advisor.setAdvice(transactionInterceptor); // 设置事务拦截器
if (this.enableTx != null) {
advisor.setOrder((Integer)this.enableTx.getNumber("order")); // 设置执行顺序
}
return advisor;
}
@Bean
@Role(2)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource); // 设置事务属性源
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager); // 设置事务管理器
}
return interceptor;
}
}所以在ProxyTransactionManagementConfiguration类中注入的Bean为:
- BeanFactoryTransactionAttributeSourceAdvisor:作为AOP的切面,将事务拦截器与事务属性源绑定,实现对事务方法的识别和拦截;
- TransactionInteceptor:拦截具体的事务方法
TransactionInteceptor
TransactionInteceptor类实现了MethodInterceptor接口,就表示这个类是一个切面,那么核心就是invoke()方法,因为这是对目标方法增强的核心逻辑。
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 获取要代理的目标类
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new TransactionAspectSupport.CoroutinesInvocationCallback() {
@Nullable
public Object proceedWithInvocation() throws Throwable {
// 执行原始对象中的目标方法
return invocation.proceed();
}
public Object getTarget() {
return invocation.getThis();
}
public Object[] getArguments() {
return invocation.getArguments();
}
});
}在invoke()方法里面有个很核心的方法invokeWithinTransaction()方法(在TransactionAspectSupport类中),它定义事务处理逻辑的模板。
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 获取事务注解上的一些信息
TransactionAttributeSource tas = this.getTransactionAttributeSource();
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
TransactionManager tm = this.determineTransactionManager(txAttr);
// 如果当前IOC容器中TransactionManager的类是ReactiveTransactionManager(一般而言不会的)
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
// 。。。 一般不会是ReactiveTransactionManger,这里不看了
} else {
// 这个很重要,因为一般就是PlatformTransactionManager
PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);
String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager cpptm) {
// 这个一般不会去用的,大部分用的都是DatasourceTransactionManager,所以这个分支可以不看了
} else {
// 进入这个分支(PlatformTransactionManager类型就是DataSourceTransactionManager)
// 创建一个事务的执行环境
TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行原始对应的目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// 如果执行失败了的话,执行后续的这个方法
this.completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 最后清楚这个事务的相关缓存信息
this.cleanupTransactionInfo(txInfo);
}
if (retVal != null && txAttr != null) {
// 获取本次事务的执行状态
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null) {
label216: {
if (retVal instanceof Future) {
Future<?> future = (Future)retVal;
if (future.isDone()) {
try {
future.get();
} catch (ExecutionException ex) {
if (txAttr.rollbackOn(ex.getCause())) {
status.setRollbackOnly();
}
} catch (InterruptedException var28) {
Thread.currentThread().interrupt();
}
break label216;
}
}
if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {
retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
}
}
// 在事务提交到数据库后执行
this.commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}可以看到在invokeWithinTransaction()方法中就定义好了事务执行的一个大体框架,它基础的伪代码如下:
构建事务的基础环境
try{
执行原始对象的目标方法
}catch(Throwable ex){
回滚事务
}
提交事务TransactionSynchronizationManage
在了解了TransactionInterceptor这个类的基础框架后,为了方便后面的代码,我们需要先了解一个类TransactionSynchronizationManage,这的类的作用在Spring事务的进程中是非常重要的,它是Spring事务管理中核心的工具类,主要用于管理事务上下文和资源绑定。
- 在当前线程中管理事务相关的资源和同步状态,实现事务上下文与线程的绑定;
- 事务同步管理,支持注册事务同步回调;
- 事务的状态管理,支持检查当前线程是否有活跃的事务、获取当前事务的名称、获取事务是否是只读、获取事务的隔离级别等;
它的工作原理:
Current ThreadTransactionSynchronizationManager as BTransactionManagerCurrent ThreadTransactionSynchronizationManager as BTransactionManagerThreadLocal<Map<Object,Object>> resourcesThreadLocal<Set<TransactionSynchronization>> synchronizations开启事务1绑定事务资源到ThreadLocal上2注册同步器3存储同步器4提交事务5触发beforeCommit()回调函数6触发beforeCompletion()回调函数7实际提交事务8触发afterCommit()回调函数9触发afterCompletion()回调函数10清理ThreadLocal资源11在TransactionSynchronizationManager中有四个核心的ThreadLocal来存储在执行事务期间用到的各种资源信息:
// 事务资源绑定,存储与当前事务绑定的资源,如数据库连接、DataSource等信息
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
// 事务回调绑定,存储当前事务注册的回调函数、afterCommit()、beforCommit()等
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations");
// 事务的名称绑定
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name");
// 当前事务是否是只读事务
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status");
// 当前事务的传播机制
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level");
// 当前事务是否处于激活状态(因为隔离级别的存在,有时候需要挂起当前事务)
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active");大概要知道上面的几个ThreadLocal分别存储的是什么类型的信息,这样在下面创建事务或回滚的时候才能更加清楚。
构建事务基础环境
createTransactionIfNecessary()方法就是为了构建事务基础环境的:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 这里就是给这个事务设置一个名称,默认就是方法名com.xxx.xxx.service.impl.UserServiceImpl.saveUser
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 创建一个事务
status = tm.getTransaction(txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
return this.prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
// 将事务的一些信息绑定到ThreadLocal中
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {
// 构建一个事务
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 创建一个事务
txInfo.newTransactionStatus(status);
} else if (this.logger.isTraceEnabled()) {
this.logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional.");
}
// 将事务绑定到当前线程的ThreadLocal中
txInfo.bindToThread();
return txInfo;
}核心点:
txAttr指的是事务的相关属性:
{
"rollbackRules": [], // 回滚规则
"descriptor": "com.xxx.xxx.service.impl.UserServiceImpl.saveUser",
"timeoutString": "",
"qualifier": "",
"labels": [],
"propagationBehavior": 0,
"isolationLevel": -1,
"timeout": -1,
"readOnly": false
}事务的相关属性是从@Transaction注解中获取对应的信息,如果获取到了对应的信息,表示这个方法的确是需要进行事务增强的,就会调用getTransaction()方法
1️⃣**getTransaction()**方法逻辑的大致梳理:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 获取到事务的一些相关属性
TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();
// 创建一个事务(构建一个DataSourceTransactionObject对象存储了事务要执行的上下文信息)
Object transaction = this.doGetTransaction();
boolean debugEnabled = this.logger.isDebugEnabled();
/**
* 这里的判断是比较重要的,它是为了实现事务隔离级别的关键实现
* 如果当前事务方法是被另外一个事务方法调用的,那么当前线程就是已经开启过事务的,这里得到的就是true,反之就是false
*/
if (this.isExistingTransaction(transaction)) {
// 按照当前已经处于事务环境中进行处理
return this.handleExistingTransaction(def, transaction, debugEnabled);
} else if (def.getTimeout() < -1) {
// 校验下事务的超时时间设置是否符合要求,如果设置的值是小于-1的,就直接抛出异常
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
} else if (def.getPropagationBehavior() == 2) {
// 校验下事务的隔离级别是否符合要求,2表示的使PROPAGATION_MANDATORY传播机制,它必须运行事务环境中,所以这里就直接报错
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
/**
* def.getPropagationBehavior() != 0 :传播机制不是 PROPAGATION_REQUIRED
* def.getPropagationBehavior() != 3 :传播机制不是 PROPAGATION_REQUIRES_NEW
* def.getPropagationBehavior() != 6 :传播机制不是 PROPAGATION_NESTED
* 所以这里的意思就是当前事务的传播机制不是 PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
* 那么就只有:PROPAGATION_SUPPORTS、PROPAGATION_NEVER、PROPAGATION_NOT_SUPPORTED 只有这三种允许或必须运行在无事务环境中
*/
else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {
// 如果隔离级别不是 -1,就表示这个事务被用户设置了自定义的隔离机制,打印对应的信息记录下这个信息
if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
}
/**
* 这里是判断同步器的行为模式
* SYNCHRONIZATION_ALWAYS(默认值:0):总是注册同步器,即使没有实际的事务
* SYNCHRONIZATION_ON_ACTUAL_TRANSACTION(1):仅存在实际事务时注册同步器
* SYNCHRONIZATION_NEVER(2):从不注册事务同步器
* 当前没有事务环境,所以只有在transactionSynchronization的值为0的时候才需要去执行同步器
*/
boolean newSynchronization = this.getTransactionSynchronization() == 0;
// 在当前非事务环境下执行原始目标的方法
return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
}
/**
* 在这里的时候,事务的传播机制为:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
* 这三种都是需要运行在事务环境中的
*/
else {
// 这里是将事务的内容直接挂起(因为当前没有事务,所以挂起的资源为空)
SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
if (debugEnabled) {
Log var10000 = this.logger;
String var10001 = def.getName();
var10000.debug("Creating new transaction with name [" + var10001 + "]: " + def);
}
try {
// 开始事务的执行
return this.startTransaction(def, transaction, false, debugEnabled, suspendedResources);
} catch (Error | RuntimeException ex) {
this.resume((Object)null, suspendedResources);
throw ex;
}
}
}doGetTransaction()
protected Object doGetTransaction() {
// 创建一个事务(事务的信息的类型就是DataSourceTransactionObject)
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 判断当前事务的是否是嵌入式事务(传播机制相关)如果是的话就添加一个savePoint
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
// 从ThreadLocal中获取到当前线程的一个事务资源信息(包含数据源、数据库连接等信息)
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
// 将这个事务要用到的数据源信息添加到事务上下文中
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
// 这里的DataSourcce就是我们在创建TransactionManager的时候设置进来的DataSource
protected DataSource obtainDataSource() {
DataSource dataSource = this.getDataSource();
Assert.state(dataSource != null, "No DataSource set");
return dataSource;
}isExistingTransaction()
判断当前线程是否已经开启过一个事务了。
// 这里的Object是从doGetTransaction()方法返回的事务上下文对象
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
// 判断当前事务对象中是否存在一个连接,并且这个连接是活跃状态
return txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive();
}实际上就是判断当前线程是否已经创建了一个数据库连接,如果是的话,一般而言这里就是已经在事务环境中了(非@Transaction标注的方法不需要进行隔离级别的判断)。
所以在getTransaction()方法中主要任务就是:
- 获取当前线程的事务资源信息(从ThreadLocal中获取到对应的信息),然后得到一个DataSourceTransactionObject类型对象,该类中的ConnectionHolder属性存储就是资源信息;
- 判断当前线程的事务资源信息是否为空,如果为空就表示当前线程处于事务环境中,反之就表示当前线程不处于事务环境中
- 如果事务资源不为空:就执行
handleExistingTransaction()方法,需要按照事务的传播机制进行执行; - 如果事务资源为空:就需要根据事务的传播机制来进行分类
- 如果事务的传播机制为:PROPAGATIONSUPPORTS、PROPAGATIONNEVER、PROPAGATIONNOTSUPPORTED(都可以以非事务的方式执行),需要去执行
prepareTransactionStatus()方法 - 如果事务的传播机制为:PROPAGATIONREQUIRED、PROPAGATIONREQUIRESNEW、PROPAGATIONNESTED(都需要以事务的方式执行),需要去执行
startTransaction()方法;
- 如果事务的传播机制为:PROPAGATIONSUPPORTS、PROPAGATIONNEVER、PROPAGATIONNOTSUPPORTED(都可以以非事务的方式执行),需要去执行
2️⃣当前没有事务环境且事务的传播机制为(PROPAGATIONREQUIRED、PROPAGATIONREQUIRESNEW、PROPAGATIONNESTED)执行startTransaction()方法
在这里无论是哪种传播机制都是需要开启一个新事务的。
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 判断同步器的工作模式,只要不是2,在当前阶段都是需要去注册同步器的
boolean newSynchronization = this.getTransactionSynchronization() != 2;
// 创建一个新的事务
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
// 事件监听器执行对应的方法
this.transactionExecutionListeners.forEach((listener) -> listener.beforeBegin(status));
try {
// 开启一个事务
this.doBegin(transaction, definition);
} catch (Error | RuntimeException ex) {
// 如果抛出异常,利用事件监听机制发布一个事务开启后的事件
this.transactionExecutionListeners.forEach((listener) -> listener.afterBegin(status, ex));
throw ex;
}
// 准备执行事务同步器
this.prepareSynchronization(status, definition);
//事件监听器执行对应的方法
this.transactionExecutionListeners.forEach((listener) -> listener.afterBegin(status, (Throwable)null));
return status;
}newTransactionStatus()
这个方法主要就是创建一个事务,同时计算下当前事务是否需要创建同步器。如果当前事务开启了同步器且当前的同步器实例为空,就表示是需要创建的,反之就是不用创建同步器。
private DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean nested, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(definition.getName(), transaction, newTransaction, actualNewSynchronization, nested, definition.isReadOnly(), debug, suspendedResources);
}doBegin()
事务创建成功后就会有一个DefaultTransactionStatus的对象,然后下一步自然就是开启这个事务(模板方法,由子类实现)。
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 获取当前事务的上下文对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
Connection con = null;
try {
// 当前线程还没有获取到数据库连接
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 当前事务上下文中获取到数据源信息,然后创建一个连接
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 将这个连接的信息存储到事务上下文对象中(方便后续的流程继续使用)并标识当前是一个新创建的连接信息
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 将synchronizedWithTransaction属性设置为true,表示已经获取过连接了
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 如果当前事务是只读事务,就设置它只读事务的标识为true,同时返回它的隔离级别设置到事务上下文对象中
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// 因为需要开启事务(可能包含多条需要执行的SQL语句,所以需要将autoCommit属性设置为false
if (con.getAutoCommit()) {
// 在事务的上下文中记录:当前事务的autoCommit设置工作已经结束
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果当前事务是只读事务,就执行sql:SET TRANSACTION READ ONLY
this.prepareTransactionalConnection(con, definition);
// 在事务上下文中记录:当前事务的状态已经激活
txObject.getConnectionHolder().setTransactionActive(true);
// 获取当前事务的超时时间
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
// 在事务上下文中记录当前事务的超时时间(默认值为-1,表示永不超时)
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 判断当前事务是不是创建一个了新的连接,如果是的话,把这个新的连接绑定到当前线程中(实际上就是存储到ThreadLocal中)
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
// 如果出现了异常,这里就要清楚掉数据源信息(只有在当前连接是新建立的才可以清楚,否则是不行的)因为它用的连接可能是另外一个事务方法创建的
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}所以在Spring中开启一个事务主要的工作就是:
- 将
autoCommit属性的值设置为false,防止自动提交 - 如果当前事务是read-only(只读事务),执行SQL
SET TRANSACTION READ ONLY将当前事务设置为只读事务;
3️⃣当前没有事务环境并且传播机制为PROPAGATIONSUPPORTS、PROPAGATIONNEVER、PROPAGATIONNOTSUPPORTED,就会调用prepareTransactionStatus()方法。
// transaction = null,newTransaction = true
private DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 创建一个事务
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, newTransaction, newSynchronization, false, debug, suspendedResources);
// 准备执行对应的同步器方法
this.prepareSynchronization(status, definition);
return status;
}所以在prepareTransactionStatus()只是简单的构建了一个DefaultTransactionStatus对象返回;
4️⃣当前存在事务环境,这个就比较复杂了(需要再次对传播机制进行分析)
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
// 如果当前传播机制为ROPAGATION_NEVER就直接抛出异常,因为当前传播机制不允许运行在事务环境中
if (definition.getPropagationBehavior() == 5) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
}
// 如果当前传播机制为ROPAGATION_NOT_SUPPORT表示挂起当前事务,然后以非事务的方式执行当前方法
else if (definition.getPropagationBehavior() == 4) {
// 打印挂起事务的日志
if (debugEnabled) {
this.logger.debug("Suspending current transaction");
}
// 挂起当前事务资源
Object suspendedResources = this.suspend(transaction);
// 判断是否需要创建同步器
boolean newSynchronization = this.getTransactionSynchronization() == 0;
// 构建一个DefaultTransactionStatus对象(false表示不需要新建事务)
return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果当前传播机制为ROPAGATION_REQUIRES_NEW,表示需要挂起当前事务,然后创建一个新的事务执行目标方法
else if (definition.getPropagationBehavior() == 3) {
// 打印挂起事务的日志
if (debugEnabled) {
this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
}
// 挂起当前事务的资源
SuspendedResourcesHolder suspendedResources = this.suspend(transaction);
try {
// 开启一个新的事务(false表示是否是嵌套事务)
return this.startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
} catch (Error | RuntimeException beginEx) {
// 如果出现了异常,就执行清楚对应的资源信息,然后将异常传递到调用方
this.resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 如果当前传播机制为PROPAGATION_NESTED表示这是一个嵌套事务
else if (definition.getPropagationBehavior() == 6) {
// 判断全局配置中是否开启了嵌套事务的支持,如果没有开启就直接报错
if (!this.isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
} else {
// 打印嵌套日志相关的信息
if (debugEnabled) {
this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 使用savePoint来开启一个嵌套事务
if (this.useSavepointForNestedTransaction()) {
// 创建一个事务对象并且标记为不需要新开事务(因为使用savePoint来实现的),不需要创建事务同步器
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, false, false, true, debugEnabled, (Object)null);
// 执行事务执行过程监听器
this.transactionExecutionListeners.forEach((listener) -> listener.beforeBegin(status));
try {
// 创建一个savePoint
status.createAndHoldSavepoint();
} catch (Error | RuntimeException ex) {
this.transactionExecutionListeners.forEach((listener) -> listener.afterBegin(status, ex));
throw ex;
}
// 执行对应的事务监听器的回调函数
this.transactionExecutionListeners.forEach((listener) -> listener.afterBegin(status, (Throwable)null));
return status;
} else {
// 如果嵌套事务不是通过savePoint来实现的,就通过新开事务的方式实现
return this.startTransaction(definition, transaction, true, debugEnabled, (SuspendedResourcesHolder)null);
}
}
} else {
// 如果传播机制为:PROPAGATION_REQUIRED、PROPAGATION_SUPPORTS、PROPAGATION_MANDATORY都是加入当前事务中
if (debugEnabled) {
this.logger.debug("Participating in existing transaction");
}
// 判断是否要执行同步器
boolean newSynchronization = this.getTransactionSynchronization() != 2;
return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
}
}所以在有事务环境下:
- 传播机制为ROPAGATION_NEVER,则直接抛出异常;
- 传播机制为ROPAGATIONNOTSUPPORT,则挂起当前事务,以非事务的方式执行当前方法;
- 传播机制为ROPAGATIONREQUIRESNEW,则挂起当前事务,创建一个新的事务;
- 传播机制为PROPAGATION_NESTED,分为两种实现方式:
- 以savepoint的方式实现 ,不创建新的事务;
- 不以savepoint的方式实现,挂起当前事务,创建新的事务;
- 传播机制为PROPAGATIONREQUIRED、PROPAGATIONSUPPORTS、PROPAGATION_MANDATORY,则直接创建一个加入到当前事务中;
所以在构建事务的基础环境中主要是根据事务的传播机制来构建出以个DefaultTransactionStatus对象即可。
执行被代理的方法
当createTransactionIfNecessary()方法执行完毕后,就会回到TransactionAspectSupport#invokeWithinTransaction()方法中:
// 这里执行的是切面传进来的回调函数(实际上就是被代理的目标方法)
retVal = invocation.proceedWithInvocation();这个没有什么讲述的,主要就是理解MethodInterceptor接口的代理机制;
提交事务
被代理的方法执行完之后,就会涉及到两个问题:提交事务或者是回滚事务也就是执行被代理的方法时没有出现异常,也就是最后会去执行commitTransactionAfterReturning()方法:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
// 如果事务信息不为空(根据传播机制,对于当前方法而言,它是不创建事务的)
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 执行对应的提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}然后就会来到默认的提交实现AbstractPlatformTransactionManager#commit()方法:
public final void commit(TransactionStatus status) throws TransactionException {
// 事务已经执行完了(已经调用过commit或rollback后又再次被调用了)
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
// 获取事务的状态
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
// 检查本地回滚状态(例如显式调用 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly() 强制回滚)
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Transactional code has requested rollback");
}
// 执行回滚
this.processRollback(defStatus, false);
}
/**
* 这里的意思这个事务是否被标记为了全局回滚
* 这个全局回滚标记不是当前事务标记的,可能是由传播机制新开的事务执行失败标记的,此时最终在执行上层事务的时候就需要判断这个标记
* 如果 commitGlobalRollbackOnly 参数被设置为true的时候,在全局事务标记为回滚的时候也会被提交掉
* 简单来讲,就是commitGlobalRollbackOnly=true的时候,即使下级事务触发了事务回滚将当前事务也标记为回滚,则仍然会提交当前事务;
* 一般而言,将其设置为false,也不建议去修改这个配置,会导致事务的原子性被破坏掉了
* 不要在生产环境中使用,甚至最好不要使用
*/
else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
this.logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 如果全局回滚标记已设置,且事务管理器不允许强制提交,也会执行回滚
this.processRollback(defStatus, true);
} else {
// 除此之外就直接提交这个事务
this.processCommit(defStatus);
}
}
}在提交事务的时候需要检查事务的全局回滚标记和本地回滚标记:
- 全局回滚标记:由其它事务失败引起当前事务回滚,在其它事务回滚的时候会将当前事务的全局回滚标志置为true;
- 本地回滚标记:由开发人员显式调用
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()强制回滚当前事务;
除此之外,在全局回滚标记为true(表示当前事务中的SQL语句部分成功),还需要根据commitGlobalRollbackOnly的值来判断:
- 如果 commitGlobalRollbackOnly = true,在表示当前事务中的SQL语句部分成功的时候仍然会执行提交;、
- 如果 commitGlobalRollbackOnly = false,在表示当前事务中SQL语句部分成功的时候不会执行提交(符合事务的原子性);
如果当前事务没有回滚标记的时候,就会执行提交,最终会执行processCommit()方法:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
// 同步器中 beforCompletion()方法是否执行了
boolean beforeCompletionInvoked = false;
// 监听器中 commit 事件监听是否执行了
boolean commitListenerInvoked = false;
try {
// 不期待的回滚信息
boolean unexpectedRollback = false;
// 模板方法,允许自定义的TransactionManager类在事务提交前再进行一些额外的操作(例如,日志记录相关信息)
this.prepareForCommit(status);
// 触发同步器中的beforCommit()方法
this.triggerBeforeCommit(status);
// 触发同步器中的beforCompletion()方法
this.triggerBeforeCompletion(status);
// 执行完毕后,将beforeCompletionInvoked设置为true,表示beforCompletion()方法执行完毕
beforeCompletionInvoked = true;
// 判断当前事务中是否存在保存点(嵌套事务的时候会使用savepoint来实现)
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Releasing transaction savepoint");
}
// 判断事务的全局回滚标记
unexpectedRollback = status.isGlobalRollbackOnly();
// 调用所有的TransactionExecutionListener中的beforeCommit()方法
this.transactionExecutionListeners.forEach((listener) -> listener.beforeCommit(status));
// 监听器中 commit 事件监听执行完毕
commitListenerInvoked = true;
// 释放之前的保存点并在当前位置创建新的保存点(多个嵌套事务)
status.releaseHeldSavepoint();
}
// 如果当前事务是一个新的事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction commit");
}
// 获取当前事务的全局回滚标志
unexpectedRollback = status.isGlobalRollbackOnly();
// 执行对应的事件监听方法
this.transactionExecutionListeners.forEach((listener) -> listener.beforeCommit(status));
commitListenerInvoked = true;
// 执行对应的提交方法
this.doCommit(status);
}
// 如果是在SQL语句执行之前就出现了异常(例如连接池异常、连接获取失败等等)
else if (this.isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 判断当前事务的全局回滚标记
if (unexpectedRollback) {
// 直接报错,因为当前事务已被标记以为需要回滚,但是却执行了commit方法
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// 如果在执行提交的视乎出现了异常(执行同步器对应的回调函数);执行监听器对应的方法
this.triggerAfterCompletion(status, 1);
this.transactionExecutionListeners.forEach((listener) -> listener.afterRollback(status, (Throwable)null));
// 将当前异常抛出去
throw ex;
} catch (TransactionException var19) {
// 如果设置了提交失败直接回滚的参数,就去执行回滚的逻辑(这个参数请设置为true,不要去改)
if (this.isRollbackOnCommitFailure()) {
this.doRollbackOnCommitException(status, var19);
} else {
// 否则执行同步器器的钩子函数
this.triggerAfterCompletion(status, 2);
// 如果对应的监听器没有执行就执行下,执行过了就不再执行了
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach((listener) -> listener.afterCommit(status, var19));
}
}
// 将对应的错误信息抛出到上层业务中
throw var19;
} catch (Error | RuntimeException ex) {
// 判断同步器中对应的钩子函数是否执行了,如果没有执行的话,就执行下;
if (!beforeCompletionInvoked) {
this.triggerBeforeCompletion(status);
}
this.doRollbackOnCommitException(status, ex);
throw ex;
}
try {
this.triggerAfterCommit(status);
} finally {
this.triggerAfterCompletion(status, 0);
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach((listener) -> listener.afterCommit(status, (Throwable)null));
}
}
} finally {
// 清除对应的缓存信息
this.cleanupAfterCompletion(status);
}
}如果在执行提交的时候出现了异常,就会去执行doRollbackOnCommitException()方法:
private void doRollbackOnCommitException(DefaultTransactionStatus status, Throwable ex) throws TransactionException {
try {
// 判断当前事务是否是新开启的实物(例如传播机制REQUIRE_NEW)
if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction rollback after commit exception", ex);
}
// 进行回滚
this.doRollback(status);
}
// 如果是加入上层事务的传播机制,这里不执行具体的回滚,而是仅仅设置全局回滚标志为true
else if (status.hasTransaction() && this.isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
this.logger.debug("Marking existing transaction as rollback-only after commit exception", ex);
}
// 不回滚当前事务
this.doSetRollbackOnly(status);
}
} catch (Error | RuntimeException rbex) {
// 如果在回滚的过程出现了一场,就执行对应的同步器回调函数和监听器的回调函数
this.logger.error("Commit exception overridden by rollback exception", ex);
this.triggerAfterCompletion(status, 2);
this.transactionExecutionListeners.forEach((listener) -> listener.afterRollback(status, rbex));
throw rbex;
}
// 如果成功完成回滚,就执行同步器的回调函数和监听器的回调函数
this.triggerAfterCompletion(status, 1);
this.transactionExecutionListeners.forEach((listener) -> listener.afterRollback(status, (Throwable)null));
}所以在提交事务的过程中如果出现异常导致提交失败,默认会回滚事务(可以通过rollbackOnCommitFailure参数来进行设置,一样是不建议去设置这个参数,默认值为true,符合事务的原子性)。
同时在提交事务的时候如果出现了异常,会根据当前事务是否是新开启的事务,还是加入的上层事务。如果仅仅是加入的上层事务,就只需要设置事务的全局回滚标志为true,反之就需要对当前的事务进行回滚。
回滚事务
回滚事务的场景有:
1️⃣目标方法执行失败,进入completeTransactionAfterThrowing()方法
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 这一步是在判断当前的目标是否需要以事务的方式来运行(例如传播机制NOT_SUPPORT)等
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 如果是以事务的方式执行的,就判断@Transaction注解配置的回滚异常是否与当前异常兼容
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 如果现在的异常是小于等于@Transaction注解配置的异常,就执行对应的回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
this.logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (Error | RuntimeException ex2) {
this.logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 如果当前目标方法不需要以事务的方式执行,就直接提交就好了
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
this.logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (Error | RuntimeException ex2) {
this.logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}如果当前目标方法是以事务的方式执行的,就需要判断此时抛出的异常是否与@Transaction注解中rollback属性是否匹配(当前异常为rollback配置的异常或其子类)。如果匹配的话,就正常执行回滚即可。
如果当前目标方式不是以事务的方式执行的,就直接执行commit()方法。而在commit()方法中只会对当前事务是一个新事物或以savePoint的方式来执行的时候去获取当前线程对应的连接信息,执行对应的提交。
对于rollback()方法而言,它比较简单:
public final void rollback(TransactionStatus status) throws TransactionException {
// 判断当前事务是否已经完成(已经回滚了或提交了),如果是,就直接抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
// 执行的对应的回滚逻辑
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
this.processRollback(defStatus, false);
}
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
// 标志事务的回滚阶段是否成功
boolean unexpectedRollback = unexpected;
// 标志事务回滚时监听器的回调函数是否执行了
boolean rollbackListenerInvoked = false;
try {
// 触发同步器的回调桉树 beforeCompletion()方法
this.triggerBeforeCompletion(status);
// 判断当前是否事务有savePoint,如果有的话,就回滚到savePoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
this.logger.debug("Rolling back transaction to savepoint");
}
// 执行监听器中的回调方法:beforRollback()方法
this.transactionExecutionListeners.forEach((listener) -> listener.beforeRollback(status));
// 将监听器回调函数的执行结果设置为true
rollbackListenerInvoked = true;
// 执行对应的回滚到savePoint的逻辑
status.rollbackToHeldSavepoint();
}
// 如果当前是一个新的事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
this.logger.debug("Initiating transaction rollback");
}
// 执行监听器中的回调方法:beforeRollback()
this.transactionExecutionListeners.forEach((listener) -> listener.beforeRollback(status));
// 将监听器回调函数的执行结果设置为true
rollbackListenerInvoked = true;
// 执行当前事务的回滚
this.doRollback(status);
} else {
// 除此之外只有两种情况:当前事务加入到上层事务中或当前不以事务的方式执行
if (status.hasTransaction()) {
// 如果当前事务是以加入到上层事务的方式执行
if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) {
// 设置事务的全局回滚状态为true,不执行当前事务的回滚;
this.doSetRollbackOnly(status);
}
} else {
this.logger.debug("Should roll back transaction but cannot - no transaction available");
}
// 根据isFailEarlyOnGlobalRollbackOnly属性判断全局事务是否要立即执行回滚
if (!this.isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (Error | RuntimeException var9) {
// 执行同步器的afterCompletion()方法
this.triggerAfterCompletion(status, 2);
// 如果监听器的回调函数 beforeRollback()执行过了,就执行afterRollbacl()
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach((listener) -> listener.afterRollback(status, var9));
}
throw var9;
}
// 执行同步器的afterCompletion()方法
this.triggerAfterCompletion(status, 1);
// 如果监听器的回调函数 beforeRollback()执行过了,就执行afterRollbacl()
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach((listener) -> listener.afterRollback(status, (Throwable)null));
}
// 如果这个值为true,表示立即回滚全局事务,反之会继续执行等到执行commit的时候根据全局回滚标志再来执行回滚
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} finally {
this.cleanupAfterCompletion(status);
}
}2️⃣目标方法的执行没有出现异常,但是在提交事务的时候出现了一场,此时就会执行commitTransactionAfterReturning(),也就是提交部分的代码(直接参考上面的就可以了)。
Spring事务自动注入的流程
Spring事务的自动注入主要是通过@EnableTransactionManagement来开启,它会通过@Import直接导入类TransactionManagementConfigurationSelector。在TransactionManagementConfigurationSelector类中会根据代理的方式选择合适的AOP策略:PROXY策略和ASPECTJ策略,一般而言我们都是使用PROXY策略的代理模式。
在PROXY策略下会通过ProxyTransactionManagementConfiguration类往Spring中注入两个Bean:
- BeanFactoryTransactionAttributeSourceAdvisor:决定哪些方法会被执行事务的增强(被@Transaction注解标注的方法);
- TransactionInterceptor:执行真正的事务增强逻,例如事务提交或事务回滚;

此时关于事务的增强配置就已经完成了,所以后续就可以直接对@Transaction注解标注的方法进行增强。
Spring事务的传播机制
Spring框架从TransactionDefinition接口中定义了7种传播行为,这些行为可以通过@Transaction的propagation属性来设置。
| 传播行为 | 说明 |
|---|---|
| REQUIRED | 如果当前存在事务,则假如该事务;如果当前没有事务,则创建一个新的事务; |
| SUPPORTS | 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行; |
| MANDATORY | 如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常 |
| REQUIRES_NEW | 创建一个新的事务,如果当前存在事务,则把当前事务挂起。内部事务独立于外部事务提交或回滚 |
| NOT_SUPPORTED | 以非事务方式运行,如果当前存在事务,则把当前事务挂起 |
| NEVER | 以非事务的方式运行,如果当前存在事务,则抛出异常 |
| NESTED | 如果当前存在事务,则在一个嵌套事务中运行;如果当前没有事务,则行为与REQUIRED类似 |
了解到上面的传播机制后,就可以根据AbstractPlatformTransactionManager#getTransaction
TransactionSynchronizationManager使用方法
TransactionSynchronizationManager是Spring框架事务模块的一个核心工具类。它不直接管理事务,而是作为一个资源同步管理器,在当前线程ThreadLocal上绑定和暴露事务相关的资源和状态。
它的核心作用是:在事务生命周期内,为当前执行线程提供一个共享的、与事务绑定的上下文环境。
核心功能方法
TransactionSynchronizationManager提供了多种静态方法,主要分为以下几类:
资源绑定
bindResource(Object key,Object value)将资源(如数据库连接DataSourceUtils.getConnection())绑定到当前线程;getResource(Object key):从当前线程获取绑定的资源;unbindResource(Object key):解绑资源;
事务同步
registerSynchronization(TransactionSynchronization synchronization):这是最常用的功能,注册一个回调接口,允许你在事务生命周期的特定时间点(如提交完、提交后)执行自定义的逻辑;getSynchronizations():获取当前注册的所有同步器;
事务状态查询
isActualTransactionActive():判断当前线程是否存在实际的事务(不只是逻辑事务);isSynchronizationActive():判断当前线程是否开启了事务同步;getCurrentTransactionName():获取当前事务的名称(通常是方法全限定名);
事务特征管理
setCurrentTransactionReadOnly(boolean readOnly):设置当前事务是否只读;isCurrentTransactionReadOnly():判断当前事务是否只读;
核心应用场景与代码示例
事务提交后执行操作(最常用)
这是TransactionSynchronizationManager最经典的应用场景,你需要在数据库事务成功提交后,执行一些操作,比如发送消息、刷新缓存、调用外部系统。如果这些操作在事务提交前执行,一旦事务回滚,就会产生数据不一致;
业务场景:用户注册后,需要发送邮件,必须确保邮件只在用户数据确定写入数据库后才发送。如果注册事务回滚,就不应该发送邮件。
@Service
@Slf4j
public class UserRegistrationService {
@Autowired
private UserRepository userRepository;
@Autowired
private EmailService emailService;
@Transactional
public void registerUser(User user) {
// 1. 核心业务逻辑:保存用户到数据库
// 此时用户数据还在事务中,未真正提交
userRepository.save(user);
log.info("用户数据已保存到Session,但事务未提交");
// 2. 注册一个事务同步回调
// 重点:这个回调逻辑会在事务提交"之后"才执行
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 这个方法只会在事务成功提交后被调用
log.info("事务已提交,开始发送欢迎邮件");
try {
// 发送邮件
emailService.sendWelcomeEmail(user.getEmail());
log.info("欢迎邮件发送成功");
} catch (Exception e) {
// 注意:这里的异常不会导致事务回滚,因为事务已经提交了!
log.error("发送邮件失败,但用户已成功注册。需要额外的补偿机制。", e);
}
}
// 还可以重写其他方法,如 afterCompletion(无论提交/回滚都会执行)
@Override
public void afterCompletion(int status) {
if (status == STATUS_COMMITTED) {
log.info("事务完成状态:已提交");
} else if (status == STATUS_ROLLED_BACK) {
log.info("事务完成状态:已回滚。邮件不会发送。");
}
}
}
);
} else {
// 如果没有事务,直接发送(根据业务需求决定)
emailService.sendWelcomeEmail(user.getEmail());
}
}
}在事务内跨方法调用共享资源
需要在事务的多个方法间传递一些与事务名称周期绑定的信息。
业务场景:在一个复杂的事务中,需要记录所有操作的审计日志,并在事务提交时一次性保存,以提高性能并保证审计记录的原子性。
@Service
public class OrderService {
@Autowired
private AuditLogRepository auditLogRepository;
@Transactional
public void createComplexOrder(Order order) {
// 初始化一个审计日志列表,并将其绑定到当前事务
List<AuditLog> auditLogs = new ArrayList<>();
TransactionSynchronizationManager.bindResource("AUDIT_LOGS", auditLogs);
// 执行一系列业务操作,每个操作都会添加审计日志
updateInventory(order);
calculatePrice(order);
// ... 其他操作
// 注册一个同步器,在事务提交后持久化审计日志
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 从事务上下文中获取审计日志列表
@SuppressWarnings("unchecked")
List<AuditLog> logs = (List<AuditLog>)
TransactionSynchronizationManager.getResource("AUDIT_LOGS");
if (logs != null && !logs.isEmpty()) {
// 在事务提交后,批量保存所有审计日志
auditLogRepository.saveAll(logs);
}
// 清理资源
TransactionSynchronizationManager.unbindResource("AUDIT_LOGS");
}
}
);
}
private void updateInventory(Order order) {
// ... 更新库存逻辑
addAuditLog("INVENTORY_UPDATED", "Updated inventory for order: " + order.getId());
}
private void calculatePrice(Order order) {
// ... 计算价格逻辑
addAuditLog("PRICE_CALCULATED", "Calculated final price: " + order.getTotalAmount());
}
private void addAuditLog(String action, String detail) {
// 从当前事务上下文中获取审计日志列表并添加记录
@SuppressWarnings("unchecked")
List<AuditLog> auditLogs = (List<AuditLog>)
TransactionSynchronizationManager.getResource("AUDIT_LOGS");
if (auditLogs != null) {
AuditLog log = new AuditLog(action, detail, new Date());
auditLogs.add(log);
}
}
}TransactionEventListener使用方式
Spring中监听事务执行状态的标准且更现代的方式。@TransactionalEventListener是Spring 4.2+引入的注解,它将Spring的事件机制与事务管理完美集成,比直接使用TransactionSynchronization更加简洁和声明式。
定义自定义事件
// 用户注册成功事件
public class UserRegisteredEvent extends ApplicationEvent {
private final String username;
private final String email;
public UserRegisteredEvent(Object source, String username, String email) {
super(source);
this.username = username;
this.email = email;
}
// getters
public String getUsername() { return username; }
public String getEmail() { return email; }
}
// 订单创建事件
public class OrderCreatedEvent extends ApplicationEvent {
private final Long orderId;
private final BigDecimal amount;
public OrderCreatedEvent(Object source, Long orderId, BigDecimal amount) {
super(source);
this.orderId = orderId;
this.amount = amount;
}
// getters
public Long getOrderId() { return orderId; }
public BigDecimal getAmount() { return amount; }
}发布事件
@Service
@Slf4j
public class UserService {
@Autowired
private ApplicationEventPublisher eventPublisher; // Spring的事件发布器
@Autowired
private UserRepository userRepository;
@Transactional
public User registerUser(String username, String email, String password) {
log.info("开始用户注册流程...");
// 1. 核心业务逻辑:创建并保存用户
User user = new User(username, email, password);
userRepository.save(user);
log.info("用户数据已保存(事务未提交)");
// 2. 发布用户注册事件
// 注意:此时事件不会立即处理,而是根据@TransactionalEventListener的配置决定执行时机
eventPublisher.publishEvent(new UserRegisteredEvent(this, username, email));
log.info("用户注册事件已发布,等待事务处理");
return user;
}
}
@Service
@Slf4j
public class OrderService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private OrderRepository orderRepository;
@Transactional
public Order createOrder(Long userId, List<OrderItem> items) {
log.info("开始创建订单...");
// 业务逻辑
Order order = new Order(userId, items);
orderRepository.save(order);
log.info("订单数据已保存(事务未提交)");
// 发布订单创建事件
eventPublisher.publishEvent(new OrderCreatedEvent(this, order.getId(), order.getTotalAmount()));
log.info("订单创建事件已发布");
return order;
}
}创建事件监听器(核心)
@Component
@Slf4j
public class BusinessEventListener {
/**
* 在事务提交成功后发送欢迎邮件
* phase = TransactionPhase.AFTER_COMMIT:确保只在事务成功提交后执行
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleUserRegisteredAfterCommit(UserRegisteredEvent event) {
log.info("🎉 事务已提交,开始处理用户注册后逻辑 - 用户: {}", event.getUsername());
try {
// 发送欢迎邮件
sendWelcomeEmail(event.getEmail(), event.getUsername());
log.info("✅ 欢迎邮件发送成功");
// 初始化用户积分
initUserPoints(event.getUsername());
log.info("✅ 用户积分初始化成功");
} catch (Exception e) {
// 注意:这里的异常不会回滚事务,因为事务已经提交了!
log.error("❌ 用户注册后处理失败,需要人工干预或重试机制", e);
}
}
/**
* 在事务提交成功后处理订单逻辑
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreatedAfterCommit(OrderCreatedEvent event) {
log.info("🎉 订单事务提交成功,开始后处理 - 订单ID: {}", event.getOrderId());
try {
// 扣减库存
reduceInventory(event.getOrderId());
log.info("✅ 库存扣减成功");
// 发送订单确认通知
sendOrderConfirmation(event.getOrderId());
log.info("✅ 订单确认通知已发送");
} catch (Exception e) {
log.error("❌ 订单后处理失败,订单ID: {}", event.getOrderId(), e);
// 这里应该触发补偿事务,比如恢复库存等
}
}
/**
* 在事务完成后执行(无论提交还是回滚)
* 适合用于资源清理、日志记录等
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void handleAfterCompletion(UserRegisteredEvent event) {
log.info("📋 用户注册事务已完成(可能是提交或回滚)");
// 清理资源、记录审计日志等
cleanupResources();
}
/**
* 在事务回滚后执行
* 适合用于失败补偿、告警通知等
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleAfterRollback(OrderCreatedEvent event) {
log.warn("⚠️ 订单创建事务回滚了!订单ID: {}", event.getOrderId());
// 发送告警通知
sendAlertNotification("订单创建失败,事务已回滚", event.getOrderId());
// 执行补偿逻辑
executeCompensationLogic(event.getOrderId());
}
// 模拟的业务方法
private void sendWelcomeEmail(String email, String username) {
// 模拟邮件发送
log.info("发送欢迎邮件到: {},用户名: {}", email, username);
// 实际实现会调用邮件服务
}
private void initUserPoints(String username) {
log.info("为用户 {} 初始化积分", username);
}
private void reduceInventory(Long orderId) {
log.info("为订单 {} 扣减库存", orderId);
}
private void sendOrderConfirmation(Long orderId) {
log.info("发送订单确认通知,订单ID: {}", orderId);
}
private void cleanupResources() {
log.info("清理资源...");
}
private void sendAlertNotification(String message, Long orderId) {
log.warn("发送告警: {} - 订单ID: {}", message, orderId);
}
private void executeCompensationLogic(Long orderId) {
log.info("执行补偿逻辑,订单ID: {}", orderId);
}
}TransactionPhase 的四种时机
@TransactionalEventListener支持四种事务阶段:
| Phase | 说明 | 适用场景 |
|---|---|---|
| AFTER_COMMIT(默认) | 事务成功提交后 | 发送消息、邮件、刷新缓存等必须成功的操作 |
| AFTER_ROLLBACK | 事务回滚后 | 失败通知、补偿逻辑、告警 |
| AFTER_COMPLETION | 事务完成后(提交或回滚) | 资源清理、审计日志 |
| BEFORE_COMMIT | 事务提交前 | 提交前的最后检查、预处理 |