欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot分段處理List集合多線程批量插入數(shù)據(jù)方式

 更新時(shí)間:2025年08月16日 08:55:23   作者:濤哥是個(gè)大帥比  
文章介紹如何處理大數(shù)據(jù)量List批量插入數(shù)據(jù)庫(kù)的優(yōu)化方案:通過(guò)拆分List并分配獨(dú)立線程處理,結(jié)合Spring線程池與異步方法提升效率,推薦使用batch模式而非foreach標(biāo)簽,注意插入數(shù)據(jù)無(wú)序性

項(xiàng)目場(chǎng)景

大數(shù)據(jù)量的List集合,需要把List集合中的數(shù)據(jù)批量插入數(shù)據(jù)庫(kù)中。

解決方案

拆分list集合后,然后使用多線程批量插入數(shù)據(jù)庫(kù)

1.實(shí)體類(lèi)

package com.test.entity;

import lombok.Data;

@Data
public class TestEntity {
	
	private String id;
	private String name;
}

2.Mapper

如果數(shù)據(jù)量不大,用foreach標(biāo)簽就足夠了。如果數(shù)據(jù)量很大,建議使用batch模式。

package com.test.mapper;

import java.util.List;

import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;

import com.test.entity.TestEntity;

public interface TestMapper {
	
	/**
	  * 1.用于使用batch模式,ExecutorType.BATCH開(kāi)啟批處理模式
	  * 數(shù)據(jù)量很大,推薦這種方式
	  */
	@Insert("insert into test(id, name) "
			   + " values"
			   + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
	void testInsert(TestEntity testEntity);
	
	/**
	  * 2.使用foreach標(biāo)簽,批量保存
	  * 數(shù)據(jù)量少可以使用這種方式
	  */
	@Insert("insert into test(id, name) "
			   + " values"
			   + " <foreach collection='list' item='item' index='index' separator=','>"
			   + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
			   + " </foreach>")
	void testBatchInsert(@Param("list") List<TestEntity> list);
}

3.spring容器注入線程池bean對(duì)象

package com.test.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
public class ExecutorConfig {
    /**
     * 異步任務(wù)自定義線程池
     */
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
    	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數(shù)
        executor.setCorePoolSize(50);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(500);
        //配置隊(duì)列大小
        executor.setQueueCapacity(300);
        //配置線程池中的線程的名稱(chēng)前綴
        executor.setThreadNamePrefix("testExecutor-");
        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //調(diào)用shutdown()方法時(shí)等待所有的任務(wù)完成后再關(guān)閉
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //等待所有任務(wù)完成后的最大等待時(shí)間
		executor.setAwaitTerminationSeconds(60);
        return executor;
    }
}

4.創(chuàng)建異步線程業(yè)務(wù)類(lèi)

package com.test.service;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.test.entity.TestEntity;
import com.test.mapper.TestMapper;

@Service
public class AsyncService {
	@Autowired
	private SqlSessionFactory sqlSessionFactory;
	
	@Async("asyncServiceExecutor")
    public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {
		//獲取session,打開(kāi)批處理,因?yàn)槭嵌嗑€程,所以每個(gè)線程都要開(kāi)啟一個(gè)事務(wù)
        SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
		
        try{
        	
        	TestMapper mapper = session.getMapper(TestMapper.class);
        	
            //異步線程要做的事情
        	for (int i = 0; i < logOutputResults.size(); i++) {
    			System.out.println(Thread.currentThread().getName() + "線程:" + logOutputResults.get(i));
    			
    			TestEntity test = new TestEntity();
    			//test.set()
    			//.............
    			//批量保存
    			mapper.testInsert(test);
    			//每1000條提交一次防止內(nèi)存溢出
    			if(i%1000==0){
    				session.flushStatements();
    			}
			}
        	//提交剩下未處理的事務(wù)
    		session.flushStatements();
        }finally {
            countDownLatch.countDown();// 很關(guān)鍵, 無(wú)論上面程序是否異常必須執(zhí)行countDown,否則await無(wú)法釋放
			if(session != null){
				session.close();
			}
        }
    }
}

5.拆分list調(diào)用異步的業(yè)務(wù)方法

package com.test.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;


@Service
public class TestService {

	@Resource
	private AsyncService asyncService;
	
	public int testMultiThread() {
        List<String> logOutputResults = getTestData();
        //按線程數(shù)拆分后的list
        List<List<String>> lists = splitList(logOutputResults);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        for (List<String> listSub:lists) {
            asyncService.executeAsync(listSub, countDownLatch);
        }
        try {
            countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會(huì)走下面的;
            // 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
        } catch (Exception e) {
            e.printStackTrace();
        }
        return logOutputResults.size();
    }
	
	public List<String> getTestData() {
		List<String> logOutputResults = new ArrayList<String>();
        for (int i = 0; i < 3000; i++) {
        	logOutputResults.add("測(cè)試數(shù)據(jù)"+i);
		}
        return logOutputResults;
    }
	
	public List<List<String>> splitList(List<String> logOutputResults) {
		List<List<String>> results = new ArrayList<List<String>>();
		
		/*動(dòng)態(tài)線程數(shù)方式*/
		// 每500條數(shù)據(jù)開(kāi)啟一條線程
		int threadSize = 500;
		// 總數(shù)據(jù)條數(shù)
		int dataSize = logOutputResults.size();
		// 線程數(shù),動(dòng)態(tài)生成
		int threadNum = dataSize / threadSize + 1;
	 
	    /*固定線程數(shù)方式
		    // 線程數(shù)
		    int threadNum = 6;
		    // 總數(shù)據(jù)條數(shù)
		    int dataSize = logOutputResults.size();
		    // 每一條線程處理多少條數(shù)據(jù)
		    int threadSize = dataSize / (threadNum - 1);
	    */
	 
		// 定義標(biāo)記,過(guò)濾threadNum為整數(shù)
		boolean special = dataSize % threadSize == 0;
	 
		List<String> cutList = null;
	 
		// 確定每條線程的數(shù)據(jù)
		for (int i = 0; i < threadNum; i++) {
			if (i == threadNum - 1) {
				if (special) {
					break;
				}
				cutList = logOutputResults.subList(threadSize * i, dataSize);
			} else {
				cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
			}
			
			results.add(cutList);
		}
		
        return results;
    }
}

6.Controller測(cè)試

@RestController
public class TestController {
	
	@Resource
	private TestService testService;
	

	@RequestMapping(value = "/log", method = RequestMethod.GET)
	@ApiOperation(value = "測(cè)試")
	public String test() {
		testService.testMultiThread();
		return "success";
	}
}

總結(jié)

注意這里執(zhí)行插入的數(shù)據(jù)是無(wú)序的。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論