Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的解決方案
項(xiàng)目場景:
大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。
解決方案:
拆分list集合后,然后使用多線程批量插入數(shù)據(jù)庫
1.實(shí)體類
package com.test.entity;
import lombok.Data;
@Data
public class TestEntity {
private String id;
private String name;
}2.Mapper
如果數(shù)據(jù)量不大,用foreach標(biāo)簽就足夠了。如果數(shù)據(jù)量很大,建議使用batch模式。
package com.test.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import com.test.entity.TestEntity;
public interface TestMapper {
/**
* 1.用于使用batch模式,ExecutorType.BATCH開啟批處理模式
* 數(shù)據(jù)量很大,推薦這種方式
*/
@Insert("insert into test(id, name) "
+ " values"
+ " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
void testInsert(TestEntity testEntity);
/**
* 2.使用foreach標(biāo)簽,批量保存
* 數(shù)據(jù)量少可以使用這種方式
*/
@Insert("insert into test(id, name) "
+ " values"
+ " <foreach collection='list' item='item' index='index' separator=','>"
+ " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
+ " </foreach>")
void testBatchInsert(@Param("list") List<TestEntity> list);
}3.spring容器注入線程池bean對(duì)象
package com.test.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* 異步任務(wù)自定義線程池
*/
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(50);
//配置最大線程數(shù)
executor.setMaxPoolSize(500);
//配置隊(duì)列大小
executor.setQueueCapacity(300);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("testExecutor-");
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//調(diào)用shutdown()方法時(shí)等待所有的任務(wù)完成后再關(guān)閉
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待所有任務(wù)完成后的最大等待時(shí)間
executor.setAwaitTerminationSeconds(60);
return executor;
}
}4.創(chuàng)建異步線程業(yè)務(wù)類
package com.test.service;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.test.entity.TestEntity;
import com.test.mapper.TestMapper;
@Service
public class AsyncService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Async("asyncServiceExecutor")
public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {
try{
//獲取session,打開批處理,因?yàn)槭嵌嗑€程,所以每個(gè)線程都要開啟一個(gè)事務(wù)
SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
TestMapper mapper = session.getMapper(TestMapper.class);
//異步線程要做的事情
for (int i = 0; i < logOutputResults.size(); i++) {
System.out.println(Thread.currentThread().getName() + "線程:" + logOutputResults.get(i));
TestEntity test = new TestEntity();
//test.set()
//.............
//批量保存
mapper.testInsert(test);
//每1000條提交一次防止內(nèi)存溢出
if(i%1000==0){
session.flushStatements();
}
}
//提交剩下未處理的事務(wù)
session.flushStatements();
}finally {
countDownLatch.countDown();// 很關(guān)鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放
}
}
}5.拆分list調(diào)用異步的業(yè)務(wù)方法
package com.test.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
@Service
public class TestService {
@Resource
private AsyncService asyncService;
public int testMultiThread() {
List<String> logOutputResults = getTestData();
//按線程數(shù)拆分后的list
List<List<String>> lists = splitList(logOutputResults);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<String> listSub:lists) {
asyncService.executeAsync(listSub, countDownLatch);
}
try {
countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會(huì)走下面的;
// 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
} catch (Exception e) {
e.printStackTrace();
}
return logOutputResults.size();
}
public List<String> getTestData() {
List<String> logOutputResults = new ArrayList<String>();
for (int i = 0; i < 3000; i++) {
logOutputResults.add("測試數(shù)據(jù)"+i);
}
return logOutputResults;
}
public List<List<String>> splitList(List<String> logOutputResults) {
List<List<String>> results = new ArrayList<List<String>>();
/*動(dòng)態(tài)線程數(shù)方式*/
// 每500條數(shù)據(jù)開啟一條線程
int threadSize = 500;
// 總數(shù)據(jù)條數(shù)
int dataSize = logOutputResults.size();
// 線程數(shù),動(dòng)態(tài)生成
int threadNum = dataSize / threadSize + 1;
/*固定線程數(shù)方式
// 線程數(shù)
int threadNum = 6;
// 總數(shù)據(jù)條數(shù)
int dataSize = logOutputResults.size();
// 每一條線程處理多少條數(shù)據(jù)
int threadSize = dataSize / (threadNum - 1);
*/
// 定義標(biāo)記,過濾threadNum為整數(shù)
boolean special = dataSize % threadSize == 0;
List<String> cutList = null;
// 確定每條線程的數(shù)據(jù)
for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = logOutputResults.subList(threadSize * i, dataSize);
} else {
cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
}
results.add(cutList);
}
return results;
}
}6.Controller測試
@RestController
public class TestController {
@Resource
private TestService testService;
@RequestMapping(value = "/log", method = RequestMethod.GET)
@ApiOperation(value = "測試")
public String test() {
testService.testMultiThread();
return "success";
}
}總結(jié):
注意這里執(zhí)行插入的數(shù)據(jù)是無序的。
擴(kuò)展:Java多線程分段處理List集合
項(xiàng)目場景:
大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。
解決方案:
拆分list集合后,然后使用多線程實(shí)現(xiàn)
public static void main(String[] args) throws Exception {
// 開始時(shí)間
long start = System.currentTimeMillis();
List<String> list = new ArrayList<String>();
for (int i = 1; i <= 3000; i++) {
list.add(i + "");
}
/*動(dòng)態(tài)線程數(shù)方式*/
// 每500條數(shù)據(jù)開啟一條線程
int threadSize = 500;
// 總數(shù)據(jù)條數(shù)
int dataSize = list.size();
// 線程數(shù),動(dòng)態(tài)生成
int threadNum = dataSize / threadSize + 1;
/*固定線程數(shù)方式
// 線程數(shù)
int threadNum = 6;
// 總數(shù)據(jù)條數(shù)
int dataSize = list.size();
// 每一條線程處理多少條數(shù)據(jù)
int threadSize = dataSize / (threadNum - 1);
*/
// 定義標(biāo)記,過濾threadNum為整數(shù)
boolean special = dataSize % threadSize == 0;
// 創(chuàng)建一個(gè)線程池
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
// 定義一個(gè)任務(wù)集合
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
Callable<Integer> task = null;
List<String> cutList = null;
// 確定每條線程的數(shù)據(jù)
for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = list.subList(threadSize * i, dataSize);
} else {
cutList = list.subList(threadSize * i, threadSize * (i + 1));
}
final List<String> listStr = cutList;
task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//業(yè)務(wù)邏輯,循環(huán)處理分段后的list
System.out.println(Thread.currentThread().getName() + "線程:" + listStr);
//......
int logCount = 0; //記錄下數(shù)量
for (int j = 0; j < listStr.size(); j++) {
logCount++;
}
return logCount;
}
};
// 這里提交的任務(wù)容器列表和返回的Future列表存在順序?qū)?yīng)的關(guān)系
tasks.add(task);
}
exec.invokeAll(tasks);
//總數(shù)
int total = 0;
try {
List<Future<Integer>> results = exec.invokeAll(tasks);
for (Future<Integer> future : results) {
//累計(jì)線程處理的總記錄數(shù)
total+=future.get();
}
// 關(guān)閉線程池
exec.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("線程任務(wù)執(zhí)行結(jié)束");
System.out.println("總共處理了"+total+"條數(shù)據(jù),消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
}到此這篇關(guān)于Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Spring Boot批量插入數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis動(dòng)態(tài)SQL foreach標(biāo)簽實(shí)現(xiàn)批量插入的方法示例
這篇文章主要介紹了MyBatis動(dòng)態(tài)SQL foreach標(biāo)簽實(shí)現(xiàn)批量插入的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06
關(guān)于BigDecimal類型數(shù)據(jù)的絕對(duì)值和相除求百分比
這篇文章主要介紹了關(guān)于BigDecimal類型數(shù)據(jù)的絕對(duì)值和相除求百分比,Java在java.math包中提供的API類BigDecimal,用來對(duì)超過16位有效位的數(shù)進(jìn)行精確的運(yùn)算,需要的朋友可以參考下2023-07-07
Java開發(fā)工具-scala處理json格式利器-json4s詳解
這篇文章主要介紹了開發(fā)工具-scala處理json格式利器-json4s,文章中處理方法講解的很清楚,有需要的同學(xué)可以研究下2021-02-02
Spring啟動(dòng)時(shí)實(shí)現(xiàn)初始化的幾種方案
這篇文章主要介紹了Spring啟動(dòng)時(shí)實(shí)現(xiàn)初始化的幾種方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
java synchronized實(shí)現(xiàn)可見性過程解析
這篇文章主要介紹了java synchronized實(shí)現(xiàn)可見性過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09
Java靜態(tài)內(nèi)部類實(shí)現(xiàn)單例過程
這篇文章主要介紹了Java靜態(tài)內(nèi)部類實(shí)現(xiàn)單例過程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10
Java?I/O?(Input/Output)文件字節(jié)流舉例詳解
Java的輸入輸出流(IO)是用于與外部設(shè)備(如文件、網(wǎng)絡(luò)連接等)進(jìn)行數(shù)據(jù)交互的機(jī)制,下面這篇文章主要給大家介紹了關(guān)于Java?I/O?(Input/Output)文件字節(jié)流的相關(guān)資料,需要的朋友可以參考下2024-08-08

