SpringBoot線程池ThreadPoolTaskExecutor異步處理百萬級數(shù)據(jù)
一、背景:
利用ThreadPoolTaskExecutor多線程異步批量插入,提高百萬級數(shù)據(jù)插入效率。ThreadPoolTaskExecutor是對ThreadPoolExecutor進行了封裝處理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封裝,所以,性能更加優(yōu)秀,推薦ThreadPoolTaskExecutor。
二、ThreadPoolTaskExecutor異步處理
2.1、配置application.yml
異步線程配置 自定義使用參數(shù)
async:
executor:
thread:
core_pool_size: 10 # 配置核心線程數(shù) 默認8個 核數(shù)*2+2
max_pool_size: 100 # 配置最大線程數(shù)
queue_capacity: 99988 # 配置隊列大小
keep_alive_seconds: 20 #設(shè)置線程空閑等待時間秒s
name:
prefix: async-thread- # 配置線程池中的線程的名稱前綴
2.2、ThreadPoolConfig配置注入Bean
package com.wonders.common.config;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: TODO:利用ThreadPoolTaskExecutor多線程批量執(zhí)行相關(guān)配置
* 自定義線程池
* 發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個不成文的算法:CPU核心數(shù)量*2 +2 個線程。
*/
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
//自定義使用參數(shù)
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize; //配置核心線程數(shù)
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize; //配置最大線程數(shù)
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
//1、自定義asyncServiceExecutor線程池
@Bean(name = "asyncServiceExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
log.info("start asyncServiceExecutor......");
//在這里修改
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//設(shè)置線程空閑等待時間 s
executor.setKeepAliveSeconds(keepAliveSeconds);
//配置隊列大小 設(shè)置任務(wù)等待隊列的大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
//設(shè)置線程池內(nèi)線程名稱的前綴-------阿里編碼規(guī)約推薦--方便出錯后進行調(diào)試
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達到max size的時候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
/**
* 2、公共線程池,利用系統(tǒng)availableProcessors線程數(shù)量進行計算
*/
@Bean(name = "commonThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用處理器的Java虛擬機的數(shù)量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
pool.setCorePoolSize(corePoolSize); // 核心池大小
pool.setMaxPoolSize(maxPoolSize); // 最大線程數(shù)
pool.setQueueCapacity(maxPoolSize * 1000); // 隊列程度
pool.setThreadPriority(Thread.MAX_PRIORITY);
pool.setDaemon(false);
pool.setKeepAliveSeconds(300);// 線程空閑時間
return pool;
}
//3自定義defaultThreadPoolExecutor線程池
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
int maxNumPool=Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(3,
maxNumPool,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
//置線程名前綴,例如設(shè)置前綴為hutool-thread-,則線程名為hutool-thread-1之類。
new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}2.3、創(chuàng)建異步線程,業(yè)務(wù)類
//1、自定義asyncServiceExecutor線程池
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<Student> students,
StudentService studentService,
CountDownLatch countDownLatch) {
try{
log.info("start executeAsync");
//異步線程要做的事情
studentService.saveBatch(students);
log.info("end executeAsync");
}finally {
countDownLatch.countDown();// 很關(guān)鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放
}
}
2.4、拆分集合工具類
package com.wonders.threads;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @Description: TODO:拆分工具類
* 1、獲取需要進行批量更新的大集合A,對大集合進行拆分操作,分成N個小集合A-1 ~ A-N;
* 2、開啟線程池,針對集合的大小進行調(diào)參,對小集合進行批量更新操作;
* 3、對流程進行控制,控制線程執(zhí)行順序。按照指定大小拆分集合的工具類
*/
public class SplitListUtils {
/**
* 功能描述:拆分集合
* @param <T> 泛型對象
* @MethodName: split
* @MethodParam: [resList:需要拆分的集合, subListLength:每個子集合的元素個數(shù)]
* @Return: java.util.List<java.util.List<T>>:返回拆分后的各個集合組成的列表
* 代碼里面用到了guava和common的結(jié)合工具類
*/
public static <T> List<List<T>> split(List<T> resList, int subListLength) {
if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
return Lists.newArrayList();
}
List<List<T>> ret = Lists.newArrayList();
int size = resList.size();
if (size <= subListLength) {
// 數(shù)據(jù)量不足 subListLength 指定的大小
ret.add(resList);
} else {
int pre = size / subListLength;
int last = size % subListLength;
// 前面pre個集合,每個大小都是 subListLength 個元素
for (int i = 0; i < pre; i++) {
List<T> itemList = Lists.newArrayList();
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
// last的進行處理
if (last > 0) {
List<T> itemList = Lists.newArrayList();
for (int i = 0; i < last; i++) {
itemList.add(resList.get(pre * subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}
/**
* 功能描述:方法二:集合切割類,就是把一個大集合切割成多個指定條數(shù)的小集合,方便往數(shù)據(jù)庫插入數(shù)據(jù)
* 推薦使用
* @MethodName: pagingList
* @MethodParam:[resList:需要拆分的集合, subListLength:每個子集合的元素個數(shù)]
* @Return: java.util.List<java.util.List<T>>:返回拆分后的各個集合組成的列表
*/
public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){
//判斷是否為空
if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
return Lists.newArrayList();
}
int length = resList.size();
int num = (length+pageSize-1)/pageSize;
List<List<T>> newList = new ArrayList<>();
for(int i=0;i<num;i++){
int fromIndex = i*pageSize;
int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
newList.add(resList.subList(fromIndex,toIndex));
}
return newList;
}
// 運行測試代碼 可以按順序拆分為11個集合
public static void main(String[] args) {
//初始化數(shù)據(jù)
List<String> list = Lists.newArrayList();
int size = 19;
for (int i = 0; i < size; i++) {
list.add("hello-" + i);
}
// 大集合里面包含多個小集合
List<List<String>> temps = pagingList(list, 100);
int j = 0;
// 對大集合里面的每一個小集合進行操作
for (List<String> obj : temps) {
System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));
}
}
}
2.5、造數(shù)據(jù),多線程異步插入
public int batchInsertWay() throws Exception {
log.info("開始批量操作.........");
Random rand = new Random();
List<Student> list = new ArrayList<>();
//造100萬條數(shù)據(jù)
for (int i = 0; i < 1000003; i++) {
Student student=new Student();
student.setStudentName("大明:"+i);
student.setAddr("上海:"+rand.nextInt(9) * 1000);
student.setAge(rand.nextInt(1000));
student.setPhone("134"+rand.nextInt(9) * 1000);
list.add(student);
}
//2、開始多線程異步批量導(dǎo)入
long startTime = System.currentTimeMillis(); // 開始時間
//boolean a=studentService.batchInsert(list);
List<List<Student>> list1=SplitListUtils.pagingList(list,100); //拆分集合
CountDownLatch countDownLatch = new CountDownLatch(list1.size());
for (List<Student> list2 : list1) {
asyncService.executeAsync(list2,studentService,countDownLatch);
}
try {
countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會走下面的;
long endTime = System.currentTimeMillis(); //結(jié)束時間
log.info("一共耗時time: " + (endTime - startTime) / 1000 + " s");
// 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
} catch (Exception e) {
log.error("阻塞異常:"+e.getMessage());
}
return list.size();
}2.6、測試結(jié)果

結(jié)論:對不同線程數(shù)的測試,發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個不成文的算法:CPU核心數(shù)量*2 +2 個線程。
個人推薦配置:
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用處理器的Java虛擬機的數(shù)量 int corePoolSize = (int) (processNum / (1 - 0.2)); int maxPoolSize = (int) (processNum / (1 - 0.5));
到此這篇關(guān)于SpringBoot線程池ThreadPoolTaskExecutor異步處理百萬級數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot異步處理百萬級數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java中MultipartFile和File最簡單的互相轉(zhuǎn)換示例
這篇文章主要給大家介紹了關(guān)于java中MultipartFile和File最簡單的互相轉(zhuǎn)換的相關(guān)資料,MultipartFile和File都是Java中用于處理文件上傳的類,MultipartFile用于處理上傳的文件,File用于處理本地磁盤上的文件,需要的朋友可以參考下2023-09-09
SpringBoot?配置多個JdbcTemplate的實現(xiàn)步驟
本文介紹了在SpringBoot中配置多個JdbcTemplate的方法,包括創(chuàng)建項目、添加依賴、配置數(shù)據(jù)源和多個JdbcTemplate的使用,感興趣的可以了解一下2024-11-11
Java源碼解析重寫鎖的設(shè)計結(jié)構(gòu)和細節(jié)
這篇文章主要為大家介紹了Java源碼解析重寫鎖的設(shè)計結(jié)構(gòu)和細節(jié),這小節(jié)我們以共享鎖作為案列,自定義一個共享鎖。有需要的朋友可以借鑒參考下2022-03-03
Java使用jxl包寫Excel文件適合列寬實現(xiàn)
用jxl.jar包,讀寫過Excel文件。也沒有注意最適合列寬的問題,但是jxl.jar沒有提供最適合列寬的功能,上次用到寫了一下,可以基本實現(xiàn)最適合列寬。2013-11-11
SpringBoot操作MaxComputer方式(保姆級教程)
這篇文章主要介紹了SpringBoot操作MaxComputer方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-03-03

