欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot用多線程批量導(dǎo)入數(shù)據(jù)庫實現(xiàn)方法

 更新時間:2023年02月03日 10:24:29   作者:愿做無知一猿  
這篇文章主要介紹了SpringBoot用多線程批量導(dǎo)入數(shù)據(jù)庫實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

環(huán)境

springboot、mybatisPlus、mysql8

mysql8(部署在1核2G的服務(wù)器上,很卡,所以下面的數(shù)據(jù)條數(shù)用5000,太大怕不是要等到花兒都謝了 0.0)

原始的for循環(huán)入庫

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
            //在循環(huán)中入庫
            baseMapper.insert(entity);
        }
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

共耗時:180121 ms

批量保存操作

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
      	//mybatisPlus提供的批量保存方法,數(shù)字代表每幾條數(shù)據(jù)提交一次事務(wù),默認1000
        saveBatch(entityList, 1000);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

耗時時間:87217ms

在批量插入的基礎(chǔ)上使用多線程

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() throws InterruptedException {
        long start = System.currentTimeMillis();
        //手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作
                new NamedThreadFactory("執(zhí)行線程", false),
                (r, executor) -> System.out.println("拒絕" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
        //使用CountDownLatch保證所有線程都執(zhí)行完成
        CountDownLatch latch = new CountDownLatch(5);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
            });
        });
        latch.await();
        // 也可以這么寫,設(shè)定超時時間
        //latch.await(100,TimeUnit.SECONDS);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //關(guān)閉線程池
        poolExecutor.shutdown();
        return end - start;
    }
}

耗時時間: 28235

可見時間從180秒,縮短到了28秒,但是@Transactional對于多線程是控制不了所有的事務(wù)的。

Spring實現(xiàn)事務(wù)的原理是通過ThreadLocal把數(shù)據(jù)庫連接綁定到當前線程中,同一個事務(wù)中數(shù)據(jù)庫操作使用同一個jdbc connection,新開啟的線程獲取不到當前jdbc connection。

如下代碼:

partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
                //讓每個都報錯
                int i = 1/0;
            });
        });

控制臺打印:

Exception in thread "執(zhí)行線程5" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程2" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程4" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程1" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程3" 30179
java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

可見5個線程都報錯了,但是去查詢數(shù)據(jù)庫,卻可以查詢到5000條數(shù)據(jù),這是不應(yīng)該出現(xiàn)的情況。

處理多線程入庫的事務(wù)問題

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Resource
    private TransactionDefinition transactionDefinition;
    @Override
    //此處手動管理事務(wù)的提交后,這個注解就可以去掉了
    //    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        //手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作
                new NamedThreadFactory("執(zhí)行線程", false),
                (r, executor) -> System.out.println("拒絕" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
        //使用CountDownLatch保證所有線程都執(zhí)行完成
        CountDownLatch sonLatch = new CountDownLatch(5);
        //主線程的 肯定為1
        CountDownLatch mainLatch = new CountDownLatch(1);
        AtomicBoolean hasError = new AtomicBoolean(false);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                doSave(item, sonLatch, hasError, mainLatch);
            });
        });
        try {
            //此處應(yīng)該是用try catch 包裹著主線程的所有業(yè)務(wù)代碼,以此保證主線程中任何一處報錯都可以通知子線程
            //這里加一個是為了調(diào)試主線程中的數(shù)據(jù)入庫操作
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) 99999);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            save(entity);
            //主線程報錯
            int i = 10 / 0;
            sonLatch.await();
        } catch (InterruptedException e) {
            hasError.set(true);
            e.printStackTrace();
        }
        mainLatch.countDown();
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //關(guān)閉線程池
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
        return end - start;
    }
    /**
     * 包裝后的子線程的保存代碼
     *
     * @param entityList 要保存的集合
     * @param sonLatch   子線程 CountDownLatch
     * @param hasError   是否發(fā)生錯誤
     * @param mainLatch  主線程 CountDownLatch
     */
    private void doSave(List<MoreTestEntity> entityList,
                        CountDownLatch sonLatch,
                        AtomicBoolean hasError,
                        CountDownLatch mainLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            //            //子線程報錯
            //            int i = 10 / 0;
            saveBatch(entityList);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            hasError.set(true);
        } finally {
            //這是必須的,每個子線程走完,要讓主線程繼續(xù)走,然后再回到子線程的每個任務(wù),決定是提交還是回滾
            sonLatch.countDown();
        }
        try {
            //等待主線程的執(zhí)行結(jié)束
            mainLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            hasError.set(true);
        }
        //事務(wù)操作
        if (hasError.get()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }
    }
}

分別放開子線程報錯和主線程報錯,會發(fā)現(xiàn)事務(wù)都可以正?;貪L,達到了預(yù)期的效果。

主要思路就是通過子線程CountDownLatch和主線程CountDownLatch,控制線程好代碼的執(zhí)行順序即可。

最后補充幾點:

  • 上述代碼中的countDown()一旦出現(xiàn)不執(zhí)行的情況那會導(dǎo)致線程堵塞堆積,所以建議給await()增加超時時間
  • 這樣操作可能還會出現(xiàn)問題,比如主線程通知子線程可以進行實務(wù)操作了,但是各個子線程之間非透明,所以還是有幾率存在某個子線程事務(wù)回滾失敗的情況。

到此這篇關(guān)于SpringBoot用多線程批量導(dǎo)入數(shù)據(jù)庫實現(xiàn)方法的文章就介紹到這了,更多相關(guān)SpringBoot多線程導(dǎo)入數(shù)據(jù)庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java實現(xiàn)猜拳游戲

    java實現(xiàn)猜拳游戲

    這篇文章主要為大家詳細介紹了java實現(xiàn)猜拳游戲,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-08-08
  • Java中finally關(guān)鍵字對返回值的影響詳解

    Java中finally關(guān)鍵字對返回值的影響詳解

    這篇文章主要介紹了Java中finally關(guān)鍵字對返回值的影響詳解,執(zhí)行完try catch里面內(nèi)容準備return時,如果還有finally需要執(zhí)行這是編譯器會為我們增加一個全局變量去暫存return 的值,等到finally執(zhí)行完成去return這個全局變量,需要的朋友可以參考下
    2024-01-01
  • SpringBoot數(shù)據(jù)層處理方案精講

    SpringBoot數(shù)據(jù)層處理方案精講

    這篇文章主要介紹了SpringBoot數(shù)據(jù)層技術(shù)的解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧
    2022-10-10
  • 詳解Java中wait和sleep的區(qū)別

    詳解Java中wait和sleep的區(qū)別

    這篇文章主要介紹了Java中wait和sleep的區(qū)別,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • 使用JSONObject.toJSONString 過濾掉值為空的key

    使用JSONObject.toJSONString 過濾掉值為空的key

    這篇文章主要介紹了使用JSONObject.toJSONString 過濾掉值為空的key,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java實現(xiàn)發(fā)送短信驗證碼功能

    Java實現(xiàn)發(fā)送短信驗證碼功能

    這篇文章主要為大家詳細介紹了Java實現(xiàn)發(fā)送短信驗證碼功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • java hashtable實現(xiàn)代碼

    java hashtable實現(xiàn)代碼

    這篇文章介紹了java hashtable實現(xiàn)代碼,有需要的朋友可以參考一下
    2013-10-10
  • 詳述IntelliJ IDEA遠程調(diào)試Tomcat的方法(圖文)

    詳述IntelliJ IDEA遠程調(diào)試Tomcat的方法(圖文)

    本篇文章主要介紹了詳述IntelliJ IDEA遠程調(diào)試Tomcat的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-12-12
  • Java面試題沖刺第十二天--數(shù)據(jù)庫(2)

    Java面試題沖刺第十二天--數(shù)據(jù)庫(2)

    這篇文章主要為大家分享了最有價值的三道數(shù)據(jù)庫面試題,涵蓋內(nèi)容全面,包括數(shù)據(jù)結(jié)構(gòu)和算法相關(guān)的題目、經(jīng)典面試編程題等,感興趣的小伙伴們可以參考一下
    2021-07-07
  • java實現(xiàn)二叉樹的創(chuàng)建及5種遍歷方法(總結(jié))

    java實現(xiàn)二叉樹的創(chuàng)建及5種遍歷方法(總結(jié))

    下面小編就為大家?guī)硪黄猨ava實現(xiàn)二叉樹的創(chuàng)建及5種遍歷方法(總結(jié))。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-04-04

最新評論