Spring項目集成RabbitMQ及自動創(chuàng)建隊列
簡單記錄Spring項目集成RabbitMQ的過程,重點記錄生產(chǎn)者項目自動創(chuàng)建隊列的操作,因該問題給項目帶來很多麻煩。
本文內(nèi)容分別在Spring(V5.2.6)和Spring Boot(V2.5.14)兩個項目中經(jīng)過了驗證,下述示例代碼來自于SpringBoot項目,遷移到Spring項目中需稍微調(diào)整。
一、Spring Boot集成RabbitMQ
1. 在Maven中加入依賴
Spring項目和SpringBoot項目的依賴有區(qū)別,按需引入:
<!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Spring --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.9.RELEASE</version> </dependency>
2. 在Spring配置文件增加配置項
spring: rabbitmq: # 基礎(chǔ)項 host: 192.168.1.123 port: 5672 username: admin password: admin # virtualhost需要提前在MQ的Web管理界面里手動創(chuàng)建,或者配置默認host"/" virtual-host: /test # 生產(chǎn)者 #確認消息已發(fā)送到交換機(Exchange) publisher-confirm-type: correlated #確認消息已發(fā)送到隊列(Queue) publisher-returns: true # 消費者 listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: auto #確認模式 prefetch: 1 #限制每次發(fā)送一條數(shù)據(jù) max-concurrency: 1 #啟動消費者最大數(shù)量 concurrency: 1 #同一個隊列啟動幾個消費者 retry: enabled: true #是否支持重試
說明:這里僅按照現(xiàn)有項目的配置列出,在實際的項目中,還是需要根據(jù)自身實際情況做出調(diào)整。
3. 編寫生產(chǎn)者代碼
3.1 創(chuàng)建RabbitMQ配置類:
@Configuration public class RabbitMQConfig { //queue public static final String WORK_QUEUE = "test.queue"; //exchange public static final String WORK_DIRECTEXCHANGE = "test.directExchange"; //routing public static final String WORK_DIRECTROUTING = "test.directRouting"; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } // Queue @Bean public Queue directQueue() { Map<String, Object> argsMap = new HashMap<String, Object>(); argsMap.put("x-max-priority", 5); Queue queue = new Queue(WORK_QUEUE, true, false, false, argsMap); return queue; } //Direct交換機 @Bean DirectExchange directExchange() { return new DirectExchange(WORK_DIRECTEXCHANGE, true, false); } //綁定 @Bean Binding bindingDirect() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(WORK_DIRECTROUTING); } }
3.2 編寫發(fā)送消息工具類:
@Component public class RabbitMQUtils { private static RabbitTemplate rabbitTemplate; @Autowired public RabbitMQUtils(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public static void sendMsg(String msg) throws AmqpException { try { rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_DIRECTEXCHANGE, RabbitMQConfig.WORK_DIRECTROUTING, msg); } catch (AmqpException e) { throw new AmqpException ("RabbitMQ發(fā)送消息異常", e); } } }
3.3 編寫單元測試,測試發(fā)送消息結(jié)果。
4. 編寫消費者代碼
4.1 編寫監(jiān)聽器:
@Component public class RabbitMQListener { @RabbitListener(queues = {Constants.WORK_QUEUE})//監(jiān)聽隊列 public void listener(String msg, Message message) { System.out.println(msg); System.out.println(message.getBody()); } }
二、存在的問題
1. 問題描述
實際項目中,消息的生產(chǎn)者和消費者不在同一項目中,如果先啟動消費者會因為沒有隊列而啟動失敗。
2. 嘗試解決
2.1 方式一:最容易想到的是,在MQ的Web管理界面中手動創(chuàng)建隊列:
- 該方式在實際操作是個不容易的事情,因為還要創(chuàng)建Channel和Exchange,何況發(fā)布的人不一定是開發(fā)的人,溝通繁瑣,極易出錯,好像程序還是半成品似的;
2.2 方式二:啟動消費者項目時,監(jiān)聽器發(fā)現(xiàn)不存在隊列自動創(chuàng)建:
- “通過@RabbitListener”的參數(shù),確實可以實現(xiàn);
- 但是這種方式在我的項目中出現(xiàn)了新問題,消費者項目啟動后創(chuàng)建了隊列,但是生產(chǎn)者發(fā)送消息出錯,貌似沒有了權(quán)限?
- 當(dāng)時因為項目工期,并未深究,具體錯誤也沒記錄下來;應(yīng)該是自己的代碼有問題;
- 因此,不能確認該方式的可行性,或者具體實現(xiàn)方式。
2.3 方式三:先啟動生產(chǎn)者項目:
- 因RabbitMQ懶加載模式,所以單純啟動項目是不會創(chuàng)建隊列的;
- 因此,最開始的想法是,啟動項目后,先發(fā)送一條測試消息去創(chuàng)建隊列,項目確實用該方式使用了一段時間;
- 近期在升級項目時,發(fā)現(xiàn)個現(xiàn)象,沒有隊列時候的第一條消息確實可以創(chuàng)建隊列,但是MQ里沒有消息,是空隊列;
- 種種問題促使自己重新尋找解決方式,在生產(chǎn)者項目啟動后可以自動創(chuàng)建隊列,因此有了本篇文章;
- 很慚愧,其實答案一直在,但是自己對MQ的認知一直停留在簡單使用里,希望有機會能深入的學(xué)習(xí)一下吧。
三、生產(chǎn)者項目創(chuàng)建隊列
1. 在RabbitMQ配置類中加入RabbitAdmin
@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }
2. 通過RabbitAdmin聲明隊列,完成隊列的創(chuàng)建
@Bean public Queue directQueue() { Map<String, Object> argsMap = new HashMap<String, Object>(); argsMap.put("x-max-priority", 5); Queue queue = new Queue(WORK_QUEUE, true, false, false, argsMap); // 聲明隊列 rabbitAdmin.declareQueue(queue); return queue; }
3. 修改后的完整配置類
@Configuration public class RabbitMQConfig { //queue public static final String WORK_QUEUE = "test.queue"; //exchange public static final String WORK_DIRECTEXCHANGE = "test.directExchange"; //routing public static final String WORK_DIRECTROUTING = "test.directRouting"; @Autowired private RabbitAdmin rabbitAdmin; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public Queue directQueue() { Map<String, Object> argsMap = new HashMap<String, Object>(); argsMap.put("x-max-priority", 5); Queue queue = new Queue(WORK_QUEUE, true, false, false, argsMap); rabbitAdmin.declareQueue(queue); return queue; } //Direct交換機 @Bean DirectExchange directExchange() { return new DirectExchange(WORK_DIRECTEXCHANGE, true, false); } //綁定 @Bean Binding bindingDirect() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(WORK_DIRECTROUTING); } }
4. 啟動項目,查看隊列創(chuàng)建情況
經(jīng)過多次測試,在僅手動創(chuàng)建virtual-host的前提下,啟動項目,隊列可以自動創(chuàng)建,且發(fā)送/接收消息都正常完成。
到此這篇關(guān)于Spring項目集成RabbitMQ及自動創(chuàng)建隊列的文章就介紹到這了,更多相關(guān)Spring RabbitMQ自動創(chuàng)建隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JDK源碼分析之String、StringBuilder和StringBuffer
這篇文章主要給大家介紹了關(guān)于JDK源碼分析之String、StringBuilder和StringBuffer的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用jdk具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-05-05java 中動態(tài)代理(JDK,cglib)實例代碼
這篇文章主要介紹了java 中動態(tài)代理,這里介紹了JDK 動態(tài)代理與 cglib 動態(tài)代理的相關(guān)資料2017-04-04SpringBoot?整合mapstruct的實現(xiàn)步驟
這篇文章主要介紹了SpringBoot整合mapstruct,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-11-11關(guān)于Scanner中nextInt()、nextLine()等方法總結(jié)與問題解決
這篇文章主要介紹了關(guān)于Scanner中nextInt()、nextLine()等方法總結(jié)與問題解決,具有很好的參考價值,希望對大家有所幫助。2022-11-11