如何在Spring事务提交后进行异步操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Spring事务提交后进行异步操作相关的知识,希望对你有一定的参考价值。
实现方案使用TransactionSynchronizationManager在事务提交之后操作
public void insert(TechBook techBook)
bookMapper.insert(techBook);
// send after tx commit but is async
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter()
@Override
public void afterCommit()
System.out.println("send email after transaction commit...");
);
ThreadLocalRandom random = ThreadLocalRandom.current();
if(random.nextInt() % 2 ==0)
throw new RuntimeException("test email transaction");
System.out.println("service end");
该方法就可以实现在事务提交之后进行操作
操作异步化
使用mq或线程池来进行异步,比如使用线程池:
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
public void insert(TechBook techBook)
bookMapper.insert(techBook);
// send after tx commit but is async
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter()
@Override
public void afterCommit()
executorService.submit(new Runnable()
@Override
public void run()
System.out.println("send email after transaction commit...");
try
Thread.sleep(10*1000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("complete send email after transaction commit...");
);
);
// async work but tx not work, execute even when tx is rollback
// asyncService.executeAfterTxComplete();
ThreadLocalRandom random = ThreadLocalRandom.current();
if(random.nextInt() % 2 ==0)
throw new RuntimeException("test email transaction");
System.out.println("service end");
参考技术A import java.util.HashSet;
import java.util.Set;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.SessionTrackingMode;
import javax.servlet.annotation.WebListener;
@WebListener
public class SetSessionTrackingModeListener implements ServletContextListener
public void contextInitialized(ServletContextEvent sce)
Set<SessionTrackingMode> modes = new HashSet<SessionTrackingMode>();
modes.add(SessionTrackingMode.URL); // thats the default behaviour!
modes.add(SessionTrackingMode.COOKIE);
sce.getServletContext().setSessionTrackingModes(modes);
public void contextDestroyed(ServletContextEvent sce)
Spring 事务提交回滚源码解析
前言
在上篇文章 Spring 事务初始化源码分析 中分析了 Spring 事务初始化的一个过程,当初始化完成后,Spring 是如何去获取事务,当目标方法异常后,又是如何进行回滚的,又或是目标方法执行成功后,又是怎么提交的呢?此外,事务的提交和回滚由底层数据库进行控制,而 Spring 事务行为可以传播,这个传播方式由 Spring 来进行控制,它是怎么控制的呢?这篇文章就来分析下 Spring 事务提交回滚的源码。
TransactionInterceptor
还记得在 Spring 事务初始化源码分析 中注册了一个 bean,名字为 TransactionInterceptor
吗?,它就是用来执行事务功能的,它是一个方法拦截器,如下所示:
它实现了 MethodInterceptor
接口,而该接口只有一个 invoke
方法,用来执行目标方法:
public Object invoke(MethodInvocation invocation) throws Throwable
Class<?> targetClass = (invocation.getThis() != null ?AopUtils.getTargetClass(invocation.getThis()) : null);
// 调用父类的方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
父类的 invokeWithinTransaction
方法定义了一个事务方法执行的框架,而每一步再细分为方法进行实现,代码如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
// 1. 获取事务属性
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 2. 获取事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 3. 获取需要事务的方法名称:类目.方法名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 4. 声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager))
// 5. 获取该方法上事务的信息
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try
// 6. 目标方法执行,它是一个拦截器链
retVal = invocation.proceedWithInvocation();
catch (Throwable ex)
// 7. 事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
finally
// 8. 清除事务信息
cleanupTransactionInfo(txInfo);
// 9. 事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
else
// 10. 编程式事务,流程和声明式事务一致
一个事务方法执行流程大概有以下几个步骤:
-
获取事务属性
-
获取事务管理器
-
获取需要事务的方法名称
-
获取该方法上事务的信息
-
目标方法执行
-
事务回滚
-
清除事务信息
-
事务提交
获取事务属性
首先去获取方法上面 @Translational
注解的属性,在 Spring 事务初始化源码分析 已经分析过了,即在AnnotationTransactionAttributeSource.computeTransactionAttribute
中进行获取。
获取事务管理器
每个事务都有对应的事务管理器,所以在事务开始前需要获取对应的事务管理器
protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr)
if (txAttr == null || this.beanFactory == null)
return getTransactionManager();
// 事务管理器名称
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier))
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
else if (StringUtils.hasText(this.transactionManagerBeanName))
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
else
// 默认事务管理器
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
// .....
return defaultTransactionManager;
获取需要事务的方法名称
这里主要去获取名称的名称,为 全限定类名+方法名的方式:
method.getDeclaringClass().getName() + '.' + method.getName()
获取方法上事务的信息
该部分是 Spring 事务最复杂的部分,比如说去创建一个事务,设置事务的隔离级别,超时时间,对事务传播方式的处理,事务的挂起和恢复等;事务信息 TransactionInfo
包含了目标方法执行前的所有状态信息,如果方法执行失败,则会根据该信息来进行回滚。
对应方法为:
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
创建事务
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
TransactionAttribute txAttr, final String joinpointIdentification)
// 设置事务的名称,为方法全限定名joinpointIdentification
if (txAttr != null && txAttr.getName() == null)
txAttr = new DelegatingTransactionAttribute(txAttr)
public String getName()
return joinpointIdentification;
;
TransactionStatus status = null;
if (txAttr != null)
if (tm != null)
// 获取事务
status = tm.getTransaction(txAttr);
// 创建事务信息
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
获取事务
在方法 getTransaction
中获取事务,是最为复杂的逻辑,在其中处理隔离级别,超时时间和传播方式等。
public final TransactionStatus getTransaction(TransactionDefinition definition)
// 获取事务
Object transaction = doGetTransaction();
// ...
// 如果已经存在事务了,则处理事务的传播方式,如挂起存在的事务,新建事务等
if (isExistingTransaction(transaction))
return handleExistingTransaction(definition, transaction, debugEnabled);
// .....
// 如果不存在事务,且事务的传播方式为 mandatory, 则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY)
throw new IllegalTransactionStateException("....");
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED)
SuspendedResourcesHolder suspendedResources = suspend(null);
// 如果事务的传播方式为 requested, requestes_new,nested,则会新建一个事务
try
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 第三个参数为true表示新建事务
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 构造 transaction,包括隔离级别,timeout,如果是新连接,则绑定到当前线程
doBegin(transaction, definition);
// 同步新事务
prepareSynchronization(status, definition);
return status;
catch (RuntimeException | Error ex)
resume(null, suspendedResources);
throw ex;
else
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
获取事务 doGetTransaction()
,在该方法中,会根据 DataSource
获取一个连接,如下:
protected Object doGetTransaction()
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//如果设置了允许嵌套事务,则开启保存点;只有嵌套事务才有保存点
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 根据 DataSource 获取连接,ConnectionHolder为一个数据库连接
ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
之后,判断当前线程是否存在事务,如果存在事务,则根据事务的传播方式来处理已存在的事务,这里先不看。
如果不存在事务且事务的传播方式为 requested
, requestes_new
,nested
,则会新建一个事务:
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//definition事务属性
//transaction事务
//newTransaction是否事务新事务
//suspendedResources需要挂起的事务
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources)
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
当获取到一个新的事务后,需要设置事务的一些信息,比如隔离级别,timeout 等,这些功能不是由 Spring 来控制,而是由底层的数据库来控制的,数据库连接的设置是在 doBegin
方法中进行处理:
protected void doBegin(Object transaction, TransactionDefinition definition)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 数据库连接
Connection con = null;
//如果当前事务不存在数据库连接,或者,当前连接的事务同步设置为 true,则需要获取新的数据库连接
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction())
// 获取新连接
Connection newCon = obtainDataSource().getConnection();
// 事务绑定新连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 获取和设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 由 Spring 来控制提交方式
if (con.getAutoCommit())
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
prepareTransactionalConnection(con, definition);
// 设置当前线程存在事务的标志
txObject.getConnectionHolder().setTransactionActive(true);
// 获取和设置超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT)
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
//如果是新连接,则绑定到当前线程
if (txObject.isNewConnectionHolder())
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
//其他代码......
// ====获取隔离级别
public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition)
// 设置只读标识
if (definition != null && definition.isReadOnly())
con.setReadOnly(true);
//....
// 获取隔离级别
Integer previousIsolationLevel = null;
if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT)
// 从数据库连接获取隔离级别
int currentIsolation = con.getTransactionIsolation();
if (currentIsolation != definition.getIsolationLevel())
previousIsolationLevel = currentIsolation;
con.setTransactionIsolation(definition.getIsolationLevel());
return previousIsolationLevel;
当设置完事务的信息后,需要把事务信息记录在当前线程中:
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition)
if (status.isNewSynchronization())
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
现在来处理已经存在事务的情况,
if (isExistingTransaction(transaction))
return handleExistingTransaction(definition, transaction, debugEnabled);
判断是否存在事务,依据是事务中有连接,且 transactionActive
为 true
protected boolean isExistingTransaction(Object transaction)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
如果已经存在事务,则会根据事务的传播方式来进行处理,比如 requires_new
, nested
等是如何处理:
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction)
// 如果传播方式为 never, 则抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER)
throw new IllegalTransactionStateException("...");
// 如果传播方式为 not_supported, 则把当前存在的事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED)
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
// 如果传播方式为 requires_new, 则挂起当前事务,新建一个新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW)
// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
// 如果还没有激活事务,则新建事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
newSynchronization, debugEnabled, suspendedResources);
// 设置数据库的隔离级别,timeout等
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
//....
// 如果传播方式为 nested,则新建事务,但是不会把存在的事务挂起,它是一个子事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED)
// 如果不支持嵌套事务,抛异常
if (!isNestedTransactionAllowed())
throw new NestedTransactionNotSupportedException("");
// 如果支持保存点,则创建保存点
if (useSavepointForNestedTransaction())
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction,
false, false, debugEnabled, null);
// 创建保存点
status.createAndHoldSavepoint();
return status;
else
// 如果不支持保存点,则和 requires_new 是一样的
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
// 如果传播方式为 supports和required
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
挂起事务,就是把当前事务的状态记录下来,后续在对该事务进行恢复。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException
if (TransactionSynchronizationManager.isSynchronizationActive())
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
Object suspendedResources = null;
if (transaction != null)
suspendedResources = doSuspend(transaction);
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
name, readOnly, isolationLevel, wasActive);
//.....
// 挂起事务doSuspend
protected Object doSuspend(Object transaction)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 把事务的连接置空
txObject.setConnectionHolder(null);
// 从当前线程中移除
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
当经过上面一系列操作获取到事务信息后,再根据事务信息来封装到 TransactionInfo
中:
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification,
TransactionStatus status)
// 封装事务信息
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null)
// 设置事务状态
txInfo.newTransactionStatus(status);
事务回滚
到这里,目标方法执行之前的事务准备工作都已做好了,之后,会调用 InvocationCallback.proceedWithInvocation
来执行目标方法,如果执行失败,则会进行事务的回滚操作:
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex)
if (txInfo != null && txInfo.getTransactionStatus() != null)
// 判断异常是不是 RunntimeException 和 Error
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex))
// 回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
// .........
else
// 如果是其他类型的异常,则正常提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
// .......
//判断是否回滚的异常,当前可以通过rolbackFor属性来修改
public boolean rollbackOn(Throwable ex)
return (ex instanceof RuntimeException || ex instanceof Error);
回滚事务
public final void rollback(TransactionStatus status)
// 如果事务已完成,则回滚会抛异常
if (status.isCompleted())
throw new IllegalTransactionStateException("....");
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
// 回滚事务
private void processRollback(DefaultTransactionStatus status, boolean unexpected)
try
boolean unexpectedRollback = unexpected;
// 自定义触发器的调用,不知道干嘛用???
triggerBeforeCompletion(status);
// 如果有保存点,则回滚到保存点
if (status.hasSavepoint())
status.rollbackToHeldSavepoint();
else if (status.isNewTransaction())
// 如果当前事务为独立的事务,则回滚
doRollback(status);
else
// 如果一个事务中又有事务,如 required,该事务可以看作一个事务链,
//那么当其中的一个事务需要回滚的时候,并不是立马进行回滚,
//而是只是设置回滚状态,到最后再统一回滚
if (status.hasTransaction())
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure())
// 只是设置回滚状态
doSetRollbackOnly(status);
//.......
//..........
finally
// 清空记录并恢复被挂起的事务
cleanupAfterCompletion(status);
事务的回滚操作,如果是嵌套事务,且有保存点的话,直接回滚到保存点,嵌套事务的回滚不会影响到外部事务,也就是说,外部事务不会回滚。回滚到保存点是根据底层数据库来操作的:
public void rollbackToHeldSavepoint() throws TransactionException
Object savepoint = getSavepoint();
// 回滚到保存点
getSavepointManager().rollbackToSavepoint(savepoint);
// 释放保存点
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
// 回滚到保存点
public void rollbackToSavepoint(Object savepoint) throws TransactionException
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
// ......
// 释放保存点
public void releaseSavepoint(Object savepoint) throws TransactionException
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
如果没有保存点,则直接回滚,也是使用数据库的API 来操作的:
protected void doRollback(DefaultTransactionStatus status)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
con.rollback();
还有一种情况, 如果一个事务中又有事务,如 required, 该事务可以看作一个事务链,那么当其中的一个事务需要回滚的时候,并不是立马进行回滚,而是只是设置回滚状态,到最后再统一回滚。
事务回滚后需要对事务信息进行清除:
private void cleanupAfterCompletion(DefaultTransactionStatus status)
// 设置完成状态
status.setCompleted();
if (status.isNewSynchronization())
TransactionSynchronizationManager.clear();
if (status.isNewTransaction())
// 清除事务信息
doCleanupAfterCompletion(status.getTransaction());
if (status.getSuspendedResources() != null)
// 恢复被挂起的事务
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
清除事务信息:
protected void doCleanupAfterCompletion(Object transaction)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 从当前线程中移除数据库连接
if (txObject.isNewConnectionHolder())
TransactionSynchronizationManager.unbindResource(obtainDataSource());
//重置数据库连接
Connection con = txObject.getConnectionHolder().getConnection();
if (txObject.isMustRestoreAutoCommit())
con.setAutoCommit(true);
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
// 如果是新连接,则释放连接
if (txObject.isNewConnectionHolder())
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.getConnectionHolder().clear();
恢复被挂起的事务:
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
if (resourcesHolder != null)
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null)
doResume(transaction, suspendedResources);
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null)
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
// 恢复事务,把事务和当前线程绑定
protected void doResume(Object transaction, Object suspendedResources)
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
事务提交
当目标方法执行成功,没有抛出异常,则事务可以正常提交了;但是再上面分析事务回滚的时候,还有一种情况没有分析,就是如果一个事务嵌套再一个事务里面,是一个事务链,如果其中的某个事务需要回滚,它并不会真正的立马进行回滚,而是设置一个回滚标识,由最外层的事务来统一进行回滚;所以再提交事务之前,还需要进行判断。
public final void commit(TransactionStatus status) throws TransactionException
// 如果事务已完成,则不能提交
if (status.isCompleted())
throw new IllegalTransactionStateException("...");
// 判断嵌套事务是否设置了回滚标识,如果嵌套事务设置了回滚标识,则整个事务链都不会提交
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly())
processRollback(defStatus, false);
return;
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly())
processRollback(defStatus, true);
return;
// 提交事务
processCommit(defStatus);
提交事务:
private void processCommit(DefaultTransactionStatus status) throws TransactionException
try
//.....
// 如果由保存点则释放保存点
if (status.hasSavepoint())
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
else if (status.isNewTransaction())
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交
doCommit(status);
catch (RuntimeException | Error ex)
// 如果提交过程中出现异常,则还是会回滚
doRollbackOnCommitException(status, ex);
throw ex;
// .........
// 数据库连接进行回滚
protected void doCommit(DefaultTransactionStatus status)
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
con.commit();
到这里,Spring 事务的获取,提交,回滚去分析完毕了,流程还是比较清楚的。
以上是关于如何在Spring事务提交后进行异步操作的主要内容,如果未能解决你的问题,请参考以下文章