Spring?Boot使用線程池處理上萬條數(shù)據(jù)插入功能
# 前言
前兩天做項(xiàng)目的時(shí)候,想提高一下插入表的性能優(yōu)化,因?yàn)槭莾蓮埍?,先插舊的表,緊接著插新的表,一萬多條數(shù)據(jù)就有點(diǎn)慢了
后面就想到了線程池ThreadPoolExecutor,而用的是Spring Boot項(xiàng)目,可以用Spring提供的對ThreadPoolExecutor封裝的線程池ThreadPoolTaskExecutor,直接使用注解啟用
# 使用步驟
先創(chuàng)建一個(gè)線程池的配置,讓Spring Boot加載,用來定義如何創(chuàng)建一個(gè)ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個(gè)注解,表示這是個(gè)配置類,并且是線程池的配置類
@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }
@Value是我配置在application.properties,可以參考配置,自由定義
# 異步線程配置 # 配置核心線程數(shù) async.executor.thread.core_pool_size = 5 # 配置最大線程數(shù) async.executor.thread.max_pool_size = 5 # 配置隊(duì)列大小 async.executor.thread.queue_capacity = 99999 # 配置線程池中的線程的名稱前綴 async.executor.thread.name.prefix = async-service-
創(chuàng)建一個(gè)Service接口,是異步線程的接口
public interface AsyncService { /** * 執(zhí)行異步任務(wù) * 可以根據(jù)需求,自己加參數(shù)擬定,我這里就做個(gè)測試演示 */ void executeAsync(); }
實(shí)現(xiàn)類
@Service public class AsyncServiceImpl implements AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Override @Async("asyncServiceExecutor") public void executeAsync() { logger.info("start executeAsync"); System.out.println("異步線程要做的事情"); System.out.println("可以在這里執(zhí)行批量插入等耗時(shí)的事情"); logger.info("end executeAsync"); } }
在executeAsync()方法上增加注解@Async("asyncServiceExecutor"),asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法進(jìn)入的線程池是asyncServiceExecutor方法創(chuàng)建的
接下來就是在Controller里或者是哪里通過注解@Autowired注入這個(gè)Service
@Autowiredprivate AsyncService asyncService; @GetMapping("/async") public void async(){ asyncService.executeAsync(); }
日志打印
2022-07-16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:15:47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:15:47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:15:48.833 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:15:48.834 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:15:48.986 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:15:48.987 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
通過以上日志可以發(fā)現(xiàn),[async-service-]是有多個(gè)線程的,顯然已經(jīng)在我們配置的線程池中執(zhí)行了,并且每次請求中,controller的起始和結(jié)束日志都是連續(xù)打印的,表明每次請求都快速響應(yīng)了,而耗時(shí)的操作都留給線程池中的線程去異步執(zhí)行;
雖然我們已經(jīng)用上了線程池,但是還不清楚線程池當(dāng)時(shí)的情況,有多少線程在執(zhí)行,多少在隊(duì)列中等待呢?這里我創(chuàng)建了一個(gè)ThreadPoolTaskExecutor的子類,在每次提交線程的時(shí)候都會(huì)將當(dāng)前線程池的運(yùn)行狀況打印出來
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor; /** * @Author: 騰騰 * @Date: 2022/7/16/0016 22:19 */ public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
如上所示,showThreadPoolInfo方法中將任務(wù)總數(shù)、已完成數(shù)、活躍線程數(shù),隊(duì)列大小都打印出來了,然后Override了父類的execute、submit等方法,在里面調(diào)用showThreadPoolInfo方法,這樣每次有任務(wù)被提交到線程池的時(shí)候,都會(huì)將當(dāng)前線程池的基本情況打印到日志中;
修改ExecutorConfig.java的asyncServiceExecutor方法,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); //在這里修改 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; }
再次啟動(dòng)該工程測試
2022-07-16 22:23:30.951 INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
2022-07-16 22:23:30.952 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:23:30.953 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:23:31.351 INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [1], completedTaskCount [1], activeCount [0], queueSize [0]
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:23:31.927 INFO 14088 --- [nio-8087-exec-5] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [2], completedTaskCount [2], activeCount [0], queueSize [0]
2022-07-16 22:23:31.929 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:23:31.930 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3], completedTaskCount [3], activeCount [0], queueSize [0]
2022-07-16 22:23:32.498 INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
異步線程要做的事情
可以在這里執(zhí)行批量插入等耗時(shí)的事情
2022-07-16 22:23:32.499 INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
注意這一行日志:
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3], completedTaskCount [3], activeCount [0], queueSize [0]
這說明提交任務(wù)到線程池的時(shí)候,調(diào)用的是submit(Callable task)這個(gè)方法,當(dāng)前已經(jīng)提交了3個(gè)任務(wù),完成了3個(gè),當(dāng)前有0個(gè)線程在處理任務(wù),還剩0個(gè)任務(wù)在隊(duì)列中等待,線程池的基本情況一路了然;
到此這篇關(guān)于Spring Boot使用線程池處理上萬條數(shù)據(jù)插入的文章就介紹到這了,更多相關(guān)Spring Boot線程池處理上萬條數(shù)據(jù)插入內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java8簡單了解Lambda表達(dá)式與函數(shù)式接口
這篇文章主要介紹了Java8簡單了解Lambda表達(dá)式與函數(shù)式接口,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11Spring中ApplicationListener的使用解析
這篇文章主要介紹了Spring中ApplicationListener的使用解析,ApplicationContext事件機(jī)制是觀察者設(shè)計(jì)模式的實(shí)現(xiàn),通過ApplicationEvent類和ApplicationListener接口,需要的朋友可以參考下2023-12-12使用SpringBoot_jar方式啟動(dòng)并配置日志文件
這篇文章主要介紹了使用SpringBoot_jar方式啟動(dòng)并配置日志文件操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09SpringBoot配置默認(rèn)HikariCP數(shù)據(jù)源
咱們開發(fā)項(xiàng)目的過程中用到很多的開源數(shù)據(jù)庫鏈接池,比如druid、c3p0、BoneCP等等,本文主要介紹了SpringBoot配置默認(rèn)HikariCP數(shù)據(jù)源,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11