欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的解決方案

 更新時間:2024年04月26日 09:54:11   作者:濤哥是個大帥比  
大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中,本文給大家介紹Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的解決方案,感興趣的朋友跟隨小編一起看看吧

項目場景:

大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。

解決方案:

拆分list集合后,然后使用多線程批量插入數(shù)據(jù)庫

1.實體類

package com.test.entity;
import lombok.Data;
@Data
public class TestEntity {
	private String id;
	private String name;
}

2.Mapper

如果數(shù)據(jù)量不大,用foreach標簽就足夠了。如果數(shù)據(jù)量很大,建議使用batch模式。

package com.test.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import com.test.entity.TestEntity;
public interface TestMapper {
	/**
	  * 1.用于使用batch模式,ExecutorType.BATCH開啟批處理模式
	  * 數(shù)據(jù)量很大,推薦這種方式
	  */
	@Insert("insert into test(id, name) "
			   + " values"
			   + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
	void testInsert(TestEntity testEntity);
	/**
	  * 2.使用foreach標簽,批量保存
	  * 數(shù)據(jù)量少可以使用這種方式
	  */
	@Insert("insert into test(id, name) "
			   + " values"
			   + " <foreach collection='list' item='item' index='index' separator=','>"
			   + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
			   + " </foreach>")
	void testBatchInsert(@Param("list") List<TestEntity> list);
}

3.spring容器注入線程池bean對象

package com.test.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
    /**
     * 異步任務自定義線程池
     */
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
    	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數(shù)
        executor.setCorePoolSize(50);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(500);
        //配置隊列大小
        executor.setQueueCapacity(300);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("testExecutor-");
        // rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //調(diào)用shutdown()方法時等待所有的任務完成后再關閉
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //等待所有任務完成后的最大等待時間
		executor.setAwaitTerminationSeconds(60);
        return executor;
    }
}

4.創(chuàng)建異步線程業(yè)務類

package com.test.service;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.test.entity.TestEntity;
import com.test.mapper.TestMapper;
@Service
public class AsyncService {
	@Autowired
	private SqlSessionFactory sqlSessionFactory;
	@Async("asyncServiceExecutor")
    public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {
        try{
        	//獲取session,打開批處理,因為是多線程,所以每個線程都要開啟一個事務
        	SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
        	TestMapper mapper = session.getMapper(TestMapper.class);
            //異步線程要做的事情
        	for (int i = 0; i < logOutputResults.size(); i++) {
    			System.out.println(Thread.currentThread().getName() + "線程:" + logOutputResults.get(i));
    			TestEntity test = new TestEntity();
    			//test.set()
    			//.............
    			//批量保存
    			mapper.testInsert(test);
    			//每1000條提交一次防止內(nèi)存溢出
    			if(i%1000==0){
    				session.flushStatements();
    			}
			}
        	//提交剩下未處理的事務
    		session.flushStatements();
        }finally {
            countDownLatch.countDown();// 很關鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放
        }
    }
}

5.拆分list調(diào)用異步的業(yè)務方法

package com.test.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
@Service
public class TestService {
	@Resource
	private AsyncService asyncService;
	public int testMultiThread() {
        List<String> logOutputResults = getTestData();
        //按線程數(shù)拆分后的list
        List<List<String>> lists = splitList(logOutputResults);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        for (List<String> listSub:lists) {
            asyncService.executeAsync(listSub, countDownLatch);
        }
        try {
            countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會走下面的;
            // 這樣就可以在下面拿到所有線程執(zhí)行完的集合結果
        } catch (Exception e) {
            e.printStackTrace();
        }
        return logOutputResults.size();
    }
	public List<String> getTestData() {
		List<String> logOutputResults = new ArrayList<String>();
        for (int i = 0; i < 3000; i++) {
        	logOutputResults.add("測試數(shù)據(jù)"+i);
		}
        return logOutputResults;
    }
	public List<List<String>> splitList(List<String> logOutputResults) {
		List<List<String>> results = new ArrayList<List<String>>();
		/*動態(tài)線程數(shù)方式*/
		// 每500條數(shù)據(jù)開啟一條線程
		int threadSize = 500;
		// 總數(shù)據(jù)條數(shù)
		int dataSize = logOutputResults.size();
		// 線程數(shù),動態(tài)生成
		int threadNum = dataSize / threadSize + 1;
	    /*固定線程數(shù)方式
		    // 線程數(shù)
		    int threadNum = 6;
		    // 總數(shù)據(jù)條數(shù)
		    int dataSize = logOutputResults.size();
		    // 每一條線程處理多少條數(shù)據(jù)
		    int threadSize = dataSize / (threadNum - 1);
	    */
		// 定義標記,過濾threadNum為整數(shù)
		boolean special = dataSize % threadSize == 0;
		List<String> cutList = null;
		// 確定每條線程的數(shù)據(jù)
		for (int i = 0; i < threadNum; i++) {
			if (i == threadNum - 1) {
				if (special) {
					break;
				}
				cutList = logOutputResults.subList(threadSize * i, dataSize);
			} else {
				cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
			}
			results.add(cutList);
		}
        return results;
    }
}

6.Controller測試

@RestController
public class TestController {
	@Resource
	private TestService testService;
	@RequestMapping(value = "/log", method = RequestMethod.GET)
	@ApiOperation(value = "測試")
	public String test() {
		testService.testMultiThread();
		return "success";
	}
}

總結:

注意這里執(zhí)行插入的數(shù)據(jù)是無序的。

擴展:Java多線程分段處理List集合

項目場景:

大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫中。

解決方案:

拆分list集合后,然后使用多線程實現(xiàn)

public static void main(String[] args) throws Exception {
	// 開始時間
	long start = System.currentTimeMillis();
	List<String> list = new ArrayList<String>();
	for (int i = 1; i <= 3000; i++) {
		list.add(i + "");
	}
    /*動態(tài)線程數(shù)方式*/
	// 每500條數(shù)據(jù)開啟一條線程
	int threadSize = 500;
	// 總數(shù)據(jù)條數(shù)
	int dataSize = list.size();
	// 線程數(shù),動態(tài)生成
	int threadNum = dataSize / threadSize + 1;
    /*固定線程數(shù)方式
	    // 線程數(shù)
	    int threadNum = 6;
	    // 總數(shù)據(jù)條數(shù)
	    int dataSize = list.size();
	    // 每一條線程處理多少條數(shù)據(jù)
	    int threadSize = dataSize / (threadNum - 1);
    */
	// 定義標記,過濾threadNum為整數(shù)
	boolean special = dataSize % threadSize == 0;
	// 創(chuàng)建一個線程池
	ExecutorService exec = Executors.newFixedThreadPool(threadNum);
	// 定義一個任務集合
	List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
	Callable<Integer> task = null;
	List<String> cutList = null;
	// 確定每條線程的數(shù)據(jù)
	for (int i = 0; i < threadNum; i++) {
		if (i == threadNum - 1) {
			if (special) {
				break;
			}
			cutList = list.subList(threadSize * i, dataSize);
		} else {
			cutList = list.subList(threadSize * i, threadSize * (i + 1));
		}
		final List<String> listStr = cutList;
		task = new Callable<Integer>() {
			@Override
			public Integer call() throws Exception {
				//業(yè)務邏輯,循環(huán)處理分段后的list
				System.out.println(Thread.currentThread().getName() + "線程:" + listStr);
				//......
				int logCount = 0;	//記錄下數(shù)量
				for (int j = 0; j < listStr.size(); j++) {
					logCount++;
				}
				return logCount;
			}
		};
		// 這里提交的任務容器列表和返回的Future列表存在順序對應的關系
		tasks.add(task);
	}
	exec.invokeAll(tasks);
	//總數(shù)
    int total = 0;
    try {
        List<Future<Integer>> results = exec.invokeAll(tasks);
        for (Future<Integer> future : results) {
            //累計線程處理的總記錄數(shù)
            total+=future.get();
        }
        // 關閉線程池
        exec.shutdown();
	} catch (Exception e) {
		e.printStackTrace();
	}
	System.out.println("線程任務執(zhí)行結束");
	System.out.println("總共處理了"+total+"條數(shù)據(jù),消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
}

到此這篇關于Spring Boot分段處理List集合多線程批量插入數(shù)據(jù)的文章就介紹到這了,更多相關Spring Boot批量插入數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • MyBatis動態(tài)SQL foreach標簽實現(xiàn)批量插入的方法示例

    MyBatis動態(tài)SQL foreach標簽實現(xiàn)批量插入的方法示例

    這篇文章主要介紹了MyBatis動態(tài)SQL foreach標簽實現(xiàn)批量插入的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-06-06
  • 詳解消息隊列及RabbitMQ部署和使用

    詳解消息隊列及RabbitMQ部署和使用

    消息隊列是最古老的中間件之一,從系統(tǒng)之間有通信需求開始,就自然產(chǎn)生了消息隊列。本文告訴什么是消息隊列,為什么需要消息隊列,常見的消息隊列有哪些,RabbitMQ的部署和使用
    2021-09-09
  • 一文讓你搞懂如何手寫一個redis分布式鎖

    一文讓你搞懂如何手寫一個redis分布式鎖

    既然要搞懂Redis分布式鎖,那肯定要有一個需要它的場景。高并發(fā)售票問題就是一個經(jīng)典案例。本文就來利用這個場景手寫一個redis分布式鎖,讓你徹底搞懂它
    2022-11-11
  • 關于BigDecimal類型數(shù)據(jù)的絕對值和相除求百分比

    關于BigDecimal類型數(shù)據(jù)的絕對值和相除求百分比

    這篇文章主要介紹了關于BigDecimal類型數(shù)據(jù)的絕對值和相除求百分比,Java在java.math包中提供的API類BigDecimal,用來對超過16位有效位的數(shù)進行精確的運算,需要的朋友可以參考下
    2023-07-07
  • IDEA中用maven連接數(shù)據(jù)庫的教程

    IDEA中用maven連接數(shù)據(jù)庫的教程

    這篇文章主要介紹了IDEA中用maven連接數(shù)據(jù)庫的教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • Java開發(fā)工具-scala處理json格式利器-json4s詳解

    Java開發(fā)工具-scala處理json格式利器-json4s詳解

    這篇文章主要介紹了開發(fā)工具-scala處理json格式利器-json4s,文章中處理方法講解的很清楚,有需要的同學可以研究下
    2021-02-02
  • Spring啟動時實現(xiàn)初始化的幾種方案

    Spring啟動時實現(xiàn)初始化的幾種方案

    這篇文章主要介紹了Spring啟動時實現(xiàn)初始化的幾種方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • java synchronized實現(xiàn)可見性過程解析

    java synchronized實現(xiàn)可見性過程解析

    這篇文章主要介紹了java synchronized實現(xiàn)可見性過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-09-09
  • Java靜態(tài)內(nèi)部類實現(xiàn)單例過程

    Java靜態(tài)內(nèi)部類實現(xiàn)單例過程

    這篇文章主要介紹了Java靜態(tài)內(nèi)部類實現(xiàn)單例過程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-10-10
  • Java?I/O?(Input/Output)文件字節(jié)流舉例詳解

    Java?I/O?(Input/Output)文件字節(jié)流舉例詳解

    Java的輸入輸出流(IO)是用于與外部設備(如文件、網(wǎng)絡連接等)進行數(shù)據(jù)交互的機制,下面這篇文章主要給大家介紹了關于Java?I/O?(Input/Output)文件字節(jié)流的相關資料,需要的朋友可以參考下
    2024-08-08

最新評論