spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)
spring event 事件異步處理(發(fā)布,監(jiān)聽,異步處理)
// 定義事件 public class EventDemo extends ApplicationEvent { private String supplierCode; private String productCode; public EventDemo(Object source, String supplierCode, String productCode) { super(source); this.supplierCode = supplierCode; this.productCode = productCode; } public String getSupplierCode() { return supplierCode; } public String getProductCode() { return productCode; } }
// 發(fā)布事件 @Component public class EventDemoPublish { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void publish(String message) { EventDemo demo = new EventDemo(this, message); applicationEventPublisher.publishEvent(demo); System.out.println("發(fā)布事件執(zhí)行結(jié)束"); } }
// 監(jiān)聽事件 @Component public class EventDemoListener implements ApplicationListener<EventDemo> { @Override public void onApplicationEvent(EventDemo event) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("事件監(jiān)聽開始...... " + "商家編碼:" + event.getSupplierCode() + ",商品編碼:" + event.getProductCode()); } }
<!--定義事件異步處理--> <bean id="commonTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 線程池維持處于Keep-alive狀態(tài)的線程數(shù)量。如果設(shè)置了allowCoreThreadTimeOut為true,該值可能為0。 并發(fā)線程數(shù),想達(dá)到真正的并發(fā)效果,最好對(duì)應(yīng)CPU的線程數(shù)及核心數(shù) --> <property name="corePoolSize" value="2" /> <!-- 最大線程池容量 --> <property name="maxPoolSize" value="2" /> <!-- 超過(guò)最大線程池容量后,允許的線程隊(duì)列數(shù) --> <property name="queueCapacity" value="2" /> <!-- 線程池維護(hù)線程所允許的空閑時(shí)間 .單位毫秒,默認(rèn)為60s,超過(guò)這個(gè)時(shí)間后會(huì)將大于corePoolSize的線程關(guān)閉,保持corePoolSize的個(gè)數(shù) --> <property name="keepAliveSeconds" value="1000" /> <!-- 允許核心線程超時(shí): false(默認(rèn)值)不允許超時(shí),哪怕空閑;true則使用keepAliveSeconds來(lái)控制等待超時(shí)時(shí)間,最終corePoolSize的個(gè)數(shù)可能為0 --> <property name="allowCoreThreadTimeOut" value="true" /> <!-- 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> <!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中 --> <!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> </property> </bean> <!--名字必須是applicationEventMulticaster,因?yàn)锳bstractApplicationContext默認(rèn)找個(gè)--> <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster"> <!--注入任務(wù)執(zhí)行器 這樣就實(shí)現(xiàn)了異步調(diào)用--> <property name="taskExecutor" ref="commonTaskExecutor"></property> </bean>
spring事件之異步線程執(zhí)行
Spring 不僅為我們提供了IOC
, AOP
功能外,還在這個(gè)基礎(chǔ)上提供了許多的功能,我們用的最多的可能就是Spring MVC
了吧,但是讓我們來(lái)看下spring-context
包,其中包含了緩存、調(diào)度、校驗(yàn)功能等等
這里主要想介紹一下Spring提供的觀察者模式實(shí)現(xiàn)(事件發(fā)布監(jiān)聽)及異步方法執(zhí)行,這些功能也都是基于AOP實(shí)現(xiàn)的
Spring 事件
觀察者模式大家都了解,它可以解耦各個(gè)功能,但是自己實(shí)現(xiàn)的話比較麻煩,Spring為我們提供了一種事件發(fā)布機(jī)制,可以按需要發(fā)布事件,之后由監(jiān)聽此事件的類或方法來(lái)執(zhí)行各自對(duì)應(yīng)的功能,代碼互相不影響,以后修改訂單后續(xù)的邏輯時(shí)不會(huì)影響到訂單創(chuàng)建,有點(diǎn)類似于使用MQ的感覺(jué)~
比如在配置中心apollo項(xiàng)目中,在portal創(chuàng)建了app后會(huì)發(fā)送app創(chuàng)建事件,監(jiān)聽此事件的邏輯處將此消息同步到各個(gè)環(huán)境的admin sevice中,大家有興趣可以看下相關(guān)代碼
現(xiàn)在我們來(lái)看看具體如何使用:假設(shè)一個(gè)下單場(chǎng)景,訂單創(chuàng)建成功后可能有一些后續(xù)邏輯要處理,但是和創(chuàng)建訂單本身沒(méi)有關(guān)系,此時(shí)就可以在創(chuàng)建訂單完成后,發(fā)送一個(gè)消息,又相應(yīng)部分的代碼進(jìn)行監(jiān)聽處理,避免代碼耦合到一起
首先創(chuàng)建對(duì)應(yīng)的事件
import org.springframework.context.ApplicationEvent; public class CreatedOrderEvent extends ApplicationEvent { private final String orderSn; public CreatedOrderEvent(Object source, String orderSn) { super(source); this.orderSn = orderSn; } public String getOrderSn() { return this.orderSn; } }
現(xiàn)在還需要一個(gè)事件發(fā)布者和監(jiān)聽者,創(chuàng)建一下
發(fā)布
import org.springframework.context.ApplicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));
監(jiān)聽的多種實(shí)現(xiàn)
1:注解實(shí)現(xiàn) @EventListener
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @Slf4j @Component public class OrderEventListener { @EventListener public void orderEventListener(CreatedOrderEvent event) { } }
2:代碼實(shí)現(xiàn)
import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; @Slf4j @Component public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> { @Override public void onApplicationEvent(CreatedOrderEvent event) { } }
簡(jiǎn)單的事件發(fā)布就完成了,其中的其他復(fù)雜邏輯由Spring替我們處理了
這里我們要注意一點(diǎn):發(fā)布和監(jiān)聽后處理的邏輯是在一個(gè)線程中執(zhí)行的,不是異步執(zhí)行
異步方法
有時(shí)候我們?yōu)榱颂岣唔憫?yīng)速度,有些方法可以異步去執(zhí)行,一般情況下我們可能是手動(dòng)將方法調(diào)用提交到線程池中去執(zhí)行,但是Spring 為我們提供了簡(jiǎn)化的寫法,在開啟了異步情況下,不用修改代碼,只使用注解即可完成此功能
這時(shí)只需要在要異步執(zhí)行的方法上添加@Async
注解即可異步執(zhí)行;@EnableAsync
啟動(dòng)異步線程, 如
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; @Slf4j @Component @EnableAsync public class OrderEventListener { @Async @EventListener public void orderEventListener(CreatedOrderEvent event) { } }
在使用@Async會(huì)有一些問(wèn)題建議看各位看下相關(guān)文檔及源碼
我們通過(guò)Spring事件同步線程改為異步線程,默認(rèn)的線程池是不復(fù)用線程
我覺(jué)得這是這個(gè)注解最坑的地方,沒(méi)有之一!我們來(lái)看看它默認(rèn)使用的線程池是哪個(gè),在前文的源碼分析中,我們可以看到?jīng)Q定要使用線程池的方法是
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
其源碼如下:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; // 可以在@Async注解中配置線程池的名字 String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { // 獲取默認(rèn)的線程池 targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
最終會(huì)調(diào)用到
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
這個(gè)方法中
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
可以看到,它默認(rèn)使用的線程池是SimpleAsyncTaskExecutor
。我們不看這個(gè)類的源碼,只看它上面的文檔注釋,如下:
主要說(shuō)了三點(diǎn)
- 為每個(gè)任務(wù)新起一個(gè)線程
- 默認(rèn)線程數(shù)不做限制
- 不復(fù)用線程
就這三點(diǎn),你還敢用嗎?只要你的任務(wù)耗時(shí)長(zhǎng)一點(diǎn),說(shuō)不定服務(wù)器就給你來(lái)個(gè)OOM
。
解決方案
最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法
1.在之前的源碼分析中,我們可以知道,可以通過(guò)AsyncConfigurer
來(lái)配置使用的線程池
如下:
import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * 異步線程池配置 */ @Slf4j @Component public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(200); executor.setKeepAliveSeconds(5 * 60); executor.setQueueCapacity(1000); // 自定義實(shí)現(xiàn)拒絕策略 executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("當(dāng)前任務(wù)線程池隊(duì)列已滿.")); // 或者選擇已經(jīng)定義好的其中一種拒絕策略 // 丟棄任務(wù)并拋出RejectedExecutionException異常 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 丟棄任務(wù),但是不拋出異常 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 由調(diào)用線程處理該任務(wù) executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 線程名稱前綴 executor.setThreadNamePrefix("Async-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)生未知異常.", ex); } /** * 增加日志MDC */ public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { /** * Gets context for task * * * @return context for task */ private Map<String, String> getContextForTask() { return MDC.getCopyOfContextMap(); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.) * all delegate to this. */ @Override public void execute(@NonNull Runnable command) { super.execute(wrap(command, getContextForTask())); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @NonNull @Override public Future<?> submit(@NonNull Runnable task) { return super.submit(wrap(task, getContextForTask())); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task) { return super.submit(wrap(task, getContextForTask())); } /** * Wrap callable * * @param <T> parameter * @param task task * @param context context * @return the callable */ private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) { return () -> { Map<String, String> previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { return task.call(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } /** * Wrap runnable * * @param runnable runnable * @param context context * @return the runnable */ private Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { Map<String, String> previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } } }
該方式實(shí)現(xiàn)線程的復(fù)用以及,子線程繼承父線程全鏈路traceId,方便定位問(wèn)題
2.直接在@Async注解中配置要使用的線程池的名稱
@Async(value = "自定義線程名")
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringBoot使用ApplicationEvent&Listener完成業(yè)務(wù)解耦
- 基于Spring Boot應(yīng)用ApplicationEvent案例場(chǎng)景
- 詳解SpringBoot實(shí)現(xiàn)ApplicationEvent事件的監(jiān)聽與發(fā)布
- SpringBoot中ApplicationEvent和ApplicationListener用法小結(jié)
- SpringBoot Event 事件如何實(shí)現(xiàn)異步延遲執(zhí)行
- Event?Sourcing事件溯源模式優(yōu)化業(yè)務(wù)系統(tǒng)
相關(guān)文章
idea雙擊圖標(biāo)打不開,無(wú)反應(yīng)的解決
這篇文章主要介紹了idea雙擊圖標(biāo)打不開,無(wú)反應(yīng)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09微信小程序調(diào)用微信登陸獲取openid及java做為服務(wù)端示例
這篇文章主要介紹了微信小程序調(diào)用微信登陸獲取openid及java做為服務(wù)端示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01servlet3新特性_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了servlet3新特性的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-07-07Spring Boot整合RabbitMQ開發(fā)實(shí)戰(zhàn)詳解
這篇文章主要介紹了Spring Boot整合RabbitMQ開發(fā)實(shí)戰(zhàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-02-02Spring配置多個(gè)數(shù)據(jù)源并實(shí)現(xiàn)數(shù)據(jù)源的動(dòng)態(tài)切換功能
這篇文章主要介紹了Spring配置多個(gè)數(shù)據(jù)源并實(shí)現(xiàn)數(shù)據(jù)源的動(dòng)態(tài)切換功能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01spring cloud 阿波羅 apollo 本地開發(fā)環(huán)境搭建過(guò)程
Apollo(阿波羅)是攜程框架部門研發(fā)的配置管理平臺(tái),能夠集中化管理應(yīng)用不同環(huán)境、不同集群的配置,配置修改后能夠?qū)崟r(shí)推送到應(yīng)用端,并且具備規(guī)范的權(quán)限、流程治理等特性2018-01-01解決IDEA集成Docker插件后出現(xiàn)日志亂碼的問(wèn)題
這篇文章主要介紹了解決IDEA集成Docker插件后出現(xiàn)日志亂碼的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11