欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)

 更新時(shí)間:2023年02月14日 16:28:54   作者:qq_34097912  
這篇文章主要介紹了spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

spring event 事件異步處理(發(fā)布,監(jiān)聽,異步處理)

// 定義事件
public class EventDemo extends ApplicationEvent {
 
    private String supplierCode;
    private String productCode;
 
    public EventDemo(Object source, String supplierCode, String productCode) {
        super(source);
        this.supplierCode = supplierCode;
        this.productCode = productCode;
    }
 
    public String getSupplierCode() {
        return supplierCode;
    }
 
    public String getProductCode() {
        return productCode;
    }
}
// 發(fā)布事件
@Component
public class EventDemoPublish {
 
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
 
    public void publish(String message) {
        EventDemo demo = new EventDemo(this, message);
        applicationEventPublisher.publishEvent(demo);
        System.out.println("發(fā)布事件執(zhí)行結(jié)束");
    }
}
// 監(jiān)聽事件
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {
    @Override
    public void onApplicationEvent(EventDemo event) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("事件監(jiān)聽開始...... " + "商家編碼:" + event.getSupplierCode() + ",商品編碼:" + event.getProductCode());
    }
}
<!--定義事件異步處理-->
 
<bean id="commonTaskExecutor"
		  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<!-- 線程池維持處于Keep-alive狀態(tài)的線程數(shù)量。如果設(shè)置了allowCoreThreadTimeOut為true,該值可能為0。
            并發(fā)線程數(shù),想達(dá)到真正的并發(fā)效果,最好對(duì)應(yīng)CPU的線程數(shù)及核心數(shù) -->
		<property name="corePoolSize" value="2" />
		<!-- 最大線程池容量 -->
		<property name="maxPoolSize" value="2" />
		<!-- 超過(guò)最大線程池容量后,允許的線程隊(duì)列數(shù) -->
		<property name="queueCapacity" value="2" />
		<!-- 線程池維護(hù)線程所允許的空閑時(shí)間 .單位毫秒,默認(rèn)為60s,超過(guò)這個(gè)時(shí)間后會(huì)將大于corePoolSize的線程關(guān)閉,保持corePoolSize的個(gè)數(shù) -->
		<property name="keepAliveSeconds" value="1000" />
		<!-- 允許核心線程超時(shí): false(默認(rèn)值)不允許超時(shí),哪怕空閑;true則使用keepAliveSeconds來(lái)控制等待超時(shí)時(shí)間,最終corePoolSize的個(gè)數(shù)可能為0 -->
		<property name="allowCoreThreadTimeOut" value="true" />
 
		<!-- 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略 -->
		<property name="rejectedExecutionHandler">
			<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
			<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個(gè)任務(wù)到線程池中 -->
			<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
		</property>
	</bean>
 
	<!--名字必須是applicationEventMulticaster,因?yàn)锳bstractApplicationContext默認(rèn)找個(gè)-->
	<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
		<!--注入任務(wù)執(zhí)行器 這樣就實(shí)現(xiàn)了異步調(diào)用-->
		<property name="taskExecutor" ref="commonTaskExecutor"></property>
	</bean>

spring事件之異步線程執(zhí)行

Spring 不僅為我們提供了IOC , AOP功能外,還在這個(gè)基礎(chǔ)上提供了許多的功能,我們用的最多的可能就是Spring MVC了吧,但是讓我們來(lái)看下spring-context包,其中包含了緩存、調(diào)度、校驗(yàn)功能等等

image.png

這里主要想介紹一下Spring提供的觀察者模式實(shí)現(xiàn)(事件發(fā)布監(jiān)聽)及異步方法執(zhí)行,這些功能也都是基于AOP實(shí)現(xiàn)的

Spring 事件

觀察者模式大家都了解,它可以解耦各個(gè)功能,但是自己實(shí)現(xiàn)的話比較麻煩,Spring為我們提供了一種事件發(fā)布機(jī)制,可以按需要發(fā)布事件,之后由監(jiān)聽此事件的類或方法來(lái)執(zhí)行各自對(duì)應(yīng)的功能,代碼互相不影響,以后修改訂單后續(xù)的邏輯時(shí)不會(huì)影響到訂單創(chuàng)建,有點(diǎn)類似于使用MQ的感覺(jué)~

比如在配置中心apollo項(xiàng)目中,在portal創(chuàng)建了app后會(huì)發(fā)送app創(chuàng)建事件,監(jiān)聽此事件的邏輯處將此消息同步到各個(gè)環(huán)境的admin sevice中,大家有興趣可以看下相關(guān)代碼

現(xiàn)在我們來(lái)看看具體如何使用:假設(shè)一個(gè)下單場(chǎng)景,訂單創(chuàng)建成功后可能有一些后續(xù)邏輯要處理,但是和創(chuàng)建訂單本身沒(méi)有關(guān)系,此時(shí)就可以在創(chuàng)建訂單完成后,發(fā)送一個(gè)消息,又相應(yīng)部分的代碼進(jìn)行監(jiān)聽處理,避免代碼耦合到一起

首先創(chuàng)建對(duì)應(yīng)的事件

import org.springframework.context.ApplicationEvent;
 
public class CreatedOrderEvent extends ApplicationEvent {
	
    private final String orderSn;
    
    public CreatedOrderEvent(Object source, String orderSn) {
    	super(source);
        this.orderSn = orderSn;
    }
    
    public String getOrderSn() {
    	return this.orderSn;
    }
}

現(xiàn)在還需要一個(gè)事件發(fā)布者和監(jiān)聽者,創(chuàng)建一下

發(fā)布

import org.springframework.context.ApplicationEventPublisher;
 
private ApplicationEventPublisher applicationEventPublisher;
 
applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));

監(jiān)聽的多種實(shí)現(xiàn)

1:注解實(shí)現(xiàn)  @EventListener

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class OrderEventListener {
    
    @EventListener
    public void orderEventListener(CreatedOrderEvent event) {
    	
    }
}

2:代碼實(shí)現(xiàn)

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
 
@Slf4j
@Component
public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> {
    
    @Override
    public void onApplicationEvent(CreatedOrderEvent event) {
    	
    }
}

簡(jiǎn)單的事件發(fā)布就完成了,其中的其他復(fù)雜邏輯由Spring替我們處理了

這里我們要注意一點(diǎn):發(fā)布和監(jiān)聽后處理的邏輯是在一個(gè)線程中執(zhí)行的,不是異步執(zhí)行

異步方法

有時(shí)候我們?yōu)榱颂岣唔憫?yīng)速度,有些方法可以異步去執(zhí)行,一般情況下我們可能是手動(dòng)將方法調(diào)用提交到線程池中去執(zhí)行,但是Spring 為我們提供了簡(jiǎn)化的寫法,在開啟了異步情況下,不用修改代碼,只使用注解即可完成此功能

這時(shí)只需要在要異步執(zhí)行的方法上添加@Async注解即可異步執(zhí)行;@EnableAsync 啟動(dòng)異步線程, 如

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
@EnableAsync
public class OrderEventListener {
    @Async
    @EventListener
    public void orderEventListener(CreatedOrderEvent event) {
    	
    }
}

在使用@Async會(huì)有一些問(wèn)題建議看各位看下相關(guān)文檔及源碼

我們通過(guò)Spring事件同步線程改為異步線程,默認(rèn)的線程池是不復(fù)用線程

我覺(jué)得這是這個(gè)注解最坑的地方,沒(méi)有之一!我們來(lái)看看它默認(rèn)使用的線程池是哪個(gè),在前文的源碼分析中,我們可以看到?jīng)Q定要使用線程池的方法是

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

其源碼如下:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        // 可以在@Async注解中配置線程池的名字
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        }
        else {
            // 獲取默認(rèn)的線程池
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

最終會(huì)調(diào)用到

org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

這個(gè)方法中

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
   Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
   return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

可以看到,它默認(rèn)使用的線程池是SimpleAsyncTaskExecutor。我們不看這個(gè)類的源碼,只看它上面的文檔注釋,如下:

image-20200720160047340

主要說(shuō)了三點(diǎn)

  • 為每個(gè)任務(wù)新起一個(gè)線程
  • 默認(rèn)線程數(shù)不做限制
  • 不復(fù)用線程

就這三點(diǎn),你還敢用嗎?只要你的任務(wù)耗時(shí)長(zhǎng)一點(diǎn),說(shuō)不定服務(wù)器就給你來(lái)個(gè)OOM。

解決方案

最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法

1.在之前的源碼分析中,我們可以知道,可以通過(guò)AsyncConfigurer來(lái)配置使用的線程池

如下:

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * 異步線程池配置
 */
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
 
    @Override
    public Executor getAsyncExecutor() {
        MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(200);
        executor.setKeepAliveSeconds(5 * 60);
        executor.setQueueCapacity(1000);
        // 自定義實(shí)現(xiàn)拒絕策略
        executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("當(dāng)前任務(wù)線程池隊(duì)列已滿."));
        // 或者選擇已經(jīng)定義好的其中一種拒絕策略
        // 丟棄任務(wù)并拋出RejectedExecutionException異常
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 丟棄任務(wù),但是不拋出異常
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 由調(diào)用線程處理該任務(wù)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        // 線程名稱前綴
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)生未知異常.", ex);
    }
 
    /**
     * 增加日志MDC
     */
	public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
 
        /**
         * Gets context for task *
         *
         * @return context for task
         */
        private Map<String, String> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
 
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(@NonNull Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
 
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @NonNull
        @Override
        public Future<?> submit(@NonNull Runnable task) {
            return super.submit(wrap(task, getContextForTask()));
        }
 
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @NonNull
        @Override
        public <T> Future<T> submit(@NonNull Callable<T> task) {
            return super.submit(wrap(task, getContextForTask()));
        }
 
        /**
         * Wrap callable
         *
         * @param <T>     parameter
         * @param task    task
         * @param context context
         * @return the callable
         */
        private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    return task.call();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
 
        /**
         * Wrap runnable
         *
         * @param runnable runnable
         * @param context  context
         * @return the runnable
         */
        private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
    }
 
}

該方式實(shí)現(xiàn)線程的復(fù)用以及,子線程繼承父線程全鏈路traceId,方便定位問(wèn)題

2.直接在@Async注解中配置要使用的線程池的名稱

@Async(value = "自定義線程名")

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論