欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot線程池ThreadPoolTaskExecutor異步處理百萬級數(shù)據(jù)

 更新時間:2024年03月13日 11:03:53   作者:princeAladdin  
本文主要介紹了SpringBoot線程池ThreadPoolTaskExecutor異步處理百萬級數(shù)據(jù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

一、背景:

利用ThreadPoolTaskExecutor多線程異步批量插入,提高百萬級數(shù)據(jù)插入效率。ThreadPoolTaskExecutor是對ThreadPoolExecutor進行了封裝處理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封裝,所以,性能更加優(yōu)秀,推薦ThreadPoolTaskExecutor。

二、ThreadPoolTaskExecutor異步處理

2.1、配置application.yml

異步線程配置 自定義使用參數(shù)

async:
  executor:
    thread:
      core_pool_size:  10  # 配置核心線程數(shù) 默認8個 核數(shù)*2+2
      max_pool_size:  100   # 配置最大線程數(shù)
      queue_capacity:  99988  # 配置隊列大小
      keep_alive_seconds:  20  #設(shè)置線程空閑等待時間秒s
      name:
        prefix: async-thread-  # 配置線程池中的線程的名稱前綴

2.2、ThreadPoolConfig配置注入Bean

package com.wonders.common.config;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO:利用ThreadPoolTaskExecutor多線程批量執(zhí)行相關(guān)配置
 * 自定義線程池
 * 發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個不成文的算法:CPU核心數(shù)量*2 +2 個線程。
 */
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
    //自定義使用參數(shù)
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;   //配置核心線程數(shù)
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;    //配置最大線程數(shù)
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

    //1、自定義asyncServiceExecutor線程池
    @Bean(name = "asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor......");
        //在這里修改
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數(shù)
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(maxPoolSize);
        //設(shè)置線程空閑等待時間 s
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //配置隊列大小 設(shè)置任務(wù)等待隊列的大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱前綴
        //設(shè)置線程池內(nèi)線程名稱的前綴-------阿里編碼規(guī)約推薦--方便出錯后進行調(diào)試
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //執(zhí)行初始化
        executor.initialize();
        return executor;
    }
    /**
     * 2、公共線程池,利用系統(tǒng)availableProcessors線程數(shù)量進行計算
     */
    @Bean(name = "commonThreadPoolTaskExecutor")
    public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用處理器的Java虛擬機的數(shù)量
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        pool.setCorePoolSize(corePoolSize); // 核心池大小
        pool.setMaxPoolSize(maxPoolSize); // 最大線程數(shù)
        pool.setQueueCapacity(maxPoolSize * 1000); // 隊列程度
        pool.setThreadPriority(Thread.MAX_PRIORITY);
        pool.setDaemon(false);
        pool.setKeepAliveSeconds(300);// 線程空閑時間
        return pool;
    }
   //3自定義defaultThreadPoolExecutor線程池
    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
        int maxNumPool=Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(3,
                maxNumPool,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                //置線程名前綴,例如設(shè)置前綴為hutool-thread-,則線程名為hutool-thread-1之類。
                new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }

}

2.3、創(chuàng)建異步線程,業(yè)務(wù)類

 //1、自定義asyncServiceExecutor線程池
    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<Student> students,
                             StudentService studentService,
                             CountDownLatch countDownLatch) {
        try{
            log.info("start executeAsync");
            //異步線程要做的事情
            studentService.saveBatch(students);
            log.info("end executeAsync");
        }finally {
            countDownLatch.countDown();// 很關(guān)鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放
        }
    }

2.4、拆分集合工具類

package com.wonders.threads;

import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * @Description: TODO:拆分工具類
 * 1、獲取需要進行批量更新的大集合A,對大集合進行拆分操作,分成N個小集合A-1 ~ A-N;
 * 2、開啟線程池,針對集合的大小進行調(diào)參,對小集合進行批量更新操作;
 * 3、對流程進行控制,控制線程執(zhí)行順序。按照指定大小拆分集合的工具類
 */
public class SplitListUtils {
    /**
     * 功能描述:拆分集合
     * @param <T> 泛型對象
     * @MethodName: split
     * @MethodParam: [resList:需要拆分的集合, subListLength:每個子集合的元素個數(shù)]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各個集合組成的列表
     * 代碼里面用到了guava和common的結(jié)合工具類
     */
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
            return Lists.newArrayList();
        }
        List<List<T>> ret = Lists.newArrayList();
        int size = resList.size();
        if (size <= subListLength) {
            // 數(shù)據(jù)量不足 subListLength 指定的大小
            ret.add(resList);
        } else {
            int pre = size / subListLength;
            int last = size % subListLength;
            // 前面pre個集合,每個大小都是 subListLength 個元素
            for (int i = 0; i < pre; i++) {
                List<T> itemList = Lists.newArrayList();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // last的進行處理
            if (last > 0) {
                List<T> itemList = Lists.newArrayList();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }

    /**
     * 功能描述:方法二:集合切割類,就是把一個大集合切割成多個指定條數(shù)的小集合,方便往數(shù)據(jù)庫插入數(shù)據(jù)
     * 推薦使用
     * @MethodName: pagingList
     * @MethodParam:[resList:需要拆分的集合, subListLength:每個子集合的元素個數(shù)]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各個集合組成的列表
  
     */
    public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){
        //判斷是否為空
        if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
            return Lists.newArrayList();
        }
        int length = resList.size();
        int num = (length+pageSize-1)/pageSize;
        List<List<T>> newList =  new ArrayList<>();
        for(int i=0;i<num;i++){
            int fromIndex = i*pageSize;
            int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
            newList.add(resList.subList(fromIndex,toIndex));
        }
        return newList;
    }

    // 運行測試代碼 可以按順序拆分為11個集合
    public static void main(String[] args) {
        //初始化數(shù)據(jù)
        List<String> list = Lists.newArrayList();
        int size = 19;
        for (int i = 0; i < size; i++) {
            list.add("hello-" + i);
        }
        // 大集合里面包含多個小集合
        List<List<String>> temps = pagingList(list, 100);
        int j = 0;
        // 對大集合里面的每一個小集合進行操作
        for (List<String> obj : temps) {
            System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));
        }
    }

}

2.5、造數(shù)據(jù),多線程異步插入

public int batchInsertWay() throws Exception {
        log.info("開始批量操作.........");
        Random rand = new Random();
        List<Student> list = new ArrayList<>();
        //造100萬條數(shù)據(jù)
        for (int i = 0; i < 1000003; i++) {
            Student student=new Student();
            student.setStudentName("大明:"+i);
            student.setAddr("上海:"+rand.nextInt(9) * 1000);
            student.setAge(rand.nextInt(1000));
            student.setPhone("134"+rand.nextInt(9) * 1000);
            list.add(student);
        }
        //2、開始多線程異步批量導入
        long startTime = System.currentTimeMillis(); // 開始時間
        //boolean a=studentService.batchInsert(list);
        List<List<Student>> list1=SplitListUtils.pagingList(list,100);  //拆分集合
        CountDownLatch countDownLatch = new CountDownLatch(list1.size());
        for (List<Student> list2 : list1) {
            asyncService.executeAsync(list2,studentService,countDownLatch);
        }
        try {
            countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會走下面的;
            long endTime = System.currentTimeMillis(); //結(jié)束時間
            log.info("一共耗時time: " + (endTime - startTime) / 1000 + " s");
            // 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
        } catch (Exception e) {
            log.error("阻塞異常:"+e.getMessage());
        }
        return list.size();

    }

2.6、測試結(jié)果

在這里插入圖片描述

結(jié)論:對不同線程數(shù)的測試,發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個不成文的算法:CPU核心數(shù)量*2 +2 個線程。

個人推薦配置:

int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用處理器的Java虛擬機的數(shù)量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));

到此這篇關(guān)于SpringBoot線程池ThreadPoolTaskExecutor異步處理百萬級數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot異步處理百萬級數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • java中MultipartFile和File最簡單的互相轉(zhuǎn)換示例

    java中MultipartFile和File最簡單的互相轉(zhuǎn)換示例

    這篇文章主要給大家介紹了關(guān)于java中MultipartFile和File最簡單的互相轉(zhuǎn)換的相關(guān)資料,MultipartFile和File都是Java中用于處理文件上傳的類,MultipartFile用于處理上傳的文件,File用于處理本地磁盤上的文件,需要的朋友可以參考下
    2023-09-09
  • SpringBoot?配置多個JdbcTemplate的實現(xiàn)步驟

    SpringBoot?配置多個JdbcTemplate的實現(xiàn)步驟

    本文介紹了在SpringBoot中配置多個JdbcTemplate的方法,包括創(chuàng)建項目、添加依賴、配置數(shù)據(jù)源和多個JdbcTemplate的使用,感興趣的可以了解一下
    2024-11-11
  • Java源碼解析重寫鎖的設(shè)計結(jié)構(gòu)和細節(jié)

    Java源碼解析重寫鎖的設(shè)計結(jié)構(gòu)和細節(jié)

    這篇文章主要為大家介紹了Java源碼解析重寫鎖的設(shè)計結(jié)構(gòu)和細節(jié),這小節(jié)我們以共享鎖作為案列,自定義一個共享鎖。有需要的朋友可以借鑒參考下
    2022-03-03
  • springboot項目啟動,但是訪問報404錯誤的問題

    springboot項目啟動,但是訪問報404錯誤的問題

    這篇文章主要介紹了springboot項目啟動,但是訪問報404錯誤的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java使用jxl包寫Excel文件適合列寬實現(xiàn)

    Java使用jxl包寫Excel文件適合列寬實現(xiàn)

    用jxl.jar包,讀寫過Excel文件。也沒有注意最適合列寬的問題,但是jxl.jar沒有提供最適合列寬的功能,上次用到寫了一下,可以基本實現(xiàn)最適合列寬。
    2013-11-11
  • Java中駝峰命名與下劃線命名相互轉(zhuǎn)換

    Java中駝峰命名與下劃線命名相互轉(zhuǎn)換

    這篇文章主要介紹了Java中駝峰命名與下劃線命名相互轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-01-01
  • 解決logback的日志文件路徑問題

    解決logback的日志文件路徑問題

    這篇文章主要介紹了解決logback的日志文件路徑問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Maven構(gòu)建忽略測試失敗的解決方案

    Maven構(gòu)建忽略測試失敗的解決方案

    這篇文章主要介紹了Maven構(gòu)建忽略測試失敗的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • SpringBoot操作MaxComputer方式(保姆級教程)

    SpringBoot操作MaxComputer方式(保姆級教程)

    這篇文章主要介紹了SpringBoot操作MaxComputer方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2025-03-03
  • Java Speech API實現(xiàn)語音識別

    Java Speech API實現(xiàn)語音識別

    Java語音識別是一項非常有用的功能,它可以將語音轉(zhuǎn)換為文本,從而實現(xiàn)語音輸入和語音控制功能,在當今數(shù)字化時代,語音識別技術(shù)逐漸成為人機交互的重要方式之一,語音識別技術(shù)可以幫助我們將語音數(shù)據(jù)轉(zhuǎn)化為文字,進而進行后續(xù)的處理和分析
    2023-10-10

最新評論