欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring在多線程環(huán)境下如何確保事務(wù)一致性問題詳解

 更新時(shí)間:2023年11月09日 09:55:24   作者:莫輕言舞  
這篇文章主要介紹了Spring在多線程環(huán)境下如何確保事務(wù)一致性問題詳解,說到異步執(zhí)行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的異步執(zhí)行任務(wù)能力并不足以解決我們當(dāng)前的需求,需要的朋友可以參考下

問題

我先把問題拋出來,大家就明白本文目的在于解決什么樣的業(yè)務(wù)痛點(diǎn)了:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    //1.查詢出當(dāng)前資源模塊下所有資源,查詢出來后進(jìn)行刪除
    deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //2.查詢出當(dāng)前資源模塊下所有子模塊,遞歸查詢,當(dāng)刪除完所有子模塊下的資源后,再刪除所有子模塊,最終刪除當(dāng)前資源模塊
    deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //3.刪除當(dāng)前資源模塊
    removeById(authorityModuleId);
}

如果我希望將步驟1和步驟2并行執(zhí)行,然后確保步驟1和步驟2執(zhí)行成功后,再執(zhí)行步驟3,等到步驟3執(zhí)行完畢后,再提交全部事務(wù),這個(gè)需求該如何實(shí)現(xiàn)呢?

如何解決異步執(zhí)行

上面需求第一點(diǎn)是: 如何讓任務(wù)異步并行執(zhí)行,如何實(shí)現(xiàn)二元依賴呢?

說到異步執(zhí)行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的異步執(zhí)行任務(wù)能力并不足以解決我們當(dāng)前的需求。

@Async注解原理簡(jiǎn)單來說,就是掃描IOC中的bean,給方法上標(biāo)注有@Async注解的bean進(jìn)行代理,代理的核心是添加一個(gè)MethodInterceptor即AsyncExecutionInterceptor,該方法攔截器負(fù)責(zé)將方法真正的執(zhí)行包裝為任務(wù),放入線程池中執(zhí)行。

下面我們先使用CompletableFuture來完成我們第一步需求:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //兩個(gè)并行執(zhí)行的任務(wù)
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); 
    },executor);
}

多線程環(huán)境下如何確保事務(wù)一致性

我們已經(jīng)完成了任務(wù)的異步執(zhí)行化,那么又如何確保多線程環(huán)境下的事務(wù)一致性問題呢?

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //兩個(gè)并行執(zhí)行的任務(wù)
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
    },executor);
}

在Spring環(huán)境下說到事務(wù)控制,大家第一反應(yīng)就想到使用@Transactional注解解決問題,但是這里顯然行不通,為什么行不通呢?

我還是簡(jiǎn)單的對(duì)Spring事務(wù)實(shí)現(xiàn)原理進(jìn)行一番概括:

事務(wù)王國(guó)回顧

事務(wù)管理大體分為三個(gè)流程:事務(wù)創(chuàng)建 ,事務(wù)執(zhí)行,事務(wù)結(jié)束

事務(wù)創(chuàng)建涉及到一些屬性的配置,如:

  • 事務(wù)的隔離級(jí)別
  • 事務(wù)的傳播行為
  • 事務(wù)的超時(shí)時(shí)間
  • 是否為只讀事務(wù)

由于涉及屬性頗多,并且后期還有可能進(jìn)行擴(kuò)展,因此必須通過一個(gè)類來封裝這些屬性,在Spring中對(duì)應(yīng)TransactionDefinition。

有了事務(wù)相關(guān)屬性定義后,我們就可以利用TransactionDefinition來創(chuàng)建一個(gè)事務(wù)了,在Spring中局部事務(wù)由PlatformTransactionManager負(fù)責(zé)管理,創(chuàng)建事務(wù)也是由PlatformTransactionManager負(fù)責(zé)提供:

 TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
   throws TransactionException;

如果我們希望追蹤事務(wù)的狀態(tài),例如: 事務(wù)已完成,事務(wù)回滾等,那么就需要一個(gè)事務(wù)狀態(tài)類貫穿當(dāng)前事務(wù)的執(zhí)行流程,在Spring中由TransactionStatus負(fù)責(zé)完成。

對(duì)于常見的數(shù)據(jù)源而言,通常需要記錄的事務(wù)狀態(tài)有如下幾點(diǎn):

  • 當(dāng)前事務(wù)是否是新事務(wù)
  • 當(dāng)前事務(wù)是否結(jié)束
  • 當(dāng)前事務(wù)是否需要回滾(通過標(biāo)記來判斷,因此我也可以在業(yè)務(wù)流程中手動(dòng)設(shè)置標(biāo)記為true,來讓事務(wù)在沒有發(fā)生異常的情況下進(jìn)行回滾)
  • 當(dāng)前事務(wù)是否設(shè)置了回滾點(diǎn)(savePoint)

事務(wù)的執(zhí)行過程就是具體業(yè)務(wù)代碼的執(zhí)行流程,這里就不多說了。

事務(wù)的結(jié)束分為兩種情況: 需要進(jìn)行事務(wù)回滾或者事務(wù)正常提交,如果是事務(wù)回滾,還需要判斷TransactionStatus 中的savePoint是否被設(shè)置了。

事務(wù)實(shí)現(xiàn)方式回顧

Spring中常見的事務(wù)實(shí)現(xiàn)方式有兩種: 編程式和聲明式。

編程式事務(wù)使用是本文重點(diǎn),因此這里按下不表,我們先來復(fù)習(xí)一下聲明式事務(wù)的使用

聲明式事務(wù)就是使用我們常見的@Transactional注解完成的,聲明式事務(wù)優(yōu)點(diǎn)就在于讓事務(wù)代碼與業(yè)務(wù)代碼解耦,通過Spring中提供的聲明式事務(wù)使用,我們也可以發(fā)覺我們只需要編寫業(yè)務(wù)代碼即可,而事務(wù)的管理基本不需要我們操心,Spring就像使用了魔法一樣,幫我們自動(dòng)完成了。

之所以那么神奇,本質(zhì)還是依靠Spring框架提供的Bean生命周期相關(guān)回調(diào)接口和AOP結(jié)合完成的,簡(jiǎn)述如下:

  1. 通過自動(dòng)代理創(chuàng)建器依次嘗試為每個(gè)放入容器中的bean嘗試進(jìn)行代理
  2. 嘗試進(jìn)行代理的過程對(duì)于事務(wù)管理來說,就是利用事務(wù)管理涉及到的增強(qiáng)器advisor,即TransactionAttributeSourceAdvisor
  3. 判斷當(dāng)前增強(qiáng)器是否能夠應(yīng)用與當(dāng)前bean上,怎么判斷呢?—> advisor內(nèi)部的pointCut嘍 !
  4. 如果能夠應(yīng)用,那么好,為當(dāng)前bean創(chuàng)建代理對(duì)象返回,并且往代理對(duì)象內(nèi)部添加一個(gè)TransactionInterceptor攔截器。
  5. 此時(shí)我們?cè)購(gòu)娜萜髦蝎@取,拿到的就是代理對(duì)象了,當(dāng)我們調(diào)用代理對(duì)象的方法時(shí),首先要經(jīng)過代理對(duì)象內(nèi)部攔截器鏈的處理,處理完后,最終才會(huì)調(diào)用被代理對(duì)象的方法。(這里其實(shí)就是責(zé)任鏈模式的應(yīng)用)

對(duì)于被事務(wù)增強(qiáng)器TransactionAttributeSourceAdvisor代理的bean而言,代理對(duì)象內(nèi)部會(huì)存在一個(gè)TransactionInterceptor,該攔截器內(nèi)部構(gòu)造了一個(gè)事務(wù)執(zhí)行的模板流程:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
   final InvocationCallback invocation) throws Throwable {
  //TransactionAttributeSource內(nèi)部保存著當(dāng)前類某個(gè)方法對(duì)應(yīng)的TransactionAttribute---事務(wù)屬性源
  //可以看做是一個(gè)存放TransactionAttribute與method方法映射的池子
  TransactionAttributeSource tas = getTransactionAttributeSource();
  //獲取當(dāng)前事務(wù)方法對(duì)應(yīng)的TransactionAttribute
  final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  //定位TransactionManager
  final TransactionManager tm = determineTransactionManager(txAttr);
        .....
        //類型轉(zhuǎn)換為局部事務(wù)管理器
  PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
 
  if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
   //TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務(wù)后返回
   //TransactionInfo封裝了當(dāng)前事務(wù)的信息--包括TransactionStatus
   TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
 
   Object retVal;
   try {
    //繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會(huì)調(diào)用目標(biāo)方法
    //因此可以理解為這里是調(diào)用目標(biāo)方法
    retVal = invocation.proceedWithInvocation();
   }
   catch (Throwable ex) {
    //目標(biāo)方法拋出異常則進(jìn)行判斷是否需要回滾
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
   }
   finally {
       //清除當(dāng)前事務(wù)信息
    cleanupTransactionInfo(txInfo);
   }
            ...
            //正常返回,那么就正常提交事務(wù)唄(當(dāng)然還是需要判斷TransactionStatus狀態(tài)先)
   commitTransactionAfterReturning(txInfo);
   return retVal;
  }
  ...

編程式事務(wù)

還記得本文一開始提出的業(yè)務(wù)需求嗎?

不清楚,可以回看一下,在上文,我們已經(jīng)解決了任務(wù)異步并行執(zhí)行的難題,下面我們需要解決的就是如何確保Spring在多線程環(huán)境下也能保持事務(wù)一致性。

通過上文對(duì)Spring事務(wù)基礎(chǔ)和聲明式事務(wù)的原理回顧,相信大家也發(fā)現(xiàn)了,聲明式事務(wù)并不能解決我們當(dāng)前的問題,那么就只能求助于編程式事務(wù)了。

那么編程式事務(wù)是什么樣子呢?

其實(shí)上面TransactionInterceptor給出的那套模板流程,就是編程式事務(wù)使用的模范案例,我們可以簡(jiǎn)化上面的模板流程,簡(jiǎn)單使用如下:

public class TransactionMain {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        test();
    }
 
    private static void test() {
        DataSource dataSource = getDS();
        JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
        //包括隔離級(jí)別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
        TransactionStatus ts = jtm.getTransaction(transactionDef);
        //進(jìn)行業(yè)務(wù)邏輯操作
        try {
            update(dataSource);
            jtm.commit(ts);
        }catch (Exception e){
            jtm.rollback(ts);
            System.out.println("發(fā)生異常,我已回滾");
        }
    }
 
    private static void update(DataSource dataSource) throws Exception {
        JdbcTemplate jt = new JdbcTemplate();
        jt.setDataSource(dataSource);
        jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
        throw new Exception("我是來搗亂的");
    }
}

利用編程式事務(wù)解決問題

我們明白了編程式事務(wù)的使用,相信大家也都知道問題如何解決了,下面我給出一份看似正確的解決方案:

package com.user.util;
 
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
 
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 多線程事務(wù)一致性管理 <br>
 * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */
@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
     */
    private final DataSource dataSource;
 
    /**
     * 執(zhí)行的是無返回值的任務(wù)
     * @param tasks 異步執(zhí)行的任務(wù)列表
     * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳
     */
    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("線程池不能為空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否發(fā)生了異常
        AtomicBoolean ex=new AtomicBoolean();
 
        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
 
        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.開啟新事務(wù)
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.異步任務(wù)執(zhí)行
                            task.run();
                        }catch (Throwable throwable){
                            //打印異常
                            throwable.printStackTrace();
                            //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
                            ex.set(Boolean.TRUE);
                            //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });
 
        try {
            //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
 
        //發(fā)生了異常則進(jìn)行回滾操作,否則提交
        if(ex.get()){
            System.out.println("發(fā)生異常,全部事務(wù)回滾");
            transactionStatusList.forEach(transactionManager::rollback);
        }else {
            System.out.println("全部事務(wù)正常提交");
            transactionStatusList.forEach(transactionManager::commit);
        }
    }
 
    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
        //包括隔離級(jí)別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
        return transactionManager.getTransaction(transactionDef);
    }
 
    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

大家思考上面的代碼存在問題嗎?

測(cè)試:

public void test(){
    List<Runnable> tasks=new ArrayList<>();
 
    tasks.add(()->{
       userMapper.deleteById(26);
    });
 
    tasks.add(()->{
        signMapper.deleteById(10);
    });
 
    multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}

任務(wù)正常都執(zhí)行完畢,事務(wù)進(jìn)行提交,但是會(huì)拋出異常,導(dǎo)致事務(wù)回滾:

圖片

抓關(guān)鍵字:

No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解釋: 無法在當(dāng)前線程綁定的threadLocal中尋找到HikariDataSource作為key,對(duì)應(yīng)關(guān)聯(lián)的資源對(duì)象ConnectionHolder

這里需要再次回顧一下Spring事務(wù)實(shí)現(xiàn)的小細(xì)節(jié):

一次事務(wù)的完成通常都是默認(rèn)在當(dāng)前線程內(nèi)完成的,又因?yàn)橐淮问聞?wù)的執(zhí)行過程中,涉及到對(duì)當(dāng)前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當(dāng)前事務(wù)執(zhí)行線程對(duì)應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進(jìn)行保存,在Spring中,負(fù)責(zé)保存這些ThreadLocal屬性的實(shí)現(xiàn)類由TransactionSynchronizationManager承擔(dān)。

TransactionSynchronizationManager類內(nèi)部默認(rèn)提供了下面六個(gè)ThreadLocal屬性,分別保存當(dāng)前線程對(duì)應(yīng)的不同事務(wù)資源:

   //保存當(dāng)前事務(wù)關(guān)聯(lián)的資源--默認(rèn)只會(huì)在新建事務(wù)的時(shí)候保存當(dāng)前獲取到的DataSource和當(dāng)前事務(wù)對(duì)應(yīng)Connection的映射關(guān)系--當(dāng)然這里Connection被包裝為了ConnectionHolder
 private static final ThreadLocal<Map<Object, Object>> resources =
   new NamedThreadLocal<>("Transactional resources");
    //事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個(gè)階段的過程中,會(huì)去回調(diào)監(jiān)聽者對(duì)應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認(rèn)為空集合
 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");
   //見名知意: 存放當(dāng)前事務(wù)名字
 private static final ThreadLocal<String> currentTransactionName =
   new NamedThreadLocal<>("Current transaction name");
   //見名知意: 存放當(dāng)前事務(wù)是否是只讀事務(wù)
 private static final ThreadLocal<Boolean> currentTransactionReadOnly =
   new NamedThreadLocal<>("Current transaction read-only status");
   //見名知意: 存放當(dāng)前事務(wù)的隔離級(jí)別
 private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
   new NamedThreadLocal<>("Current transaction isolation level");
   //見名知意: 存放當(dāng)前事務(wù)是否處于激活狀態(tài)
 private static final ThreadLocal<Boolean> actualTransactionActive =
   new NamedThreadLocal<>("Actual transaction active");

那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當(dāng)前事務(wù)對(duì)應(yīng)的資源,原因如下:

圖片

開啟新事務(wù)時(shí),事務(wù)相關(guān)資源都被綁定到了thread-cache-pool-1線程對(duì)應(yīng)的threadLocalMap內(nèi)部,而當(dāng)執(zhí)行事務(wù)提交代碼時(shí),commit內(nèi)部需要從TransactionSynchronizationManager中獲取當(dāng)前事務(wù)的資源,顯然我們無法從main線程對(duì)應(yīng)的threadLocalMap中獲取到對(duì)應(yīng)的事務(wù)資源,這也就是異常拋出的原因。

問題分析完了,那么如何解決問題呢?

這里給出一個(gè)我首先想到的簡(jiǎn)單粗暴的方法—CopyTransactionResource—將事務(wù)資源在兩個(gè)線程間來回復(fù)制

這里給出解決后問題后的代碼示例:

package com.user.util;
 
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 多線程事務(wù)一致性管理 <br>
 * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */
@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
     */
    private final DataSource dataSource;
 
    /**
     * 執(zhí)行的是無返回值的任務(wù)
     * @param tasks 異步執(zhí)行的任務(wù)列表
     * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳
     */
    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("線程池不能為空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否發(fā)生了異常
        AtomicBoolean ex=new AtomicBoolean();
 
        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
        List<TransactionResource> transactionResources=new ArrayList<>(tasks.size());
 
        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.開啟新事務(wù)
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.copy事務(wù)資源
                         transactionResources.add(TransactionResource.copyTransactionResource());
                            //3.異步任務(wù)執(zhí)行
                            task.run();
                        }catch (Throwable throwable){
                            //打印異常
                            throwable.printStackTrace();
                            //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
                            ex.set(Boolean.TRUE);
                            //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });
 
        try {
            //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
 
        //發(fā)生了異常則進(jìn)行回滾操作,否則提交
        if(ex.get()){
            System.out.println("發(fā)生異常,全部事務(wù)回滾");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.rollback(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }else {
            System.out.println("全部事務(wù)正常提交");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.commit(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }
    }
 
    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
        //包括隔離級(jí)別和傳播行為等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
        return transactionManager.getTransaction(transactionDef);
    }
 
    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
 
    /**
     * 保存當(dāng)前事務(wù)資源,用于線程間的事務(wù)資源COPY操作
     */
    @Builder
    private static class TransactionResource{
        //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
        private  Map<Object, Object> resources = new HashMap<>();
 
        //下面五個(gè)屬性會(huì)在事務(wù)結(jié)束后被自動(dòng)清理,無需我們手動(dòng)清理
        private  Set<TransactionSynchronization> synchronizations =new HashSet<>();
 
        private  String currentTransactionName;
 
        private Boolean currentTransactionReadOnly;
 
        private Integer currentTransactionIsolationLevel;
 
        private Boolean actualTransactionActive;
 
        public static TransactionResource copyTransactionResource(){
            return TransactionResource.builder()
                    //返回的是不可變集合
                    .resources(TransactionSynchronizationManager.getResourceMap())
                    //如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
                    .synchronizations(new LinkedHashSet<>())
                    .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
                    .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
                    .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
                    .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
                    .build();
        }
 
        public void autoWiredTransactionResource(){
             resources.forEach(TransactionSynchronizationManager::bindResource);
             //如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
             TransactionSynchronizationManager.initSynchronization();
             TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
             TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
             TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
             TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
        }
 
        public void removeTransactionResource() {
            //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
            //DataSource如果重復(fù)移除,unbindResource時(shí)會(huì)因?yàn)椴淮嬖诖薻ey關(guān)聯(lián)的事務(wù)資源而報(bào)錯(cuò)
            resources.keySet().forEach(key->{
                if(!(key instanceof  DataSource)){
                    TransactionSynchronizationManager.unbindResource(key);
                }
            });
        }
    }
}

增加異常拋出,測(cè)試是否能夠保證多線程間的事務(wù)一致性:

@SpringBootTest(classes = UserMain.class)
public class Test {
    @Resource
    private UserMapper userMapper;
    @Resource
    private SignMapper signMapper;
    @Resource
    private MultiplyThreadTransactionManager multiplyThreadTransactionManager;
 
    @SneakyThrows
    @org.junit.jupiter.api.Test
    public void test(){
        List<Runnable> tasks=new ArrayList<>();
 
        tasks.add(()->{
                userMapper.deleteById(26);
                throw new RuntimeException("我就要拋出異常!");
        });
 
        tasks.add(()->{
            signMapper.deleteById(10);
        });
 
        multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
    }
 
}

圖片

事務(wù)都進(jìn)行了回滾,數(shù)據(jù)庫數(shù)據(jù)沒變

到此這篇關(guān)于Spring在多線程環(huán)境下如何確保事務(wù)一致性問題詳解的文章就介紹到這了,更多相關(guān)Spring多線程確保事務(wù)一致性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論