Spring Boot中使用RabbitMQ 生產(chǎn)消息和消費(fèi)消息的實(shí)例代碼
引入RabbitMQ依賴
<!-- springboot集成rabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
增加RabbitMQ配置
#rabbitmq配置 spring: rabbitmq: host: ip地址 port: 5672 username: 賬號 password: 密碼 virtual-host: /
配置RabbitMQ交換機(jī)以及隊(duì)列
package com.ckm.ball.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //rabbitMQ綁定交換機(jī) / 隊(duì)列 @Configuration public class RabbitMQConfig { //========================================================RabbitMQ Queue========================================================// //創(chuàng)建fanout模式交換機(jī) @Bean public FanoutExchange fanoutExchangeProcess() { return new FanoutExchange("process-data-change-exchange", true, false); } //創(chuàng)建隊(duì)列 @Bean public Queue processDataChangeQueue() { return new Queue("process-data-change-queue", true); } //將隊(duì)列綁定到交換機(jī) @Bean public Binding chatBindExchange() { return BindingBuilder.bind(processDataChangeQueue()).to(fanoutExchangeProcess()); } }
編寫接口,模擬生產(chǎn)消息
@Resource private RabbitTemplate rabbitTemplate; @GetMapping("/produceMessage") @ApiOperation(value = "生產(chǎn)消息", tags = "測試接口") public void updateTokenTime() { //生產(chǎn)消息,會到交換機(jī),交換機(jī)下發(fā)給隊(duì)列,隊(duì)列監(jiān)聽到就會消費(fèi),執(zhí)行業(yè)務(wù)邏輯 rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh"); }
編寫消息監(jiān)聽類,模擬消費(fèi)消息
package com.ckm.ball.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @Component public class RabbitMQDataSyncListenerProcess { //監(jiān)聽process-data-change-queue隊(duì)列 -> 消費(fèi) @RabbitListener(queues = "process-data-change-queue") public void orderDead(@Payload String productIdAndOrderId) { log.info("當(dāng)前時間:{},收到隊(duì)列信息:{}", new Date().toString(), productIdAndOrderId); //執(zhí)行你的業(yè)務(wù)邏輯 for (int i = 0; i < 5; i++) { System.out.println("循環(huán)次數(shù): " + (i + 1)); try { // 暫停 2000 毫秒(2 秒) Thread.sleep(2000); } catch (InterruptedException e) { // 處理異常 System.err.println("線程被中斷: " + e.getMessage()); } } } }
RabbitMQ 中的交換機(jī)的作用
RabbitMQ 中的交換機(jī)(Exchange)是消息路由的核心組件。它負(fù)責(zé)接收來自生產(chǎn)者發(fā)送的消息,并根據(jù)特定的路由規(guī)則將這些消息傳遞給一個或多個隊(duì)列(Queue)。交換機(jī)的主要功能和類型
1.消息路由:
- 交換機(jī)決定消息應(yīng)該發(fā)送到哪些隊(duì)列,基于綁定(Binding)和路由鍵(Routing Key)。
2.類型:
- 直連交換機(jī)(Direct Exchange):消息直接發(fā)送到與路由鍵精確匹配的隊(duì)列。
- 主題交換機(jī)(Topic Exchange):消息根據(jù)路由鍵模式匹配一個或多個隊(duì)列,支持通配符。
- 扇出交換機(jī)(Fanout Exchange):將消息廣播到所有綁定的隊(duì)列,不考慮路由鍵。
- 頭交換機(jī)(Headers Exchange):通過消息的屬性(Headers)進(jìn)行路由,而不是使用路由鍵。
工作流程
- 生產(chǎn)者發(fā)送消息到交換機(jī)。
- 交換機(jī)根據(jù)配置的路由規(guī)則和隊(duì)列的綁定關(guān)系,將消息路由到相應(yīng)的隊(duì)列。
- 消費(fèi)者從隊(duì)列中獲取消息進(jìn)行處理。
在我的代碼中生產(chǎn)消息語句:
convertAndSend(交換機(jī),路由鍵也就是隊(duì)列,你想傳遞的參數(shù))
在扇出交換機(jī)(Fanout Exchange)模式不需要指定路由鍵,因?yàn)橹付艘矝]用。
rabbitTemplate.convertAndSend("process-data-change-exchange", "process-data-change-queue", "hhhhhhhhhhhhhh");
在扇出交換機(jī)(Fanout Exchange)模式,應(yīng)改成:
rabbitTemplate.convertAndSend("process-data-change-exchange", "", "hhhhhhhhhhhhhh");
在扇出交換機(jī)中,可以將路由鍵設(shè)置為空字符串 “”,因?yàn)樯瘸鼋粨Q機(jī)會將消息發(fā)送到所有綁定的隊(duì)列,而不需要考慮路由鍵的具體值。
- 在扇出交換機(jī)中,路由鍵被忽略。
- 消息會被廣播到所有與交換機(jī)綁定的隊(duì)列中。
四種交換機(jī)模式
1. 直連交換機(jī)(Direct Exchange)
直連交換機(jī):發(fā)送到匹配路由鍵的隊(duì)列。
// 創(chuàng)建直連交換機(jī) @Bean public DirectExchange directExchange() { return new DirectExchange("direct-exchange", true, false); } // 創(chuàng)建隊(duì)列 @Bean public Queue directQueue() { return new Queue("direct-queue", true); } // 將隊(duì)列綁定到直連交換機(jī),同時指定路由鍵 @Bean public Binding directBinding() { return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct-routing-key"); }
生產(chǎn)消息:
直連交換機(jī)生產(chǎn)消息:需要指定路由鍵。
// 發(fā)送消息到直連交換機(jī) rabbitTemplate.convertAndSend("direct-exchange", "direct-routing-key", "Your message here");
2. 主題交換機(jī)(Topic Exchange)
主題交換機(jī):支持模糊匹配路由鍵。
// 創(chuàng)建主題交換機(jī) @Bean public TopicExchange topicExchange() { return new TopicExchange("topic-exchange", true, false); } // 創(chuàng)建隊(duì)列 @Bean public Queue topicQueue() { return new Queue("topic-queue", true); } // 將隊(duì)列綁定到主題交換機(jī),同時指定路由鍵 @Bean public Binding topicBinding() { return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#"); }
生產(chǎn)消息:
主題交換機(jī)生產(chǎn)消息:需要指定符合主題模式的路由鍵。
// 發(fā)送消息到主題交換機(jī) rabbitTemplate.convertAndSend("topic-exchange", "topic.routing.key", "Your message here");
3. 扇出交換機(jī)(Fanout Exchange)
扇出交換機(jī):將消息廣播到所有綁定的隊(duì)列。
// 創(chuàng)建扇出交換機(jī) @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout-exchange", true, false); } // 創(chuàng)建隊(duì)列 @Bean public Queue fanoutQueue1() { return new Queue("fanout-queue-1", true); } @Bean public Queue fanoutQueue2() { return new Queue("fanout-queue-2", true); } // 將隊(duì)列綁定到扇出交換機(jī) @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); }
生產(chǎn)消息:
扇出交換機(jī)生產(chǎn)消息:不需要路由鍵,使用空字符串即可。
// 發(fā)送消息到扇出交換機(jī) rabbitTemplate.convertAndSend("fanout-exchange", "", "Your message here");
4. 頭交換機(jī)(Headers Exchange)
頭交換機(jī):根據(jù)消息頭中匹配的屬性進(jìn)行路由。
// 創(chuàng)建頭交換機(jī) @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers-exchange", true, false); } // 創(chuàng)建隊(duì)列 @Bean public Queue headersQueue() { return new Queue("headers-queue", true); } // 將隊(duì)列綁定到頭交換機(jī),同時指定頭屬性 @Bean public Binding headersBinding() { Map<String, Object> headers = new HashMap<>(); headers.put("format", "pdf"); headers.put("type", "report"); return BindingBuilder.bind(headersQueue()) .to(headersExchange()) .whereAll(headers) .match(); }
生產(chǎn)消息:
頭交換機(jī)生產(chǎn)消息:需要構(gòu)建一個帶有頭屬性的消息。
// 發(fā)送消息到頭交換機(jī) MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("format", "pdf"); messageProperties.setHeader("type", "report"); Message message = new Message("Your message here".getBytes(), messageProperties); rabbitTemplate.send("headers-exchange", "", message);
到此這篇關(guān)于Spring Boot中使用RabbitMQ 生產(chǎn)消息和消費(fèi)消息的文章就介紹到這了,更多相關(guān)Spring Boot生產(chǎn)消息和消費(fèi)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 多線程實(shí)現(xiàn)在線咨詢(udp)
這篇文章主要介紹了java 多線程實(shí)現(xiàn)在線咨詢(udp)的示例,幫助大家更好的理解和學(xué)習(xí)Java 網(wǎng)絡(luò)編程的相關(guān)內(nèi)容,感興趣的朋友可以了解下2020-11-11mybatis配置文件簡介_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了mybatis配置文件簡介的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09mybatis插件pageHelper實(shí)現(xiàn)分頁效果
這篇文章主要為大家詳細(xì)介紹了mybatis插件pageHelper實(shí)現(xiàn)分頁效果,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12Java SimpleDateFormat中英文時間格式化轉(zhuǎn)換詳解
這篇文章主要為大家詳細(xì)介紹了Java SimpleDateFormat中英文時間格式化轉(zhuǎn)換,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12關(guān)于SpringBoot2.7.6連接nacos遇到的一些問題
這篇文章主要介紹了關(guān)于SpringBoot2.7.6連接nacos遇到的一些問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06Java synchronized重量級鎖實(shí)現(xiàn)過程淺析
這篇文章主要介紹了Java synchronized重量級鎖實(shí)現(xiàn)過程,synchronized是Java里的一個關(guān)鍵字,起到的一個效果是"監(jiān)視器鎖",它的功能就是保證操作的原子性,同時禁止指令重排序和保證內(nèi)存的可見性2023-02-02Java調(diào)用Docx4j庫玩轉(zhuǎn)Word文檔處理
在 Java 開發(fā)里處理 Word 文檔時,Docx4j 可是個超厲害的庫,它能讓咱輕松創(chuàng)建,讀取,修改和轉(zhuǎn)換 Word 文檔,下面我們就來看看具體是如何操作的吧2025-02-02