springboot的調(diào)度服務(wù)與異步服務(wù)使用詳解
1.調(diào)度服務(wù)
1.1.JDK之ScheduledExecutorService
講到調(diào)度任務(wù),我們腦海里馬上會想到ScheduledExecutorService。
ScheduledExecutorService是 Java java.util.concurrent 包中的一個(gè)接口,它繼承自 ExecutorService 接口。它主要用于在給定的延遲后運(yùn)行任務(wù),或者定期地執(zhí)行任務(wù)。這個(gè)接口提供了幾種安排任務(wù)執(zhí)行的方法,包括單次執(zhí)行、定期執(zhí)行和周期性執(zhí)行。
以下是 ScheduledExecutorService 提供的一些關(guān)鍵方法:
- schedule(Callable<V> callable, long delay, TimeUnit unit): 安排所提交的
Callable
任務(wù)在指定的延遲后運(yùn)行,返回一個(gè)Future
,代表任務(wù)的結(jié)果。 - schedule(Runnable command, long delay, TimeUnit unit): 安排所提交的
Runnable
任務(wù)在指定的延遲后運(yùn)行。 - scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 安排所提交的
Runnable
任務(wù)在指定的初始延遲后首次啟動,并且隨后按指定的周期重復(fù)執(zhí)行。 - scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 安排所提交的
Runnable
任務(wù)在指定的初始延遲后首次啟動,并且隨后在每次執(zhí)行結(jié)束和下次執(zhí)行開始之間都存在指定的延遲。
然而,如果采用了sprintboot,我們也可以直接采用springboot提供的調(diào)試線程池。這其中有一個(gè)最大的優(yōu)勢在于,可以利用spring的cron表達(dá)式,采用方法注解的方式,不用引入quartz第三方工具。
1.2.springboot使用調(diào)度線程池
我們嘗試從源代碼的角度,來看看springboot提供的調(diào)度線程池是怎么創(chuàng)建的。
首先,啟用調(diào)度配置,啟動類加上@EnableScheduling
springboot的自動配置類一般以AutoConfiguration作為后綴,采用模糊搜索ScheduleAutoConfiguration可以找到目標(biāo)TaskSchedulingAutoConfiguration。
從截圖我們可以看出,我們只需在application.yml加入以下的配置就可以啟動了
spring: task: ## 定時(shí)任務(wù)(業(yè)務(wù)上定時(shí)任務(wù)量不多,2個(gè)足矣) scheduling: ## 線程池核心線程數(shù)量 pool: size: 2 ## 線程名稱前線 threadNamePrefix: common-scheduling-
spring對線程池進(jìn)行二次封閉,最終調(diào)用的還是jdk的ThreadPoolExecutor類,我們在該類的構(gòu)造函數(shù)打個(gè)斷點(diǎn),可以看到,pool.size參數(shù)已經(jīng)被傳參:
1.3.ThreadPoolExecutor核心參數(shù)與執(zhí)行流程
這里有必要先介紹下 ThreadPoolExecutor類的幾個(gè)函數(shù)參數(shù)及基本運(yùn)行機(jī)制
構(gòu)造函數(shù)參數(shù):
- 核心線程數(shù)(Core Pool Size): 線程池中始終保持的線程數(shù)量,即使它們處于空閑狀態(tài)。如果任務(wù)數(shù)量少于核心線程數(shù),線程池會創(chuàng)建新的線程來處理任務(wù),而不會立即回收這些線程。
- 最大線程數(shù)(Maximum Pool Size): 線程池中允許的最大線程數(shù)量。如果任務(wù)數(shù)量超過了核心線程數(shù)但小于最大線程數(shù),且工作隊(duì)列已滿,線程池會創(chuàng)建新的線程來處理任務(wù),直到達(dá)到最大線程數(shù)。
- 工作隊(duì)列(Work Queue): 用于存放待執(zhí)行任務(wù)的阻塞隊(duì)列。當(dāng)所有核心線程都在忙碌時(shí),新的任務(wù)會被放入工作隊(duì)列中等待執(zhí)行。
- 線程工廠(Thread Factory): 用于創(chuàng)建新線程的工廠。它提供了一種方式來定制線程的創(chuàng)建過程,例如設(shè)置線程的名稱、優(yōu)先級、是否為守護(hù)線程等。
- 拒絕策略(Rejected Execution Handler): 當(dāng)任務(wù)無法被線程池及時(shí)處理時(shí)(即當(dāng)線程池已滿,且工作隊(duì)列已滿),線程池會采用拒絕策略來處理新提交的任務(wù)。常見的拒絕策略包括:
AbortPolicy
:拋出RejectedExecutionException
。CallerRunsPolicy
:由調(diào)用者線程運(yùn)行該任務(wù)。DiscardPolicy
:靜默丟棄任務(wù)。DiscardOldestPolicy
:丟棄隊(duì)列中最老的任務(wù),然后嘗試再次提交當(dāng)前任務(wù)。- 保持活動時(shí)間(Keep-Alive Time): 非核心線程空閑時(shí)在終止前等待新任務(wù)的最長時(shí)間。如果線程池允許核心線程空閑,這個(gè)參數(shù)也適用于核心線程。
- 時(shí)間單位(Time Unit): 與保持活動時(shí)間配合使用的時(shí)間單位,例如
TimeUnit.SECONDS
。
執(zhí)行流程:
- 如果線程池中的線程數(shù)量少于核心線程數(shù),即使有空閑線程,線程池也會優(yōu)先創(chuàng)建新線程來執(zhí)行新的任務(wù)。
- 如果線程池中的線程數(shù)量達(dá)到核心線程數(shù),新的任務(wù)會被放入工作隊(duì)列等待執(zhí)行。
- 如果工作隊(duì)列已滿且線程數(shù)量少于最大線程數(shù),線程池會創(chuàng)建新的非核心線程來執(zhí)行任務(wù)。
- 如果工作隊(duì)列已滿且線程數(shù)量達(dá)到最大線程數(shù),新的任務(wù)會被拒絕,線程池會采用拒絕策略來處理。
1.4.自定義線程池
由于spingboot對調(diào)度任務(wù)線程池的參數(shù)支持有限,如果想定制自己的參數(shù),可以注入自己的調(diào)度線程池,從代碼可看出:
@Configuration public class ThreadPoolConfig { @Bean public ScheduledExecutorService scheduledExecutorService() { return new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("common-schedule"), new ThreadPoolExecutor.CallerRunsPolicy()) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); } }; } }
1.5.spring cron注解
Cron 表達(dá)式是一種用于描述定時(shí)任務(wù)觸發(fā)時(shí)間的字符串表達(dá)式。它由多個(gè)時(shí)間字段組成,每個(gè)字段代表定時(shí)任務(wù)在特定時(shí)間單位上的觸發(fā)條件。
1.5.1.cron表達(dá)式語法格式
秒 分 時(shí) 日 月 星期 年份
其中,每個(gè)時(shí)間字段都有對應(yīng)的取值范圍和特殊符號。下面是每個(gè)時(shí)間字段的詳細(xì)說明:
1、秒(Seconds):取值范圍為 0~59。例如,`0/5` 表示從0秒開始,每隔 5 秒觸發(fā)一次,`*` 表示每秒都觸發(fā)。
2、分鐘(Minutes):取值范圍為 0~59。例如,`0/5` 表示從0分鐘開始,每隔 5 分鐘觸發(fā)一次,`*` 表示每分鐘都觸發(fā)。
3、小時(shí)(Hours):取值范圍為 0~23。例如,`0/2` 表示從0小時(shí)開始,每隔 2 小時(shí)觸發(fā)一次,`*` 表示每小時(shí)都觸發(fā)。
4、日期(Day of Month):取值范圍為 1~31。例如,`1,15` 表示每月的 1 日和 15 日觸發(fā),`*` 表示每天都觸發(fā)。
5、月份(Month):取值范圍為 1~12,也可以使用英文縮寫 JAN、FEB、MAR 等。例如,`1,6` 表示一月和六月觸發(fā),`*` 表示每個(gè)月都觸發(fā)。
6、 星期(Day of Week):取值范圍為 1~7,1 表示星期日,2 表示星期一,以此類推,也可以使用英文縮寫 SUN、MON、TUE 等。例如,`2-6` 表示星期一到星期五觸發(fā),`*` 表示每個(gè)星期都觸發(fā)。
7、年份(Year):可選字段,表示觸發(fā)條件的年份。例如,`2023` 表示在 2023 年觸發(fā),`*` 表示每年都觸發(fā)。
除了取值范圍,Cron 表達(dá)式還支持一些特殊符號,用于指定特定的觸發(fā)條件,例如:
- - 星號(*):代表所有可能的取值,表示不限制該時(shí)間字段的取值范圍。
- - 問號(?):僅在日期和星期字段中使用,表示不指定具體的取值,可以任意匹配。
- - 斜線(/):表示間隔觸發(fā),例如在分鐘字段中,"*/5" 表示每隔 5 分鐘觸發(fā)一次。
- - 逗號(,):用于指定多個(gè)取值,例如在小時(shí)字段中,"1,3,5" 表示在第 1、3、5 小時(shí)觸發(fā)。
- - 減號(-):用于指定一個(gè)范圍,例如在月份字段中,"3-6" 表示三月到六月觸發(fā)。
- L : 表示最后,只能出現(xiàn)在星期和每月第幾天域,如果在星期域使用1L,意味著在最后的一個(gè)星期日觸發(fā)。
- W : 表示有效工作日(周一到周五),只能出現(xiàn)在每月第幾日域,系統(tǒng)將在離指定日期的最近的有效工作日觸發(fā)事件。注意一點(diǎn),W的最近尋找不會跨過月份
- LW : 這兩個(gè)字符可以連用,表示在某個(gè)月最后一個(gè)工作日,即最后一個(gè)星期五。
# : 用于確定每個(gè)月第幾個(gè)星期幾,只能出現(xiàn)在每月第幾天域。例如在1#3,表示某月的第三個(gè)星期日。
1.5.2.cron表達(dá)式示例
作用 | 表達(dá)式 |
每隔5秒執(zhí)行一次 | */5 * * * * ? |
每天中午12點(diǎn)執(zhí)行一次 | 0 0 12 * * ? |
2024年的每天上午10:00執(zhí)行一次 | 0 0 10 * * ? 2023 |
每天下午6點(diǎn)到下午6:59每分鐘執(zhí)行一次 | 0 * 18 * * ? |
每月的最后一個(gè)星期五上午10:30執(zhí)行一次 | 0 30 10 ? * 6L |
每月的第4個(gè)星期五上午10:25執(zhí)行一次 | 0 25 10 ? * 6#4 |
每天上午8點(diǎn),下午1點(diǎn),4點(diǎn)執(zhí)行一次 | 0 0 8,13,16 * * ? |
2.異步任務(wù)
2.1.springboot配置
在軟件開發(fā)中,有些任務(wù)比較耗時(shí)但又無需馬上獲得結(jié)果。一般地,這些任務(wù)我們可以采用獨(dú)立線程池異步執(zhí)行。如果程序基于springboot環(huán)境,我們有現(xiàn)成的工具可以使用。
首先,我們需要在程序啟動入口類增加@EnableAsync。
借著,我們嘗試從源代碼的角度,來看看springboot提供的異步線程池是怎么創(chuàng)建的。
從springboot的命名風(fēng)格可知,通過模糊搜索TaskAutoConfiguration,可以找到TaskExecutionAutoConfiguration,如下:
從源代碼可知,只需在application.yml配置如下參數(shù)即可:
spring: execution: pool: coreSize: 4 queueCapacity: 64 maxSize: 8 ## 禁止空閑線程關(guān)閉,保證最少有core個(gè)存活線程 allowCoreThreadTimeout: false keepAlive: 300s threadNamePrefix: common-async_task-
這些參數(shù)跟jdk的ThreadPoolExecutor類的構(gòu)造參數(shù)非常相似,這里不作解析 。
2.2.使用異步任務(wù)
使用方法,只要在目標(biāo)方法的簽名加上@Async
然而,執(zhí)行結(jié)果卻出乎意外(在main主線程上執(zhí)行,沒有異步執(zhí)行)
熟悉springaop機(jī)制的同學(xué),馬上知道這是因?yàn)楫惒饺蝿?wù)底層是基于動態(tài)代理機(jī)制實(shí)現(xiàn)的。Spring AOP 代理只有在通過 Spring 容器獲取 Bean 時(shí)才會創(chuàng)建。
當(dāng)在同一個(gè)類內(nèi)部調(diào)用一個(gè)方法時(shí),調(diào)用的是原始對象,而不是代理對象。因此,內(nèi)部調(diào)用不會經(jīng)過 Spring AOP 代理,也就無法觸發(fā)異步執(zhí)行。解決這個(gè)問題最簡單的方法是用一個(gè)新的類來管理異步執(zhí)行方法。
問題解決
2.3.模擬系統(tǒng)繁忙進(jìn)行性能測試
我們嘗試模擬一些極端情況,系統(tǒng)處理不過來的情況。修改上面的配置,改為
pool: coreSize: 1 queueCapacity: 1 maxSize: 8
同時(shí),異步執(zhí)行增加時(shí)間延遲來模擬耗時(shí)任務(wù)
@Component @Slf4j class AsyncTaskHandler { @Async public void busyTask1() { try { Thread.sleep(5000); } catch (InterruptedException ignored) { } log.info("----------busyTask1---------"); } @Async public void busyTask2() { try { Thread.sleep(5000); } catch (InterruptedException ignored) { } log.info("----------busyTask2---------"); } }
執(zhí)行結(jié)果會出現(xiàn)報(bào)錯(cuò)(線程數(shù)已達(dá)到最大數(shù)量,且任務(wù)隊(duì)列已滿,無法添加新任務(wù))
調(diào)度任務(wù)執(zhí)行頻率是可預(yù)見的,有多少個(gè)任務(wù),執(zhí)行頻率,開發(fā)可知,核心數(shù)量,最大數(shù)量,隊(duì)列容量比較好設(shè)定。而異步執(zhí)行頻率很大程度是由系統(tǒng)的使用者(用戶)決定的,因此這些參數(shù)需要根據(jù)流量動態(tài)修改。
2.4.異步線程池拒絕策略
如果不想把 queueCapacity和maxSize都設(shè)置成很大的話,我們可以考慮修改下線程池的拒絕策略。最妥當(dāng)?shù)姆绞绞牵热划惒讲涣?,那?quot;熔斷"成同步,直接在調(diào)用者所在的業(yè)務(wù)線程執(zhí)行。然而,springboot沒有提供相應(yīng)的配置項(xiàng)。為此,我們只能關(guān)閉springboot的自動配置了。
從代碼可看出,只要提供一個(gè)Executor實(shí)例,并且名字叫taskExecutor即可。
話不多說,上代碼
@Configuration public class ThreadPoolConfig { private final int core = Runtime.getRuntime().availableProcessors(); /** * springboot 自動注入的異步執(zhí)行線程池,拒絕策略為丟棄,難以配置maxPoolSize參數(shù) * @see TaskExecutionAutoConfiguration#taskExecutorBuilder */ @Bean(name = "taskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor(TaskExecutionProperties properties) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(properties.getPool().getCoreSize()); executor.setThreadNamePrefix(properties.getThreadNamePrefix()); executor.setMaxPoolSize(properties.getPool().getMaxSize()); executor.setQueueCapacity(properties.getPool().getQueueCapacity()); executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().toSeconds()); executor.setAllowCoreThreadTimeOut(properties.getPool().isAllowCoreThreadTimeout()); // 超過隊(duì)列容量,則在業(yè)務(wù)線程上執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
重新運(yùn)行程序,可以看到,當(dāng)任務(wù)超過線程池負(fù)載的時(shí)候,多余的線程會在調(diào)用線程上執(zhí)行,變?yōu)橥酱a
個(gè)人認(rèn)為:使用springboot創(chuàng)建的線程池,代碼也只是稍微簡化一點(diǎn)點(diǎn)。
采用原生線程池,每個(gè)業(yè)務(wù)代碼必須實(shí)現(xiàn)Runnable接口,而使用springboot的異步線程池,只需以方法注解的形式即可,底層aop會生成對應(yīng)的代理方法。但要確保避免內(nèi)部方法調(diào)用導(dǎo)致異步邏輯失效。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
logback ThresholdFilter臨界值日志過濾器源碼解讀
這篇文章主要為大家介紹了logback ThresholdFilter臨界值日志過濾器源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11基于Spring Data Jest的Elasticsearch數(shù)據(jù)統(tǒng)計(jì)示例
本篇文章主要介紹了基于Spring Data Jest的Elasticsearch數(shù)據(jù)統(tǒng)計(jì)示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-02-02idea運(yùn)行java項(xiàng)目main方法報(bào)build failure錯(cuò)誤的解決方法
當(dāng)在使用 IntelliJ IDEA 運(yùn)行 Java 項(xiàng)目的 main 方法時(shí)遇到 "Build Failure" 錯(cuò)誤,這通常意味著在項(xiàng)目的構(gòu)建過程中遇到了問題,以下是一些詳細(xì)的解決步驟,以及一個(gè)簡單的代碼示例,用于展示如何確保 Java 程序可以成功構(gòu)建和運(yùn)行,需要的朋友可以參考下2024-09-09SpringBoot JSON全局日期格式轉(zhuǎn)換器實(shí)現(xiàn)方式
這篇文章主要介紹了SpringBoot JSON全局日期格式轉(zhuǎn)換器,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04Netty分布式ByteBuf中PooledByteBufAllocator剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf剖析PooledByteBufAllocator簡述,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03