淺談spring-boot-rabbitmq動態(tài)管理的方法
使用spring boot + rabbitmq的時候,在開發(fā)過程中,可能會想要臨時停用/啟用監(jiān)聽,或修改監(jiān)聽消費者數(shù)量。如果每次修改都重啟比較浪費時間,所以研究了一下不停機就啟用停用監(jiān)聽或修改一些配置
一. 關(guān)于rabbitmq監(jiān)聽的配置
- 配置屬性類:RabbitProperties,包含rabbitmq的認證、監(jiān)聽、發(fā)送者以及其他的一些配置
- 自動配置類:RabbitAutoConfiguration,主要配置rabbitmq的連接工廠和發(fā)送者等,不包含監(jiān)聽的配置
- rabbitmq監(jiān)聽的配置是RabbitAnnotationDrivenConfiguration,是通過RabbitAutoConfiguration引入的
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
...
}
RabbitAnnotationDrivenConfiguration中主要就是監(jiān)聽工廠的配置、監(jiān)聽工廠,但是這里也只是創(chuàng)建bean,并沒有真正的初始化
通過配置里的bean類名,分析一下,rabbitmq的監(jiān)聽肯定是由監(jiān)聽工廠創(chuàng)建的,所以找到監(jiān)聽工廠SimpleRabbitListenerContainerFactory
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
既然自動配置里面沒有初始化監(jiān)聽,那就應(yīng)該是在其他地方調(diào)用的,進入監(jiān)聽工廠類中,發(fā)現(xiàn)有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測初始化肯定與這個方法有關(guān),所以查看有哪些地方調(diào)用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有創(chuàng)建監(jiān)聽容器和初始化的代碼
/**
* Create and start a new {@link MessageListenerContainer} using the specified factory.
* @param endpoint the endpoint to create a {@link MessageListenerContainer}.
* @param factory the {@link RabbitListenerContainerFactory} to use.
* @return the {@link MessageListenerContainer}.
*/
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
繼續(xù)找調(diào)用這個方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,發(fā)現(xiàn)調(diào)用的地方很多了

看看afterPropertiesSet方法,是InitializingBean接口中的,猜測應(yīng)該是spring容器創(chuàng)建bean之后都會調(diào)用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里創(chuàng)建的實例。原來是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個自動配置里面初始化的,所以這就找到rabbitmq初始化監(jiān)聽的源頭了
二. 動態(tài)管理rabbitmq監(jiān)聽
回到最初的問題,想要動態(tài)的啟用停用mq的監(jiān)聽,所以先看看初始化配置的類,既然有初始化,那可能會有相關(guān)的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有對監(jiān)聽容器進行操作,主要源碼如下
/**
* @return the managed {@link MessageListenerContainer} instance(s).
*/
public Collection<MessageListenerContainer> getListenerContainers() {
return Collections.unmodifiableCollection(this.listenerContainers.values());
}
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
/**
* Start the specified {@link MessageListenerContainer} if it should be started
* on startup or when start is called explicitly after startup.
* @see MessageListenerContainer#isAutoStartup()
*/
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
@Override
public void stop() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
listenerContainer.stop();
}
}
寫個controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對監(jiān)聽進行啟用停用的操作,并且RabbitListenerEndpointRegistry實例還可以獲取監(jiān)聽容器,對監(jiān)聽的一些參數(shù)也能進行修改,比如消費者數(shù)量。代碼如下:
import java.util.Set;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.itopener.framework.ResultMap;
/**
* Created by fuwei.deng on 2017年7月24日.
*/
@RestController
@RequestMapping("rabbitmq/listener")
public class RabbitMQController {
@Resource
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@RequestMapping("stop")
public ResultMap stop(){
rabbitListenerEndpointRegistry.stop();
return ResultMap.buildSuccess();
}
@RequestMapping("start")
public ResultMap start(){
rabbitListenerEndpointRegistry.start();
return ResultMap.buildSuccess();
}
@RequestMapping("setup")
public ResultMap setup(int consumer, int maxConsumer){
Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();
SimpleMessageListenerContainer container = null;
for(String id : containerIds){
container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);
if(container != null){
container.setConcurrentConsumers(consumer);
container.setMaxConcurrentConsumers(maxConsumer);
}
}
return ResultMap.buildSuccess();
}
}
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
springboot中RestTemplate發(fā)送HTTP請求的實現(xiàn)示例
RestTemplate是一個 spring-web 提供的執(zhí)行HTTP請求的同步阻塞式工具類,本文就來介紹一下RestTemplate發(fā)送HTTP請求,具有一定的參考價值,感興趣的可以了解一下2024-03-03
詳解Java并發(fā)編程之內(nèi)置鎖(synchronized)
這篇文章主要介紹了Java并發(fā)編程之內(nèi)置鎖(synchronized)的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03
java應(yīng)用程序如何自定義log4j配置文件的位置
這篇文章主要介紹了java應(yīng)用程序如何自定義log4j配置文件的位置,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
Apache POI將PPT轉(zhuǎn)換成圖片實例代碼
這篇文章主要介紹了Apache POI將PPT轉(zhuǎn)換成圖片實例代碼,具有一定借鑒價值,需要的朋友可以參考下2018-01-01
Spring?Boot實現(xiàn)JWT?token自動續(xù)期的實現(xiàn)
本文主要介紹了Spring?Boot實現(xiàn)JWT?token自動續(xù)期,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
SpringBoot ApplicationContextAware拓展接口使用詳解
當一個類實現(xiàn)了這個接口(ApplicationContextAware)之后,這個類就可以方便獲得ApplicationContext中的所有bean。換句話說,就是這個類可以直接獲取spring配置文件中,所有有引用到的bean對象2023-04-04

