Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的解決方案
項目場景:
大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。
解決方案:
拆分list集合后,然后使用多線程批量插入數(shù)據(jù)庫
1.實體類
package com.test.entity; import lombok.Data; @Data public class TestEntity { private String id; private String name; }
2.Mapper
如果數(shù)據(jù)量不大,用foreach標簽就足夠了。如果數(shù)據(jù)量很大,建議使用batch模式。
package com.test.mapper; import java.util.List; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import com.test.entity.TestEntity; public interface TestMapper { /** * 1.用于使用batch模式,ExecutorType.BATCH開啟批處理模式 * 數(shù)據(jù)量很大,推薦這種方式 */ @Insert("insert into test(id, name) " + " values" + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})") void testInsert(TestEntity testEntity); /** * 2.使用foreach標簽,批量保存 * 數(shù)據(jù)量少可以使用這種方式 */ @Insert("insert into test(id, name) " + " values" + " <foreach collection='list' item='item' index='index' separator=','>" + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})" + " </foreach>") void testBatchInsert(@Param("list") List<TestEntity> list); }
3.spring容器注入線程池bean對象
package com.test.config; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class ExecutorConfig { /** * 異步任務自定義線程池 */ @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(50); //配置最大線程數(shù) executor.setMaxPoolSize(500); //配置隊列大小 executor.setQueueCapacity(300); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("testExecutor-"); // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //調(diào)用shutdown()方法時等待所有的任務完成后再關閉 executor.setWaitForTasksToCompleteOnShutdown(true); //等待所有任務完成后的最大等待時間 executor.setAwaitTerminationSeconds(60); return executor; } }
4.創(chuàng)建異步線程業(yè)務類
package com.test.service; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import com.test.entity.TestEntity; import com.test.mapper.TestMapper; @Service public class AsyncService { @Autowired private SqlSessionFactory sqlSessionFactory; @Async("asyncServiceExecutor") public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) { try{ //獲取session,打開批處理,因為是多線程,所以每個線程都要開啟一個事務 SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); TestMapper mapper = session.getMapper(TestMapper.class); //異步線程要做的事情 for (int i = 0; i < logOutputResults.size(); i++) { System.out.println(Thread.currentThread().getName() + "線程:" + logOutputResults.get(i)); TestEntity test = new TestEntity(); //test.set() //............. //批量保存 mapper.testInsert(test); //每1000條提交一次防止內(nèi)存溢出 if(i%1000==0){ session.flushStatements(); } } //提交剩下未處理的事務 session.flushStatements(); }finally { countDownLatch.countDown();// 很關鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放 } } }
5.拆分list調(diào)用異步的業(yè)務方法
package com.test.service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.annotation.Resource; import org.springframework.stereotype.Service; @Service public class TestService { @Resource private AsyncService asyncService; public int testMultiThread() { List<String> logOutputResults = getTestData(); //按線程數(shù)拆分后的list List<List<String>> lists = splitList(logOutputResults); CountDownLatch countDownLatch = new CountDownLatch(lists.size()); for (List<String> listSub:lists) { asyncService.executeAsync(listSub, countDownLatch); } try { countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會走下面的; // 這樣就可以在下面拿到所有線程執(zhí)行完的集合結果 } catch (Exception e) { e.printStackTrace(); } return logOutputResults.size(); } public List<String> getTestData() { List<String> logOutputResults = new ArrayList<String>(); for (int i = 0; i < 3000; i++) { logOutputResults.add("測試數(shù)據(jù)"+i); } return logOutputResults; } public List<List<String>> splitList(List<String> logOutputResults) { List<List<String>> results = new ArrayList<List<String>>(); /*動態(tài)線程數(shù)方式*/ // 每500條數(shù)據(jù)開啟一條線程 int threadSize = 500; // 總數(shù)據(jù)條數(shù) int dataSize = logOutputResults.size(); // 線程數(shù),動態(tài)生成 int threadNum = dataSize / threadSize + 1; /*固定線程數(shù)方式 // 線程數(shù) int threadNum = 6; // 總數(shù)據(jù)條數(shù) int dataSize = logOutputResults.size(); // 每一條線程處理多少條數(shù)據(jù) int threadSize = dataSize / (threadNum - 1); */ // 定義標記,過濾threadNum為整數(shù) boolean special = dataSize % threadSize == 0; List<String> cutList = null; // 確定每條線程的數(shù)據(jù) for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = logOutputResults.subList(threadSize * i, dataSize); } else { cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1)); } results.add(cutList); } return results; } }
6.Controller測試
@RestController public class TestController { @Resource private TestService testService; @RequestMapping(value = "/log", method = RequestMethod.GET) @ApiOperation(value = "測試") public String test() { testService.testMultiThread(); return "success"; } }
總結:
注意這里執(zhí)行插入的數(shù)據(jù)是無序的。
擴展:Java多線程分段處理List集合
項目場景:
大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。
解決方案:
拆分list集合后,然后使用多線程實現(xiàn)
public static void main(String[] args) throws Exception { // 開始時間 long start = System.currentTimeMillis(); List<String> list = new ArrayList<String>(); for (int i = 1; i <= 3000; i++) { list.add(i + ""); } /*動態(tài)線程數(shù)方式*/ // 每500條數(shù)據(jù)開啟一條線程 int threadSize = 500; // 總數(shù)據(jù)條數(shù) int dataSize = list.size(); // 線程數(shù),動態(tài)生成 int threadNum = dataSize / threadSize + 1; /*固定線程數(shù)方式 // 線程數(shù) int threadNum = 6; // 總數(shù)據(jù)條數(shù) int dataSize = list.size(); // 每一條線程處理多少條數(shù)據(jù) int threadSize = dataSize / (threadNum - 1); */ // 定義標記,過濾threadNum為整數(shù) boolean special = dataSize % threadSize == 0; // 創(chuàng)建一個線程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定義一個任務集合 List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); Callable<Integer> task = null; List<String> cutList = null; // 確定每條線程的數(shù)據(jù) for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = list.subList(threadSize * i, dataSize); } else { cutList = list.subList(threadSize * i, threadSize * (i + 1)); } final List<String> listStr = cutList; task = new Callable<Integer>() { @Override public Integer call() throws Exception { //業(yè)務邏輯,循環(huán)處理分段后的list System.out.println(Thread.currentThread().getName() + "線程:" + listStr); //...... int logCount = 0; //記錄下數(shù)量 for (int j = 0; j < listStr.size(); j++) { logCount++; } return logCount; } }; // 這里提交的任務容器列表和返回的Future列表存在順序對應的關系 tasks.add(task); } exec.invokeAll(tasks); //總數(shù) int total = 0; try { List<Future<Integer>> results = exec.invokeAll(tasks); for (Future<Integer> future : results) { //累計線程處理的總記錄數(shù) total+=future.get(); } // 關閉線程池 exec.shutdown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("線程任務執(zhí)行結束"); System.out.println("總共處理了"+total+"條數(shù)據(jù),消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); }
到此這篇關于Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的文章就介紹到這了,更多相關Spring Boot批量插入數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
MyBatis動態(tài)SQL foreach標簽實現(xiàn)批量插入的方法示例
這篇文章主要介紹了MyBatis動態(tài)SQL foreach標簽實現(xiàn)批量插入的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-06-06關于BigDecimal類型數(shù)據(jù)的絕對值和相除求百分比
這篇文章主要介紹了關于BigDecimal類型數(shù)據(jù)的絕對值和相除求百分比,Java在java.math包中提供的API類BigDecimal,用來對超過16位有效位的數(shù)進行精確的運算,需要的朋友可以參考下2023-07-07Java開發(fā)工具-scala處理json格式利器-json4s詳解
這篇文章主要介紹了開發(fā)工具-scala處理json格式利器-json4s,文章中處理方法講解的很清楚,有需要的同學可以研究下2021-02-02java synchronized實現(xiàn)可見性過程解析
這篇文章主要介紹了java synchronized實現(xiàn)可見性過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09Java靜態(tài)內(nèi)部類實現(xiàn)單例過程
這篇文章主要介紹了Java靜態(tài)內(nèi)部類實現(xiàn)單例過程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-10-10Java?I/O?(Input/Output)文件字節(jié)流舉例詳解
Java的輸入輸出流(IO)是用于與外部設備(如文件、網(wǎng)絡連接等)進行數(shù)據(jù)交互的機制,下面這篇文章主要給大家介紹了關于Java?I/O?(Input/Output)文件字節(jié)流的相關資料,需要的朋友可以參考下2024-08-08