springboot一個(gè)自定義注解如何搞定多線程事務(wù)
springboot多線程(聲明式)的使用方法?
1、springboot提供了注解@Async來(lái)使用線程池,具體使用方法如下:
(1) 在啟動(dòng)類(lèi)(配置類(lèi))添加@EnableAsync來(lái)開(kāi)啟線程池
(2) 在需要開(kāi)啟子線程的方法上添加注解@Async
所以使用時(shí)需要配置自定義線程池,如下:
@Configuration @EnableAsync public class ThreadPoolTaskConfig { @Bean("threadPoolTaskExecutor")//自定義線程池名稱(chēng) public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //線程池創(chuàng)建的核心線程數(shù),線程池維護(hù)線程的最少數(shù)量,即使沒(méi)有任務(wù)需要執(zhí)行,也會(huì)一直存活 executor.setCorePoolSize(16); //如果設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉 //executor.setAllowCoreThreadTimeOut(true); //阻塞隊(duì)列 當(dāng)核心線程數(shù)達(dá)到最大時(shí),新任務(wù)會(huì)放在隊(duì)列中排隊(duì)等待執(zhí)行 executor.setQueueCapacity(124); //最大線程池?cái)?shù)量,當(dāng)線程數(shù)>=corePoolSize,且任務(wù)隊(duì)列已滿時(shí)。線程池會(huì)創(chuàng)建新線程來(lái)處理任務(wù) //任務(wù)隊(duì)列已滿時(shí), 且當(dāng)線程數(shù)=maxPoolSize,,線程池會(huì)拒絕處理任務(wù)而拋出異常 executor.setMaxPoolSize(64); //當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize //允許線程空閑時(shí)間30秒,當(dāng)maxPoolSize的線程在空閑時(shí)間到達(dá)的時(shí)候銷(xiāo)毀 //如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0 executor.setKeepAliveSeconds(30); //spring 提供的 ThreadPoolTaskExecutor 線程池,是有setThreadNamePrefix() 方法的。 //jdk 提供的ThreadPoolExecutor 線程池是沒(méi)有 setThreadNamePrefix() 方法的 executor.setThreadNamePrefix("自定義線程池-"); // rejection-policy:拒絕策略:當(dāng)線程數(shù)已經(jīng)達(dá)到maxSize的時(shí)候,如何處理新任務(wù) // CallerRunsPolicy():交由調(diào)用方線程運(yùn)行,比如 main 線程;如果添加到線程池失敗,那么主線程會(huì)自己去執(zhí)行該任務(wù),不會(huì)等待線程池中的線程去執(zhí)行, (個(gè)人推薦) // AbortPolicy():該策略是線程池的默認(rèn)策略,如果線程池隊(duì)列滿了丟掉這個(gè)任務(wù)并且拋出RejectedExecutionException異常。 // DiscardPolicy():如果線程池隊(duì)列滿了,會(huì)直接丟掉這個(gè)任務(wù)并且不會(huì)有任何異常 // DiscardOldestPolicy():丟棄隊(duì)列中最老的任務(wù),隊(duì)列滿了,會(huì)將最早進(jìn)入隊(duì)列的任務(wù)刪掉騰出空間,再嘗試加入隊(duì)列 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //設(shè)置線程池關(guān)閉的時(shí)候等待所有任務(wù)都完成再繼續(xù)銷(xiāo)毀其他的Bean,這樣這些異步任務(wù)的銷(xiāo)毀就會(huì)先于Redis線程池的銷(xiāo)毀 executor.setWaitForTasksToCompleteOnShutdown(true); //設(shè)置線程池中任務(wù)的等待時(shí)間,如果超過(guò)這個(gè)時(shí)候還沒(méi)有銷(xiāo)毀就強(qiáng)制銷(xiāo)毀,以確保應(yīng)用最后能夠被關(guān)閉,而不是阻塞住。 executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } }
開(kāi)啟子線程方法: 在需要開(kāi)啟線程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的參數(shù)為自定義線程池的名稱(chēng)。
二、自定義注解實(shí)現(xiàn)多線程事務(wù)控制
1.自定義注解
本文是使用了兩個(gè)注解共同作用實(shí)現(xiàn)的,主線程當(dāng)做協(xié)調(diào)者,各子線程作為參與者
package com.example.anno; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 多線程事務(wù)注解: 主事務(wù) * * @author zlj * @since 2022/11/3 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface MainTransaction { int value();//子線程數(shù)量 } package com.example.anno; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 多線程事務(wù)注解: 子事務(wù) * * @author zlj * @since 2022/11/3 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface SonTransaction { String value() default ""; }
解釋?zhuān)?/strong>
兩個(gè)注解都是用在方法上的,須配合@Transactional(rollbackFor = Exception.class)一起使用
- @MainTransaction注解 用在調(diào)用方,其參數(shù)為必填,參數(shù)值為本方法中調(diào)用的方法開(kāi)啟的線程數(shù),如:在這個(gè)方法中調(diào)用的方法中有2個(gè)方法用@Async注解開(kāi)啟了子線程,則參數(shù)為@MainTransaction(2),另外如果未使用@MainTransaction注解,則直接已無(wú)多線程事務(wù)執(zhí)行(不影響方法的單線程事務(wù))
- @SonTransaction注解 用在被調(diào)用方(開(kāi)啟線程的方法),無(wú)需傳入?yún)?shù)
2.AOP內(nèi)容
代碼如下:
package com.example.aop; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.example.anno.MainTransaction; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; /** * 多線程事務(wù) * * @author zlj * @since 2022/11/3 */ @Aspect @Component public class TransactionAop { //用來(lái)存儲(chǔ)各線程計(jì)數(shù)器數(shù)據(jù)(每次執(zhí)行后會(huì)從map中刪除) private static final Map<String, Object> map = new HashMap<>(); @Resource private PlatformTransactionManager transactionManager; @Around("@annotation(mainTransaction)") public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable { //當(dāng)前線程名稱(chēng) Thread thread = Thread.currentThread(); String threadName = thread.getName(); //初始化計(jì)數(shù)器 CountDownLatch mainDownLatch = new CountDownLatch(1); CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的參數(shù), 為子線程的數(shù)量 // 用來(lái)記錄子線程的運(yùn)行狀態(tài),只要有一個(gè)失敗就變?yōu)閠rue AtomicBoolean rollBackFlag = new AtomicBoolean(false); // 用來(lái)存每個(gè)子線程的異常,把每個(gè)線程的自定義異常向vector的首位置插入,其余異常向末位置插入,避免線程不安全,所以使用vector代替list Vector<Throwable> exceptionVector = new Vector<>(); map.put(threadName + "mainDownLatch", mainDownLatch); map.put(threadName + "sonDownLatch", sonDownLatch); map.put(threadName + "rollBackFlag", rollBackFlag); map.put(threadName + "exceptionVector", exceptionVector); try { joinPoint.proceed();//執(zhí)行方法 } catch (Throwable e) { exceptionVector.add(0, e); rollBackFlag.set(true);//子線程回滾 mainDownLatch.countDown();//放行所有子線程 } if (!rollBackFlag.get()) { try { // sonDownLatch等待,直到所有子線程執(zhí)行完插入操作,但此時(shí)還沒(méi)有提交事務(wù) sonDownLatch.await(); mainDownLatch.countDown();// 根據(jù)rollBackFlag狀態(tài)放行子線程的await處,告知是回滾還是提交 } catch (Exception e) { rollBackFlag.set(true); exceptionVector.add(0, e); } } if (CollectionUtils.isNotEmpty(exceptionVector)) { map.remove(threadName + "mainDownLatch"); map.remove(threadName + "sonDownLatch"); map.remove(threadName + "rollBackFlag"); map.remove(threadName + "exceptionVector"); throw exceptionVector.get(0); } } @Around("@annotation(com.huigu.common.anno.SonTransaction)") public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable { Object[] args = joinPoint.getArgs(); Thread thread = (Thread) args[args.length - 1]; String threadName = thread.getName(); CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch"); if (mainDownLatch == null) { //主事務(wù)未加注解時(shí), 直接執(zhí)行子事務(wù) joinPoint.proceed();//這里最好的方式是:交由上面的thread來(lái)調(diào)用此方法,但我沒(méi)有找尋到對(duì)應(yīng)api,只能直接放棄事務(wù), 歡迎大神來(lái)優(yōu)化, 留言分享 return; } CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch"); AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag"); Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector"); //如果這時(shí)有一個(gè)子線程已經(jīng)出錯(cuò),那當(dāng)前線程不需要執(zhí)行 if (rollBackFlag.get()) { sonDownLatch.countDown(); return; } DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 開(kāi)啟事務(wù) def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 設(shè)置事務(wù)隔離級(jí)別 TransactionStatus status = transactionManager.getTransaction(def); try { joinPoint.proceed();//執(zhí)行方法 sonDownLatch.countDown();// 對(duì)sonDownLatch-1 mainDownLatch.await();// 如果mainDownLatch不是0,線程會(huì)在此阻塞,直到mainDownLatch變?yōu)? // 如果能執(zhí)行到這一步說(shuō)明所有子線程都已經(jīng)執(zhí)行完畢判斷如果atomicBoolean是true就回滾false就提交 if (rollBackFlag.get()) { transactionManager.rollback(status); } else { transactionManager.commit(status); } } catch (Throwable e) { exceptionVector.add(0, e); // 回滾 transactionManager.rollback(status); // 并把狀態(tài)設(shè)置為true rollBackFlag.set(true); mainDownLatch.countDown(); sonDownLatch.countDown(); } } }
擴(kuò)展說(shuō)明: CountDownLatch是什么?
一個(gè)同步輔助類(lèi)
- 創(chuàng)建對(duì)象時(shí): 用給定的數(shù)字初始化 CountDownLatch
- countDown() 方法: 使計(jì)數(shù)減1
- await() 方法: 阻塞當(dāng)前線程, 直至當(dāng)前計(jì)數(shù)到達(dá)零。
本文中:
用 計(jì)數(shù) 1 初始化的 mainDownLatch 當(dāng)作一個(gè)簡(jiǎn)單的開(kāi)/關(guān)鎖存器,或入口:在通過(guò)調(diào)用 countDown() 的線程打開(kāi)入口前,所有調(diào)用 await 的線程都一直在入口處等待。
用 子線程數(shù)量 初始化的 sonDownLatch 可以使一個(gè)線程在 N 個(gè)線程完成某項(xiàng)操作之前一直等待,或者使其在某項(xiàng)操作完成 N 次之前一直等待。
3.注解使用Demo
任務(wù)方法:
package com.example.demo.service; import com.example.demo.anno.SonTransaction; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author zlj * @since 2022/11/14 */ @Service public class SonService { /** * 參數(shù)說(shuō)明: 以下4個(gè)方法參數(shù)和此相同 * * @param args 業(yè)務(wù)中需要傳遞的參數(shù) * @param thread 調(diào)用者的線程, 用于aop獲取參數(shù), 不建議以方法重寫(xiě)的方式簡(jiǎn)略此參數(shù), * 在調(diào)用者方法中可以以此參數(shù)為標(biāo)識(shí)計(jì)算子線程的個(gè)數(shù)作為注解參數(shù),避免線程參數(shù)計(jì)算錯(cuò)誤導(dǎo)致鎖表 * 傳參時(shí)參數(shù)固定為: Thread.currentThread() */ @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod1(String args, Thread thread) { System.out.println(args + "開(kāi)啟了線程"); } @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod2(String args1, String args2, Thread thread) { System.out.println(args1 + "和" + args2 + "開(kāi)啟了線程"); } @Transactional(rollbackFor = Exception.class) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod3(String args, Thread thread) { System.out.println(args + "開(kāi)啟了線程"); } //sonMethod4方法沒(méi)有使用線程池 @Transactional(rollbackFor = Exception.class) public void sonMethod4(String args) { System.out.println(args + "沒(méi)有開(kāi)啟線程"); } }
調(diào)用方:
package com.example.demo.service; import com.example.demo.anno.MainTransaction; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zlj * @since 2022/11/14 */ @Service public class MainService { @Resource private SonService sonService; @MainTransaction(3)//調(diào)用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async開(kāi)啟了線程, 所以參數(shù)為: 3 @Transactional(rollbackFor = Exception.class) public void test1() { sonService.sonMethod1("路飛", Thread.currentThread()); sonService.sonMethod2("索隆", "山治", Thread.currentThread()); sonService.sonMethod3("娜美", Thread.currentThread()); sonService.sonMethod4("羅賓"); } /* * 有的業(yè)務(wù)中存在if的多種可能, 每一種走向調(diào)用的方法(開(kāi)啟線程的方法)數(shù)量如果不同, 這時(shí)可以選擇放棄使用@MainTransaction注解避免鎖表 * 這時(shí)候如果發(fā)生異常會(huì)導(dǎo)致多線程不能同時(shí)回滾, 可根據(jù)業(yè)務(wù)自己權(quán)衡是否使用 */ @Transactional(rollbackFor = Exception.class) public void test2() { sonService.sonMethod1("路飛", Thread.currentThread()); sonService.sonMethod2("索隆", "山治", Thread.currentThread()); sonService.sonMethod3("娜美", Thread.currentThread()); sonService.sonMethod4("羅賓"); } }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
C語(yǔ)言實(shí)現(xiàn)矩陣運(yùn)算案例詳解
這篇文章主要介紹了C語(yǔ)言實(shí)現(xiàn)矩陣運(yùn)算案例詳解,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08java中的數(shù)學(xué)計(jì)算函數(shù)的總結(jié)
這篇文章主要介紹了java中的數(shù)學(xué)計(jì)算函數(shù)的總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-07-07Java實(shí)現(xiàn)API sign簽名校驗(yàn)的方法詳解
為了防止中間人攻擊,有時(shí)我們需要進(jìn)行API sign 簽名校驗(yàn)。本文將用Java語(yǔ)言實(shí)現(xiàn)API sign 簽名校驗(yàn),感興趣的小伙伴可以嘗試一下2022-07-07Spring實(shí)戰(zhàn)之使用注解實(shí)現(xiàn)聲明式事務(wù)操作示例
這篇文章主要介紹了Spring實(shí)戰(zhàn)之使用注解實(shí)現(xiàn)聲明式事務(wù)操作,結(jié)合實(shí)例形式詳細(xì)分析了spring使用注解實(shí)現(xiàn)聲明式事務(wù)相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01Java?springBoot初步使用websocket的代碼示例
這篇文章主要介紹了Java?springBoot初步使用websocket的相關(guān)資料,WebSocket是一種實(shí)現(xiàn)實(shí)時(shí)雙向通信的協(xié)議,適用于需要實(shí)時(shí)通信的應(yīng)用程序,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-03-03Java8新特性之接口中的默認(rèn)方法和靜態(tài)方法
這篇文章主要介紹了Java8新特性之接口中的默認(rèn)方法和靜態(tài)方法的相關(guān)資料,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07關(guān)于IDEA2020.1新建項(xiàng)目maven PKIX 報(bào)錯(cuò)問(wèn)題解決方法
這篇文章主要介紹了關(guān)于IDEA2020.1新建項(xiàng)目maven PKIX 報(bào)錯(cuò)問(wèn)題解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06