SpringBoot 多線程事務(wù)回滾的實(shí)現(xiàn)
我們開發(fā)的時(shí)候常常會(huì)遇到多線程事務(wù)的問題。以為添加了@Transactional注解就行了,其實(shí)你加了注解之后會(huì)發(fā)現(xiàn)事務(wù)失效。
原因:數(shù)據(jù)庫連接spring是放在threadLocal里面,多線程場景下,拿到的數(shù)據(jù)庫連接是不一樣的,即是屬于不同事務(wù)。
本文是基于springboot的@Async注解開啟多線程,并通過自定義注解和AOP實(shí)現(xiàn)的多線程事務(wù),避免繁瑣的手動(dòng)提交/回滾事務(wù) (CV即用、參數(shù)齊全、無需配置)
一、springboot多線程(聲明式)的使用方法?
1、springboot提供了注解@Async來使用線程池,具體使用方法如下:
(1) 在啟動(dòng)類(配置類)添加@EnableAsync來開啟線程池
(2) 在需要開啟子線程的方法上添加注解@Async
注意: 框架默認(rèn) -----> 來一個(gè)請(qǐng)求開啟一個(gè)線程,在高并發(fā)下容易內(nèi)存溢出
所以使用時(shí)需要配置自定義線程池,如下:
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")//自定義線程池名稱
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//線程池創(chuàng)建的核心線程數(shù),線程池維護(hù)線程的最少數(shù)量,即使沒有任務(wù)需要執(zhí)行,也會(huì)一直存活
executor.setCorePoolSize(16);
//如果設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉
//executor.setAllowCoreThreadTimeOut(true);
//阻塞隊(duì)列 當(dāng)核心線程數(shù)達(dá)到最大時(shí),新任務(wù)會(huì)放在隊(duì)列中排隊(duì)等待執(zhí)行
executor.setQueueCapacity(124);
//最大線程池?cái)?shù)量,當(dāng)線程數(shù)>=corePoolSize,且任務(wù)隊(duì)列已滿時(shí)。線程池會(huì)創(chuàng)建新線程來處理任務(wù)
//任務(wù)隊(duì)列已滿時(shí), 且當(dāng)線程數(shù)=maxPoolSize,,線程池會(huì)拒絕處理任務(wù)而拋出異常
executor.setMaxPoolSize(64);
//當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize
//允許線程空閑時(shí)間30秒,當(dāng)maxPoolSize的線程在空閑時(shí)間到達(dá)的時(shí)候銷毀
//如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0
executor.setKeepAliveSeconds(30);
//spring 提供的 ThreadPoolTaskExecutor 線程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 線程池是沒有 setThreadNamePrefix() 方法的
executor.setThreadNamePrefix("自定義線程池-");
// rejection-policy:拒絕策略:當(dāng)線程數(shù)已經(jīng)達(dá)到maxSize的時(shí)候,如何處理新任務(wù)
// CallerRunsPolicy():交由調(diào)用方線程運(yùn)行,比如 main 線程;如果添加到線程池失敗,那么主線程會(huì)自己去執(zhí)行該任務(wù),不會(huì)等待線程池中的線程去執(zhí)行, (個(gè)人推薦)
// AbortPolicy():該策略是線程池的默認(rèn)策略,如果線程池隊(duì)列滿了丟掉這個(gè)任務(wù)并且拋出RejectedExecutionException異常。
// DiscardPolicy():如果線程池隊(duì)列滿了,會(huì)直接丟掉這個(gè)任務(wù)并且不會(huì)有任何異常
// DiscardOldestPolicy():丟棄隊(duì)列中最老的任務(wù),隊(duì)列滿了,會(huì)將最早進(jìn)入隊(duì)列的任務(wù)刪掉騰出空間,再嘗試加入隊(duì)列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//設(shè)置線程池關(guān)閉的時(shí)候等待所有任務(wù)都完成再繼續(xù)銷毀其他的Bean,這樣這些異步任務(wù)的銷毀就會(huì)先于Redis線程池的銷毀
executor.setWaitForTasksToCompleteOnShutdown(true);
//設(shè)置線程池中任務(wù)的等待時(shí)間,如果超過這個(gè)時(shí)候還沒有銷毀就強(qiáng)制銷毀,以確保應(yīng)用最后能夠被關(guān)閉,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
開啟子線程方法: 在需要開啟線程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的參數(shù)為自定義線程池的名稱。
二、自定義注解實(shí)現(xiàn)多線程事務(wù)控制
自定義注解
本文是使用了兩個(gè)注解共同作用實(shí)現(xiàn)的,主線程當(dāng)做協(xié)調(diào)者,各子線程作為參與者
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務(wù)注解: 主事務(wù)
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
int value();//子線程數(shù)量
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務(wù)注解: 子事務(wù)
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
String value() default "";
}
解釋:
兩個(gè)注解都是用在方法上的,須配合@Transactional(rollbackFor = Exception.class)一起使用
@MainTransaction注解 用在調(diào)用方,其參數(shù)為必填,參數(shù)值為本方法中調(diào)用的方法開啟的線程數(shù),如:在這個(gè)方法中調(diào)用的方法中有2個(gè)方法用@Async注解開啟了子線程,則參數(shù)為@MainTransaction(2),另外如果未使用@MainTransaction注解,則直接已無多線程事務(wù)執(zhí)行(不影響方法的單線程事務(wù))
@SonTransaction注解 用在被調(diào)用方(開啟線程的方法),無需傳入?yún)?shù)
AOP內(nèi)容
代碼如下:
package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 多線程事務(wù)
*/
@Aspect
@Component
public class TransactionAop {
//用來存儲(chǔ)各線程計(jì)數(shù)器數(shù)據(jù)(每次執(zhí)行后會(huì)從map中刪除)
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
//當(dāng)前線程名稱
Thread thread = Thread.currentThread();
String threadName = thread.getName();
//初始化計(jì)數(shù)器
CountDownLatch mainDownLatch = new CountDownLatch(1);
CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的參數(shù), 為子線程的數(shù)量
// 用來記錄子線程的運(yùn)行狀態(tài),只要有一個(gè)失敗就變?yōu)閠rue
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
// 用來存每個(gè)子線程的異常,把每個(gè)線程的自定義異常向vector的首位置插入,其余異常向末位置插入,避免線程不安全,所以使用vector代替list
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "sonDownLatch", sonDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();//執(zhí)行方法
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);//子線程回滾
mainDownLatch.countDown();//放行所有子線程
}
if (!rollBackFlag.get()) {
try {
// sonDownLatch等待,直到所有子線程執(zhí)行完插入操作,但此時(shí)還沒有提交事務(wù)
sonDownLatch.await();
mainDownLatch.countDown();// 根據(jù)rollBackFlag狀態(tài)放行子線程的await處,告知是回滾還是提交
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "sonDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
throw exceptionVector.get(0);
}
}
@Around("@annotation(com.huigu.common.anno.SonTransaction)")
public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
//主事務(wù)未加注解時(shí), 直接執(zhí)行子事務(wù)
joinPoint.proceed();//這里最好的方式是:交由上面的thread來調(diào)用此方法,但我沒有找尋到對(duì)應(yīng)api,只能直接放棄事務(wù), 歡迎大神來優(yōu)化, 留言分享
return;
}
CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
//如果這時(shí)有一個(gè)子線程已經(jīng)出錯(cuò),那當(dāng)前線程不需要執(zhí)行
if (rollBackFlag.get()) {
sonDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 開啟事務(wù)
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 設(shè)置事務(wù)隔離級(jí)別
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();//執(zhí)行方法
sonDownLatch.countDown();// 對(duì)sonDownLatch-1
mainDownLatch.await();// 如果mainDownLatch不是0,線程會(huì)在此阻塞,直到mainDownLatch變?yōu)?
// 如果能執(zhí)行到這一步說明所有子線程都已經(jīng)執(zhí)行完畢判斷如果atomicBoolean是true就回滾false就提交
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
// 回滾
transactionManager.rollback(status);
// 并把狀態(tài)設(shè)置為true
rollBackFlag.set(true);
mainDownLatch.countDown();
sonDownLatch.countDown();
}
}
}
擴(kuò)展說明: CountDownLatch是什么?
一個(gè)同步輔助類
創(chuàng)建對(duì)象時(shí): 用給定的數(shù)字初始化
CountDownLatchcountDown()方法: 使計(jì)數(shù)減1await()方法: 阻塞當(dāng)前線程, 直至當(dāng)前計(jì)數(shù)到達(dá)零。
本文中:
用 計(jì)數(shù) 1 初始化的 mainDownLatch 當(dāng)作一個(gè)簡單的開/關(guān)鎖存器,或入口:在通過調(diào)用 countDown() 的線程打開入口前,所有調(diào)用 await 的線程都一直在入口處等待。
用 子線程數(shù)量 初始化的 sonDownLatch 可以使一個(gè)線程在 N 個(gè)線程完成某項(xiàng)操作之前一直等待,或者使其在某項(xiàng)操作完成 N 次之前一直等待。
注解使用Demo
任務(wù)方法:
package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 測試子service
*/
@Service
public class SonService {
/**
* 參數(shù)說明: 以下4個(gè)方法參數(shù)和此相同
*
* @param args 業(yè)務(wù)中需要傳遞的參數(shù)
* @param thread 調(diào)用者的線程, 用于aop獲取參數(shù), 不建議以方法重寫的方式簡略此參數(shù),
* 在調(diào)用者方法中可以以此參數(shù)為標(biāo)識(shí)計(jì)算子線程的個(gè)數(shù)作為注解參數(shù),避免線程參數(shù)計(jì)算錯(cuò)誤導(dǎo)致鎖表
* 傳參時(shí)參數(shù)固定為: Thread.currentThread()
*/
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod1(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod2(String args1, String args2, Thread thread) {
System.out.println(args1 + "和" + args2 + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod3(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
//sonMethod4方法沒有使用線程池
@Transactional(rollbackFor = Exception.class)
public void sonMethod4(String args) {
System.out.println(args + "沒有開啟線程");
}
}
調(diào)用方:
package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* 測試主service
*/
@Service
public class MainService {
@Resource
private SonService sonService;
@MainTransaction(3)//調(diào)用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async開啟了線程, 所以參數(shù)為: 3
@Transactional(rollbackFor = Exception.class)
public void test1() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
/*
* 有的業(yè)務(wù)中存在if的多種可能, 每一種走向調(diào)用的方法(開啟線程的方法)數(shù)量如果不同, 這時(shí)可以選擇放棄使用@MainTransaction注解避免鎖表
* 這時(shí)候如果發(fā)生異常會(huì)導(dǎo)致多線程不能同時(shí)回滾, 可根據(jù)業(yè)務(wù)自己權(quán)衡是否使用
*/
@Transactional(rollbackFor = Exception.class)
public void test2() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
}
有的業(yè)務(wù)中存在比較復(fù)雜的分支, 不同情況都會(huì)調(diào)用不同的方法,開啟不同數(shù)量的線程,這時(shí)可以選擇放棄使用@MainTransaction注解避免鎖表,因此在使用過程中,需要根據(jù)自己的權(quán)衡。
到此這篇關(guān)于SpringBoot 多線程事務(wù)回滾的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot 多線程事務(wù)回滾內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot項(xiàng)目中遇到的BUG問題及解決方法
這篇文章主要介紹了SpringBoot項(xiàng)目中遇到的BUG問題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
IntelliJ IDEA連接MySQL數(shù)據(jù)庫詳細(xì)圖解
今天小編就為大家分享一篇關(guān)于intellij idea連接mysql數(shù)據(jù)庫詳細(xì)圖解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-10-10
springboot啟動(dòng)報(bào)錯(cuò):application?startup?failed問題
這篇文章主要介紹了springboot啟動(dòng)報(bào)錯(cuò):application?startup?failed問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
MyEclipse去除網(wǎng)上復(fù)制下來的代碼帶有的行號(hào)(正則去除行號(hào))
這篇文章主要介紹了MyEclipse去除網(wǎng)上復(fù)制下來的代碼帶有的行號(hào)(正則去除行號(hào))的相關(guān)資料,需要的朋友可以參考下2017-10-10
IDEA 2020.2 +Gradle 6.6.1 + Spring Boot 2.3.4 創(chuàng)建多模塊項(xiàng)目的超詳細(xì)教程
這篇文章主要介紹了IDEA 2020.2 +Gradle 6.6.1 + Spring Boot 2.3.4 創(chuàng)建多模塊項(xiàng)目的教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09
如何解決java.net.BindException:地址已在使用問題
當(dāng)Zookeeper啟動(dòng)報(bào)錯(cuò)“java.net.BindException:地址已在使用”時(shí),通常是因?yàn)橹付ǖ亩丝谝驯黄渌M(jìn)程占用,解決這個(gè)問題需要按照以下步驟操作:首先,使用命令如lsof -i:2181找到占用該端口的進(jìn)程號(hào);其次,使用kill命令終止該進(jìn)程2024-09-09

