欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解讀@RabbitListener起作用的原理

 更新時間:2023年03月21日 09:49:45   作者:自東向西  
這篇文章主要介紹了解讀@RabbitListener起作用的原理,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、前言

在spring中,定義rabbitMq的消費者可以相當(dāng)方便,只需要在消息處理類或者類方法加上@RabbitListener注解,指定隊列名稱即可。

如下代碼:

@Component
public class RabbitMqListener1 {
? ? @RabbitListener(queues = "queue1")
? ? public void consumer1(Message message) {

? ? }

? ? @RabbitListener(queues = "queue2")
? ? public void consumer2(String messsageBody) {

? ? }
}


@Component
@RabbitListener(queues = "queue3")
public class RabbitMqListener2 {
? ? @RabbitHandler(isDefault=true)
? ? public void consumer3() {

? ? }
}

注意?。?!如果@RabbitListener加在類上面,需要有一個默認的處理方法@RabbitHandler(isDefault=true),默認是false。

不設(shè)置一個true,消費mq消息的時候會出現(xiàn)“Listener method ‘no match’ threw exception”異常。

原因在RabbitListenerAnnotationBeanPostProcessor.processMultiMethodListeners方法,有興趣的可以看下。

可以看到代碼相當(dāng)?shù)暮唵?。但是!?。槭裁醇由线@個注解,就能作為一個consumer接受mq的消息呢?為啥處理mq消息的方法,入?yún)⒖梢阅敲措S意?

有經(jīng)驗的程序員,可能會有這樣的設(shè)想:

1、單純看這些listener的代碼,只是定義了由spring管理的bean,要能監(jiān)聽rabbitMq的消息,肯定需要有另外一個類,這個類會掃描所有加了@RabbitListener的bean,進行加工。

2、看這些listener的代碼,可以發(fā)現(xiàn)處理mq消息的,都是具體的某個方法。那加工的過程,應(yīng)該就是利用反射拿到對象、方法和@RabbitListener中的queue屬性,然后建立一個綁定關(guān)系(對象+方法)——>(queue的consumer)。queue的consumer在接收到mq消息后,找到綁定的“對象+方法”,再通過反射的方式,調(diào)用真正的處理方法。

3、mq消息的處理方法,可以那么隨意,應(yīng)該是queue的consumer在調(diào)用真正處理方法之前,需要根據(jù)處理方法的參數(shù)類型,做一次數(shù)據(jù)轉(zhuǎn)換。

接下來,就去看看源碼,看一下設(shè)想是不是正確的~~

二、源碼分析

1、誰來掃描@RabbitListener注解的bean

在springBoot使用rabbit,一般是在@Configuration類上加上@EnableRabbit注解來開啟rabbit功能。那我們就去看看@EnableRabbit注解的源碼,看這個注解的作用

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

可以看到,這個注解的作用,是導(dǎo)入RabbitBootstrapConfiguration配置類

@Configuration
public class RabbitBootstrapConfiguration {

?? ?@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
?? ?@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
?? ?public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
?? ??? ?return new RabbitListenerAnnotationBeanPostProcessor();
?? ?}

?? ?@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
?? ?public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
?? ??? ?return new RabbitListenerEndpointRegistry();
?? ?}
}

RabbitBootstrapConfiguration 配置類的作用,就是定義了RabbitListenerAnnotationBeanPostProcessor 和RabbitListenerEndpointRegistry 兩個bean。

看到RabbitListenerAnnotationBeanPostProcessor 這個類名,就可以猜到,該類的實例bean就是用來掃描加了@RabbitListener 的類,并做一些加工。

(“RabbitListenerAnnotationBean”——針對添加了@RabbitListener注解的bean; “PostProcessor”——后置加工)

2、怎么建立(對象+方法)——>(queue的consumer)的映射關(guān)系

分析一下RabbitListenerAnnotationBeanPostProcessor類的源碼

// 實現(xiàn)了BeanPostProcessor、Ordered、BeanFactoryAware、BeanClassLoaderAware、EnvironmentAware和SmartInitializingSingleton 6個接口
public class RabbitListenerAnnotationBeanPostProcessor
?? ??? ?implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
?? ??? ?SmartInitializingSingleton {
?? ??? ?
?? ?.......
?? ?
?? ?// 完成初始化bean之后,調(diào)用該方法
?? ?@Override
?? ?public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
?? ??? ?Class<?> targetClass = AopUtils.getTargetClass(bean);

?? ??? ?TypeMetadata metadata = this.typeCache.get(targetClass);
?? ??? ?if (metadata == null) {
?? ??? ??? ?metadata = buildMetadata(targetClass);
?? ??? ??? ?this.typeCache.putIfAbsent(targetClass, metadata);
?? ??? ?}

?? ??? ?for (ListenerMethod lm : metadata.listenerMethods) {
?? ??? ??? ?for (RabbitListener rabbitListener : lm.annotations) {
?? ??? ??? ??? ?processAmqpListener(rabbitListener, lm.method, bean, beanName);
?? ??? ??? ?}
?? ??? ?}
?? ??? ?if (metadata.handlerMethods.length > 0) {
?? ??? ??? ?processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
?? ??? ?}
?? ??? ?return bean;
?? ?}

?? ?// 根據(jù)Class,獲取元數(shù)據(jù)
?? ?private TypeMetadata buildMetadata(Class<?> targetClass) {
?? ??? ?Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
?? ??? ?final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
?? ??? ?final List<ListenerMethod> methods = new ArrayList<ListenerMethod>();
?? ??? ?final List<Method> multiMethods = new ArrayList<Method>();
?? ??? ?ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {

?? ??? ??? ?@Override
?? ??? ??? ?public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
?? ??? ??? ??? ?Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
?? ??? ??? ??? ?if (listenerAnnotations.size() > 0) {
?? ??? ??? ??? ??? ?methods.add(new ListenerMethod(method,
?? ??? ??? ??? ??? ??? ??? ?listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
?? ??? ??? ??? ?}
?? ??? ??? ??? ?if (hasClassLevelListeners) {
?? ??? ??? ??? ??? ?RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
?? ??? ??? ??? ??? ?if (rabbitHandler != null) {
?? ??? ??? ??? ??? ??? ?multiMethods.add(method);
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?}

?? ??? ??? ?}
?? ??? ?}, ReflectionUtils.USER_DECLARED_METHODS);
?? ??? ?if (methods.isEmpty() && multiMethods.isEmpty()) {
?? ??? ??? ?return TypeMetadata.EMPTY;
?? ??? ?}
?? ??? ?return new TypeMetadata(
?? ??? ??? ??? ?methods.toArray(new ListenerMethod[methods.size()]),
?? ??? ??? ??? ?multiMethods.toArray(new Method[multiMethods.size()]),
?? ??? ??? ??? ?classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
?? ?}

?? ?// 檢查一下是否使用jdk代理,使用jdk代理方式必須實現(xiàn)了接口
?? ?// new一個MethodRabbitListenerEndpoint對象,交由processListener方法進行處理
?? ?protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
?? ??? ?Method methodToUse = checkProxy(method, bean);
?? ??? ?MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
?? ??? ?endpoint.setMethod(methodToUse);
?? ??? ?endpoint.setBeanFactory(this.beanFactory);
?? ??? ?processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
?? ?}

// 前面大半代碼都是對MethodRabbitListenerEndpoint對象的屬性設(shè)置:處理消息的bean、消息處理方法的工廠類、監(jiān)聽的隊列名。。。。
// 通過beanFactory獲取RabbitListenerContainerFactory類的bean
// 調(diào)用RabbitListenerEndpointRegistar的registerEndpoint方法注冊mq消息消費端點
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
?? ??? ??? ?Object adminTarget, String beanName) {
?? ??? ?endpoint.setBean(bean);
?? ??? ?endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
?? ??? ?endpoint.setId(getEndpointId(rabbitListener));
?? ??? ?endpoint.setQueueNames(resolveQueues(rabbitListener));
?? ??? ?String group = rabbitListener.group();
?? ??? ?if (StringUtils.hasText(group)) {
?? ??? ??? ?Object resolvedGroup = resolveExpression(group);
?? ??? ??? ?if (resolvedGroup instanceof String) {
?? ??? ??? ??? ?endpoint.setGroup((String) resolvedGroup);
?? ??? ??? ?}
?? ??? ?}

?? ??? ?endpoint.setExclusive(rabbitListener.exclusive());
?? ??? ?String priority = resolve(rabbitListener.priority());
?? ??? ?if (StringUtils.hasText(priority)) {
?? ??? ??? ?try {
?? ??? ??? ??? ?endpoint.setPriority(Integer.valueOf(priority));
?? ??? ??? ?}
?? ??? ??? ?catch (NumberFormatException ex) {
?? ??? ??? ??? ?throw new BeanInitializationException("Invalid priority value for " +
?? ??? ??? ??? ??? ??? ?rabbitListener + " (must be an integer)", ex);
?? ??? ??? ?}
?? ??? ?}

?? ??? ?String rabbitAdmin = resolve(rabbitListener.admin());
?? ??? ?if (StringUtils.hasText(rabbitAdmin)) {
?? ??? ??? ?Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
?? ??? ??? ?try {
?? ??? ??? ??? ?endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
?? ??? ??? ?}
?? ??? ??? ?catch (NoSuchBeanDefinitionException ex) {
?? ??? ??? ??? ?throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
?? ??? ??? ??? ??? ??? ?adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
?? ??? ??? ??? ??? ??? ?rabbitAdmin + "' was found in the application context", ex);
?? ??? ??? ?}
?? ??? ?}


?? ??? ?RabbitListenerContainerFactory<?> factory = null;
?? ??? ?String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
?? ??? ?if (StringUtils.hasText(containerFactoryBeanName)) {
?? ??? ??? ?Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
?? ??? ??? ?try {
?? ??? ??? ??? ?factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
?? ??? ??? ?}
?? ??? ??? ?catch (NoSuchBeanDefinitionException ex) {
?? ??? ??? ??? ?throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
?? ??? ??? ??? ??? ??? ?adminTarget + "] for bean " + beanName + ", no " +
?? ??? ??? ??? ??? ??? ?RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
?? ??? ??? ??? ??? ??? ?containerFactoryBeanName + "' was found in the application context", ex);
?? ??? ??? ?}
?? ??? ?}

?? ??? ?this.registrar.registerEndpoint(endpoint, factory);
?? ?}
?? ?........

}

這個類的代碼比較長,只貼部分比較主要的部分,其他的,可以自己查看源碼進行了解。

RabbitListenerAnnotationBeanPostProcessor實現(xiàn)了BeanPostProcessor(bean初始化后的后置處理)、Ordered(后置處理的排序)、BeanFactoryAware(注入BeanFactory)、BeanClassLoaderAware(注入BeanClassLoader)、EnvironmentAware(注入spring環(huán)境)和SmartInitializingSingleton(單例bean初始化后的回調(diào)) 6個接口。

我們需要關(guān)注的是BeanPostProcessor接口定義的方法,看postProcessAfterInitialization方法的代碼,大致流程為:

1、通過AopUtils得到bean代理的對象的class

2、判斷緩存中是否有該class的類型元數(shù)據(jù),如果沒有則調(diào)用buildMetadata方法生成類型元數(shù)據(jù)并放入緩存

3、遍歷加了@RabbitListener注解的方法,調(diào)用processAmqpListener方法進行處理

4、調(diào)用processMultiMethodListeners方法對加了@RabbitHandler的方法進行處理

關(guān)于buildMetadata方法:

代碼不復(fù)雜,就是利用反射,拿到class中,添加了@RabbitListener和@RabbitHandler注解的方法。另外,從代碼中也可以看出,@RabbitHandler注解要生效,必須在class上增加@RabbitListener注解

關(guān)于processAmqpListener方法:

沒有什么實際內(nèi)容,就干兩個事情:

1、檢查一下是否使用jdk代理,使用jdk代理方式必須實現(xiàn)了接口

2、new一個MethodRabbitListenerEndpoint對象,交由processListener方法進行處理

關(guān)于processListener方法:

1、前面大半代碼都是對MethodRabbitListenerEndpoint對象的屬性設(shè)置:處理消息的bean、消息處理方法的工廠類、監(jiān)聽的隊列名。。。。

其中要關(guān)注一下setMessageHandlerMethodFactory方法,查看MessageHandlerMethodFactory接口的源碼

public interface MessageHandlerMethodFactory {
?? ?InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method);

從入?yún)⒑头祷刂悼梢钥闯鰜?,這個工廠的作用就是將spring的bean對象和方法包裝成一個InvocableHandlerMethod對象,也就是我們上面提到的(對象+方法)。

2、通過beanFactory獲取RabbitListenerContainerFactory類的bean。

3、調(diào)用RabbitListenerEndpointRegistar的registerEndpoint方法注冊mq消息消費端點。

繼續(xù)往下追,看一下RabbitListenerEndpointRegistar的代碼:

public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
?? ?// 將整個endpointDescriptors數(shù)組進行注冊
?? ?protected void registerAllEndpoints() {
?? ??? ?synchronized (this.endpointDescriptors) {
?? ??? ??? ?for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
?? ??? ??? ??? ?this.endpointRegistry.registerListenerContainer(
?? ??? ??? ??? ??? ??? ?descriptor.endpoint, resolveContainerFactory(descriptor));
?? ??? ??? ?}
?? ??? ??? ?this.startImmediately = true; ?// trigger immediate startup
?? ??? ?}
?? ?}
?? ?
?? ?// 解析得到RabbitListenerContainerFactory
?? ?// 如果AmqpListenerEndpointDescriptor 的containerFactory屬性不為空,直接返回containerFactory
?? ?// 如果為空,嘗試從beanFactory獲取
?? ?private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
?? ??? ?if (descriptor.containerFactory != null) {
?? ??? ??? ?return descriptor.containerFactory;
?? ??? ?}
?? ??? ?else if (this.containerFactory != null) {
?? ??? ??? ?return this.containerFactory;
?? ??? ?}
?? ??? ?else if (this.containerFactoryBeanName != null) {
?? ??? ??? ?Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
?? ??? ??? ?this.containerFactory = this.beanFactory.getBean(
?? ??? ??? ??? ??? ?this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
?? ??? ??? ?return this.containerFactory; ?// Consider changing this if live change of the factory is required
?? ??? ?}
?? ??? ?else {
?? ??? ??? ?throw new IllegalStateException("Could not resolve the " +
?? ??? ??? ??? ??? ?RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
?? ??? ??? ??? ??? ?descriptor.endpoint + "] no factory was given and no default is set.");
?? ??? ?}
?? ?}
?? ?
?? ?// new一個AmqpListenerEndpointDescriptor對象
?? ?// 如果立即啟動,則調(diào)用RabbitListenerEndpointRegistry注冊器來注冊消息監(jiān)聽
?? ?// 如果不是立即啟動,則添加到endpointDescriptors列表中,后面通過registerAllEndpoints方法統(tǒng)一啟動
?? ?public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
?? ??? ?Assert.notNull(endpoint, "Endpoint must be set");
?? ??? ?Assert.hasText(endpoint.getId(), "Endpoint id must be set");
?? ??? ?// Factory may be null, we defer the resolution right before actually creating the container
?? ??? ?AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
?? ??? ?synchronized (this.endpointDescriptors) {
?? ??? ??? ?if (this.startImmediately) { // Register and start immediately
?? ??? ??? ??? ?this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
?? ??? ??? ??? ??? ??? ?resolveContainerFactory(descriptor), true);
?? ??? ??? ?}
?? ??? ??? ?else {
?? ??? ??? ??? ?this.endpointDescriptors.add(descriptor);
?? ??? ??? ?}
?? ??? ?}
?? ?}
}

從上面的代碼可以看出,我們關(guān)心的內(nèi)容,應(yīng)該是在RabbitListenerEndpointRegistry類的registerListenerContainer方法?。?/p>

public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
?? ??? ?ApplicationListener<ContextRefreshedEvent> {
?? ??? ?// 檢查是否被注冊過,注冊過就不能注冊第二次
?? ??? ?// 調(diào)用createListenerContainer創(chuàng)建消息監(jiān)聽
?? ??? ?// 關(guān)于分組消費的,我們不關(guān)心
?? ??? ?// 是否立即啟動,是的話,同步調(diào)用startIfNecessary方法
?? ??? ?public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?boolean startImmediately) {
?? ??? ?Assert.notNull(endpoint, "Endpoint must not be null");
?? ??? ?Assert.notNull(factory, "Factory must not be null");

?? ??? ?String id = endpoint.getId();
?? ??? ?Assert.hasText(id, "Endpoint id must not be empty");
?? ??? ?synchronized (this.listenerContainers) {
?? ??? ??? ?Assert.state(!this.listenerContainers.containsKey(id),
?? ??? ??? ??? ??? ?"Another endpoint is already registered with id '" + id + "'");
?? ??? ??? ?MessageListenerContainer container = createListenerContainer(endpoint, factory);
?? ??? ??? ?this.listenerContainers.put(id, container);
?? ??? ??? ?if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
?? ??? ??? ??? ?List<MessageListenerContainer> containerGroup;
?? ??? ??? ??? ?if (this.applicationContext.containsBean(endpoint.getGroup())) {
?? ??? ??? ??? ??? ?containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
?? ??? ??? ??? ?}
?? ??? ??? ??? ?else {
?? ??? ??? ??? ??? ?containerGroup = new ArrayList<MessageListenerContainer>();
?? ??? ??? ??? ??? ?this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
?? ??? ??? ??? ?}
?? ??? ??? ??? ?containerGroup.add(container);
?? ??? ??? ?}
?? ??? ??? ?if (startImmediately) {
?? ??? ??? ??? ?startIfNecessary(container);
?? ??? ??? ?}
?? ??? ?}

?? ?// 其實就是調(diào)用了RabbitListenerContainerFactory的createListenerContainer生成了一個MessageListenerContainer對象
?? ?protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
?? ??? ??? ?RabbitListenerContainerFactory<?> factory) {

?? ??? ?MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

?? ??? ?if (listenerContainer instanceof InitializingBean) {
?? ??? ??? ?try {
?? ??? ??? ??? ?((InitializingBean) listenerContainer).afterPropertiesSet();
?? ??? ??? ?}
?? ??? ??? ?catch (Exception ex) {
?? ??? ??? ??? ?throw new BeanInitializationException("Failed to initialize message listener container", ex);
?? ??? ??? ?}
?? ??? ?}

?? ??? ?int containerPhase = listenerContainer.getPhase();
?? ??? ?if (containerPhase < Integer.MAX_VALUE) { ?// a custom phase value
?? ??? ??? ?if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
?? ??? ??? ??? ?throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
?? ??? ??? ??? ??? ??? ?this.phase + " vs " + containerPhase);
?? ??? ??? ?}
?? ??? ??? ?this.phase = listenerContainer.getPhase();
?? ??? ?}

?? ??? ?return listenerContainer;
?? ?}
}

createListenerContainer方法調(diào)用了RabbitListenerContainerFactory接口的createListenerContainer方法創(chuàng)建一個MessageListenerContainer對象。

在這里,如果是通過RabbitAutoConfiguration自動配置的,那么RabbitListenerContainerFactory接口的具體實現(xiàn)類是SimpleRabbitListenerContainerFactory,MessageListenerContainer接口的具體實現(xiàn)類是SimpleMessageListenerContainer。有興趣的話,可以去看下rabbitMq自動配置的幾個類。

RabbitListenerContainerFactory接口的createListenerContainer方法是由AbstractRabbitListenerContainerFactory抽象類實現(xiàn),代碼如下:

?? ?@Override
?? ?public C createListenerContainer(RabbitListenerEndpoint endpoint) {
?? ??? ?C instance = createContainerInstance();

?? ??? ?if (this.connectionFactory != null) {
?? ??? ??? ?instance.setConnectionFactory(this.connectionFactory);
?? ??? ?}
?? ??? ?if (this.errorHandler != null) {
?? ??? ??? ?instance.setErrorHandler(this.errorHandler);
?? ??? ?}
?? ??? ?if (this.messageConverter != null) {
?? ??? ??? ?instance.setMessageConverter(this.messageConverter);
?? ??? ?}
?? ??? ?if (this.acknowledgeMode != null) {
?? ??? ??? ?instance.setAcknowledgeMode(this.acknowledgeMode);
?? ??? ?}
?? ??? ?if (this.channelTransacted != null) {
?? ??? ??? ?instance.setChannelTransacted(this.channelTransacted);
?? ??? ?}
?? ??? ?if (this.autoStartup != null) {
?? ??? ??? ?instance.setAutoStartup(this.autoStartup);
?? ??? ?}
?? ??? ?if (this.phase != null) {
?? ??? ??? ?instance.setPhase(this.phase);
?? ??? ?}
?? ??? ?instance.setListenerId(endpoint.getId());
?? ??? ?// 最重要的一行!?。?
?? ??? ?endpoint.setupListenerContainer(instance);
?? ??? ?initializeContainer(instance);

?? ??? ?return instance;
?? ?}

乍一看,都是對MessageListenerContainer實例的初始化,實際上有一行,相當(dāng)重要“ endpoint.setupListenerContainer(instance); ”,這一行最終是走到

AbstractRabbitListenerEndpoint.setupListenerContainer
public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEndpoint, BeanFactoryAware {
?? ?......
?? ?
?? ?// 設(shè)置MessageListenerContainer,最重要的就是設(shè)置監(jiān)聽的隊列名稱?。。?
?? ?@Override
?? ?public void setupListenerContainer(MessageListenerContainer listenerContainer) {
?? ??? ?SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;

?? ??? ?boolean queuesEmpty = getQueues().isEmpty();
?? ??? ?boolean queueNamesEmpty = getQueueNames().isEmpty();
?? ??? ?if (!queuesEmpty && !queueNamesEmpty) {
?? ??? ??? ?throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
?? ??? ?}
?? ??? ?if (queuesEmpty) {
?? ??? ??? ?Collection<String> names = getQueueNames();
?? ??? ??? ?container.setQueueNames(names.toArray(new String[names.size()]));
?? ??? ?}
?? ??? ?else {
?? ??? ??? ?Collection<Queue> instances = getQueues();
?? ??? ??? ?container.setQueues(instances.toArray(new Queue[instances.size()]));
?? ??? ?}

?? ??? ?container.setExclusive(isExclusive());
?? ??? ?if (getPriority() != null) {
?? ??? ??? ?Map<String, Object> args = new HashMap<String, Object>();
?? ??? ??? ?args.put("x-priority", getPriority());
?? ??? ??? ?container.setConsumerArguments(args);
?? ??? ?}

?? ??? ?if (getAdmin() != null) {
?? ??? ??? ?container.setRabbitAdmin(getAdmin());
?? ??? ?}
?? ??? ?setupMessageListener(listenerContainer);
?? ?}
?? ?
?? ?// 創(chuàng)建MessageListener
?? ?protected abstract MessageListener createMessageListener(MessageListenerContainer container);

?? ?// 創(chuàng)建MessageListener,設(shè)置到MessageListenerContainer 里
?? ?private void setupMessageListener(MessageListenerContainer container) {
?? ??? ?MessageListener messageListener = createMessageListener(container);
?? ??? ?Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
?? ??? ?container.setupMessageListener(messageListener);
?? ?}
?? ?......
}

用@RabbitLinstener注解的方法,使用的endpoint是MethodRabbitListenerEndpoint繼承自AbstractRabbitListenerEndpoint,所以看看AbstractRabbitListenerEndpoint的createMessageListener方法

public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint {
?? ?......
?? ?@Override
?? ?protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
?? ??? ?Assert.state(this.messageHandlerMethodFactory != null,
?? ??? ??? ??? ?"Could not create message listener - MessageHandlerMethodFactory not set");
?? ??? ?MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
?? ??? ?messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
?? ??? ?String replyToAddress = getDefaultReplyToAddress();
?? ??? ?if (replyToAddress != null) {
?? ??? ??? ?messageListener.setResponseAddress(replyToAddress);
?? ??? ?}
?? ??? ?MessageConverter messageConverter = container.getMessageConverter();
?? ??? ?if (messageConverter != null) {
?? ??? ??? ?messageListener.setMessageConverter(messageConverter);
?? ??? ?}
?? ??? ?if (getBeanResolver() != null) {
?? ??? ??? ?messageListener.setBeanResolver(getBeanResolver());
?? ??? ?}
?? ??? ?return messageListener;
?? ?}

?? ?protected MessagingMessageListenerAdapter createMessageListenerInstance() {
?? ??? ?return new MessagingMessageListenerAdapter(this.bean, this.method);
?? ?}
?? ?
?? ?......
}

從上面代碼可以看出,createMessageListener方法返回了一個MessagingMessageListenerAdapter實例,MessagingMessageListenerAdapter實現(xiàn)了MessageListener接口

到這里,我們就能得出一些結(jié)論:

1、有@RabbitListener注解的方法,會生成MethodRabbitListenerEndpoint對象

2、通過MethodRabbitListenerEndpoint對象和SimpleRabbitListenerContainerFactory工廠bean,生成SimpleMessageListenerContainer對象

3、SimpleMessageListenerContainer對象保存了要監(jiān)聽的隊列名,創(chuàng)建了用于處理消息的MessagingMessageListenerAdapter實例

4、MessagingMessageListenerAdapter持有@RabbitListener注解的對象和方法,起到一個適配器的作用

SimpleMessageListenerContainer是相當(dāng)重要的一個類,,包裝了整個mq消息消費需要的信息:

1、保存了要監(jiān)聽的隊列名,啟動的時候,根據(jù)隊列名創(chuàng)建從服務(wù)器拉取消息的consumer——BlockingQueueConsumer

2、創(chuàng)建了一個MessagingMessageListenerAdapter對象,當(dāng)consumer從服務(wù)器拿到消息后,由MessagingMessageListenerAdapter進行處理

3、誰來做數(shù)據(jù)轉(zhuǎn)換?

是MessagingMessageListenerAdapter,有興趣的,可以看看MessagingMessageListenerAdapter轉(zhuǎn)換參數(shù)的源碼~~

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • String.intern()作用與常量池關(guān)系示例解析

    String.intern()作用與常量池關(guān)系示例解析

    這篇文章主要為大家介紹了String.intern()作用與常量池關(guān)系示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • Java中JDK動態(tài)代理的超詳細講解

    Java中JDK動態(tài)代理的超詳細講解

    JDK 的動態(tài)代理是基于攔截器和反射來實現(xiàn)的,JDK代理是不需要第三方庫支持的,只需要JDK環(huán)境就可以進行代理,下面這篇文章主要給大家介紹了關(guān)于Java中JDK動態(tài)代理的超詳細講解,需要的朋友可以參考下
    2022-10-10
  • java并發(fā)編程工具類JUC之ArrayBlockingQueue

    java并發(fā)編程工具類JUC之ArrayBlockingQueue

    類ArrayBlockingQueue是BlockingQueue接口的實現(xiàn)類,它是有界的阻塞隊列,內(nèi)部使用數(shù)組存儲隊列元素,通過代碼給大家說明如何初始化一個ArrayBlockingQueue,并向其中添加一個對象,對java并發(fā)編程工具類ArrayBlockingQueue相關(guān)知識感興趣的朋友一起看看吧
    2021-05-05
  • Java實現(xiàn)用Mysql存取圖片操作實例

    Java實現(xiàn)用Mysql存取圖片操作實例

    這篇文章主要介紹了Java實現(xiàn)用Mysql存取圖片操作實例,本文講解了使用BLOB類型保存和讀取圖片的代碼實例,需要的朋友可以參考下
    2015-06-06
  • Unicode、UTF-8 和 ISO8859-1區(qū)別解析

    Unicode、UTF-8 和 ISO8859-1區(qū)別解析

    這篇文章主要介紹了Unicode、UTF-8 和 ISO8859-1到底有什么區(qū)別,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-01-01
  • Java實現(xiàn)替換Word中文本和圖片功能

    Java實現(xiàn)替換Word中文本和圖片功能

    Word中的替換功能以查找指定文本然后替換為新的文本,可單個替換或全部替換。本文將用Java語言實現(xiàn)Word中的文本、圖片替換功能,需要的可以參考一下
    2022-06-06
  • 對數(shù)據(jù)進行分頁顯示到table中的實現(xiàn)方法

    對數(shù)據(jù)進行分頁顯示到table中的實現(xiàn)方法

    這篇文章主要介紹了對數(shù)據(jù)進行分頁顯示到table中的實現(xiàn)方法的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2016-05-05
  • 解析Java圖形化編程中的文本框和文本區(qū)

    解析Java圖形化編程中的文本框和文本區(qū)

    這篇文章主要介紹了Java圖形化編程中的文本框和文本區(qū),是Java入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-10-10
  • 為什么rest接口返回json建議采用下劃線形式,不要用駝峰

    為什么rest接口返回json建議采用下劃線形式,不要用駝峰

    為什么rest接口返回json建議采用下劃線形式,不要用駝峰?今天小編就來為大家說明一下原因,還等什么?一起跟隨小編過來看看吧
    2020-09-09
  • 基于創(chuàng)建Web項目運行時出錯的解決方法(必看篇)

    基于創(chuàng)建Web項目運行時出錯的解決方法(必看篇)

    下面小編就為大家?guī)硪黄趧?chuàng)建Web項目運行時出錯的解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08

最新評論