欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

mybatisPlus批量插入優(yōu)化加快性能

 更新時(shí)間:2023年12月19日 09:55:29   作者:大飛哥~BigFei  
這篇文章主要介紹了mybatisPlus批量插入優(yōu)化加快性能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

1.背景

? 由于之前一個(gè)同事問(wèn)我mybatisPlus的IService層的saveBatch(Collection entityList)方法批量插入速度實(shí)在是太慢了,能否優(yōu)化下?我過(guò)去跟他看了下具體的需求是這樣的,一張業(yè)務(wù)表中有30多萬(wàn)的業(yè)務(wù)數(shù)據(jù),表里的一個(gè)字段是一個(gè)json的數(shù)據(jù)字段,要把30多萬(wàn)的數(shù)據(jù)查出來(lái)針對(duì)這個(gè)json字段解析之后存入另外一張表中,后面他使用了mybatisPlus的IService層的saveBatch發(fā)現(xiàn)好慢好慢,,去重是把分頁(yè)查詢(xún)出來(lái)插入數(shù)據(jù)的最后一條數(shù)據(jù)的id存入redis中,每次分頁(yè)查詢(xún)數(shù)據(jù)排序之后id比上一次處理的最后一條數(shù)據(jù)的id大的數(shù)據(jù),如果比該條數(shù)據(jù)的id小則忽略,后面我給他看了下使用saveBatch插入一批數(shù)據(jù)1000條大概要好幾十分鐘的,之前他使用的for循環(huán)里面insert每一條處理好的數(shù)據(jù),這種是真的非常的慢,慢到你懷疑人生,這個(gè)insert方法我在下面的demo中準(zhǔn)備數(shù)據(jù),往一個(gè)表里插入30萬(wàn)條數(shù)據(jù)的時(shí)候使用了下實(shí)在是太慢了,后面我使用了一個(gè)mybatisPlus的sql注入器的一個(gè)批量的方法:

Integer insertBatchSomeColumn(Collection<UserEntity> entityList);

? 使用了該方法之后,在一個(gè)測(cè)試方法中使用主線(xiàn)程插入模擬的30萬(wàn)數(shù)據(jù)到一個(gè)user的表中只用了:23.209105秒,然后使用多線(xiàn)程異步插入用了:12.1030808秒,這個(gè)時(shí)間跟機(jī)器的性能和網(wǎng)路有關(guān),所以每一次執(zhí)行這個(gè)時(shí)間會(huì)有所不同的,由于我們使用的數(shù)據(jù)庫(kù)是云數(shù)據(jù)庫(kù),所以插入需要走網(wǎng)絡(luò),沒(méi)有使用本地的安裝的mysql數(shù)據(jù)庫(kù),使用這個(gè)sql批量插入的注入器給那個(gè)同事優(yōu)化了一波之后,他原來(lái)使用insertr處理要10個(gè)小時(shí)后面使用saveBatch也要在2個(gè)小時(shí)以上,后面優(yōu)化之后大概估計(jì)只要20分鐘不到就完了,但是我后面使用這個(gè)demo用30萬(wàn)數(shù)據(jù),解析一個(gè)json字段入庫(kù)到一張表中是非??斓?,就花了10多秒~30多秒的時(shí)間就干完了,所以說(shuō)他那個(gè)需求還有優(yōu)化的空間,后面數(shù)據(jù)全部跑完之后就不用了,不是我搞所以就能用就行,比原來(lái)的效率也是提升了一大截。

2.方案

? 查詢(xún)可以使用mybatis的原生sql查詢(xún),需要寫(xiě)一個(gè)批量插入的方法insert標(biāo)簽中使用foreach遍歷一個(gè)list然后插入逐條插入數(shù)據(jù)即可,但是這種需要寫(xiě)大量的代碼,還要處理每個(gè)插入的字段為空的判斷處理,不然遇到插入字段為null就會(huì)報(bào)錯(cuò),讓人很頭疼,所以還是使用mybatisPlus的sql批量注入器基本不需要寫(xiě)啥代碼即可實(shí)現(xiàn)批量插入操作,而且性能和效率還是杠桿的。

? mybatisPlus的IService層的saveBatch為啥慢?看了下它的源碼發(fā)現(xiàn)底層封裝還是比較深,本質(zhì)上底層還是一條一條的去插入數(shù)據(jù),所以才會(huì)慢。

? mybatisPlus的sql注入器有一下幾個(gè)可以給我們擴(kuò)展使用的類(lèi):

  • AlwaysUpdateSomeColumnById 根據(jù)Id更新每一個(gè)字段,全量更新不忽略null字段,解決mybatis-plus中updateById默認(rèn)會(huì)自動(dòng)忽略實(shí)體中null值字段不去更新的問(wèn)題。
  • InsertBatchSomeColumn 真實(shí)批量插入,通過(guò)單SQL的insert語(yǔ)句實(shí)現(xiàn)批量插入
  • DeleteByIdWithFill 帶自動(dòng)填充的邏輯刪除,比如自動(dòng)填充更新時(shí)間、操作人
  • Upsert 更新or插入,根據(jù)唯一約束判斷是執(zhí)行更新還是刪除,相當(dāng)于提供insert on duplicate key update支持

圖片

? 項(xiàng)目中批量sql注入器的使用如下:

? 新增一個(gè) MySqlInjector類(lèi):

package com.zlf.sb.demo.sqlinjector;
import com.baomidou.mybatisplus.core.injector.AbstractMethod;
import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;
import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn;
import java.util.List;
public class MySqlInjector extends DefaultSqlInjector {
    @Override
    public List<AbstractMethod> getMethodList(Class<?> mapperClass) {
        List<AbstractMethod> methodList = super.getMethodList(mapperClass);
        methodList.add(new InsertBatchSomeColumn());//添加批量插入方法
        return methodList;
    }
}

? 將MySqlInjector交給Spring容器管理:新增一個(gè)MybatisPlusConfig類(lèi)如下:

package com.zlf.sb.demo.config;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.plugins.pagination.optimize.JsqlParserCountOptimize;
import com.zlf.sb.demo.sqlinjector.MySqlInjector;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MybatisPlusConfig {
    @Bean
    public PaginationInterceptor paginationInterceptor() {
        PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
        // 設(shè)置請(qǐng)求的頁(yè)面大于最大頁(yè)后操作, true調(diào)回到首頁(yè),false 繼續(xù)請(qǐng)求  默認(rèn)false
        // paginationInterceptor.setOverflow(false);
        // 設(shè)置最大單頁(yè)限制數(shù)量,默認(rèn) 500 條,-1 不受限制
        paginationInterceptor.setLimit(-1); // 這里也是一個(gè)大坑,需要注意的
        // 開(kāi)啟 count 的 join 優(yōu)化,只針對(duì)部分 left join
        paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
        return paginationInterceptor;
    }
    @Bean
    @Primary//批量插入配置
    public MySqlInjector mySqlInjector() {
        return new MySqlInjector();
    }
}

? UserMapper接口中加入insertBatchSomeColumn方法:

package com.zlf.sb.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zlf.sb.demo.entity.UserEntity;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.mapping.ResultSetType;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Repository;
import java.util.Collection;
@Scope("prototype")
@Repository
public interface UserMapper extends BaseMapper<UserEntity> {
    /**
     * 批量插入 僅適用于mysql
     *
     * @param entityList 實(shí)體列表
     * @return 影響行數(shù)
     */
    Integer insertBatchSomeColumn(Collection<UserEntity> entityList);
    /**
     * 統(tǒng)計(jì)總數(shù)
     *
     * @return
     */
    @Select("SELECT count(*) FROM user")
    Integer countTotal();
    /**
     * 分頁(yè)游標(biāo)查詢(xún)
     *
     * @return
     */
    @Options(resultSetType = ResultSetType.FORWARD_ONLY)
    @Select("SELECT * FROM user ORDER BY create_time DESC LIMIT #{start}, #{pageSize} ")
    Cursor<UserEntity> getCursorPageData(@Param("start") Integer start, @Param("pageSize") Integer pageSize);
}

? 準(zhǔn)備了兩張表:user,user_json_data,user表中有個(gè)json_data的字段是一個(gè)存json格式數(shù)據(jù)的字段,然后準(zhǔn)備30萬(wàn)的數(shù)據(jù),向user表中插入30萬(wàn)條構(gòu)造的數(shù)據(jù),SpringBootDemoApplicationTests中代碼如下:

//@Test
    public void addUserDataTest() {
        StopWatch stopWatch = new StopWatch();
        // 開(kāi)始時(shí)間
        stopWatch.start();
        int totalData = 300000;
        int splitSize = 10000;
        int splitPages = (totalData + splitSize - 1) / splitSize;
        /*for (int i = 1; i <= splitPages; i++) {
            //起始條數(shù)
            int firstIndex = (i - 1) * splitSize + 1;
            //截止條數(shù)
            int lastIndex = i * splitSize;
            List<UserEntity> userEntityList = new ArrayList<>();
            for (int j = firstIndex; j <= lastIndex; j++) {
                UserEntity user = new UserEntity();
                user.setName("zlf" + j);
                user.setAge(j);
                JSONObject jo = new JSONObject();
                try {
                    jo.put("name", "zlf" + j);
                    jo.put("age", j);
                } catch (JSONException e) {
                    throw new RuntimeException(e);
                }
                //log.info("json:{}",j.toString());
                user.setJsonData(jo.toString());
                //userMapper.insert(user);
                userEntityList.add(user);
            }
            //userService.saveBatch(userEntityList);
            userMapper.insertBatchSomeColumn(userEntityList);
        }
        // 結(jié)束時(shí)間
        stopWatch.stop();
        log.info("執(zhí)行時(shí)長(zhǎng):{}秒", stopWatch.getTotalTimeSeconds());*/
        //執(zhí)行時(shí)長(zhǎng):23.209105秒
        // 異步
        ThreadPoolExecutor executor = ThreadPoolService.getInstance();
        AtomicInteger count = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(splitPages);
        for (int i = 1; i <= splitPages; i++) {
            int finalI = i;
            CompletableFuture.runAsync(() -> {
                //起始條數(shù)
                int firstIndex = (finalI - 1) * splitSize + 1;
                //截止條數(shù)
                int lastIndex = finalI * splitSize;
                List<UserEntity> userEntityList = new ArrayList<>();
                for (int j = firstIndex; j <= lastIndex; j++) {
                    UserEntity user = new UserEntity();
                    user.setName("zlf" + j);
                    user.setAge(j);
                    JSONObject jo = new JSONObject();
                    try {
                        jo.put("name", "zlf" + j);
                        jo.put("age", j);
                    } catch (JSONException e) {
                        throw new RuntimeException(e);
                    }
                    //log.info("json:{}",j.toString());
                    user.setJsonData(jo.toString());
                    //userMapper.insert(user);
                    userEntityList.add(user);
                }
                count.getAndAdd(userEntityList.size());
                userMapper.insertBatchSomeColumn(userEntityList);
                //userService.saveBatch(userEntityList);
                countDownLatch.countDown();
            }, executor);
        }
        try {
            countDownLatch.await(); //保證之前的所有的線(xiàn)程都執(zhí)行完成,才會(huì)走下面的;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // 結(jié)束時(shí)間
        stopWatch.stop();
        log.info("total:{},執(zhí)行時(shí)長(zhǎng):{}秒", count.get(), stopWatch.getTotalTimeSeconds());
        // total:300000,執(zhí)行時(shí)長(zhǎng):12.1030808秒
    }

2.1 多線(xiàn)程分頁(yè)查詢(xún) 、 生產(chǎn)者消費(fèi)者模型、多線(xiàn)程sql注入器批量插入

? 測(cè)試用例的方法:

    @Test
    public void queryInsertDataTest() {
        userService.queryInsertData();
    }

? UserServiceImpl類(lèi)中的方法:

/**
     * 多線(xiàn)程分頁(yè)查詢(xún)
     * 生產(chǎn)者+消費(fèi)者模型
     * 多線(xiàn)程插入
     */
    @Override
    public void queryInsertData() {
        StopWatch stopWatch = new StopWatch();
        // 開(kāi)始時(shí)間
        stopWatch.start();
        Page<UserEntity> page = new Page<>(1, 10);
        QueryWrapper<UserEntity> queryWrapper = new QueryWrapper<>();
        queryWrapper.lambda().orderByDesc(UserEntity::getCreateTime);
        Page<UserEntity> pageResult = this.getBaseMapper().selectPage(page, queryWrapper);
        int pageSize = 10000;
        if (Objects.nonNull(pageResult)) {
            long total = pageResult.getTotal();
            int pages = (int) ((total + pageSize - 1) / pageSize);
            log.info("pages:{}", pages);
            ThreadPoolExecutor executor = ThreadPoolService.getInstance();
            CountDownLatch countDownLatch1 = new CountDownLatch(pages);
            CountDownLatch countDownLatch2 = new CountDownLatch(pages);
            LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue = userJsonDataService.getLinkedBlockingQueue();
            for (int i = 1; i <= pages; i++) {
                try {
                    QueryDoBizProducer queryDoBizProducer = new QueryDoBizProducer(linkedBlockingQueue, i, queryCount, countDownLatch1, pageSize);
                    CompletableFuture.runAsync(queryDoBizProducer, executor);
                } catch (Exception e) {
                    log.info("異常1:{}", e.getMessage());
                }
            }
            try {
                countDownLatch1.await();
            } catch (InterruptedException e) {
                log.info("異常2:{}", e.getMessage());
            }
            for (int i = 1; i <= pages; i++) {
                try {
                    InsertDataConsumer insertDataConsumer = new InsertDataConsumer(linkedBlockingQueue, insertCount, countDownLatch2);
                    CompletableFuture.runAsync(insertDataConsumer, executor);
                } catch (Exception e) {
                    log.info("異常3:{}", e.getMessage());
                }
            }
            try {
                countDownLatch2.await();
            } catch (InterruptedException e) {
                log.info("異常4:{}", e.getMessage());
            }
            log.info("queryCount:{}", queryCount);
            log.info("insertCount:{}", insertCount);
        }
        // 結(jié)束時(shí)間
        stopWatch.stop();
        log.info("執(zhí)行時(shí)長(zhǎng):{}秒", stopWatch.getTotalTimeSeconds());
        // 執(zhí)行時(shí)長(zhǎng):18.903922秒
    }

? 自定義單例線(xiàn)程池ThreadPoolService:

package com.zlf.sb.demo.service;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @author zlf
 * @description:
 * @time: 2022/7/13 10:18
 */
public class ThreadPoolService {
    /**
     * CPU 密集型:核心線(xiàn)程數(shù) = CPU核數(shù) + 1
     * <p>
     * IO 密集型:核心線(xiàn)程數(shù) = CPU核數(shù) * 2
     * <p>
     * 注意:IO密集型 (某大廠(chǎng)實(shí)戰(zhàn)經(jīng)驗(yàn))
     * 核心線(xiàn)程數(shù) = CPU核數(shù) / (1 - 阻塞系數(shù))
     * 例如阻塞系數(shù)為0.8 ,CPU核數(shù)為 4 ,則核心線(xiàn)程數(shù)為 20
     */
    private static final int DEFAULT_CORE_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private static final int MAX_QUEUE_SIZE = 100;
    private static final int QUEUE_INIT_MAX_SIZE = 200;
    private volatile static ThreadPoolExecutor executor;
    private ThreadPoolService() {
    }
    // 獲取單例的線(xiàn)程池對(duì)象
    public static ThreadPoolExecutor getInstance() {
        if (executor == null) {
            synchronized (ThreadPoolService.class) {
                if (executor == null) {
                    executor = new ThreadPoolExecutor(DEFAULT_CORE_SIZE,// 核心線(xiàn)程數(shù)
                            MAX_QUEUE_SIZE, // 最大線(xiàn)程數(shù)
                            Integer.MAX_VALUE, // 閑置線(xiàn)程存活時(shí)間
                            TimeUnit.MILLISECONDS,// 時(shí)間單位
                            new LinkedBlockingDeque<Runnable>(QUEUE_INIT_MAX_SIZE),// 線(xiàn)程隊(duì)列
                            Executors.defaultThreadFactory()// 線(xiàn)程工廠(chǎng)
                    );
                }
            }
        }
        return executor;
    }
    public void execute(Runnable runnable) {
        if (runnable == null) {
            return;
        }
        executor.execute(runnable);
    }
    // 從線(xiàn)程隊(duì)列中移除對(duì)象
    public void cancel(Runnable runnable) {
        if (executor != null) {
            executor.getQueue().remove(runnable);
        }
    }
}

? 生產(chǎn)者代碼:

package com.zlf.sb.demo.producer;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zlf.sb.demo.entity.UserEntity;
import com.zlf.sb.demo.entity.UserJsonDataEntity;
import com.zlf.sb.demo.mapper.UserMapper;
import com.zlf.sb.demo.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class QueryDoBizProducer implements Runnable {
    private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue;
    private AtomicInteger count;
    private UserMapper mapper;
    private long current;
    private int pageSize;
    private CountDownLatch countDownLatch;
    public QueryDoBizProducer(LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue, long current, AtomicInteger count, CountDownLatch countDownLatch, int pageSize) throws Exception {
        this.linkedBlockingQueue = linkedBlockingQueue;
        this.mapper = SpringUtils.getBean(UserMapper.class);
        this.current = current;
        this.count = count;
        this.countDownLatch = countDownLatch;
        this.pageSize = pageSize;
    }
    @Override
    public void run() {
        try {
            Page<UserEntity> page2 = new Page<>(current, pageSize);
            QueryWrapper<UserEntity> queryWrapper2 = new QueryWrapper<>();
            queryWrapper2.lambda().orderByDesc(UserEntity::getCreateTime);
            Page<UserEntity> pageData = mapper.selectPage(page2, queryWrapper2);
            if (Objects.nonNull(pageData)) {
                List<UserEntity> records = pageData.getRecords();
                List<UserJsonDataEntity> list = new ArrayList<>();
                for (UserEntity rs : records) {
                    UserJsonDataEntity jd = JSONObject.parseObject(rs.getJsonData(), new TypeReference<UserJsonDataEntity>() {
                    });
                    list.add(jd);
                }
                linkedBlockingQueue.put(list);
                count.getAndAdd(list.size());
                log.info("生產(chǎn)者查詢(xún)數(shù)據(jù)放入隊(duì)列完成---->ThreadName:{}", Thread.currentThread().getName());
            }
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("加入隊(duì)列異常:{}", e.getMessage());
        }
    }
}

? 消費(fèi)者代碼:

package com.zlf.sb.demo.consumer;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.zlf.sb.demo.entity.UserJsonDataEntity;
import com.zlf.sb.demo.mapper.UserJsonDataMapper;
import com.zlf.sb.demo.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class InsertDataConsumer implements Runnable {
    private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue;
    private AtomicInteger count;
    private UserJsonDataMapper mapper;
    private CountDownLatch countDownLatch;
    public InsertDataConsumer(LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue, AtomicInteger count, CountDownLatch countDownLatch) {
        this.linkedBlockingQueue = linkedBlockingQueue;
        this.count = count;
        this.mapper = SpringUtils.getBean(UserJsonDataMapper.class);
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        if (CollectionUtil.isNotEmpty(linkedBlockingQueue)) {
            try {
                List<UserJsonDataEntity> dataList = linkedBlockingQueue.take();
                if (CollectionUtil.isNotEmpty(dataList)) {
                    log.info("dataList:{}", JSON.toJSONString(dataList));
                    mapper.insertBatchSomeColumn(dataList);
                    log.info("消費(fèi)者插入數(shù)據(jù)完成---->ThreadName:{}", Thread.currentThread().getName());
                    count.getAndAdd(dataList.size());
                }
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
                log.error("消費(fèi)者插入數(shù)據(jù):{}", e.getMessage());
            }
        }
    }
}

? 阻塞隊(duì)列在UserJsonDataServiceImpl類(lèi)中:

package com.zlf.sb.demo.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zlf.sb.demo.entity.UserJsonDataEntity;
import com.zlf.sb.demo.mapper.UserJsonDataMapper;
import com.zlf.sb.demo.service.IUserJsonDataService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@Data
@Service
@Slf4j
public class UserJsonDataServiceImpl extends ServiceImpl<UserJsonDataMapper, UserJsonDataEntity> implements IUserJsonDataService {
    private LinkedBlockingQueue<List<UserJsonDataEntity>> linkedBlockingQueue = new LinkedBlockingQueue<>();
}

? 這種方式如果數(shù)據(jù)量太大了阻塞隊(duì)列中的存儲(chǔ)了太多的數(shù)據(jù),有可能OOM,30萬(wàn)數(shù)據(jù)本地跑一點(diǎn)壓力都沒(méi)有,輕輕松松就搞定了,但是數(shù)據(jù)量太大就需要JVM調(diào)下優(yōu)了,這種方式不用管是spring的事務(wù)管理,如果使用mysql數(shù)據(jù)spring的事務(wù)傳播級(jí)別是:ISOLATION_REPEATABLE_READ,可重復(fù)讀:可以防止臟讀和不可重復(fù)讀,但是幻讀仍可能發(fā)生,如果使用oracle數(shù)據(jù)庫(kù)spring的事務(wù)傳播級(jí)別是:ISOLATION_READ_COMMITTED,讀已提交,可防止臟讀,幻讀和不可重復(fù)讀仍可能發(fā)生,所以事務(wù)默認(rèn)即可。

2.2 游標(biāo)查詢(xún)sql注入器批量插入

? 測(cè)試用例:

    @Test
    public void cursorQueryInsertData() {
        userService.cursorQueryInsertData();
    }

? UserServiceImpl類(lèi)中的方法:

/**
     * 游標(biāo)查詢(xún)
     */
    @Override
    public void cursorQueryInsertData() {
        StopWatch stopWatch = new StopWatch();
        // 開(kāi)始時(shí)間
        stopWatch.start();
        int total = userMapper.countTotal();
        if (total > 0) {
            int pageSize = 10000;
            int pages = (total + pageSize - 1) / pageSize;
            for (int i = 1; i <= pages; i++) {
                List<UserJsonDataEntity> userJsonDataEntities = new ArrayList<>();
                int start = (i - 1) * pageSize;
                try (SqlSession sqlSession = sqlSessionFactory.openSession(); Cursor<UserEntity> pageCursor = sqlSession.getMapper(UserMapper.class).getCursorPageData(start, pageSize)) {
                    int currentIndex = pageCursor.getCurrentIndex();
                    log.info("currentIndex1:{}", currentIndex);
                    Iterator<UserEntity> iterator = pageCursor.iterator();
                    while (iterator.hasNext()) {
                        UserEntity userEntity = iterator.next();
                        // log.info("userEntity:{}", JSON.toJSONString(userEntity));
                        UserJsonDataEntity jd = com.alibaba.fastjson.JSONObject.parseObject(userEntity.getJsonData(), new TypeReference<UserJsonDataEntity>() {
                        });
                        userJsonDataEntities.add(jd);
                    }
                } catch (Exception e) {
                    log.error("游標(biāo)分頁(yè)查詢(xún)異常:{}", e.getMessage());
                }
                log.info("userJsonDataEntities:{}", userJsonDataEntities.size());
                userJsonDataMapper.insertBatchSomeColumn(userJsonDataEntities);
            }
        }
        // 結(jié)束時(shí)間
        stopWatch.stop();
        log.info("執(zhí)行時(shí)長(zhǎng):{}秒", stopWatch.getTotalTimeSeconds());
        //執(zhí)行時(shí)長(zhǎng):39.5655331秒
    }

2.3 多線(xiàn)程分頁(yè)查詢(xún) 、 生產(chǎn)者消費(fèi)者模型、多線(xiàn)程往ES中按時(shí)間維度劃分的索引中寫(xiě)入數(shù)據(jù)

? 查詢(xún)user表中的數(shù)據(jù)可以按照時(shí)間段查詢(xún),可以根據(jù)年、月查詢(xún),然后多線(xiàn)程查詢(xún)出一批數(shù)據(jù)之后根據(jù)原始數(shù)據(jù)創(chuàng)建一個(gè)ES的索引,將各個(gè)時(shí)間段的數(shù)據(jù)存放在各自的索引中,這樣根據(jù)時(shí)間段劃分?jǐn)?shù)據(jù)到對(duì)應(yīng)的索引上,在多線(xiàn)程往ES各自時(shí)間段的索引中寫(xiě)入數(shù)據(jù)的時(shí)候就可以并發(fā)并行的去操作各自的索引,而不是去并發(fā)的去操作一個(gè)索引,這是需要特別注意的一點(diǎn),使用ES要特別注意:有事務(wù)場(chǎng)景下不要使用ES,要控制好并發(fā),不要并發(fā)的去操作一個(gè)索引,這種一定會(huì)翻車(chē)的。將數(shù)據(jù)寫(xiě)入到ES各自的索引后,可以根據(jù)時(shí)間段去從各自的索引中查數(shù)據(jù),這種極大的提高了效率和性能,有興趣的可以去嘗試下這個(gè)方案。

2.4 查詢(xún)數(shù)據(jù)量較小的情況采用List分片的方法

? 查詢(xún)數(shù)據(jù)量小的情況下可以將查詢(xún)結(jié)果的List分片多線(xiàn)程處理每一個(gè)分片的數(shù)據(jù)

2.4.1 使用 Google 的 Guava 框架實(shí)現(xiàn)分片

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>31.0.1-jre</version>
</dependency>
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
/**
 * Guava 分片
 */
public class PartitionByGuavaExample {
    // 原集合
    private static final List<String> OLD_LIST = Arrays.asList(
            "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權(quán)".split(","));
    public static void main(String[] args) {
        // 集合分片
        List<List<String>> newList = Lists.partition(OLD_LIST, 3);
        // 打印分片集合
        newList.forEach(i -> {
            System.out.println("集合長(zhǎng)度:" + i.size());
        });
    }
}

2.4.2 使用 Apache 的 commons 框架實(shí)現(xiàn)分片

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-collections4</artifactId>
  <version>4.4</version>
</dependency>
import org.apache.commons.collections4.ListUtils;
import java.util.Arrays;
import java.util.List;
/**
 * commons.collections4 集合分片
 */
public class PartitionExample {
    // 原集合
    private static final List<String> OLD_LIST = Arrays.asList(
            "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權(quán)".split(","));
    public static void main(String[] args) {
        // 集合分片
        List<List<String>> newList = ListUtils.partition(OLD_LIST, 3);
        newList.forEach(i -> {
            System.out.println("集合長(zhǎng)度:" + i.size());
        });
    }
}

2.4.3 使用國(guó)產(chǎn)神級(jí)框架 Hutool 實(shí)現(xiàn)分片

<dependency>
  <groupId>cn.hutool</groupId>
  <artifactId>hutool-all</artifactId>
  <version>5.7.14</version>
</dependency>
import cn.hutool.core.collection.ListUtil;
import java.util.Arrays;
import java.util.List;
public class PartitionByHutoolExample {
    // 原集合
    private static final List<String> OLD_LIST = Arrays.asList(
            "唐僧,悟空,八戒,沙僧,曹操,劉備,孫權(quán)".split(","));
    public static void main(String[] args) {
        // 分片處理
        List<List<String>> newList = ListUtil.partition(OLD_LIST, 3);
        newList.forEach(i -> {
            System.out.println("集合長(zhǎng)度:" + i.size());
        });
    }
}

2.4.4 自定義實(shí)現(xiàn)分片

           if (listServiceResult.isSucceed() && listServiceResult.getData().size() > 0) {
					List<AgentAO> resultData = listServiceResult.getData();
					int totalSize = listServiceResult.getData().size(); // 總記錄數(shù)
					int pageCount = 0; // 頁(yè)數(shù)
					int pageSize = 50; // 每頁(yè)50條
					if (totalSize < 50) {
						pageCount = 1;
					}
					int mod = totalSize % pageSize;
					if (pageCount > 0) {
						pageCount = totalSize / pageSize + 1;
					} else {
						pageCount = totalSize / pageSize;
					}
					for (int i = 0; i < pageCount; i++) {
						List<AgentAO> subList1 = shipLogic(resultData, pageCount, pageSize, i, mod);
						if (CollectionUtil.isNotEmpty(subList1)) {
						 // TODO 處理數(shù)據(jù)
						}
					}
                } 

? 切分?jǐn)?shù)據(jù)的兩個(gè)方法:

     /**
	 * 切割數(shù)據(jù)
	 *
	 * @param data
	 * @param pageCount
	 * @param pageSize
	 * @param i
	 * @param mod
	 * @return
	 */
	private List<AgentAO> shipLogic(List<AgentAO> data, int pageCount, int pageSize, int i, int mod) {
		List<AgentAO> subList;
		int startIndex = 0;
		int endIndex = 0;
		if (pageCount == 1) {
			return data;
		} else {
			if (mod == 0) {
				startIndex = i * pageSize;
				endIndex = (i + 1) * pageSize - 1;
			} else {
				startIndex = i * pageSize;
				if (i == pageCount - 1) {
					endIndex = i * pageSize + mod;
				} else {
					endIndex = (i + 1) * pageSize;
				}
			}
		}
		System.out.println("startIndex=" + startIndex + ",endIndex=" + endIndex);
		subList = data.subList(startIndex, endIndex);
		return subList;
	}		
    /**
	 * 切割數(shù)據(jù)
	 * 使用 JDK 8 中提供 Stream 實(shí)現(xiàn)分片
	 */
	private <T> List<List<T>> splitList(List<T> list, int batchSzie) {
		if (CollectionUtils.isEmpty(list)) {
			return null;
		}
		if (list.size() <= batchSzie) {
			return Arrays.asList(list);
		}
		int limit = (list.size() + batchSzie - 1) / batchSzie;
		List<List<T>> splitList = Stream.iterate(0, n -> n + 1).limit(limit).parallel()
				.map(a -> list.stream().skip(a * batchSzie).limit(batchSzie).parallel().collect(Collectors.toList()))
				.collect(Collectors.toList());
		return splitList;
	}
	public static void main(String[] args) {
		List<String> data =new ArrayList<>();
		for(int i=0;i<1000;i++){
			data.add(String.valueOf(i));
		}
		MessagePlanService ms =new MessagePlanService();
		List<List<String>> result = ms.splitList(data,500);
		System.out.println(result.size());
		for(int j= 0;j<result.size();j++){
			List<String> d = result.get(j);
			for(int g=0;g<d.size();g++){
              System.out.println(d.get(g));
			}
		}
	}

? mybatisPlus將一個(gè)List結(jié)果轉(zhuǎn)換為一個(gè)Page分頁(yè)對(duì)象:

     if (CollectionUtils.isNotEmpty(wmOfpk)) {
                pageInfo.setTotal(wmOfpk.size());
                // 計(jì)算總頁(yè)數(shù)
                pages = (wmOfpk.size() + whiteListPageDTO.getSize() - 1) / whiteListPageDTO.getSize();
                pageInfo.setPages(pages);
                if (pageInfo.getCurrent() > pages) {
                    pageInfo.setCurrent(pages);
                }
                long start = (pageInfo.getCurrent() - 1) * whiteListPageDTO.getSize();
                long end = pageInfo.getCurrent() * whiteListPageDTO.getSize();
                if (start != 0) {
                    start = start - 1;
                }
                if (pageInfo.getCurrent() == pages) {
                    end = wmOfpk.size() - 1;
                }
                return pageInfo.setRecords(wmOfpk.subList((int) start, (int) end));
            } else {
                pageInfo.setTotal(0);
                pageInfo.setPages(0);
                return pageInfo.setRecords(null);
      }

3.總結(jié)

? 原來(lái)我使用多線(xiàn)程寫(xiě)入遇到了事務(wù)問(wèn)題,導(dǎo)致插入到表中的數(shù)據(jù)每次執(zhí)行都會(huì)少了,后面才知道是myBatis使用的是DefaultSqlSession獲取會(huì)話(huà)的這個(gè)類(lèi)是不安全,所以我才使用mybatis-Spring包下面的的這個(gè)SqlSessionTemplate的類(lèi)提供的方法,自己寫(xiě)了一個(gè)原生的mybatis的批量更新的方法,Dao層的Mapper沒(méi)有集成myBatisPlus的BaseMapper,批量方法就是要處理全部的字段,然后有插入的實(shí)體中空的字段沒(méi)有做判斷的就會(huì)報(bào)錯(cuò),所以需要判空:

 <update id="updateBatch"
            parameterType="java.util.List">
        <foreach collection="list" item="item" index="index" open="" close="" separator=";">
            update xxxxxxxx
            <trim prefix="set" suffixOverrides=",">
                <if test=" item.id != null ">
                    id = #{item.id, jdbcType=INTEGER},
                </if>
                <if test=" item.xxxxx != null ">
                    xxxxx = #{item.xxxxx, jdbcType=INTEGER},
                </if>
                <if test="item.xxxxx != null and item.xxxx.trim() neq '' ">
                    order_no = #{item.xxxx, jdbcType=VARCHAR},
                </if>
            </trim>
            <where>
                id = #{item.id, jdbcType=INTEGER}
            </where>
        </foreach>
    </update>

然后調(diào)用可以成功的插入:

       //批量更新數(shù)據(jù) Dao類(lèi)的全路徑
      String updateStatement = "com.dytzxxxxxxxx.xxxxxx.updateBatch";
       try {
                       if (CollectionUtils.isNotEmpty(noResult) && records1.containsAll(noResult)) {
                                log.info("==========不需要更新的數(shù)據(jù)=========={}", JSON.toJSONString(noResult));
                                records1.removeAll(noResult);
                            }
                            if (CollectionUtils.isNotEmpty(records1)) {
                                log.info("==========批量更新開(kāi)始==========");
                                sqlSessionTemplate.update(updateStatement, records1);
                                log.info("==========批量更新結(jié)束==========");
                            }
                     } catch (Exception e) {
                        e.getStackTrace();
                  log.error("==========批量更新異常=========={}",                 ExceptionUtils.getMessage(e));
     }

? 這個(gè)是有查詢(xún)和寫(xiě)入多線(xiàn)程環(huán)境下都查詢(xún)和修改到同一條數(shù)據(jù)的情況就會(huì)出問(wèn)題,最莫名奇妙的問(wèn)題就是使用mybaitsPlus的save方法還是saveBatch方法在多線(xiàn)程環(huán)境下讀寫(xiě)數(shù)據(jù)會(huì)出現(xiàn)報(bào)錯(cuò),單線(xiàn)批量后的數(shù)據(jù)變少了,原因就是mybaitsPlus使用的DefaultSqlSession,這個(gè)類(lèi)是不安全的,使用mybatis-Spring包下面的的這個(gè)SqlSessionTemplate的類(lèi)session是安全的,原因是每次去獲取session都加了jvm的(synchronized)鎖,保證了獲取session不出現(xiàn)并發(fā)操作,使用這個(gè)類(lèi)之后在使用多線(xiàn)程來(lái)跑就會(huì)直接報(bào)錯(cuò),錯(cuò)誤就是提示:有的線(xiàn)程沒(méi)有獲取到session而導(dǎo)致調(diào)用方法報(bào)錯(cuò),所以使用mybatis-Spring包下面的的這個(gè)SqlSessionTemplate寫(xiě)批量更新不能使用多線(xiàn)程操作,這個(gè)也是我之前遇到的坑,希望對(duì)大家有幫助,請(qǐng)關(guān)注點(diǎn)贊加收藏,一鍵三連哦。

圖片

4.Demo百度網(wǎng)盤(pán)

鏈接: https://pan.baidu.com/s/1bn1feJ0jqaSf6phzV3bvAg

提取碼: 8m2g

到此這篇關(guān)于mybatisPlus批量插入優(yōu)化,性能快的飛起的文章就介紹到這了,更多相關(guān)mybatisPlus批量插入內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

  • Java中堆和棧的概念和區(qū)別

    Java中堆和棧的概念和區(qū)別

    Java的堆是一個(gè)運(yùn)行時(shí)數(shù)據(jù)區(qū),類(lèi)的對(duì)象從堆中分配空間。棧中主要存放一些基本數(shù)據(jù)類(lèi)型的變量(byte,short,int,long,float,double,boolean,char)和對(duì)象的引用,這篇文章給大家詳細(xì)介紹java 堆和棧的概念和區(qū)別,一起看看吧
    2020-06-06
  • JavaWeb如何實(shí)現(xiàn)禁用瀏覽器緩存

    JavaWeb如何實(shí)現(xiàn)禁用瀏覽器緩存

    這篇文章主要介紹了JavaWeb如何實(shí)現(xiàn)禁用瀏覽器緩存,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • Java+Freemarker實(shí)現(xiàn)根據(jù)XML模板文件生成Word文檔

    Java+Freemarker實(shí)現(xiàn)根據(jù)XML模板文件生成Word文檔

    這篇文章主要為大家詳細(xì)介紹了Java如何使用Freemarker實(shí)現(xiàn)根據(jù)XML模板文件生成Word文檔,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以學(xué)習(xí)一下
    2023-11-11
  • MyBatis 探秘之#{} 與 ${} 參傳差異解碼(數(shù)據(jù)庫(kù)連接池筑牢數(shù)據(jù)交互根基)

    MyBatis 探秘之#{} 與 ${} 參傳差異解碼(數(shù)據(jù)庫(kù)連接池筑牢數(shù)據(jù)交互

    本文詳細(xì)介紹了MyBatis中的`#{}`和`${}`的區(qū)別與使用場(chǎng)景,包括預(yù)編譯SQL和即時(shí)SQL的區(qū)別、安全性問(wèn)題,以及如何正確使用數(shù)據(jù)庫(kù)連接池來(lái)提高性能,感興趣的朋友一起看看吧
    2024-12-12
  • java多線(xiàn)程使用mdc追蹤日志方式

    java多線(xiàn)程使用mdc追蹤日志方式

    這篇文章主要介紹了java多線(xiàn)程使用mdc追蹤日志方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringBoot3整合WebSocket詳細(xì)指南

    SpringBoot3整合WebSocket詳細(xì)指南

    SpringBoot 3 整合 WebSocket 提供了一種高效的實(shí)時(shí)通信解決方案,通過(guò)本文的配置和示例,可以快速實(shí)現(xiàn),感興趣的哦朋友跟隨小編一起看看吧
    2024-12-12
  • Spring MVC結(jié)合Spring Data JPA實(shí)現(xiàn)按條件查詢(xún)和分頁(yè)

    Spring MVC結(jié)合Spring Data JPA實(shí)現(xiàn)按條件查詢(xún)和分頁(yè)

    這篇文章主要為大家詳細(xì)介紹了Spring MVC結(jié)合Spring Data JPA實(shí)現(xiàn)按條件查詢(xún),以及分頁(yè)效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • java讀寫(xiě)串口數(shù)據(jù)你了解多少

    java讀寫(xiě)串口數(shù)據(jù)你了解多少

    這篇文章主要為大家詳細(xì)介紹了java讀寫(xiě)串口數(shù)據(jù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-02-02
  • Mybatis源碼解析之mapper接口的代理模式詳解

    Mybatis源碼解析之mapper接口的代理模式詳解

    這篇文章主要介紹了Mybatis源碼解析之mapper接口的代理模式詳解,在mybatis中執(zhí)行sql時(shí)有兩種方式,一種是基于statementId,也就是直接調(diào)用SqlSession的方法,需要的朋友可以參考下
    2023-12-12
  • 最新評(píng)論