SpringBoot Event 事件如何實(shí)現(xiàn)異步延遲執(zhí)行
SpringBoot Event 事件實(shí)現(xiàn)異步延遲執(zhí)行
Spring的事件(Application Event)非常好用,雖然有一點(diǎn)會(huì)出現(xiàn)代碼污染,但是在做不使用其他框架來(lái)做異步的情況先,還是非常方便的。
使用它只需要三樣?xùn)|西
- 自定義事件:繼承 ApplicationEvent,創(chuàng)建一個(gè)你想傳的數(shù)據(jù)的對(duì)象,會(huì)在監(jiān)聽(tīng)器那邊收到該對(duì)象。
- 定義監(jiān)聽(tīng)器,實(shí)現(xiàn) ApplicationListener 或者通過(guò) @EventListener 注解到方法上,兩種方式都行,但是推薦使用@EventListener,只要參數(shù)是你寫(xiě)的繼承ApplicationEvent的對(duì)象,就會(huì)自動(dòng)找到執(zhí)行方法。
- 定義發(fā)布者,通過(guò) ApplicationEventPublisher,自帶的bean,不需要單獨(dú)聲明,直接@Autowired就能使用,主要只需要publishEvent方法。
但是有時(shí)候我需要做延時(shí)執(zhí)行,自帶的功能缺不支持,但是我發(fā)現(xiàn)ApplicationEvent對(duì)象里面有兩個(gè)成員變量,source和timestamp,構(gòu)造函數(shù)(@since 5.3.8)也提供了同時(shí)注入這兩個(gè)變量數(shù)據(jù)。
? ?/** ?? ? * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp} ?? ? * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}. ?? ? * <p>This constructor is typically used in testing scenarios. ?? ? * @param source the object on which the event initially occurred or with ?? ? * which the event is associated (never {@code null}) ?? ? * @param clock a clock which will provide the timestamp ?? ? * @since 5.3.8 ?? ? * @see #ApplicationEvent(Object) ?? ? */ ?? ?public ApplicationEvent(Object source, Clock clock) { ?? ??? ?super(source); ?? ??? ?this.timestamp = clock.millis(); ?? ?}
但是,看了說(shuō)明timestamp只是標(biāo)志執(zhí)行的時(shí)間,并不是為了延遲執(zhí)行,可惜了。
于是查了一些資料,找到j(luò)ava.util.concurrent.DelayQueue對(duì)象,JDK自帶了延遲的隊(duì)列對(duì)象,我們可以考慮利用自帶的timestamp和延遲隊(duì)列DelayQueue結(jié)合一起來(lái)實(shí)現(xiàn),具體DelayQueue的使用請(qǐng)自行查詢(xún),非常的簡(jiǎn)單。
首先,繼承的ApplicationEvent重新實(shí)現(xiàn)一下。
不單單要繼承ApplicationEvent,還需要實(shí)現(xiàn)Delayed,主要是因?yàn)镈elayQueue隊(duì)列中必須是Delayed的實(shí)現(xiàn)類(lèi)
import java.time.Clock; import java.time.Duration; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; ? import org.springframework.context.ApplicationEvent; ? import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; ? @Data @EqualsAndHashCode(callSuper = false) public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed { ? ?? ?private static final long serialVersionUID = 1L; ? ?? ?public ApplicationDelayedEvent(Object source) { ?? ??? ?this(source, 0L); ?? ?} ? ?? ?public ApplicationDelayedEvent(Object source, long delaySeconds) { ?? ??? ?super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds))); ?? ?} ? ?? ?@Override ?? ?public int compareTo(Delayed o) { ?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大 ?? ??? ?long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); ?? ??? ?return (int) delta; ?? ?} ? ?? ?@Override ?? ?public long getDelay(TimeUnit unit) { ?? ??? ?// 最好用NANOSECONDS,更精確,但是用處不大,負(fù)數(shù)也會(huì)認(rèn)為到時(shí)間了 ?? ??? ?long millis = this.getTimestamp(); ?? ??? ?long currentTimeMillis = System.currentTimeMillis(); ?? ??? ?long sourceDuration = millis - currentTimeMillis; ?? ??? ?return unit.convert(sourceDuration, unit); ?? ?} }
多了兩個(gè)必須實(shí)現(xiàn)的方法,compareTo是排序,應(yīng)該是隊(duì)列中的順序。
getDelay是主要的方法,目的是歸0的時(shí)候會(huì)從DelayQueue釋放出來(lái),當(dāng)然那必須是NANOSECONDS級(jí)別的,我使用MILLISECONDS,就會(huì)出現(xiàn)負(fù)數(shù),但也是可以的,也能釋放出來(lái)。
另一個(gè)需要改的就是發(fā)布者,所以重新寫(xiě)一個(gè)ApplicationDelayEventPublisher
import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; ? import lombok.extern.slf4j.Slf4j; ? @Slf4j @Component public class ApplicationDelayEventPublisher implements ApplicationRunner { ? ?? ?// ApplicationDelayedEvent需要import進(jìn)來(lái) ?? ?private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>(); ? ?? ?@Autowired ?? ?private ApplicationEventPublisher eventPublisher; ? ?? ?@Autowired ? ? @Qualifier("watchTaskExecutor") ?? ?private ThreadPoolTaskExecutor poolTaskExecutor; ? ?? ?public void publishEvent(ApplicationDelayedEvent event) { ?? ??? ?boolean result = delayQueue.offer(event); ?? ??? ?log.info("加入延遲隊(duì)列。。。。{}", result); ?? ?} ? ?? ?@Override ?? ?public void run(ApplicationArguments args) throws Exception { ?? ??? ?poolTaskExecutor.execute(() -> watchThread()); ?? ?} ? ?? ?private void watchThread() { ?? ??? ?while (true) { ?? ??? ??? ?try { ?? ??? ??? ??? ?log.info("啟動(dòng)延時(shí)任務(wù)的監(jiān)聽(tīng)線程。。。。"); ?? ??? ??? ??? ?ApplicationDelayedEvent event = this.delayQueue.take(); ?? ??? ??? ??? ?log.info("接收到延時(shí)任務(wù)執(zhí)行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); ?? ??? ??? ??? ?eventPublisher.publishEvent(event); ?? ??? ??? ?} catch (InterruptedException e) { ?? ??? ??? ??? ?log.info("啟動(dòng)延時(shí)任務(wù)的監(jiān)聽(tīng)線程關(guān)閉"); ?? ??? ??? ??? ?this.delayQueue.clear(); ?? ??? ??? ??? ?break; ?? ??? ??? ?} ?? ??? ?} ?? ?} }
需要實(shí)現(xiàn)ApplicationRunner作為Spring boot的啟動(dòng)時(shí)候運(yùn)行的bean,目的就是開(kāi)啟監(jiān)聽(tīng)線程,有事件到了執(zhí)行時(shí)間take方法會(huì)得到數(shù)據(jù),然后調(diào)用Spring原生的事件發(fā)布。
另外特別說(shuō)明的就是監(jiān)聽(tīng)線程不能隨便創(chuàng)建,脫離了Spring容器的線程池會(huì)造成關(guān)閉服務(wù)的時(shí)候造成無(wú)法關(guān)閉的現(xiàn)象,所以建議還是自定義一個(gè)ThreadPoolTaskExecutor
? ? @Bean ?? ?public ThreadPoolTaskExecutor watchTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(1); ?? ??? ?executor.setMaxPoolSize(1); ?? ??? ?executor.setQueueCapacity(1); ?? ??? ?executor.setKeepAliveSeconds(60); ?? ??? ?executor.setThreadNamePrefix("watch_task_"); ? ?? ??? ?// 線程池對(duì)拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?}
最后就是接收事件,跟傳統(tǒng)的接收是一樣的,異步只需要在配置類(lèi)上加上@EnableAsync注解就行了,然后在監(jiān)聽(tīng)的方法上加@Async
import java.util.concurrent.ThreadPoolExecutor; ? import javax.annotation.PostConstruct; ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; ? import lombok.extern.slf4j.Slf4j; ? @Slf4j @Configuration @EnableAsync @ConditionalOnClass(ApplicationDelayEventPublisher.class) public class DelayEventConfiguration { ? ?? ?@PostConstruct ?? ?public void init() { ?? ??? ?log.info("延遲Spring事件模塊啟動(dòng)中。。。"); ?? ?} ? ?? ? ? // 不能和監(jiān)聽(tīng)線程放到一個(gè)線程池,不然無(wú)法執(zhí)行 ?? ?@Bean ?? ?public ThreadPoolTaskExecutor poolTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(3); ?? ??? ?executor.setMaxPoolSize(50); ?? ??? ?executor.setQueueCapacity(10000); ?? ??? ?executor.setKeepAliveSeconds(30); ?? ??? ?executor.setThreadNamePrefix("my_task_"); ? ?? ??? ?// 線程池對(duì)拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?} ? ?? ? ? @Bean ?? ?public ThreadPoolTaskExecutor watchTaskExecutor() { ?? ??? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ?? ??? ?executor.setCorePoolSize(1); ?? ??? ?executor.setMaxPoolSize(1); ?? ??? ?executor.setQueueCapacity(1); ?? ??? ?executor.setKeepAliveSeconds(60); ?? ??? ?executor.setThreadNamePrefix("watch_task_"); ? ?? ??? ?// 線程池對(duì)拒絕任務(wù)的處理策略 //?? ??? ?ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常 //?? ??? ?ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。 //?? ??? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面。 //?? ??? ?ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用者處理該任務(wù) 。 ?? ??? ?executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ?? ??? ?// 初始化 ?? ??? ?executor.initialize(); ?? ??? ?return executor; ?? ?} } ?? ?@Async("poolTaskExecutor") ?? ?@EventListener ?? ?public void listenDelayEvent(ApplicationDelayedEvent event) { ?? ??? ?log.info("收到執(zhí)行事件:{}", event.getSource()); ?? ?}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄
這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01jdk中密鑰和證書(shū)管理工具keytool常用命令詳解
keytool JAVA是個(gè)密鑰和證書(shū)管理工具。它使用戶能夠管理自己的公鑰/私鑰對(duì)及相關(guān)證書(shū),用于(通過(guò)數(shù)字簽名)自我認(rèn)證(用戶向別的用戶/服務(wù)認(rèn)證自己)或數(shù)據(jù)完整性以及認(rèn)證服務(wù)2014-01-01將java程序打成jar包在cmd命令行下執(zhí)行的方法
這篇文章主要給大家介紹了關(guān)于將java程序打成jar包在cmd命令行下執(zhí)行的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-01-01Mybatis控制臺(tái)打印SQL執(zhí)行信息的方法詳解
SQL性能監(jiān)控是一個(gè)程序必要的功能,通常我們可以使用數(shù)據(jù)庫(kù)自帶的客戶端工具進(jìn)行SQL性能分析,本章節(jié)只實(shí)現(xiàn)Mybatis執(zhí)行時(shí)對(duì)執(zhí)行SQL進(jìn)行攔截,控制臺(tái)打印執(zhí)行SQL包括參數(shù)、執(zhí)行方法以及執(zhí)行時(shí)間,需要的朋友可以參考下2024-11-11

REST架構(gòu)及RESTful應(yīng)用程序簡(jiǎn)介

java實(shí)現(xiàn)pdf按頁(yè)轉(zhuǎn)換為圖片

MyBatis中基于別名typeAliases的設(shè)置