SpringBoot用多線程批量導(dǎo)入數(shù)據(jù)庫實現(xiàn)方法
環(huán)境
springboot、mybatisPlus、mysql8
mysql8(部署在1核2G的服務(wù)器上,很卡,所以下面的數(shù)據(jù)條數(shù)用5000,太大怕不是要等到花兒都謝了 0.0)
原始的for循環(huán)入庫
@Service @Slf4j public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService { @Override @Transactional(rollbackFor = Exception.class) public Object doTest() { long start = System.currentTimeMillis(); List<MoreTestEntity> entityList = new ArrayList<>(); for (int i = 0; i < 5000; i++) { MoreTestEntity entity = new MoreTestEntity(); entity.setId((long) i); entity.setA(UUID.randomUUID().toString()); entity.setB(UUID.randomUUID().toString()); entity.setC(UUID.randomUUID().toString()); entity.setD(UUID.randomUUID().toString()); entity.setE(UUID.randomUUID().toString()); entity.setF(UUID.randomUUID().toString()); entity.setG(UUID.randomUUID().toString()); entity.setH(UUID.randomUUID().toString()); entity.setI(UUID.randomUUID().toString()); entity.setJ(UUID.randomUUID().toString()); entity.setK(UUID.randomUUID().toString()); entityList.add(entity); //在循環(huán)中入庫 baseMapper.insert(entity); } long end = System.currentTimeMillis(); System.err.println(end - start); return end - start; } }
共耗時:180121 ms
批量保存操作
@Service @Slf4j public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService { @Override @Transactional(rollbackFor = Exception.class) public Object doTest() { long start = System.currentTimeMillis(); List<MoreTestEntity> entityList = new ArrayList<>(); for (int i = 0; i < 5000; i++) { MoreTestEntity entity = new MoreTestEntity(); entity.setId((long) i); entity.setA(UUID.randomUUID().toString()); entity.setB(UUID.randomUUID().toString()); entity.setC(UUID.randomUUID().toString()); entity.setD(UUID.randomUUID().toString()); entity.setE(UUID.randomUUID().toString()); entity.setF(UUID.randomUUID().toString()); entity.setG(UUID.randomUUID().toString()); entity.setH(UUID.randomUUID().toString()); entity.setI(UUID.randomUUID().toString()); entity.setJ(UUID.randomUUID().toString()); entity.setK(UUID.randomUUID().toString()); entityList.add(entity); } //mybatisPlus提供的批量保存方法,數(shù)字代表每幾條數(shù)據(jù)提交一次事務(wù),默認1000 saveBatch(entityList, 1000); long end = System.currentTimeMillis(); System.err.println(end - start); return end - start; } }
耗時時間:87217ms
在批量插入的基礎(chǔ)上使用多線程
@Service @Slf4j public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService { @Override @Transactional(rollbackFor = Exception.class) public Object doTest() throws InterruptedException { long start = System.currentTimeMillis(); //手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 5, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), //isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作 new NamedThreadFactory("執(zhí)行線程", false), (r, executor) -> System.out.println("拒絕" + r)); List<MoreTestEntity> entityList = new ArrayList<>(); for (int i = 0; i < 5000; i++) { MoreTestEntity entity = new MoreTestEntity(); entity.setId((long) i); entity.setA(UUID.randomUUID().toString()); entity.setB(UUID.randomUUID().toString()); entity.setC(UUID.randomUUID().toString()); entity.setD(UUID.randomUUID().toString()); entity.setE(UUID.randomUUID().toString()); entity.setF(UUID.randomUUID().toString()); entity.setG(UUID.randomUUID().toString()); entity.setH(UUID.randomUUID().toString()); entity.setI(UUID.randomUUID().toString()); entity.setJ(UUID.randomUUID().toString()); entity.setK(UUID.randomUUID().toString()); entityList.add(entity); } //拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行 List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000); //使用CountDownLatch保證所有線程都執(zhí)行完成 CountDownLatch latch = new CountDownLatch(5); partition.forEach(item -> { poolExecutor.execute(() -> { saveBatch(item, 1000); latch.countDown(); }); }); latch.await(); // 也可以這么寫,設(shè)定超時時間 //latch.await(100,TimeUnit.SECONDS); long end = System.currentTimeMillis(); System.err.println(end - start); //關(guān)閉線程池 poolExecutor.shutdown(); return end - start; } }
耗時時間: 28235
可見時間從180秒,縮短到了28秒,但是@Transactional對于多線程是控制不了所有的事務(wù)的。
Spring實現(xiàn)事務(wù)的原理是通過ThreadLocal把數(shù)據(jù)庫連接綁定到當前線程中,同一個事務(wù)中數(shù)據(jù)庫操作使用同一個jdbc connection,新開啟的線程獲取不到當前jdbc connection。
如下代碼:
partition.forEach(item -> { poolExecutor.execute(() -> { saveBatch(item, 1000); latch.countDown(); //讓每個都報錯 int i = 1/0; }); });
控制臺打印:
Exception in thread "執(zhí)行線程5" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程2" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程4" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程1" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程3" 30179
java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
可見5個線程都報錯了,但是去查詢數(shù)據(jù)庫,卻可以查詢到5000條數(shù)據(jù),這是不應(yīng)該出現(xiàn)的情況。
處理多線程入庫的事務(wù)問題
@Service @Slf4j public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService { @Resource private DataSourceTransactionManager dataSourceTransactionManager; @Resource private TransactionDefinition transactionDefinition; @Override //此處手動管理事務(wù)的提交后,這個注解就可以去掉了 // @Transactional(rollbackFor = Exception.class) public Object doTest() { long start = System.currentTimeMillis(); //手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 5, 5, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), //isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作 new NamedThreadFactory("執(zhí)行線程", false), (r, executor) -> System.out.println("拒絕" + r)); List<MoreTestEntity> entityList = new ArrayList<>(); for (int i = 0; i < 50; i++) { MoreTestEntity entity = new MoreTestEntity(); entity.setId((long) i); entity.setA(UUID.randomUUID().toString()); entity.setB(UUID.randomUUID().toString()); entity.setC(UUID.randomUUID().toString()); entity.setD(UUID.randomUUID().toString()); entity.setE(UUID.randomUUID().toString()); entity.setF(UUID.randomUUID().toString()); entity.setG(UUID.randomUUID().toString()); entity.setH(UUID.randomUUID().toString()); entity.setI(UUID.randomUUID().toString()); entity.setJ(UUID.randomUUID().toString()); entity.setK(UUID.randomUUID().toString()); entityList.add(entity); } //拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行 List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10); //使用CountDownLatch保證所有線程都執(zhí)行完成 CountDownLatch sonLatch = new CountDownLatch(5); //主線程的 肯定為1 CountDownLatch mainLatch = new CountDownLatch(1); AtomicBoolean hasError = new AtomicBoolean(false); partition.forEach(item -> { poolExecutor.execute(() -> { doSave(item, sonLatch, hasError, mainLatch); }); }); try { //此處應(yīng)該是用try catch 包裹著主線程的所有業(yè)務(wù)代碼,以此保證主線程中任何一處報錯都可以通知子線程 //這里加一個是為了調(diào)試主線程中的數(shù)據(jù)入庫操作 MoreTestEntity entity = new MoreTestEntity(); entity.setId((long) 99999); entity.setA(UUID.randomUUID().toString()); entity.setB(UUID.randomUUID().toString()); entity.setC(UUID.randomUUID().toString()); entity.setD(UUID.randomUUID().toString()); entity.setE(UUID.randomUUID().toString()); entity.setF(UUID.randomUUID().toString()); entity.setG(UUID.randomUUID().toString()); entity.setH(UUID.randomUUID().toString()); entity.setI(UUID.randomUUID().toString()); entity.setJ(UUID.randomUUID().toString()); entity.setK(UUID.randomUUID().toString()); save(entity); //主線程報錯 int i = 10 / 0; sonLatch.await(); } catch (InterruptedException e) { hasError.set(true); e.printStackTrace(); } mainLatch.countDown(); long end = System.currentTimeMillis(); System.err.println(end - start); //關(guān)閉線程池 if (!poolExecutor.isShutdown()) { poolExecutor.shutdown(); } return end - start; } /** * 包裝后的子線程的保存代碼 * * @param entityList 要保存的集合 * @param sonLatch 子線程 CountDownLatch * @param hasError 是否發(fā)生錯誤 * @param mainLatch 主線程 CountDownLatch */ private void doSave(List<MoreTestEntity> entityList, CountDownLatch sonLatch, AtomicBoolean hasError, CountDownLatch mainLatch) { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { // //子線程報錯 // int i = 10 / 0; saveBatch(entityList); } catch (Throwable throwable) { throwable.printStackTrace(); hasError.set(true); } finally { //這是必須的,每個子線程走完,要讓主線程繼續(xù)走,然后再回到子線程的每個任務(wù),決定是提交還是回滾 sonLatch.countDown(); } try { //等待主線程的執(zhí)行結(jié)束 mainLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); hasError.set(true); } //事務(wù)操作 if (hasError.get()) { dataSourceTransactionManager.rollback(transactionStatus); } else { dataSourceTransactionManager.commit(transactionStatus); } } }
分別放開子線程報錯和主線程報錯,會發(fā)現(xiàn)事務(wù)都可以正?;貪L,達到了預(yù)期的效果。
主要思路就是通過子線程CountDownLatch和主線程CountDownLatch,控制線程好代碼的執(zhí)行順序即可。
最后補充幾點:
- 上述代碼中的countDown()一旦出現(xiàn)不執(zhí)行的情況那會導(dǎo)致線程堵塞堆積,所以建議給await()增加超時時間
- 這樣操作可能還會出現(xiàn)問題,比如主線程通知子線程可以進行實務(wù)操作了,但是各個子線程之間非透明,所以還是有幾率存在某個子線程事務(wù)回滾失敗的情況。
到此這篇關(guān)于SpringBoot用多線程批量導(dǎo)入數(shù)據(jù)庫實現(xiàn)方法的文章就介紹到這了,更多相關(guān)SpringBoot多線程導(dǎo)入數(shù)據(jù)庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中finally關(guān)鍵字對返回值的影響詳解
這篇文章主要介紹了Java中finally關(guān)鍵字對返回值的影響詳解,執(zhí)行完try catch里面內(nèi)容準備return時,如果還有finally需要執(zhí)行這是編譯器會為我們增加一個全局變量去暫存return 的值,等到finally執(zhí)行完成去return這個全局變量,需要的朋友可以參考下2024-01-01使用JSONObject.toJSONString 過濾掉值為空的key
這篇文章主要介紹了使用JSONObject.toJSONString 過濾掉值為空的key,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03詳述IntelliJ IDEA遠程調(diào)試Tomcat的方法(圖文)
本篇文章主要介紹了詳述IntelliJ IDEA遠程調(diào)試Tomcat的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12Java面試題沖刺第十二天--數(shù)據(jù)庫(2)
這篇文章主要為大家分享了最有價值的三道數(shù)據(jù)庫面試題,涵蓋內(nèi)容全面,包括數(shù)據(jù)結(jié)構(gòu)和算法相關(guān)的題目、經(jīng)典面試編程題等,感興趣的小伙伴們可以參考一下2021-07-07java實現(xiàn)二叉樹的創(chuàng)建及5種遍歷方法(總結(jié))
下面小編就為大家?guī)硪黄猨ava實現(xiàn)二叉樹的創(chuàng)建及5種遍歷方法(總結(jié))。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04