mybatisPlus批量插入優(yōu)化加快性能
1.背景
? 由于之前一個同事問我mybatisPlus的IService層的saveBatch(Collection entityList)方法批量插入速度實在是太慢了,能否優(yōu)化下?我過去跟他看了下具體的需求是這樣的,一張業(yè)務表中有30多萬的業(yè)務數(shù)據(jù),表里的一個字段是一個json的數(shù)據(jù)字段,要把30多萬的數(shù)據(jù)查出來針對這個json字段解析之后存入另外一張表中,后面他使用了mybatisPlus的IService層的saveBatch發(fā)現(xiàn)好慢好慢,,去重是把分頁查詢出來插入數(shù)據(jù)的最后一條數(shù)據(jù)的id存入redis中,每次分頁查詢數(shù)據(jù)排序之后id比上一次處理的最后一條數(shù)據(jù)的id大的數(shù)據(jù),如果比該條數(shù)據(jù)的id小則忽略,后面我給他看了下使用saveBatch插入一批數(shù)據(jù)1000條大概要好幾十分鐘的,之前他使用的for循環(huán)里面insert每一條處理好的數(shù)據(jù),這種是真的非常的慢,慢到你懷疑人生,這個insert方法我在下面的demo中準備數(shù)據(jù),往一個表里插入30萬條數(shù)據(jù)的時候使用了下實在是太慢了,后面我使用了一個mybatisPlus的sql注入器的一個批量的方法:
Integer insertBatchSomeColumn(Collection<UserEntity> entityList);
? 使用了該方法之后,在一個測試方法中使用主線程插入模擬的30萬數(shù)據(jù)到一個user的表中只用了:23.209105秒,然后使用多線程異步插入用了:12.1030808秒,這個時間跟機器的性能和網(wǎng)路有關,所以每一次執(zhí)行這個時間會有所不同的,由于我們使用的數(shù)據(jù)庫是云數(shù)據(jù)庫,所以插入需要走網(wǎng)絡,沒有使用本地的安裝的mysql數(shù)據(jù)庫,使用這個sql批量插入的注入器給那個同事優(yōu)化了一波之后,他原來使用insertr處理要10個小時后面使用saveBatch也要在2個小時以上,后面優(yōu)化之后大概估計只要20分鐘不到就完了,但是我后面使用這個demo用30萬數(shù)據(jù),解析一個json字段入庫到一張表中是非??斓?,就花了10多秒~30多秒的時間就干完了,所以說他那個需求還有優(yōu)化的空間,后面數(shù)據(jù)全部跑完之后就不用了,不是我搞所以就能用就行,比原來的效率也是提升了一大截。
2.方案
? 查詢可以使用mybatis的原生sql查詢,需要寫一個批量插入的方法insert標簽中使用foreach遍歷一個list然后插入逐條插入數(shù)據(jù)即可,但是這種需要寫大量的代碼,還要處理每個插入的字段為空的判斷處理,不然遇到插入字段為null就會報錯,讓人很頭疼,所以還是使用mybatisPlus的sql批量注入器基本不需要寫啥代碼即可實現(xiàn)批量插入操作,而且性能和效率還是杠桿的。
? mybatisPlus的IService層的saveBatch為啥慢?看了下它的源碼發(fā)現(xiàn)底層封裝還是比較深,本質上底層還是一條一條的去插入數(shù)據(jù),所以才會慢。
? mybatisPlus的sql注入器有一下幾個可以給我們擴展使用的類:
AlwaysUpdateSomeColumnById
根據(jù)Id更新每一個字段,全量更新不忽略null字段,解決mybatis-plus中updateById默認會自動忽略實體中null值字段不去更新的問題。InsertBatchSomeColumn
真實批量插入,通過單SQL的insert語句實現(xiàn)批量插入DeleteByIdWithFill
帶自動填充的邏輯刪除,比如自動填充更新時間、操作人Upsert
更新or插入,根據(jù)唯一約束判斷是執(zhí)行更新還是刪除,相當于提供insert on duplicate key update支持
? 項目中批量sql注入器的使用如下:
? 新增一個 MySqlInjector類:
package com.zlf.sb.demo.sqlinjector; import com.baomidou.mybatisplus.core.injector.AbstractMethod; import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn; import java.util.List; public class MySqlInjector extends DefaultSqlInjector { @Override public List<AbstractMethod> getMethodList(Class<?> mapperClass) { List<AbstractMethod> methodList = super.getMethodList(mapperClass); methodList.add(new InsertBatchSomeColumn());//添加批量插入方法 return methodList; } }
? 將MySqlInjector交給Spring容器管理:新增一個MybatisPlusConfig類如下:
package com.zlf.sb.demo.config; import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; import com.baomidou.mybatisplus.extension.plugins.pagination.optimize.JsqlParserCountOptimize; import com.zlf.sb.demo.sqlinjector.MySqlInjector; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration public class MybatisPlusConfig { @Bean public PaginationInterceptor paginationInterceptor() { PaginationInterceptor paginationInterceptor = new PaginationInterceptor(); // 設置請求的頁面大于最大頁后操作, true調回到首頁,false 繼續(xù)請求 默認false // paginationInterceptor.setOverflow(false); // 設置最大單頁限制數(shù)量,默認 500 條,-1 不受限制 paginationInterceptor.setLimit(-1); // 這里也是一個大坑,需要注意的 // 開啟 count 的 join 優(yōu)化,只針對部分 left join paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true)); return paginationInterceptor; } @Bean @Primary//批量插入配置 public MySqlInjector mySqlInjector() { return new MySqlInjector(); } }
? UserMapper接口中加入insertBatchSomeColumn方法:
package com.zlf.sb.demo.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.zlf.sb.demo.entity.UserEntity; import org.apache.ibatis.annotations.Options; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.cursor.Cursor; import org.apache.ibatis.mapping.ResultSetType; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Repository; import java.util.Collection; @Scope("prototype") @Repository public interface UserMapper extends BaseMapper<UserEntity> { /** * 批量插入 僅適用于mysql * * @param entityList 實體列表 * @return 影響行數(shù) */ Integer insertBatchSomeColumn(Collection<UserEntity> entityList); /** * 統(tǒng)計總數(shù) * * @return */ @Select("SELECT count(*) FROM user") Integer countTotal(); /** * 分頁游標查詢 * * @return */ @Options(resultSetType = ResultSetType.FORWARD_ONLY) @Select("SELECT * FROM user ORDER BY create_time DESC LIMIT #{start}, #{pageSize} ") Cursor<UserEntity> getCursorPageData(@Param("start") Integer start, @Param("pageSize") Integer pageSize); }
? 準備了兩張表:user,user_json_data,user表中有個json_data的字段是一個存json格式數(shù)據(jù)的字段,然后準備30萬的數(shù)據(jù),向user表中插入30萬條構造的數(shù)據(jù),SpringBootDemoApplicationTests中代碼如下:
//@Test public void addUserDataTest() { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); int totalData = 300000; int splitSize = 10000; int splitPages = (totalData + splitSize - 1) / splitSize; /*for (int i = 1; i <= splitPages; i++) { //起始條數(shù) int firstIndex = (i - 1) * splitSize + 1; //截止條數(shù) int lastIndex = i * splitSize; List<UserEntity> userEntityList = new ArrayList<>(); for (int j = firstIndex; j <= lastIndex; j++) { UserEntity user = new UserEntity(); user.setName("zlf" + j); user.setAge(j); JSONObject jo = new JSONObject(); try { jo.put("name", "zlf" + j); jo.put("age", j); } catch (JSONException e) { throw new RuntimeException(e); } //log.info("json:{}",j.toString()); user.setJsonData(jo.toString()); //userMapper.insert(user); userEntityList.add(user); } //userService.saveBatch(userEntityList); userMapper.insertBatchSomeColumn(userEntityList); } // 結束時間 stopWatch.stop(); log.info("執(zhí)行時長:{}秒", stopWatch.getTotalTimeSeconds());*/ //執(zhí)行時長:23.209105秒 // 異步 ThreadPoolExecutor executor = ThreadPoolService.getInstance(); AtomicInteger count = new AtomicInteger(0); CountDownLatch countDownLatch = new CountDownLatch(splitPages); for (int i = 1; i <= splitPages; i++) { int finalI = i; CompletableFuture.runAsync(() -> { //起始條數(shù) int firstIndex = (finalI - 1) * splitSize + 1; //截止條數(shù) int lastIndex = finalI * splitSize; List<UserEntity> userEntityList = new ArrayList<>(); for (int j = firstIndex; j <= lastIndex; j++) { UserEntity user = new UserEntity(); user.setName("zlf" + j); user.setAge(j); JSONObject jo = new JSONObject(); try { jo.put("name", "zlf" + j); jo.put("age", j); } catch (JSONException e) { throw new RuntimeException(e); } //log.info("json:{}",j.toString()); user.setJsonData(jo.toString()); //userMapper.insert(user); userEntityList.add(user); } count.getAndAdd(userEntityList.size()); userMapper.insertBatchSomeColumn(userEntityList); //userService.saveBatch(userEntityList); countDownLatch.countDown(); }, executor); } try { countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會走下面的; } catch (InterruptedException e) { throw new RuntimeException(e); } // 結束時間 stopWatch.stop(); log.info("total:{},執(zhí)行時長:{}秒", count.get(), stopWatch.getTotalTimeSeconds()); // total:300000,執(zhí)行時長:12.1030808秒 }
2.1 多線程分頁查詢 、 生產(chǎn)者消費者模型、多線程sql注入器批量插入
? 測試用例的方法:
@Test public void queryInsertDataTest() { userService.queryInsertData(); }
? UserServiceImpl類中的方法:
/** * 多線程分頁查詢 * 生產(chǎn)者+消費者模型 * 多線程插入 */ @Override public void queryInsertData() { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); Page<UserEntity> page = new Page<>(1, 10); QueryWrapper<UserEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().orderByDesc(UserEntity::getCreateTime); Page<UserEntity> pageResult = this.getBaseMapper().selectPage(page, queryWrapper); int pageSize = 10000; if (Objects.nonNull(pageResult)) { long total = pageResult.getTotal(); int pages = (int) ((total + pageSize - 1) / pageSize); log.info("pages:{}", pages); ThreadPoolExecutor executor = ThreadPoolService.getInstance(); CountDownLatch countDownLatch1 = new CountDownLatch(pages); CountDownLatch countDownLatch2 = new CountDownLatch(pages); LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue = userJsonDataService.getLinkedBlockingQueue(); for (int i = 1; i <= pages; i++) { try { QueryDoBizProducer queryDoBizProducer = new QueryDoBizProducer(linkedBlockingQueue, i, queryCount, countDownLatch1, pageSize); CompletableFuture.runAsync(queryDoBizProducer, executor); } catch (Exception e) { log.info("異常1:{}", e.getMessage()); } } try { countDownLatch1.await(); } catch (InterruptedException e) { log.info("異常2:{}", e.getMessage()); } for (int i = 1; i <= pages; i++) { try { InsertDataConsumer insertDataConsumer = new InsertDataConsumer(linkedBlockingQueue, insertCount, countDownLatch2); CompletableFuture.runAsync(insertDataConsumer, executor); } catch (Exception e) { log.info("異常3:{}", e.getMessage()); } } try { countDownLatch2.await(); } catch (InterruptedException e) { log.info("異常4:{}", e.getMessage()); } log.info("queryCount:{}", queryCount); log.info("insertCount:{}", insertCount); } // 結束時間 stopWatch.stop(); log.info("執(zhí)行時長:{}秒", stopWatch.getTotalTimeSeconds()); // 執(zhí)行時長:18.903922秒 }
? 自定義單例線程池ThreadPoolService:
package com.zlf.sb.demo.service; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author zlf * @description: * @time: 2022/7/13 10:18 */ public class ThreadPoolService { /** * CPU 密集型:核心線程數(shù) = CPU核數(shù) + 1 * <p> * IO 密集型:核心線程數(shù) = CPU核數(shù) * 2 * <p> * 注意:IO密集型 (某大廠實戰(zhàn)經(jīng)驗) * 核心線程數(shù) = CPU核數(shù) / (1 - 阻塞系數(shù)) * 例如阻塞系數(shù)為0.8 ,CPU核數(shù)為 4 ,則核心線程數(shù)為 20 */ private static final int DEFAULT_CORE_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final int MAX_QUEUE_SIZE = 100; private static final int QUEUE_INIT_MAX_SIZE = 200; private volatile static ThreadPoolExecutor executor; private ThreadPoolService() { } // 獲取單例的線程池對象 public static ThreadPoolExecutor getInstance() { if (executor == null) { synchronized (ThreadPoolService.class) { if (executor == null) { executor = new ThreadPoolExecutor(DEFAULT_CORE_SIZE,// 核心線程數(shù) MAX_QUEUE_SIZE, // 最大線程數(shù) Integer.MAX_VALUE, // 閑置線程存活時間 TimeUnit.MILLISECONDS,// 時間單位 new LinkedBlockingDeque<Runnable>(QUEUE_INIT_MAX_SIZE),// 線程隊列 Executors.defaultThreadFactory()// 線程工廠 ); } } } return executor; } public void execute(Runnable runnable) { if (runnable == null) { return; } executor.execute(runnable); } // 從線程隊列中移除對象 public void cancel(Runnable runnable) { if (executor != null) { executor.getQueue().remove(runnable); } } }
? 生產(chǎn)者代碼:
package com.zlf.sb.demo.producer; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.zlf.sb.demo.entity.UserEntity; import com.zlf.sb.demo.entity.UserJsonDataEntity; import com.zlf.sb.demo.mapper.UserMapper; import com.zlf.sb.demo.util.SpringUtils; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class QueryDoBizProducer implements Runnable { private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue; private AtomicInteger count; private UserMapper mapper; private long current; private int pageSize; private CountDownLatch countDownLatch; public QueryDoBizProducer(LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue, long current, AtomicInteger count, CountDownLatch countDownLatch, int pageSize) throws Exception { this.linkedBlockingQueue = linkedBlockingQueue; this.mapper = SpringUtils.getBean(UserMapper.class); this.current = current; this.count = count; this.countDownLatch = countDownLatch; this.pageSize = pageSize; } @Override public void run() { try { Page<UserEntity> page2 = new Page<>(current, pageSize); QueryWrapper<UserEntity> queryWrapper2 = new QueryWrapper<>(); queryWrapper2.lambda().orderByDesc(UserEntity::getCreateTime); Page<UserEntity> pageData = mapper.selectPage(page2, queryWrapper2); if (Objects.nonNull(pageData)) { List<UserEntity> records = pageData.getRecords(); List<UserJsonDataEntity> list = new ArrayList<>(); for (UserEntity rs : records) { UserJsonDataEntity jd = JSONObject.parseObject(rs.getJsonData(), new TypeReference<UserJsonDataEntity>() { }); list.add(jd); } linkedBlockingQueue.put(list); count.getAndAdd(list.size()); log.info("生產(chǎn)者查詢數(shù)據(jù)放入隊列完成---->ThreadName:{}", Thread.currentThread().getName()); } countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); log.error("加入隊列異常:{}", e.getMessage()); } } }
? 消費者代碼:
package com.zlf.sb.demo.consumer; import cn.hutool.core.collection.CollectionUtil; import com.alibaba.fastjson.JSON; import com.zlf.sb.demo.entity.UserJsonDataEntity; import com.zlf.sb.demo.mapper.UserJsonDataMapper; import com.zlf.sb.demo.util.SpringUtils; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class InsertDataConsumer implements Runnable { private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue; private AtomicInteger count; private UserJsonDataMapper mapper; private CountDownLatch countDownLatch; public InsertDataConsumer(LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue, AtomicInteger count, CountDownLatch countDownLatch) { this.linkedBlockingQueue = linkedBlockingQueue; this.count = count; this.mapper = SpringUtils.getBean(UserJsonDataMapper.class); this.countDownLatch = countDownLatch; } @Override public void run() { if (CollectionUtil.isNotEmpty(linkedBlockingQueue)) { try { List<UserJsonDataEntity> dataList = linkedBlockingQueue.take(); if (CollectionUtil.isNotEmpty(dataList)) { log.info("dataList:{}", JSON.toJSONString(dataList)); mapper.insertBatchSomeColumn(dataList); log.info("消費者插入數(shù)據(jù)完成---->ThreadName:{}", Thread.currentThread().getName()); count.getAndAdd(dataList.size()); } countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); log.error("消費者插入數(shù)據(jù):{}", e.getMessage()); } } } }
? 阻塞隊列在UserJsonDataServiceImpl類中:
package com.zlf.sb.demo.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.zlf.sb.demo.entity.UserJsonDataEntity; import com.zlf.sb.demo.mapper.UserJsonDataMapper; import com.zlf.sb.demo.service.IUserJsonDataService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @Data @Service @Slf4j public class UserJsonDataServiceImpl extends ServiceImpl<UserJsonDataMapper, UserJsonDataEntity> implements IUserJsonDataService { private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue = new LinkedBlockingQueue<>(); }
? 這種方式如果數(shù)據(jù)量太大了阻塞隊列中的存儲了太多的數(shù)據(jù),有可能OOM,30萬數(shù)據(jù)本地跑一點壓力都沒有,輕輕松松就搞定了,但是數(shù)據(jù)量太大就需要JVM調下優(yōu)了,這種方式不用管是spring的事務管理,如果使用mysql數(shù)據(jù)spring的事務傳播級別是:ISOLATION_REPEATABLE_READ,可重復讀:可以防止臟讀和不可重復讀,但是幻讀仍可能發(fā)生,如果使用oracle數(shù)據(jù)庫spring的事務傳播級別是:ISOLATION_READ_COMMITTED,讀已提交,可防止臟讀,幻讀和不可重復讀仍可能發(fā)生,所以事務默認即可。
2.2 游標查詢sql注入器批量插入
? 測試用例:
@Test public void cursorQueryInsertData() { userService.cursorQueryInsertData(); }
? UserServiceImpl類中的方法:
/** * 游標查詢 */ @Override public void cursorQueryInsertData() { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); int total = userMapper.countTotal(); if (total > 0) { int pageSize = 10000; int pages = (total + pageSize - 1) / pageSize; for (int i = 1; i <= pages; i++) { List<UserJsonDataEntity> userJsonDataEntities = new ArrayList<>(); int start = (i - 1) * pageSize; try (SqlSession sqlSession = sqlSessionFactory.openSession(); Cursor<UserEntity> pageCursor = sqlSession.getMapper(UserMapper.class).getCursorPageData(start, pageSize)) { int currentIndex = pageCursor.getCurrentIndex(); log.info("currentIndex1:{}", currentIndex); Iterator<UserEntity> iterator = pageCursor.iterator(); while (iterator.hasNext()) { UserEntity userEntity = iterator.next(); // log.info("userEntity:{}", JSON.toJSONString(userEntity)); UserJsonDataEntity jd = com.alibaba.fastjson.JSONObject.parseObject(userEntity.getJsonData(), new TypeReference<UserJsonDataEntity>() { }); userJsonDataEntities.add(jd); } } catch (Exception e) { log.error("游標分頁查詢異常:{}", e.getMessage()); } log.info("userJsonDataEntities:{}", userJsonDataEntities.size()); userJsonDataMapper.insertBatchSomeColumn(userJsonDataEntities); } } // 結束時間 stopWatch.stop(); log.info("執(zhí)行時長:{}秒", stopWatch.getTotalTimeSeconds()); //執(zhí)行時長:39.5655331秒 }
2.3 多線程分頁查詢 、 生產(chǎn)者消費者模型、多線程往ES中按時間維度劃分的索引中寫入數(shù)據(jù)
? 查詢user表中的數(shù)據(jù)可以按照時間段查詢,可以根據(jù)年、月查詢,然后多線程查詢出一批數(shù)據(jù)之后根據(jù)原始數(shù)據(jù)創(chuàng)建一個ES的索引,將各個時間段的數(shù)據(jù)存放在各自的索引中,這樣根據(jù)時間段劃分數(shù)據(jù)到對應的索引上,在多線程往ES各自時間段的索引中寫入數(shù)據(jù)的時候就可以并發(fā)并行的去操作各自的索引,而不是去并發(fā)的去操作一個索引,這是需要特別注意的一點,使用ES要特別注意:有事務場景下不要使用ES,要控制好并發(fā),不要并發(fā)的去操作一個索引,這種一定會翻車的。將數(shù)據(jù)寫入到ES各自的索引后,可以根據(jù)時間段去從各自的索引中查數(shù)據(jù),這種極大的提高了效率和性能,有興趣的可以去嘗試下這個方案。
2.4 查詢數(shù)據(jù)量較小的情況采用List分片的方法
? 查詢數(shù)據(jù)量小的情況下可以將查詢結果的List分片多線程處理每一個分片的數(shù)據(jù)
2.4.1 使用 Google 的 Guava 框架實現(xiàn)分片
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.0.1-jre</version> </dependency>
import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; /** * Guava 分片 */ public class PartitionByGuavaExample { // 原集合 private static final List<String> OLD_LIST = Arrays.asList( "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權".split(",")); public static void main(String[] args) { // 集合分片 List<List<String>> newList = Lists.partition(OLD_LIST, 3); // 打印分片集合 newList.forEach(i -> { System.out.println("集合長度:" + i.size()); }); } }
2.4.2 使用 Apache 的 commons 框架實現(xiàn)分片
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency>
import org.apache.commons.collections4.ListUtils; import java.util.Arrays; import java.util.List; /** * commons.collections4 集合分片 */ public class PartitionExample { // 原集合 private static final List<String> OLD_LIST = Arrays.asList( "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權".split(",")); public static void main(String[] args) { // 集合分片 List<List<String>> newList = ListUtils.partition(OLD_LIST, 3); newList.forEach(i -> { System.out.println("集合長度:" + i.size()); }); } }
2.4.3 使用國產(chǎn)神級框架 Hutool 實現(xiàn)分片
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.14</version> </dependency>
import cn.hutool.core.collection.ListUtil; import java.util.Arrays; import java.util.List; public class PartitionByHutoolExample { // 原集合 private static final List<String> OLD_LIST = Arrays.asList( "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權".split(",")); public static void main(String[] args) { // 分片處理 List<List<String>> newList = ListUtil.partition(OLD_LIST, 3); newList.forEach(i -> { System.out.println("集合長度:" + i.size()); }); } }
2.4.4 自定義實現(xiàn)分片
if (listServiceResult.isSucceed() && listServiceResult.getData().size() > 0) { List<AgentAO> resultData = listServiceResult.getData(); int totalSize = listServiceResult.getData().size(); // 總記錄數(shù) int pageCount = 0; // 頁數(shù) int pageSize = 50; // 每頁50條 if (totalSize < 50) { pageCount = 1; } int mod = totalSize % pageSize; if (pageCount > 0) { pageCount = totalSize / pageSize + 1; } else { pageCount = totalSize / pageSize; } for (int i = 0; i < pageCount; i++) { List<AgentAO> subList1 = shipLogic(resultData, pageCount, pageSize, i, mod); if (CollectionUtil.isNotEmpty(subList1)) { // TODO 處理數(shù)據(jù) } } }
? 切分數(shù)據(jù)的兩個方法:
/** * 切割數(shù)據(jù) * * @param data * @param pageCount * @param pageSize * @param i * @param mod * @return */ private List<AgentAO> shipLogic(List<AgentAO> data, int pageCount, int pageSize, int i, int mod) { List<AgentAO> subList; int startIndex = 0; int endIndex = 0; if (pageCount == 1) { return data; } else { if (mod == 0) { startIndex = i * pageSize; endIndex = (i + 1) * pageSize - 1; } else { startIndex = i * pageSize; if (i == pageCount - 1) { endIndex = i * pageSize + mod; } else { endIndex = (i + 1) * pageSize; } } } System.out.println("startIndex=" + startIndex + ",endIndex=" + endIndex); subList = data.subList(startIndex, endIndex); return subList; } /** * 切割數(shù)據(jù) * 使用 JDK 8 中提供 Stream 實現(xiàn)分片 */ private <T> List<List<T>> splitList(List<T> list, int batchSzie) { if (CollectionUtils.isEmpty(list)) { return null; } if (list.size() <= batchSzie) { return Arrays.asList(list); } int limit = (list.size() + batchSzie - 1) / batchSzie; List<List<T>> splitList = Stream.iterate(0, n -> n + 1).limit(limit).parallel() .map(a -> list.stream().skip(a * batchSzie).limit(batchSzie).parallel().collect(Collectors.toList())) .collect(Collectors.toList()); return splitList; } public static void main(String[] args) { List<String> data =new ArrayList<>(); for(int i=0;i<1000;i++){ data.add(String.valueOf(i)); } MessagePlanService ms =new MessagePlanService(); List<List<String>> result = ms.splitList(data,500); System.out.println(result.size()); for(int j= 0;j<result.size();j++){ List<String> d = result.get(j); for(int g=0;g<d.size();g++){ System.out.println(d.get(g)); } } }
? mybatisPlus將一個List結果轉換為一個Page分頁對象:
if (CollectionUtils.isNotEmpty(wmOfpk)) { pageInfo.setTotal(wmOfpk.size()); // 計算總頁數(shù) pages = (wmOfpk.size() + whiteListPageDTO.getSize() - 1) / whiteListPageDTO.getSize(); pageInfo.setPages(pages); if (pageInfo.getCurrent() > pages) { pageInfo.setCurrent(pages); } long start = (pageInfo.getCurrent() - 1) * whiteListPageDTO.getSize(); long end = pageInfo.getCurrent() * whiteListPageDTO.getSize(); if (start != 0) { start = start - 1; } if (pageInfo.getCurrent() == pages) { end = wmOfpk.size() - 1; } return pageInfo.setRecords(wmOfpk.subList((int) start, (int) end)); } else { pageInfo.setTotal(0); pageInfo.setPages(0); return pageInfo.setRecords(null); }
3.總結
? 原來我使用多線程寫入遇到了事務問題,導致插入到表中的數(shù)據(jù)每次執(zhí)行都會少了,后面才知道是myBatis使用的是DefaultSqlSession獲取會話的這個類是不安全,所以我才使用mybatis-Spring包下面的的這個SqlSessionTemplate的類提供的方法,自己寫了一個原生的mybatis的批量更新的方法,Dao層的Mapper沒有集成myBatisPlus的BaseMapper,批量方法就是要處理全部的字段,然后有插入的實體中空的字段沒有做判斷的就會報錯,所以需要判空:
<update id="updateBatch" parameterType="java.util.List"> <foreach collection="list" item="item" index="index" open="" close="" separator=";"> update xxxxxxxx <trim prefix="set" suffixOverrides=","> <if test=" item.id != null "> id = #{item.id, jdbcType=INTEGER}, </if> <if test=" item.xxxxx != null "> xxxxx = #{item.xxxxx, jdbcType=INTEGER}, </if> <if test="item.xxxxx != null and item.xxxx.trim() neq '' "> order_no = #{item.xxxx, jdbcType=VARCHAR}, </if> </trim> <where> id = #{item.id, jdbcType=INTEGER} </where> </foreach> </update>
然后調用可以成功的插入:
//批量更新數(shù)據(jù) Dao類的全路徑 String updateStatement = "com.dytzxxxxxxxx.xxxxxx.updateBatch"; try { if (CollectionUtils.isNotEmpty(noResult) && records1.containsAll(noResult)) { log.info("==========不需要更新的數(shù)據(jù)=========={}", JSON.toJSONString(noResult)); records1.removeAll(noResult); } if (CollectionUtils.isNotEmpty(records1)) { log.info("==========批量更新開始=========="); sqlSessionTemplate.update(updateStatement, records1); log.info("==========批量更新結束=========="); } } catch (Exception e) { e.getStackTrace(); log.error("==========批量更新異常=========={}", ExceptionUtils.getMessage(e)); }
? 這個是有查詢和寫入多線程環(huán)境下都查詢和修改到同一條數(shù)據(jù)的情況就會出問題,最莫名奇妙的問題就是使用mybaitsPlus的save方法還是saveBatch方法在多線程環(huán)境下讀寫數(shù)據(jù)會出現(xiàn)報錯,單線批量后的數(shù)據(jù)變少了,原因就是mybaitsPlus使用的DefaultSqlSession,這個類是不安全的,使用mybatis-Spring包下面的的這個SqlSessionTemplate的類session是安全的,原因是每次去獲取session都加了jvm的(synchronized)鎖,保證了獲取session不出現(xiàn)并發(fā)操作,使用這個類之后在使用多線程來跑就會直接報錯,錯誤就是提示:有的線程沒有獲取到session而導致調用方法報錯,所以使用mybatis-Spring包下面的的這個SqlSessionTemplate寫批量更新不能使用多線程操作,這個也是我之前遇到的坑,希望對大家有幫助,請關注點贊加收藏,一鍵三連哦。
4.Demo百度網(wǎng)盤
鏈接: https://pan.baidu.com/s/1bn1feJ0jqaSf6phzV3bvAg
提取碼: 8m2g
到此這篇關于mybatisPlus批量插入優(yōu)化,性能快的飛起的文章就介紹到這了,更多相關mybatisPlus批量插入內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
使用spring實現(xiàn)郵件的發(fā)送實例(含測試,源碼,注釋)
本篇文章主要介紹了使用spring實現(xiàn)郵件的發(fā)送實例,詳細的介紹了使用spring配置實現(xiàn)郵件發(fā)送,含測試,源碼,注釋,有興趣的可以下2017-05-05Java+Freemarker實現(xiàn)根據(jù)XML模板文件生成Word文檔
這篇文章主要為大家詳細介紹了Java如何使用Freemarker實現(xiàn)根據(jù)XML模板文件生成Word文檔,文中的示例代碼講解詳細,感興趣的小伙伴可以學習一下2023-11-11MyBatis 探秘之#{} 與 ${} 參傳差異解碼(數(shù)據(jù)庫連接池筑牢數(shù)據(jù)交互
本文詳細介紹了MyBatis中的`#{}`和`${}`的區(qū)別與使用場景,包括預編譯SQL和即時SQL的區(qū)別、安全性問題,以及如何正確使用數(shù)據(jù)庫連接池來提高性能,感興趣的朋友一起看看吧2024-12-12Spring MVC結合Spring Data JPA實現(xiàn)按條件查詢和分頁
這篇文章主要為大家詳細介紹了Spring MVC結合Spring Data JPA實現(xiàn)按條件查詢,以及分頁效果,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10