SpringBoot使用異步線程池實現(xiàn)生產(chǎn)環(huán)境批量數(shù)據(jù)推送
前言
SpringBoot使用異步線程池:
1、編寫線程池配置類,自定義一個線程池;
2、定義一個異步服務(wù);
3、使用@Async注解指向定義的線程池;
這里以我工作中使用過的一個案例來做描述,我所在公司是醫(yī)療行業(yè),敏感數(shù)據(jù)需要上報到某監(jiān)管平臺,所以有一個定時任務(wù)在流量較小時(一般是凌晨后)執(zhí)行上報行為。但特殊時期會存在一定要在工作時間大批量上報數(shù)據(jù)的情況,且要求短時間內(nèi)就要完成,此時就考慮寫一個專門的異步上報接口手動執(zhí)行,利用線程池上報,極大提高了速度。
編寫線程池配置類
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; ? import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; ? /** ?* 類名稱:ExecutorConfig ?* ******************************** ?* <p> ?* 類描述:線程池配置 ?* ?* @author guoj ?* @date 2021-09-07 09:00 ?*/ @Configuration @EnableAsync @Slf4j public class ExecutorConfig { ? ? /** ? ? ?* 定義數(shù)據(jù)上報線程池 ? ? ?* @return ? ? ?*/ ? ? @Bean("dataCollectionExecutor") ? ? public Executor dataCollectionExecutor() { ? ? ? ? ? ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ? ? ? ? ? // 核心線程數(shù)量:當(dāng)前機(jī)器的核心數(shù) ? ? ? ? executor.setCorePoolSize( ? ? ? ? ? ? ? ? Runtime.getRuntime().availableProcessors()); ? ? ? ? ? // 最大線程數(shù) ? ? ? ? executor.setMaxPoolSize( ? ? ? ? ? ? ? ? Runtime.getRuntime().availableProcessors() * 2); ? ? ? ? ? // 隊列大小 ? ? ? ? executor.setQueueCapacity(Integer.MAX_VALUE); ? ? ? ? ? // 線程池中的線程名前綴 ? ? ? ? executor.setThreadNamePrefix("sjsb-"); ? ? ? ? ? // 拒絕策略:直接拒絕 ? ? ? ? executor.setRejectedExecutionHandler( ? ? ? ? ? ? ? ? new ThreadPoolExecutor.AbortPolicy()); ? ? ? ? ? // 執(zhí)行初始化 ? ? ? ? executor.initialize(); ? ? ? ? ? return executor; ? ? } }
PS:
1)、需要注意,這里一定要自己定義ThreadPoolTaskExecutor線程池,否則springboot的異步注解會執(zhí)行默認(rèn)線程池,存在線程阻塞導(dǎo)致CPU飆高及內(nèi)存溢出的風(fēng)險。這一點可以參考阿里開發(fā)手冊,線程池定義這塊明確提到了這一點;
2)、在@Bean注解中定義線程池名稱,后面異步注解會用到。
編寫異步服務(wù)
/** ?* 異步方法的服務(wù), 不影響主程序運行。 ?*/ @Service public class AsyncService { ? ? ? private final Logger log = LoggerFactory.getLogger(AsyncService.class); ? ? ? /** ? ? ?* 發(fā)送短信 ? ? ?*/ ? ? @Async("sendMsgExecutor") ? ? public void sendMsg(String access_token, Consult item, Map<String, String> configMap) { ? ? ? ? // 此處編寫發(fā)送短信業(yè)務(wù) ? ? ? ? // 1、buildConsultData(); ? ? ? ? // 2、sendMsg(); ? ? } ? ? ? /** ? ? ?* 發(fā)送微信訂閱消息 ? ? ?*/ ? ? @Async ? ? public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) { ? ? ? ? // 此處編寫發(fā)送微信訂閱消息業(yè)務(wù) ? ? ? ? // 1、buildConsultData(); ? ? ? ? // 2、sendSubscribeMsg(); ? ? } ? ? ? /** ? ? ?* 數(shù)據(jù)并上報 ? ? ?*/ ? ? @Async("dataCollectionExecutor") ? ? public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) { ? ? ? ? // 此處編寫上報業(yè)務(wù),如拼接數(shù)據(jù),然后執(zhí)行上報。 ? ? ? ? // 1、buildConsultData(); ? ? ? ? // 2、postData(); ? ? } }
PS:
1)、以上是代碼片段,個人經(jīng)驗認(rèn)為專門定義一個異步service存放各個異步方法最佳,這樣可以避免編碼時一些誤操作比如異步方法不是void或者是private修飾,導(dǎo)致@Async注解失效的情況,同時可以安排每個注解指向不同的自定義線程池更加靈活;
2)、@Async注解中的名稱就是上面定義的自定義線程池名稱,這樣業(yè)務(wù)執(zhí)行時就會從指定線程池中獲取異步線程。
異步批量上報數(shù)據(jù)
@Autowired private AsyncService asyncService; ? /** ?* 手動上報問診記錄,線程池方式。 ?*/ public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) { ? ? ? // 查詢指定時間內(nèi)的問診記錄 ? ?List<Consult> consultList = consultService ? ? ? ?.findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId")); ? ? ?if (!CollectionUtils.isEmpty(consultList)) { ? ? ? ? ?log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手動上報問診記錄, 一共[{}]條", consultList.size()); ? ? ? ? ?consultList.forEach((item) -> { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?// 異步調(diào)用,使用線程池。 ? ? ? ? ? ? ? ?asyncService.buildAndPostData(access_token, item, configMap); ? ? ? ? ? ?} catch (Exception ex) { ? ? ? ? ? ? ? ?log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手動上報問診記錄發(fā)生異常: ", ex); ? ? ? ? ? ?} ? ? ? ?}); ? ?} }
總結(jié)
以上方式已經(jīng)在生產(chǎn)環(huán)境運行,在工作時間內(nèi)執(zhí)行過很多次,一次數(shù)萬條記錄基本是幾分鐘內(nèi)就全部上報完畢,而正常循環(huán)遍歷時一次大概需要半個小時左右。
線程池的使用方式往往來源于業(yè)務(wù)場景,如果類似的業(yè)務(wù)不存在緊急處理的情況,大體還是以任務(wù)調(diào)度執(zhí)行為主,因為更安全。如果存在緊急處理的情況,那么使用SpringBoot+線程池的方式不僅能節(jié)省非常多的時間,且不占用主線程的執(zhí)行空間。
到此這篇關(guān)于SpringBoot使用異步線程池實現(xiàn)生產(chǎn)環(huán)境批量數(shù)據(jù)推送的文章就介紹到這了,更多相關(guān)SpringBoot 生產(chǎn)環(huán)境批量數(shù)據(jù)推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
- springboot?正確的在異步線程中使用request的示例代碼
- SpringBoot整合MQTT并實現(xiàn)異步線程調(diào)用的問題
- SpringBoot?異步線程間傳遞上下文方式
- Spring Boot之@Async異步線程池示例詳解
- SpringBoot獲取HttpServletRequest的3種方式總結(jié)
- SpringBoot詳細(xì)講解異步任務(wù)如何獲取HttpServletRequest
- 在 Spring Boot 中使用異步線程時的 HttpServletRequest 復(fù)用問題記錄
相關(guān)文章
基于servlet的執(zhí)行原理與生命周期(全面解析)
下面小編就為大家分享一篇servlet的執(zhí)行原理與生命周期全面解析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2017-12-12使用jdk1.8實現(xiàn)將list根據(jù)指定的值去分組的操作
這篇文章主要介紹了使用jdk1.8實現(xiàn)將list根據(jù)指定的值去分組的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10Java實現(xiàn)在線編輯預(yù)覽office文檔詳解
PageOffice是一款在線的office編輯軟件,幫助Web應(yīng)用系統(tǒng)或Web網(wǎng)站實現(xiàn)用戶在線編輯Word、Excel、PowerPoint文檔,下面我們就來看看如何使用Java實現(xiàn)在線預(yù)覽office吧2024-01-01一文徹底吃透SpringMVC中的轉(zhuǎn)發(fā)和重定向
大家應(yīng)該都知道springmvc本來就會把返回的字符串作為視圖名解析,然后轉(zhuǎn)發(fā)到對應(yīng)的視圖,這篇文章主要給大家介紹了關(guān)于SpringMVC中轉(zhuǎn)發(fā)和重定向的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-04-04