這一次搞懂Spring事務(wù)是如何傳播的
前言
上一篇分析了事務(wù)注解的解析過(guò)程,本質(zhì)上是將事務(wù)封裝為切面加入到AOP的執(zhí)行鏈中,因此會(huì)調(diào)用到MethodInceptor的實(shí)現(xiàn)類的invoke方法,而事務(wù)切面的Interceptor就是TransactionInterceptor,所以本篇直接從該類開(kāi)始。
正文
事務(wù)切面的調(diào)用過(guò)程
public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
這個(gè)方法本身沒(méi)做什么事,主要是調(diào)用了父類的invokeWithinTransaction方法,注意最后一個(gè)參數(shù),傳入的是一個(gè)lambda表達(dá)式,而這個(gè)表達(dá)式中的調(diào)用的方法應(yīng)該不陌生,在分析AOP調(diào)用鏈時(shí),就是通過(guò)這個(gè)方法傳遞到下一個(gè)切面或是調(diào)用被代理實(shí)例的方法,忘記了的可以回去看看。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. //獲取事務(wù)屬性類 AnnotationTransactionAttributeSource TransactionAttributeSource tas = getTransactionAttributeSource(); //獲取方法上面有@Transactional注解的屬性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); //獲取事務(wù)管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 調(diào)用proceed方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } //事務(wù)提交 commitTransactionAfterReturning(txInfo); return retVal; } // 省略了else }
這個(gè)方法邏輯很清晰,一目了然,if里面就是對(duì)聲明式事務(wù)的處理,先調(diào)用createTransactionIfNecessary方法開(kāi)啟事務(wù),然后通過(guò)invocation.proceedWithInvocation調(diào)用下一個(gè)切面,如果沒(méi)有其它切面了,就是調(diào)用被代理類的方法,出現(xiàn)異常就回滾,否則提交事務(wù),這就是Spring事務(wù)切面的執(zhí)行過(guò)程。但是,我們主要要搞懂的就是在這些方法中是如何管理事務(wù)以及事務(wù)在多個(gè)方法之間是如何傳播的。
事務(wù)的傳播性概念
傳播性是Spring自己搞出來(lái)的,數(shù)據(jù)庫(kù)是沒(méi)有的,因?yàn)樯婕暗椒椒ㄩg的調(diào)用,那么必然就需要考慮事務(wù)在這些方法之間如何流轉(zhuǎn),所以Spring提供了7個(gè)傳播屬性供選擇,可以將其看成兩大類,即是否支持當(dāng)前事務(wù):
支持當(dāng)前事務(wù)(在同一個(gè)事務(wù)中):
PROPAGATION_REQUIRED:支持當(dāng)前事務(wù),如果不存在,就新建一個(gè)事務(wù)。
PROPAGATION_MANDATORY:支持當(dāng)前事務(wù),如果不存在,就拋出異常。
PROPAGATION_SUPPORTS:支持當(dāng)前事務(wù),如果不存在,就不使用事務(wù)。
不支持當(dāng)前事務(wù)(不在同一個(gè)事務(wù)中):
PROPAGATION_NEVER:以非事務(wù)的方式運(yùn)行,如果有事務(wù),則拋出異常。
PROPAGATION_NOT_SUPPORTED:以非事務(wù)的方式運(yùn)行,如果有事務(wù),則掛起當(dāng)前事務(wù)。
PROPAGATION_REQUIRES_NEW:新建事務(wù),如果有事務(wù),掛起當(dāng)前事務(wù)(兩個(gè)事務(wù)相互獨(dú)立,父事務(wù)回滾不影響子事務(wù))。
PROPAGATION_NESTED:如果當(dāng)前事務(wù)存在,則嵌套事務(wù)執(zhí)行(指必須依存父事務(wù),子事務(wù)不能單獨(dú)提交且父事務(wù)回滾則子事務(wù)也必須回滾,而子事務(wù)若回滾,父事務(wù)可以回滾也可以捕獲異常)。如果當(dāng)前沒(méi)有事務(wù),則進(jìn)行與PROPAGATION_REQUIRED類似的操作。
別看屬性這么多,實(shí)際上我們主要用的是PROPAGATION_REQUIRED默認(rèn)屬性,一些特殊業(yè)務(wù)下可能會(huì)用到PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED。
下面我會(huì)假設(shè)一個(gè)場(chǎng)景,并主要分析這三個(gè)屬性。
public class A { @Autowired private B b; @Transactional public void addA() { b.addB(); } } public class B { @Transactional public void addB() { // doSomething... } }
上面我創(chuàng)建了A、B兩個(gè)類,每個(gè)類中有一個(gè)事務(wù)方法,使用了聲明式事務(wù)并采用的默認(rèn)傳播屬性,在A中調(diào)用了B的方法。
當(dāng)請(qǐng)求來(lái)了調(diào)用addA時(shí),首先調(diào)用的是代理對(duì)象的方法,因此會(huì)進(jìn)入createTransactionIfNecessary方法開(kāi)啟事務(wù):
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //開(kāi)啟事務(wù),這里重點(diǎn)看 status = tm.getTransaction(txAttr); } else { } } //創(chuàng)建事務(wù)信息對(duì)象,記錄新老事務(wù)信息對(duì)象 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
實(shí)際上開(kāi)啟事務(wù)是通過(guò)AbstractPlatformTransactionManager做的,而這個(gè)類是一個(gè)抽象類,具體實(shí)例化的對(duì)象就是我們?cè)陧?xiàng)目里常配置的DataSourceTransactionManager對(duì)象。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { //這里重點(diǎn)看,.DataSourceTransactionObject拿到對(duì)象 Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //第一次進(jìn)來(lái)connectionHolder為空的,所以不存在事務(wù) if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //第一次進(jìn)來(lái)大部分會(huì)走這里 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //先掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //創(chuàng)建事務(wù)狀態(tài)對(duì)象,其實(shí)就是封裝了事務(wù)對(duì)象的一些信息,記錄事務(wù)狀態(tài)的 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //開(kāi)啟事務(wù),重點(diǎn)看看 DataSourceTransactionObject doBegin(transaction, definition); //開(kāi)啟事務(wù)后,改變事務(wù)狀態(tài) prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
這個(gè)方法流程比較長(zhǎng),一步步來(lái)看,先調(diào)用doGetTransaction方法獲取一個(gè)DataSourceTransactionObject對(duì)象,這個(gè)類是JdbcTransactionObjectSupport的子類,在父類中持有了一個(gè)ConnectionHolder對(duì)象,見(jiàn)名知意,這個(gè)對(duì)象保存了當(dāng)前的連接。
protected Object doGetTransaction() { //管理connection對(duì)象,創(chuàng)建回滾點(diǎn),按照回滾點(diǎn)回滾,釋放回滾點(diǎn) DataSourceTransactionObject txObject = new DataSourceTransactionObject(); //DataSourceTransactionManager默認(rèn)是允許嵌套事務(wù)的 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //obtainDataSource() 獲取數(shù)據(jù)源對(duì)象,其實(shí)就是數(shù)據(jù)庫(kù)連接塊對(duì)象 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
追溯getResource方法可以看到ConnectionHolder 是從ThreadLocal里獲取的,也就是當(dāng)前線程,key是DataSource對(duì)象;但是仔細(xì)思考下我們是第一次進(jìn)來(lái),所以這里肯定獲取不到的,反之,要從這里獲取到值,那必然是同一個(gè)線程第二次及以后進(jìn)入到這里,也就是在addA調(diào)用addB時(shí),另外需要注意這里保存ConnectionHolder到DataSourceTransactionObject對(duì)象時(shí)是將newConnectionHolder屬性設(shè)置為false了的。
繼續(xù)往后,創(chuàng)建完transaction對(duì)象后,會(huì)調(diào)用isExistingTransaction判斷是否已經(jīng)存在一個(gè)事務(wù),如果存在就會(huì)調(diào)用handleExistingTransaction方法,這個(gè)方法就是處理事務(wù)傳播的核心方法,因?yàn)槲覀兪堑谝淮芜M(jìn)來(lái),肯定不存在事務(wù),所以先跳過(guò)。
再往后,可以看到就是處理不同的傳播屬性,主要看到下面這個(gè)部分:
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //先掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //創(chuàng)建事務(wù)狀態(tài)對(duì)象,其實(shí)就是封裝了事務(wù)對(duì)象的一些信息,記錄事務(wù)狀態(tài)的 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //開(kāi)啟事務(wù),重點(diǎn)看看 DataSourceTransactionObject doBegin(transaction, definition); //開(kāi)啟事務(wù)后,改變事務(wù)狀態(tài) prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } }
第一次進(jìn)來(lái)時(shí),PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED都會(huì)進(jìn)入到這里,首先會(huì)調(diào)用suspend掛起當(dāng)前存在的事務(wù),在這里沒(méi)啥作用。接下來(lái)通過(guò)newTransactionStatus創(chuàng)建了DefaultTransactionStatus對(duì)象,這個(gè)對(duì)象主要就是存儲(chǔ)當(dāng)前事務(wù)的一些狀態(tài)信息,需要特別注意newTransaction屬性設(shè)置為了true,表示是一個(gè)新事務(wù)。
狀態(tài)對(duì)象創(chuàng)建好之后就是通過(guò)doBegin開(kāi)啟事務(wù),這是一個(gè)模板方法:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { //如果沒(méi)有數(shù)據(jù)庫(kù)連接 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //從連接池里面獲取連接 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //把連接包裝成ConnectionHolder,然后設(shè)置到事務(wù)對(duì)象中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //從數(shù)據(jù)庫(kù)連接中獲取隔離級(jí)別 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } //關(guān)閉連接的自動(dòng)提交,其實(shí)這步就是開(kāi)啟了事務(wù) con.setAutoCommit(false); } //設(shè)置只讀事務(wù) 從這一點(diǎn)設(shè)置的時(shí)間點(diǎn)開(kāi)始(時(shí)間點(diǎn)a)到這個(gè)事務(wù)結(jié)束的過(guò)程中,其他事務(wù)所提交的數(shù)據(jù),該事務(wù)將看不見(jiàn)! //設(shè)置只讀事務(wù)就是告訴數(shù)據(jù)庫(kù),我這個(gè)事務(wù)內(nèi)沒(méi)有新增,修改,刪除操作只有查詢操作,不需要數(shù)據(jù)庫(kù)鎖等操作,減少數(shù)據(jù)庫(kù)壓力 prepareTransactionalConnection(con, definition); //自動(dòng)提交關(guān)閉了,就說(shuō)明已經(jīng)開(kāi)啟事務(wù)了,事務(wù)是活動(dòng)的 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { //如果是新創(chuàng)建的事務(wù),則建立當(dāng)前線程和數(shù)據(jù)庫(kù)連接的關(guān)系 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
這個(gè)方法里面主要做了六件事:
首先從連接池獲取連接并保存到DataSourceTransactionObject對(duì)象中。
關(guān)閉數(shù)據(jù)庫(kù)的自動(dòng)提交,也就是開(kāi)啟事務(wù)。
獲取數(shù)據(jù)庫(kù)的隔離級(jí)別。
根據(jù)屬性設(shè)置該事務(wù)是否為只讀事務(wù)。
將該事務(wù)標(biāo)識(shí)為活動(dòng)事務(wù)(transactionActive=true)。
將ConnectionHolder對(duì)象與當(dāng)前線程綁定。
完成之后通過(guò)prepareSynchronization將事務(wù)的屬性和狀態(tài)設(shè)置到TransactionSynchronizationManager對(duì)象中進(jìn)行管理。最后返回到createTransactionIfNecessary方法中創(chuàng)建TransactionInfo對(duì)象與當(dāng)前線程綁定并返回。
通過(guò)以上的步驟就開(kāi)啟了事務(wù),接下來(lái)就是通過(guò)proceedWithInvocation調(diào)用其它切面,這里我們先假設(shè)沒(méi)有其它切面了,那么就是直接調(diào)用到A類的addA方法,在這個(gè)方法中又調(diào)用了B類的addB方法,那么肯定也是調(diào)用到代理類的方法,因此又會(huì)進(jìn)入到createTransactionIfNecessary方法中。但這次進(jìn)來(lái)通過(guò)isExistingTransaction判斷是存在事務(wù)的,因此會(huì)進(jìn)入到handleExistingTransaction方法:
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //不允許有事務(wù),直接異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } //掛起當(dāng)前事務(wù) Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //修改事務(wù)狀態(tài)信息,把事務(wù)的一些信息存儲(chǔ)到當(dāng)前線程中,ThreadLocal中 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } //掛起 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } //默認(rèn)是可以嵌套事務(wù)的 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); //創(chuàng)建回滾點(diǎn) status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // 省略 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
這里面也是對(duì)每個(gè)傳播屬性的判斷,先看PROPAGATION_REQUIRES_NEW的處理,因?yàn)樵搶傩砸竺看握{(diào)用都開(kāi)啟一個(gè)新的事務(wù),所以首先會(huì)將當(dāng)前事務(wù)掛起,怎么掛起呢?
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; //第一次進(jìn)來(lái),肯定為null的 if (transaction != null) { //吧connectionHolder設(shè)置為空 suspendedResources = doSuspend(transaction); } //做數(shù)據(jù)還原操作 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } } protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); //解除綁定關(guān)系, return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
這里明顯是進(jìn)入第一個(gè)if并且會(huì)調(diào)用到doSuspend方法,整體來(lái)說(shuō)掛起事務(wù)很簡(jiǎn)單:首先將DataSourceTransactionObject的ConnectionHolder設(shè)置為空并解除與當(dāng)前線程的綁定,之后將解除綁定的ConnectionHolder和其它屬性(事務(wù)名稱、隔離級(jí)別、只讀屬性)通通封裝到SuspendedResourcesHolder對(duì)象,并將當(dāng)前事務(wù)的活動(dòng)狀態(tài)設(shè)置為false。掛起事務(wù)之后又通過(guò)newTransactionStatus創(chuàng)建了一個(gè)新的事務(wù)狀態(tài)并調(diào)用doBegin開(kāi)啟事務(wù),這里不再重復(fù)分析。
接著來(lái)看PROPAGATION_NESTED:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } //默認(rèn)是可以嵌套事務(wù)的 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); //創(chuàng)建回滾點(diǎn) status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } }
這里面可以看到如果允許嵌套事務(wù),就會(huì)創(chuàng)建一個(gè)DefaultTransactionStatus對(duì)象(注意newTransaction是false,表明不是一個(gè)新事務(wù))和回滾點(diǎn);如果不允許嵌套,就會(huì)創(chuàng)建新事務(wù)并開(kāi)啟。
當(dāng)上面的判斷都不滿足時(shí),也就是傳播屬性為默認(rèn)PROPAGATION_REQUIRED時(shí),則只是創(chuàng)建了一個(gè)newTransaction為false的DefaultTransactionStatus返回。
完成之后又是調(diào)用proceedWithInvocation,那么就是執(zhí)行B類的addB方法,假如沒(méi)有發(fā)生異常,那么就會(huì)回到切面調(diào)用commitTransactionAfterReturning提交addB的事務(wù):
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } public final void commit(TransactionStatus status) throws TransactionException { processCommit(defStatus); } private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } // 如果是nested,沒(méi)有提交,只是將savepoint清除掉了 unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } //如果都是PROPAGATION_REQUIRED,最外層的才會(huì)走進(jìn)來(lái)統(tǒng)一提交,如果是PROPAGATION_REQUIRES_NEW,每一個(gè)事務(wù)都會(huì)進(jìn)來(lái) else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } } } finally { cleanupAfterCompletion(status); } }
主要邏輯在processCommit方法中。如果存在回滾點(diǎn),可以看到并沒(méi)有提交事務(wù),只是將當(dāng)前事務(wù)的回滾點(diǎn)清除了;而如果是新事務(wù),就會(huì)調(diào)用doCommit提交事務(wù),也就是只有PROPAGATION_REQUIRED屬性下的最外層事務(wù)和PROPAGATION_REQUIRES_NEW屬性下的事務(wù)能提交。事務(wù)提交完成后會(huì)調(diào)用cleanupAfterCompletion清除當(dāng)前事務(wù)的狀態(tài),如果有掛起的事務(wù)還會(huì)通過(guò)resume恢復(fù)掛起的事務(wù)(將解綁的連接和當(dāng)前線程綁定以及將之前保存的事務(wù)狀態(tài)重新設(shè)置回去)。當(dāng)前事務(wù)正常提交后,那么就會(huì)輪到addA方法提交,處理邏輯同上,不再贅述。
如果調(diào)用addB發(fā)生異常,就會(huì)通過(guò)completeTransactionAfterThrowing進(jìn)行回滾:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } } } } public final void rollback(TransactionStatus status) throws TransactionException { DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false); } private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); //按照嵌套事務(wù)按照回滾點(diǎn)回滾 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } //都為PROPAGATION_REQUIRED最外層事務(wù)統(tǒng)一回滾 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } doRollback(status); } else { // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } }
流程和提交是一樣的,先是判斷有沒(méi)有回滾點(diǎn),如果有就回到到回滾點(diǎn)并清除該回滾點(diǎn);如果沒(méi)有則判斷是不是新事務(wù)(PROPAGATION_REQUIRED屬性下的最外層事務(wù)和PROPAGATION_REQUIRES_NEW屬性下的事務(wù)),滿足則直接回滾當(dāng)前事務(wù)?;貪L完成后同樣需要清除掉當(dāng)前的事務(wù)狀態(tài)并恢復(fù)掛起的連接。另外需要特別注意的是在catch里面調(diào)用完回滾邏輯后,還通過(guò)throw拋出了異常,這意味著什么?意味著即使是嵌套事務(wù),內(nèi)層事務(wù)的回滾也會(huì)導(dǎo)致外層事務(wù)的回滾,也就是addA的事務(wù)也會(huì)跟著回滾。
至此,事務(wù)的傳播原理分析完畢,深入看每個(gè)方法的實(shí)現(xiàn)是很復(fù)雜的,但如果僅僅是分析各個(gè)傳播屬性對(duì)事務(wù)的影響,則有一個(gè)簡(jiǎn)單的方法。我們可以將內(nèi)層事務(wù)切面等效替換掉invocation.proceedWithInvocation方法,比如上面兩個(gè)類的調(diào)用可以看作是下面這樣:
// addA的事務(wù) TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // addB的事務(wù) TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } //事務(wù)提交 commitTransactionAfterReturning(txInfo); } catch (Throwable ex) { //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } //事務(wù)提交 commitTransactionAfterReturning(txInfo);
這樣看是不是很容易就能分析出事務(wù)之間的影響以及是提交還是回滾了?下面來(lái)看幾個(gè)實(shí)例分析。
實(shí)例分析
我再添加一個(gè)C類,和addC的方法,然后在addA里面調(diào)用這個(gè)方法。
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // addB的事務(wù) TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { b.addB(); } catch (Throwable ex) { // target invocation exception //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } //事務(wù)提交 commitTransactionAfterReturning(txInfo); // addC的事務(wù) TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { c.addC(); } catch (Throwable ex) { // target invocation exception //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } //事務(wù)提交 commitTransactionAfterReturning(txInfo); } catch (Throwable ex) { //事務(wù)回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } //事務(wù)提交 commitTransactionAfterReturning(txInfo);
等效替換后就是上面這個(gè)代碼,我們分別來(lái)分析。
都是PROPAGATION_REQUIRED屬性:通過(guò)上面的分析,我們知道三個(gè)方法都是同一個(gè)連接和事務(wù),那么任何一個(gè)出現(xiàn)異常則都會(huì)回滾。
addB為PROPAGATION_REQUIRES_NEW:
如果B中拋出異常,那么B中肯定會(huì)回滾,接著異常向上拋,導(dǎo)致A事務(wù)整體回滾;
如果C中拋出異常,不難看出C和A都會(huì)回滾,但B已經(jīng)提交了,因此不會(huì)受影響。
addC為PROPAGATION_NESTED,addB為PROPAGATION_REQUIRES_NEW:
如果B中拋出異常,那么B回滾并拋出異常,A也回滾,C不會(huì)執(zhí)行;
如果C中拋出異常,先是回滾到回滾點(diǎn)并拋出異常,所以A也回滾,但B此時(shí)已經(jīng)提交,不受影響。
都是PROPAGATION_NESTED:雖然創(chuàng)建了回滾點(diǎn),但是仍然是同一個(gè)連接,任何一個(gè)發(fā)生異常都會(huì)回滾,如果不想影響彼此,可以try-catch生吞子事務(wù)的異常實(shí)現(xiàn)。
還有其它很多情況,這里就不一一列舉了,只要使用上面的分析方法都能夠很輕松的分析出來(lái)。
總結(jié)
本篇詳細(xì)分析了事務(wù)的傳播原理,另外還有隔離級(jí)別,這在Spring中沒(méi)有體現(xiàn),需要我們自己結(jié)合數(shù)據(jù)庫(kù)的知識(shí)進(jìn)行分析設(shè)置。最后我們還需要考慮聲明式事務(wù)和編程式事務(wù)的優(yōu)缺點(diǎn),聲明式事務(wù)雖然簡(jiǎn)單,但不適合用在長(zhǎng)事務(wù)中,會(huì)占用大量連接資源,這時(shí)就需要考慮利用編程式事務(wù)的靈活性了。
總而言之,事務(wù)的使用并不是一律默認(rèn)就好,接口的一致性和吞吐量與事務(wù)有著直接關(guān)系,嚴(yán)重情況下可能會(huì)導(dǎo)致系統(tǒng)崩潰。
以上這篇這一次搞懂Spring事務(wù)是如何傳播的就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java將List轉(zhuǎn)換為String的幾種常見(jiàn)方式
在實(shí)際開(kāi)發(fā)中經(jīng)常遇到List轉(zhuǎn)為String字符串的情況,下面這篇文章主要給大家介紹了關(guān)于Java將List轉(zhuǎn)換為String的幾種常見(jiàn)方式,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-03-03Java設(shè)計(jì)模式初識(shí)之備忘錄模式詳解
備忘錄設(shè)計(jì)模式(Memento Design Pattern)也叫作快照(Snapshot)模式,主要用于實(shí)現(xiàn)防丟失、撤銷(xiāo)、恢復(fù)等功能。本文將通過(guò)示例為大家介紹一些備忘錄模式的定義與使用,需要的可以參考一下2022-11-11Java實(shí)戰(zhàn)之簡(jiǎn)單的文件管理器
這篇文章主要介紹了Java實(shí)戰(zhàn)之簡(jiǎn)單的文件管理器,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04SpringBoot啟動(dòng)后立即執(zhí)行的幾種方法小結(jié)
在項(xiàng)目開(kāi)發(fā)中某些場(chǎng)景必須要用到啟動(dòng)項(xiàng)目后立即執(zhí)行方式的功能,本文主要介紹了SpringBoot啟動(dòng)后立即執(zhí)行的幾種方法小結(jié),具有一定的參考價(jià)值,感興趣的可以了解一下2023-05-05mybatis同一張表多次連接查詢相同列賦值問(wèn)題小結(jié)
這篇文章主要介紹了mybatis同一張表多次連接查詢相同列賦值問(wèn)題,非常不錯(cuò),具有參考借鑒價(jià)值,需要的的朋友參考下2017-01-01java評(píng)論、回復(fù)功能設(shè)計(jì)與實(shí)現(xiàn)方法
很多項(xiàng)目或者系統(tǒng)都有評(píng)論或者回復(fù)的需求,但評(píng)論回復(fù)的實(shí)現(xiàn)往往都比較復(fù)雜,也不好實(shí)現(xiàn),下面這篇文章主要給大家介紹了關(guān)于java評(píng)論、回復(fù)功能設(shè)計(jì)與實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2022-06-06Java BigDecimal使用及基本運(yùn)算(推薦)
Java在java.math包中提供的API類BigDecimal,用來(lái)對(duì)超過(guò)16位有效位的數(shù)進(jìn)行精確的運(yùn)算。這篇文章主要介紹了Java BigDecimal使用指南針(推薦),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08