并行Stream與Spring事務(wù)相遇會發(fā)生什么?
前言:
事情是這樣的:運營人員反饋,通過Excel導(dǎo)入數(shù)據(jù)時,有一部分成功了,有一部分未導(dǎo)入。初步猜測,是事務(wù)未生效導(dǎo)致的。查看代碼,發(fā)現(xiàn)導(dǎo)入部分已經(jīng)通過@Transcational注解進(jìn)行事務(wù)控制了,為什么還會出現(xiàn)事務(wù)不生效的問題呢?下面我們就進(jìn)行具體的案例分析,Let's go!
事務(wù)不生效的代碼
這里寫一段簡單的偽代碼來演示展示一下事務(wù)不生效的代碼:
@Transactional(rollbackFor = Exception.class) public void batchInsert(List<Order> list) { list.parallelStream().forEach(order -> orderMapper.save(order)); }
邏輯很簡單,遍歷list,然后批量插入Order數(shù)據(jù)到數(shù)據(jù)庫。在該方法上使用@Transactional來聲明出現(xiàn)異常時進(jìn)行回滾。
但事實情況是,其中某一條數(shù)據(jù)執(zhí)行異常時,事務(wù)并沒有進(jìn)行回滾。這到底是為什么呢?
下面一探究竟。
JDK 8 的Stream
上面代碼中涉及到了兩個知識點:parallelStream和@Transactional,我們先來鋪墊一下parallelStream相關(guān)知識。
在JDK8 中引入了Stream API的概念和實現(xiàn),這里的Stream有別于 InputStream 和OutputStream,Stream API 是處理對象流而不是字節(jié)流。
比如,我們可以通過如下方式來基于Stream進(jìn)行實現(xiàn):
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.stream().forEach(num->System.out.println(num));
輸出:1 2 3 4 5 6 7 8 9
代碼看起來方便清爽多了。
關(guān)于Stream的基本處理流程如下:
在這些Stream API中,還提供了一個并行處理的API,也就是parallelStream。它可以將任務(wù)拆分子任務(wù),分發(fā)給多個處理器同時處理,之后合并。這樣做的目的很明顯是為了提升處理效率。
parallelStream的基本使用方式如下:
// 并行執(zhí)行流 list.stream().parallel().filter(e -> e > 10).count()
針對上述代碼,對應(yīng)的流程如下:
而parallelStream會將流劃分成多個子流,分散到不同的CPU并行處理,然后合并處理結(jié)果。其中,parallelStream默認(rèn)是基于ForkJoinPool.commonPool()線程池來實現(xiàn)并行處理的。
通常情況下,我們可以認(rèn)為并行會比串行快,但還是有前提條件的:
- 處理器核心數(shù)量:并行處理核心數(shù)越多,處理效率越高;
- 處理數(shù)據(jù)量:處理數(shù)據(jù)量越大優(yōu)勢越明顯;
但并行處理也面臨著一系列的問題,比如:資源競爭、死鎖、線程切換、事務(wù)、可見性、線程安全等問題。
@Transactional事務(wù)處理
上面了解了parallelStream的基本原理及特性之后,再來看看@Transactional的事務(wù)處理特性。
@Transactional是Spring提供的基于注解的一種聲明式事務(wù)方式,該注解只能運用到public的方法上。
基本原理:當(dāng)一個方法被@Transactional注解之后,Spring會基于AOP在方法執(zhí)行之前開啟一個事務(wù)。當(dāng)方法執(zhí)行完畢之后,根據(jù)方法是否報錯,來決定回滾或提交事務(wù)。
在默認(rèn)代理模式下,只有目標(biāo)方法由外部方法調(diào)用時,才能被Spring的事務(wù)攔截器攔截。所以,在同一個類中的兩個方法直接調(diào)用,不會被Spring的事務(wù)攔截器攔截。這是事務(wù)不生效的一個場景,但在上述案例中,并不存在這種情況。
Spring在處理事務(wù)時,會從連接池中獲得一個jdbc connection,將連接綁定到線程上(基于ThreadLocal),那么同一個線程中用到的就是同一個connection了。具體實現(xiàn)在DataSourceTransactionManager#doBegin方法中。
Bug綜合分析
在了解了parallelStream和@Transactional的相關(guān)知識之后,我們會發(fā)現(xiàn):parallelStream處理時開啟了多線程,而@Transactional在處理事務(wù)時會(基于ThreadLocal)將連接綁定到當(dāng)前線程,由于@Transactional綁定管理的是主線程的事務(wù),而parallelStream開啟的新的線程與主線程無關(guān)。因此,事務(wù)也就無效了。
此時,將parallelStream改為普通的stream,事務(wù)可正?;貪L。這就提示我們,在使用基于@Transactional方式管理事務(wù)時,慎重使用多線程處理。
問題拓展
雖然parallelStream帶來了更高的性能,但也要區(qū)分場景進(jìn)行使用。即便是在不需要事務(wù)管理的情況下,如果parallelStream使用不當(dāng),也會造成同一時間對數(shù)據(jù)庫發(fā)起大量請求等問題。
因此,在stream與parallelStream之間進(jìn)行選擇時,還要考慮幾個問題:
- 是否需要并行?數(shù)據(jù)量比較大,處理器核心數(shù)比較多的情況下才會有性能提升。
- 任務(wù)之間是否是獨立的,是否會引起任何競態(tài)條件?比如:是否共享變量。
- 執(zhí)行結(jié)果是否取決于任務(wù)的調(diào)用順序?并行執(zhí)行的順序是不確定的。
小結(jié)
文章講述的Bug雖然簡單,但如果不了解parallelStream與@Transactional注解的特性,還是很難排查的。而且也讓我們意識到,雖然Spring通過@Transactional將事務(wù)管理進(jìn)行了簡化處理,但作為開發(fā)者,還是需要深入了解一下它的基本運作原理。不然,在排查bug時,很容易踩坑。
到此這篇關(guān)于并行Stream與Spring事務(wù)相遇會發(fā)生什么?的文章就介紹到這了,更多相關(guān) Stream與Spring 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis使用JSONObject接收數(shù)據(jù)庫查詢的方法
這篇文章主要介紹了Mybatis使用JSONObject接收數(shù)據(jù)庫查詢,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12SpringBoot使用@valid進(jìn)行參數(shù)校驗的流程步驟
SpringBoot 提供了一種方便的方式來進(jìn)行參數(shù)校驗:使用 Hibernate Validator,Spring Boot 提供了一種方便的方式來進(jìn)行參數(shù)校驗:使用 Hibernate Validator,所以本文給大家介紹了SpringBoot使用@valid進(jìn)行參數(shù)校驗的流程步驟,需要的朋友可以參考下2023-09-09Java語言實現(xiàn)簡單FTP軟件 FTP軟件效果圖預(yù)覽之上傳功能(3)
這篇文章主要為大家詳細(xì)介紹了Java語言實現(xiàn)簡單FTP軟件,F(xiàn)TP軟件效果圖預(yù)覽之上傳功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03ConcurrentHashMap線程安全及實現(xiàn)原理實例解析
這篇文章主要介紹了ConcurrentHashMap線程安全及實現(xiàn)原理實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11自己動手用Springboot實現(xiàn)仿百度網(wǎng)盤的實踐
本項目基于Springboot開發(fā)實現(xiàn),前端采用BootStrap開發(fā)實現(xiàn),模仿百度網(wǎng)盤實現(xiàn)相關(guān)功能,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12