SpringBoot集成MQ的過程(四種交換機的實例)
?RabbitMQ交換機(Exchange)的核心作用
在RabbitMQ中,?交換機 是消息路由的核心組件,負責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)規(guī)則(如路由鍵、頭信息等)將消息分發(fā)到對應(yīng)的隊列中。
不同交換機類型決定了消息的路由邏輯,使用不同的交換機在不同的場景下可以提高消息系統(tǒng)的高可用性。
1. 直連交換機(Direct Exchange)?
?路由機制 ?
- 精確匹配路由鍵(Routing Key)?:消息會被發(fā)送到與
Routing Key
?完全匹配 的隊列。 - ?典型場景:一對一或一對多的精確消息分發(fā)。
應(yīng)用場景 ?
- 任務(wù)分發(fā):如訂單處理系統(tǒng),根據(jù)訂單類型(如
order.payment
、order.shipping
)分發(fā)到不同隊列。 - ?日志分類:將不同級別的日志(
log.error
、log.info
)路由到對應(yīng)的處理服務(wù)。
使用直連交換機實現(xiàn)消息發(fā)送和接收
1.創(chuàng)建一個SpringBoot項目,在yml文件配置如下:
server: port: 8021 spring: application: name: rabbitmq-provider #配置rabbitMq 服務(wù)器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
2.初始化隊列和交換機,并進行綁定
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午1:55 */ @Configuration public class DirectRabbitConfig { @Bean public Queue TestDirectQueue(){ return new Queue("TestDirectQueue",true); } @Bean DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false); } @Bean Binding bindingDirect(){ return BindingBuilder.bind(TestDirectQueue()) .to(TestDirectExchange()) .with("TestDirectRouting"); } }
3.實現(xiàn)sendDirectMessage發(fā)送消息請求,由生產(chǎn)者發(fā)送到MQ,TestDirectRouting作為Key,用于精確轉(zhuǎn)發(fā)。
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午2:12 */ @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "Hello MQ!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "OK"; } }
4.此時就可以啟動項目發(fā)送消息了,使用PostMan發(fā)送消息,返回OK說明發(fā)送成功
5.進入http://localhost:15672/,可以看到消息發(fā)送成功,我這里是請求了兩次(也就是發(fā)了兩條消息)。
6.接下來寫消費者的消費過程,新創(chuàng)建一個SpringBoot項目,在yml文件配置如下
server: port: 8022 spring: application: name: rabbitmq-provider #配置rabbitMq 服務(wù)器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
7.消費者配置類,同樣TestDirectRouting用于唯一識別Key
package com.atguigu.demomq2; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午 */ @Configuration public class DirectRabbitConfig { @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
8.消費者 接收消息@RabbitListener(queues = "TestDirectQueue")用于監(jiān)聽指定隊列發(fā)送的消息
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "TestDirectQueue") public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString()); } }
9.啟動消費者,成功接收消息
10.查看MQ控制臺,消息成功被消費
2. 扇出交換機(Fanout Exchange)? ?
路由機制(一個交換機轉(zhuǎn)發(fā)到多個隊列)
- 廣播模式:忽略
Routing Key
,將消息發(fā)送到所有綁定的隊列。 - ?典型場景:消息的全局通知或并行處理。
?應(yīng)用場景
- ?實時通知系統(tǒng):如用戶注冊成功后,同時發(fā)送郵件、短信、更新緩存。
- ?日志廣播:多個服務(wù)訂閱同一日志源,各自獨立處理。
使用扇出交換機實現(xiàn)消息發(fā)送和接收
1.扇出交換機配置
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutExchangeConfig { // 定義扇出交換機 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.user.register", true, false); } // 定義郵件隊列 @Bean public Queue emailQueue() { return new Queue("fanout.user.email", true); } // 定義短信隊列 @Bean public Queue smsQueue() { return new Queue("fanout.user.sms", true); } // 綁定所有隊列到扇出交換機(無需路由鍵) @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class FanoutUserService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendFanoutMessage") public String sendRegisterBroadcast() { rabbitTemplate.convertAndSend( "fanout.user.register", "", // 扇出交換機忽略路由鍵 "message MQ" ); return "OK Fan"; } }
3.消費者
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutNotificationConsumer { @RabbitListener(queues = "fanout.user.email") public void handleEmail(String message) { System.out.println("[Email] Received: " + message); } @RabbitListener(queues = "fanout.user.sms") public void handleSms(String message) { System.out.println("[SMS] Received: " + message); } }
4.請求并查看消費結(jié)果
可以看到一個交換機完成消費兩條消息
?3. 主題交換機(Topic Exchange)?
- ?路由機制 ?模式匹配路由鍵:使用
*
(匹配一個單詞)和#
(匹配多個單詞)通配符。? - 典型場景:靈活的多條件消息路由。 ?
應(yīng)用場景
- ?新聞訂閱系統(tǒng):用戶訂閱特定主題(如
news.sports.*
、news.tech.#
)。? - 設(shè)備狀態(tài)監(jiān)控:根據(jù)設(shè)備類型和區(qū)域路由消息(如
sensor.temperature.room1
)。
1.配置主題交換機
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicExchangeConfig { // 定義主題交換機 @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.news", true, false); } // 定義體育新聞隊列 @Bean public Queue sportsQueue() { return new Queue("topic.news.sports", true); } // 定義科技新聞隊列 @Bean public Queue techQueue() { return new Queue("topic.news.tech", true); } // 綁定體育隊列:匹配 news.sports.* @Bean public Binding sportsBinding() { return BindingBuilder.bind(sportsQueue()) .to(topicExchange()) .with("news.sports.*"); } // 綁定科技隊列:匹配 news.tech.# @Bean public Binding techBinding() { return BindingBuilder.bind(techQueue()) .to(topicExchange()) .with("news.tech.#"); } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TopicNewsService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendTopicMessage1") public String sendSportsNews() { rabbitTemplate.convertAndSend( "topic.news", "news.sports.football", "* message:news.sports.football" ); return "*OK"; } @GetMapping("/sendTopicMessage2") public String sendTechNews() { rabbitTemplate.convertAndSend( "topic.news", "news.tech.ai.abc.123456", "# message:news.tech.ai.abc.123456" ); return "#OK"; } }
3. 消費者
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicNewsConsumer { @RabbitListener(queues = "topic.news.sports") public void handleSports(String message) { System.out.println("[Sports] Received: " + message); } @RabbitListener(queues = "topic.news.tech") public void handleTech(String message) { System.out.println("[Tech] Received: " + message); } }
4.發(fā)送請求
可以看到消息成功消費,第一個為*通配符,第二個為#通配符
?4. 頭交換機(Headers Exchange)?
?路由機制( 我的理解是一種基于 ?多條件組合 的消息路由機制) ?
- ?基于消息頭(Headers)匹配:忽略
Routing Key
,通過鍵值對(Headers)匹配隊列綁定的條件。 - ?匹配規(guī)則:
x-match
參數(shù)設(shè)為all
(需全部匹配)或any
(匹配任意一個)。
?應(yīng)用場景
- ?復(fù)雜路由邏輯:如根據(jù)消息的版本號、語言等元數(shù)據(jù)路由。?
- 多維度過濾:如同時匹配用戶類型(
user_type: vip
)和地理位置(region: asia
)。
1.頭交換機配置
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class HeadersExchangeConfig { // 定義頭交換機 @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.user", true, false); } // 定義VIP用戶隊列 @Bean public Queue vipQueue() { return new Queue("headers.user.vip", true); } // 綁定VIP隊列,要求同時匹配 userType=vip 和 region=asia @Bean public Binding vipBinding() { Map<String, Object> headers = new HashMap<>(); headers.put("userType", "vip"); headers.put("region", "asia"); return BindingBuilder.bind(vipQueue()) .to(headersExchange()) .whereAll(headers).match(); // whereAll 表示需全部匹配 } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HeaderUserVipService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendHeaderMessage") public String sendVipMessage() { MessageProperties props = new MessageProperties(); props.setHeader("userType", "vip"); props.setHeader("region", "asia"); Message msg = new Message("HeaderMessage".getBytes(), props); rabbitTemplate.send("headers.user", "", msg); return "OK"; } }
3.消費者
package com.atguigu.demomq2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class HeaderUserVipConsumer { @RabbitListener(queues = "headers.user.vip") public void handleVip(Message message) { String body = new String(message.getBody()); System.out.println("[VIP] Received: " + body); } }
4.PostMan測試
這里僅消費交換機初始化時滿足所有設(shè)定條件的消息,我們可以測試一下不滿足條件時發(fā)送消息
消費者不消費消息
總結(jié)
需要代碼自己進行測試的 可以Git自取
git clone https://gitee.com/myselfzxy/mq-producer.git
git clone https://gitee.com/myselfzxy/mq-customer.git
到此這篇關(guān)于SpringBoot集成MQ,四種交換機的實例的文章就介紹到這了,更多相關(guān)SpringBoot集成MQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot Rabbit MQ topic 配置文件綁定隊列和交換機的實現(xiàn)方法
- SpringBoot整合RabbitMQ實戰(zhàn)教程附死信交換機
- SpringBoot整合RabbitMQ實現(xiàn)交換機與隊列的綁定
- RabbitMQ交換機與Springboot整合的簡單實現(xiàn)
- SpringBoot集成MQTT實現(xiàn)交互服務(wù)通信
- SpringAMQP消息隊列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- SpringBoot集成RocketMQ的使用示例
- springboot集成mqtt超級詳細步驟
相關(guān)文章
基于Spring p標(biāo)簽和c標(biāo)簽注入方式
這篇文章主要介紹了Spring p標(biāo)簽和c標(biāo)簽注入方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09在IDEA中配置tomcat并創(chuàng)建tomcat項目的圖文教程
這篇文章主要介紹了在IDEA中配置tomcat并創(chuàng)建tomcat項目的圖文教程,需要的朋友可以參考下2020-07-07基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)
本文將會詳細地展示如何搭建JSP的開發(fā)環(huán)境。本次教程使用的是最新版的Eclipse 2018-09編輯器和最新版的Apache Tomcat v9.0,步驟詳細,內(nèi)容詳盡,適合零基礎(chǔ)學(xué)者作為學(xué)習(xí)參考2018-12-12springboot如何統(tǒng)一設(shè)置時區(qū)
這篇文章主要介紹了springboot如何統(tǒng)一設(shè)置時區(qū)問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01詳解使用spring aop實現(xiàn)業(yè)務(wù)層mysql 讀寫分離
本篇文章主要介紹了使用spring aop實現(xiàn)業(yè)務(wù)層mysql 讀寫分離,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-01-01Knife4j的請求示例當(dāng)中有很多空白行的問題解決辦法
這篇文章主要介紹了Knife4j的請求示例當(dāng)中有很多空白行的問題解決辦法,按正常來說不應(yīng)該有上方的空白,當(dāng)然如果只是查看我也不至于非要解決他,主要是假如接口是json傳參,調(diào)試界面都沒辦法修改參數(shù),遇到同樣問題的同學(xué)可以參考閱讀本文2024-09-09