Spring在多線程環(huán)境下如何確保事務(wù)一致性問題詳解
問題
我先把問題拋出來,大家就明白本文目的在于解決什么樣的業(yè)務(wù)痛點(diǎn)了:
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { //1.查詢出當(dāng)前資源模塊下所有資源,查詢出來后進(jìn)行刪除 deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService); //2.查詢出當(dāng)前資源模塊下所有子模塊,遞歸查詢,當(dāng)刪除完所有子模塊下的資源后,再刪除所有子模塊,最終刪除當(dāng)前資源模塊 deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService); //3.刪除當(dāng)前資源模塊 removeById(authorityModuleId); }
如果我希望將步驟1和步驟2并行執(zhí)行,然后確保步驟1和步驟2執(zhí)行成功后,再執(zhí)行步驟3,等到步驟3執(zhí)行完畢后,再提交全部事務(wù),這個(gè)需求該如何實(shí)現(xiàn)呢?
如何解決異步執(zhí)行
上面需求第一點(diǎn)是: 如何讓任務(wù)異步并行執(zhí)行,如何實(shí)現(xiàn)二元依賴呢?
說到異步執(zhí)行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的異步執(zhí)行任務(wù)能力并不足以解決我們當(dāng)前的需求。
@Async注解原理簡(jiǎn)單來說,就是掃描IOC中的bean,給方法上標(biāo)注有@Async注解的bean進(jìn)行代理,代理的核心是添加一個(gè)MethodInterceptor即AsyncExecutionInterceptor,該方法攔截器負(fù)責(zé)將方法真正的執(zhí)行包裝為任務(wù),放入線程池中執(zhí)行。
下面我們先使用CompletableFuture來完成我們第一步需求:
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { CompletableFuture.runAsync(()->{ //兩個(gè)并行執(zhí)行的任務(wù) CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor); //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟 CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); },executor); }
多線程環(huán)境下如何確保事務(wù)一致性
我們已經(jīng)完成了任務(wù)的異步執(zhí)行化,那么又如何確保多線程環(huán)境下的事務(wù)一致性問題呢?
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) { CompletableFuture.runAsync(()->{ //兩個(gè)并行執(zhí)行的任務(wù) CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor); //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟 CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); },executor); }
在Spring環(huán)境下說到事務(wù)控制,大家第一反應(yīng)就想到使用@Transactional注解解決問題,但是這里顯然行不通,為什么行不通呢?
我還是簡(jiǎn)單的對(duì)Spring事務(wù)實(shí)現(xiàn)原理進(jìn)行一番概括:
事務(wù)王國(guó)回顧
事務(wù)管理大體分為三個(gè)流程:事務(wù)創(chuàng)建 ,事務(wù)執(zhí)行,事務(wù)結(jié)束
事務(wù)創(chuàng)建涉及到一些屬性的配置,如:
- 事務(wù)的隔離級(jí)別
- 事務(wù)的傳播行為
- 事務(wù)的超時(shí)時(shí)間
- 是否為只讀事務(wù)
- …
由于涉及屬性頗多,并且后期還有可能進(jìn)行擴(kuò)展,因此必須通過一個(gè)類來封裝這些屬性,在Spring中對(duì)應(yīng)TransactionDefinition。
有了事務(wù)相關(guān)屬性定義后,我們就可以利用TransactionDefinition來創(chuàng)建一個(gè)事務(wù)了,在Spring中局部事務(wù)由PlatformTransactionManager負(fù)責(zé)管理,創(chuàng)建事務(wù)也是由PlatformTransactionManager負(fù)責(zé)提供:
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
如果我們希望追蹤事務(wù)的狀態(tài),例如: 事務(wù)已完成,事務(wù)回滾等,那么就需要一個(gè)事務(wù)狀態(tài)類貫穿當(dāng)前事務(wù)的執(zhí)行流程,在Spring中由TransactionStatus負(fù)責(zé)完成。
對(duì)于常見的數(shù)據(jù)源而言,通常需要記錄的事務(wù)狀態(tài)有如下幾點(diǎn):
- 當(dāng)前事務(wù)是否是新事務(wù)
- 當(dāng)前事務(wù)是否結(jié)束
- 當(dāng)前事務(wù)是否需要回滾(通過標(biāo)記來判斷,因此我也可以在業(yè)務(wù)流程中手動(dòng)設(shè)置標(biāo)記為true,來讓事務(wù)在沒有發(fā)生異常的情況下進(jìn)行回滾)
- 當(dāng)前事務(wù)是否設(shè)置了回滾點(diǎn)(savePoint)
事務(wù)的執(zhí)行過程就是具體業(yè)務(wù)代碼的執(zhí)行流程,這里就不多說了。
事務(wù)的結(jié)束分為兩種情況: 需要進(jìn)行事務(wù)回滾或者事務(wù)正常提交,如果是事務(wù)回滾,還需要判斷TransactionStatus 中的savePoint是否被設(shè)置了。
事務(wù)實(shí)現(xiàn)方式回顧
Spring中常見的事務(wù)實(shí)現(xiàn)方式有兩種: 編程式和聲明式。
編程式事務(wù)使用是本文重點(diǎn),因此這里按下不表,我們先來復(fù)習(xí)一下聲明式事務(wù)的使用
聲明式事務(wù)就是使用我們常見的@Transactional注解完成的,聲明式事務(wù)優(yōu)點(diǎn)就在于讓事務(wù)代碼與業(yè)務(wù)代碼解耦,通過Spring中提供的聲明式事務(wù)使用,我們也可以發(fā)覺我們只需要編寫業(yè)務(wù)代碼即可,而事務(wù)的管理基本不需要我們操心,Spring就像使用了魔法一樣,幫我們自動(dòng)完成了。
之所以那么神奇,本質(zhì)還是依靠Spring框架提供的Bean生命周期相關(guān)回調(diào)接口和AOP結(jié)合完成的,簡(jiǎn)述如下:
- 通過自動(dòng)代理創(chuàng)建器依次嘗試為每個(gè)放入容器中的bean嘗試進(jìn)行代理
- 嘗試進(jìn)行代理的過程對(duì)于事務(wù)管理來說,就是利用事務(wù)管理涉及到的增強(qiáng)器advisor,即TransactionAttributeSourceAdvisor
- 判斷當(dāng)前增強(qiáng)器是否能夠應(yīng)用與當(dāng)前bean上,怎么判斷呢?—> advisor內(nèi)部的pointCut嘍 !
- 如果能夠應(yīng)用,那么好,為當(dāng)前bean創(chuàng)建代理對(duì)象返回,并且往代理對(duì)象內(nèi)部添加一個(gè)TransactionInterceptor攔截器。
- 此時(shí)我們?cè)購(gòu)娜萜髦蝎@取,拿到的就是代理對(duì)象了,當(dāng)我們調(diào)用代理對(duì)象的方法時(shí),首先要經(jīng)過代理對(duì)象內(nèi)部攔截器鏈的處理,處理完后,最終才會(huì)調(diào)用被代理對(duì)象的方法。(這里其實(shí)就是責(zé)任鏈模式的應(yīng)用)
對(duì)于被事務(wù)增強(qiáng)器TransactionAttributeSourceAdvisor代理的bean而言,代理對(duì)象內(nèi)部會(huì)存在一個(gè)TransactionInterceptor,該攔截器內(nèi)部構(gòu)造了一個(gè)事務(wù)執(zhí)行的模板流程:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //TransactionAttributeSource內(nèi)部保存著當(dāng)前類某個(gè)方法對(duì)應(yīng)的TransactionAttribute---事務(wù)屬性源 //可以看做是一個(gè)存放TransactionAttribute與method方法映射的池子 TransactionAttributeSource tas = getTransactionAttributeSource(); //獲取當(dāng)前事務(wù)方法對(duì)應(yīng)的TransactionAttribute final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); //定位TransactionManager final TransactionManager tm = determineTransactionManager(txAttr); ..... //類型轉(zhuǎn)換為局部事務(wù)管理器 PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { //TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務(wù)后返回 //TransactionInfo封裝了當(dāng)前事務(wù)的信息--包括TransactionStatus TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { //繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會(huì)調(diào)用目標(biāo)方法 //因此可以理解為這里是調(diào)用目標(biāo)方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //目標(biāo)方法拋出異常則進(jìn)行判斷是否需要回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除當(dāng)前事務(wù)信息 cleanupTransactionInfo(txInfo); } ... //正常返回,那么就正常提交事務(wù)唄(當(dāng)然還是需要判斷TransactionStatus狀態(tài)先) commitTransactionAfterReturning(txInfo); return retVal; } ...
編程式事務(wù)
還記得本文一開始提出的業(yè)務(wù)需求嗎?
不清楚,可以回看一下,在上文,我們已經(jīng)解決了任務(wù)異步并行執(zhí)行的難題,下面我們需要解決的就是如何確保Spring在多線程環(huán)境下也能保持事務(wù)一致性。
通過上文對(duì)Spring事務(wù)基礎(chǔ)和聲明式事務(wù)的原理回顧,相信大家也發(fā)現(xiàn)了,聲明式事務(wù)并不能解決我們當(dāng)前的問題,那么就只能求助于編程式事務(wù)了。
那么編程式事務(wù)是什么樣子呢?
其實(shí)上面TransactionInterceptor給出的那套模板流程,就是編程式事務(wù)使用的模范案例,我們可以簡(jiǎn)化上面的模板流程,簡(jiǎn)單使用如下:
public class TransactionMain { public static void main(String[] args) throws ClassNotFoundException, SQLException { test(); } private static void test() { DataSource dataSource = getDS(); JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource); //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置 //包括隔離級(jí)別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù) TransactionStatus ts = jtm.getTransaction(transactionDef); //進(jìn)行業(yè)務(wù)邏輯操作 try { update(dataSource); jtm.commit(ts); }catch (Exception e){ jtm.rollback(ts); System.out.println("發(fā)生異常,我已回滾"); } } private static void update(DataSource dataSource) throws Exception { JdbcTemplate jt = new JdbcTemplate(); jt.setDataSource(dataSource); jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6"); throw new Exception("我是來搗亂的"); } }
利用編程式事務(wù)解決問題
我們明白了編程式事務(wù)的使用,相信大家也都知道問題如何解決了,下面我給出一份看似正確的解決方案:
package com.user.util; import lombok.RequiredArgsConstructor; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * 多線程事務(wù)一致性管理 <br> * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行 * @author 大忽悠 * @create 2022/10/19 21:34 */ @Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源 */ private final DataSource dataSource; /** * 執(zhí)行的是無返回值的任務(wù) * @param tasks 異步執(zhí)行的任務(wù)列表 * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳 */ public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) { if(executor==null){ throw new IllegalArgumentException("線程池不能為空"); } DataSourceTransactionManager transactionManager = getTransactionManager(); //是否發(fā)生了異常 AtomicBoolean ex=new AtomicBoolean(); List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size()); List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size()); tasks.forEach(task->{ taskFutureList.add(CompletableFuture.runAsync( () -> { try{ //1.開啟新事務(wù) transactionStatusList.add(openNewTransaction(transactionManager)); //2.異步任務(wù)執(zhí)行 task.run(); }catch (Throwable throwable){ //打印異常 throwable.printStackTrace(); //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記 ex.set(Boolean.TRUE); //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了 taskFutureList.forEach(completableFuture -> completableFuture.cancel(true)); } } , executor) ); }); try { //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲 CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //發(fā)生了異常則進(jìn)行回滾操作,否則提交 if(ex.get()){ System.out.println("發(fā)生異常,全部事務(wù)回滾"); transactionStatusList.forEach(transactionManager::rollback); }else { System.out.println("全部事務(wù)正常提交"); transactionStatusList.forEach(transactionManager::commit); } } private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置 //包括隔離級(jí)別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù) return transactionManager.getTransaction(transactionDef); } private DataSourceTransactionManager getTransactionManager() { return new DataSourceTransactionManager(dataSource); } }
大家思考上面的代碼存在問題嗎?
測(cè)試:
public void test(){ List<Runnable> tasks=new ArrayList<>(); tasks.add(()->{ userMapper.deleteById(26); }); tasks.add(()->{ signMapper.deleteById(10); }); multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool()); }
任務(wù)正常都執(zhí)行完畢,事務(wù)進(jìn)行提交,但是會(huì)拋出異常,導(dǎo)致事務(wù)回滾:
抓關(guān)鍵字:
No value for key [HikariDataSource (HikariPool-1)] bound to thread [main] 解釋: 無法在當(dāng)前線程綁定的threadLocal中尋找到HikariDataSource作為key,對(duì)應(yīng)關(guān)聯(lián)的資源對(duì)象ConnectionHolder
這里需要再次回顧一下Spring事務(wù)實(shí)現(xiàn)的小細(xì)節(jié):
一次事務(wù)的完成通常都是默認(rèn)在當(dāng)前線程內(nèi)完成的,又因?yàn)橐淮问聞?wù)的執(zhí)行過程中,涉及到對(duì)當(dāng)前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當(dāng)前事務(wù)執(zhí)行線程對(duì)應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進(jìn)行保存,在Spring中,負(fù)責(zé)保存這些ThreadLocal屬性的實(shí)現(xiàn)類由TransactionSynchronizationManager
承擔(dān)。
TransactionSynchronizationManager
類內(nèi)部默認(rèn)提供了下面六個(gè)ThreadLocal屬性,分別保存當(dāng)前線程對(duì)應(yīng)的不同事務(wù)資源:
//保存當(dāng)前事務(wù)關(guān)聯(lián)的資源--默認(rèn)只會(huì)在新建事務(wù)的時(shí)候保存當(dāng)前獲取到的DataSource和當(dāng)前事務(wù)對(duì)應(yīng)Connection的映射關(guān)系--當(dāng)然這里Connection被包裝為了ConnectionHolder private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); //事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個(gè)階段的過程中,會(huì)去回調(diào)監(jiān)聽者對(duì)應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認(rèn)為空集合 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations"); //見名知意: 存放當(dāng)前事務(wù)名字 private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name"); //見名知意: 存放當(dāng)前事務(wù)是否是只讀事務(wù) private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status"); //見名知意: 存放當(dāng)前事務(wù)的隔離級(jí)別 private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level"); //見名知意: 存放當(dāng)前事務(wù)是否處于激活狀態(tài) private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當(dāng)前事務(wù)對(duì)應(yīng)的資源,原因如下:
開啟新事務(wù)時(shí),事務(wù)相關(guān)資源都被綁定到了thread-cache-pool-1
線程對(duì)應(yīng)的threadLocalMap內(nèi)部,而當(dāng)執(zhí)行事務(wù)提交代碼時(shí),commit內(nèi)部需要從TransactionSynchronizationManager
中獲取當(dāng)前事務(wù)的資源,顯然我們無法從main線程對(duì)應(yīng)的threadLocalMap
中獲取到對(duì)應(yīng)的事務(wù)資源,這也就是異常拋出的原因。
問題分析完了,那么如何解決問題呢?
這里給出一個(gè)我首先想到的簡(jiǎn)單粗暴的方法—CopyTransactionResource
—將事務(wù)資源在兩個(gè)線程間來回復(fù)制
這里給出解決后問題后的代碼示例:
package com.user.util; import lombok.Builder; import lombok.RequiredArgsConstructor; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import javax.sql.DataSource; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * 多線程事務(wù)一致性管理 <br> * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行 * @author 大忽悠 * @create 2022/10/19 21:34 */ @Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源 */ private final DataSource dataSource; /** * 執(zhí)行的是無返回值的任務(wù) * @param tasks 異步執(zhí)行的任務(wù)列表 * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳 */ public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) { if(executor==null){ throw new IllegalArgumentException("線程池不能為空"); } DataSourceTransactionManager transactionManager = getTransactionManager(); //是否發(fā)生了異常 AtomicBoolean ex=new AtomicBoolean(); List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size()); List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size()); List<TransactionResource> transactionResources=new ArrayList<>(tasks.size()); tasks.forEach(task->{ taskFutureList.add(CompletableFuture.runAsync( () -> { try{ //1.開啟新事務(wù) transactionStatusList.add(openNewTransaction(transactionManager)); //2.copy事務(wù)資源 transactionResources.add(TransactionResource.copyTransactionResource()); //3.異步任務(wù)執(zhí)行 task.run(); }catch (Throwable throwable){ //打印異常 throwable.printStackTrace(); //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記 ex.set(Boolean.TRUE); //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了 taskFutureList.forEach(completableFuture -> completableFuture.cancel(true)); } } , executor) ); }); try { //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲 CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //發(fā)生了異常則進(jìn)行回滾操作,否則提交 if(ex.get()){ System.out.println("發(fā)生異常,全部事務(wù)回滾"); for (int i = 0; i < tasks.size(); i++) { transactionResources.get(i).autoWiredTransactionResource(); transactionManager.rollback(transactionStatusList.get(i)); transactionResources.get(i).removeTransactionResource(); } }else { System.out.println("全部事務(wù)正常提交"); for (int i = 0; i < tasks.size(); i++) { transactionResources.get(i).autoWiredTransactionResource(); transactionManager.commit(transactionStatusList.get(i)); transactionResources.get(i).removeTransactionResource(); } } } private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置 //包括隔離級(jí)別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù) return transactionManager.getTransaction(transactionDef); } private DataSourceTransactionManager getTransactionManager() { return new DataSourceTransactionManager(dataSource); } /** * 保存當(dāng)前事務(wù)資源,用于線程間的事務(wù)資源COPY操作 */ @Builder private static class TransactionResource{ //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄 private Map<Object, Object> resources = new HashMap<>(); //下面五個(gè)屬性會(huì)在事務(wù)結(jié)束后被自動(dòng)清理,無需我們手動(dòng)清理 private Set<TransactionSynchronization> synchronizations =new HashSet<>(); private String currentTransactionName; private Boolean currentTransactionReadOnly; private Integer currentTransactionIsolationLevel; private Boolean actualTransactionActive; public static TransactionResource copyTransactionResource(){ return TransactionResource.builder() //返回的是不可變集合 .resources(TransactionSynchronizationManager.getResourceMap()) //如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值 .synchronizations(new LinkedHashSet<>()) .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()) .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()) .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()) .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()) .build(); } public void autoWiredTransactionResource(){ resources.forEach(TransactionSynchronizationManager::bindResource); //如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值 TransactionSynchronizationManager.initSynchronization(); TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive); TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly); } public void removeTransactionResource() { //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄 //DataSource如果重復(fù)移除,unbindResource時(shí)會(huì)因?yàn)椴淮嬖诖薻ey關(guān)聯(lián)的事務(wù)資源而報(bào)錯(cuò) resources.keySet().forEach(key->{ if(!(key instanceof DataSource)){ TransactionSynchronizationManager.unbindResource(key); } }); } } }
增加異常拋出,測(cè)試是否能夠保證多線程間的事務(wù)一致性:
@SpringBootTest(classes = UserMain.class) public class Test { @Resource private UserMapper userMapper; @Resource private SignMapper signMapper; @Resource private MultiplyThreadTransactionManager multiplyThreadTransactionManager; @SneakyThrows @org.junit.jupiter.api.Test public void test(){ List<Runnable> tasks=new ArrayList<>(); tasks.add(()->{ userMapper.deleteById(26); throw new RuntimeException("我就要拋出異常!"); }); tasks.add(()->{ signMapper.deleteById(10); }); multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool()); } }
事務(wù)都進(jìn)行了回滾,數(shù)據(jù)庫數(shù)據(jù)沒變
到此這篇關(guān)于Spring在多線程環(huán)境下如何確保事務(wù)一致性問題詳解的文章就介紹到這了,更多相關(guān)Spring多線程確保事務(wù)一致性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Intellij IDEA中如何查看maven項(xiàng)目中所有jar包的依賴關(guān)系圖
這篇文章主要介紹了Intellij IDEA中如何查看maven項(xiàng)目中所有jar包的依賴關(guān)系圖,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05一文詳解SpringBoot如何優(yōu)雅地實(shí)現(xiàn)異步調(diào)用
SpringBoot想必大家都用過,但是大家平時(shí)使用發(fā)布的接口大都是同步的,那么你知道如何優(yōu)雅的實(shí)現(xiàn)異步呢?這篇文章就來和大家詳細(xì)聊聊2023-03-03Java實(shí)現(xiàn)ATM機(jī)操作系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)ATM機(jī)操作系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05Java將CSV的數(shù)據(jù)發(fā)送到kafka的示例
這篇文章主要介紹了Java將CSV的數(shù)據(jù)發(fā)送到kafka得示例,幫助大家更好得理解和使用Java,感興趣的朋友可以了解下2020-11-11J2ee 高并發(fā)情況下監(jiān)聽器實(shí)例詳解
這篇文章主要介紹了J2ee 高并發(fā)情況下監(jiān)聽器實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-02-02Springboot使用sharedingjdbc實(shí)現(xiàn)分庫分表
這篇文章主要介紹了Springboot使用sharedingjdbc實(shí)現(xiàn)分庫分表,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07Java 和 Javascript 的 Date 與 .Net 的 DateTime 之間的相互轉(zhuǎn)換
這篇文章主要介紹了Java 和 Javascript 的 Date 與 .Net 的 DateTime 之間的相互轉(zhuǎn)換的相關(guān)資料,非常不錯(cuò)具有參考借鑒價(jià)值,需要的朋友可以參考下2016-06-06Java使用HttpUtils實(shí)現(xiàn)發(fā)送HTTP請(qǐng)求
這篇文章主要介紹了Java使用HttpUtils實(shí)現(xiàn)發(fā)送HTTP請(qǐng)求,HTTP請(qǐng)求,在日常開發(fā)中,還是比較常見的,今天給大家分享HttpUtils如何使用,需要的朋友可以參考下2023-05-05java.lang.ExceptionInInitializerError異常的解決方法
這篇文章主要為大家詳細(xì)介紹了java.lang.ExceptionInInitializerError異常的解決方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10