SpringBoot集成MQ的過程(四種交換機(jī)的實(shí)例)
?RabbitMQ交換機(jī)(Exchange)的核心作用
在RabbitMQ中,?交換機(jī) 是消息路由的核心組件,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)規(guī)則(如路由鍵、頭信息等)將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中。
不同交換機(jī)類型決定了消息的路由邏輯,使用不同的交換機(jī)在不同的場景下可以提高消息系統(tǒng)的高可用性。
1. 直連交換機(jī)(Direct Exchange)?
?路由機(jī)制 ?
- 精確匹配路由鍵(Routing Key)?:消息會(huì)被發(fā)送到與
Routing Key
?完全匹配 的隊(duì)列。 - ?典型場景:一對(duì)一或一對(duì)多的精確消息分發(fā)。
應(yīng)用場景 ?
- 任務(wù)分發(fā):如訂單處理系統(tǒng),根據(jù)訂單類型(如
order.payment
、order.shipping
)分發(fā)到不同隊(duì)列。 - ?日志分類:將不同級(jí)別的日志(
log.error
、log.info
)路由到對(duì)應(yīng)的處理服務(wù)。
使用直連交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收
1.創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下:
server: port: 8021 spring: application: name: rabbitmq-provider #配置rabbitMq 服務(wù)器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
2.初始化隊(duì)列和交換機(jī),并進(jìn)行綁定
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午1:55 */ @Configuration public class DirectRabbitConfig { @Bean public Queue TestDirectQueue(){ return new Queue("TestDirectQueue",true); } @Bean DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false); } @Bean Binding bindingDirect(){ return BindingBuilder.bind(TestDirectQueue()) .to(TestDirectExchange()) .with("TestDirectRouting"); } }
3.實(shí)現(xiàn)sendDirectMessage發(fā)送消息請(qǐng)求,由生產(chǎn)者發(fā)送到MQ,TestDirectRouting作為Key,用于精確轉(zhuǎn)發(fā)。
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午2:12 */ @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "Hello MQ!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "OK"; } }
4.此時(shí)就可以啟動(dòng)項(xiàng)目發(fā)送消息了,使用PostMan發(fā)送消息,返回OK說明發(fā)送成功
5.進(jìn)入http://localhost:15672/,可以看到消息發(fā)送成功,我這里是請(qǐng)求了兩次(也就是發(fā)了兩條消息)。
6.接下來寫消費(fèi)者的消費(fèi)過程,新創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下
server: port: 8022 spring: application: name: rabbitmq-provider #配置rabbitMq 服務(wù)器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
7.消費(fèi)者配置類,同樣TestDirectRouting用于唯一識(shí)別Key
package com.atguigu.demomq2; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 功能: * 作者:程序員ZXY * 日期:2025/3/8 下午 */ @Configuration public class DirectRabbitConfig { @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
8.消費(fèi)者 接收消息@RabbitListener(queues = "TestDirectQueue")用于監(jiān)聽指定隊(duì)列發(fā)送的消息
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "TestDirectQueue") public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費(fèi)者收到消息 : " + testMessage.toString()); } }
9.啟動(dòng)消費(fèi)者,成功接收消息
10.查看MQ控制臺(tái),消息成功被消費(fèi)
2. 扇出交換機(jī)(Fanout Exchange)? ?
路由機(jī)制(一個(gè)交換機(jī)轉(zhuǎn)發(fā)到多個(gè)隊(duì)列)
- 廣播模式:忽略
Routing Key
,將消息發(fā)送到所有綁定的隊(duì)列。 - ?典型場景:消息的全局通知或并行處理。
?應(yīng)用場景
- ?實(shí)時(shí)通知系統(tǒng):如用戶注冊(cè)成功后,同時(shí)發(fā)送郵件、短信、更新緩存。
- ?日志廣播:多個(gè)服務(wù)訂閱同一日志源,各自獨(dú)立處理。
使用扇出交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收
1.扇出交換機(jī)配置
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutExchangeConfig { // 定義扇出交換機(jī) @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.user.register", true, false); } // 定義郵件隊(duì)列 @Bean public Queue emailQueue() { return new Queue("fanout.user.email", true); } // 定義短信隊(duì)列 @Bean public Queue smsQueue() { return new Queue("fanout.user.sms", true); } // 綁定所有隊(duì)列到扇出交換機(jī)(無需路由鍵) @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class FanoutUserService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendFanoutMessage") public String sendRegisterBroadcast() { rabbitTemplate.convertAndSend( "fanout.user.register", "", // 扇出交換機(jī)忽略路由鍵 "message MQ" ); return "OK Fan"; } }
3.消費(fèi)者
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutNotificationConsumer { @RabbitListener(queues = "fanout.user.email") public void handleEmail(String message) { System.out.println("[Email] Received: " + message); } @RabbitListener(queues = "fanout.user.sms") public void handleSms(String message) { System.out.println("[SMS] Received: " + message); } }
4.請(qǐng)求并查看消費(fèi)結(jié)果
可以看到一個(gè)交換機(jī)完成消費(fèi)兩條消息
?3. 主題交換機(jī)(Topic Exchange)?
- ?路由機(jī)制 ?模式匹配路由鍵:使用
*
(匹配一個(gè)單詞)和#
(匹配多個(gè)單詞)通配符。? - 典型場景:靈活的多條件消息路由。 ?
應(yīng)用場景
- ?新聞?dòng)嗛喯到y(tǒng):用戶訂閱特定主題(如
news.sports.*
、news.tech.#
)。? - 設(shè)備狀態(tài)監(jiān)控:根據(jù)設(shè)備類型和區(qū)域路由消息(如
sensor.temperature.room1
)。
1.配置主題交換機(jī)
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicExchangeConfig { // 定義主題交換機(jī) @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.news", true, false); } // 定義體育新聞隊(duì)列 @Bean public Queue sportsQueue() { return new Queue("topic.news.sports", true); } // 定義科技新聞隊(duì)列 @Bean public Queue techQueue() { return new Queue("topic.news.tech", true); } // 綁定體育隊(duì)列:匹配 news.sports.* @Bean public Binding sportsBinding() { return BindingBuilder.bind(sportsQueue()) .to(topicExchange()) .with("news.sports.*"); } // 綁定科技隊(duì)列:匹配 news.tech.# @Bean public Binding techBinding() { return BindingBuilder.bind(techQueue()) .to(topicExchange()) .with("news.tech.#"); } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TopicNewsService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendTopicMessage1") public String sendSportsNews() { rabbitTemplate.convertAndSend( "topic.news", "news.sports.football", "* message:news.sports.football" ); return "*OK"; } @GetMapping("/sendTopicMessage2") public String sendTechNews() { rabbitTemplate.convertAndSend( "topic.news", "news.tech.ai.abc.123456", "# message:news.tech.ai.abc.123456" ); return "#OK"; } }
3. 消費(fèi)者
package com.atguigu.demomq2; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicNewsConsumer { @RabbitListener(queues = "topic.news.sports") public void handleSports(String message) { System.out.println("[Sports] Received: " + message); } @RabbitListener(queues = "topic.news.tech") public void handleTech(String message) { System.out.println("[Tech] Received: " + message); } }
4.發(fā)送請(qǐng)求
可以看到消息成功消費(fèi),第一個(gè)為*通配符,第二個(gè)為#通配符
?4. 頭交換機(jī)(Headers Exchange)?
?路由機(jī)制( 我的理解是一種基于 ?多條件組合 的消息路由機(jī)制) ?
- ?基于消息頭(Headers)匹配:忽略
Routing Key
,通過鍵值對(duì)(Headers)匹配隊(duì)列綁定的條件。 - ?匹配規(guī)則:
x-match
參數(shù)設(shè)為all
(需全部匹配)或any
(匹配任意一個(gè))。
?應(yīng)用場景
- ?復(fù)雜路由邏輯:如根據(jù)消息的版本號(hào)、語言等元數(shù)據(jù)路由。?
- 多維度過濾:如同時(shí)匹配用戶類型(
user_type: vip
)和地理位置(region: asia
)。
1.頭交換機(jī)配置
package com.atguigu.demomq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class HeadersExchangeConfig { // 定義頭交換機(jī) @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.user", true, false); } // 定義VIP用戶隊(duì)列 @Bean public Queue vipQueue() { return new Queue("headers.user.vip", true); } // 綁定VIP隊(duì)列,要求同時(shí)匹配 userType=vip 和 region=asia @Bean public Binding vipBinding() { Map<String, Object> headers = new HashMap<>(); headers.put("userType", "vip"); headers.put("region", "asia"); return BindingBuilder.bind(vipQueue()) .to(headersExchange()) .whereAll(headers).match(); // whereAll 表示需全部匹配 } }
2.生產(chǎn)者
package com.atguigu.demomq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HeaderUserVipService { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendHeaderMessage") public String sendVipMessage() { MessageProperties props = new MessageProperties(); props.setHeader("userType", "vip"); props.setHeader("region", "asia"); Message msg = new Message("HeaderMessage".getBytes(), props); rabbitTemplate.send("headers.user", "", msg); return "OK"; } }
3.消費(fèi)者
package com.atguigu.demomq2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class HeaderUserVipConsumer { @RabbitListener(queues = "headers.user.vip") public void handleVip(Message message) { String body = new String(message.getBody()); System.out.println("[VIP] Received: " + body); } }
4.PostMan測試
這里僅消費(fèi)交換機(jī)初始化時(shí)滿足所有設(shè)定條件的消息,我們可以測試一下不滿足條件時(shí)發(fā)送消息
消費(fèi)者不消費(fèi)消息
總結(jié)
需要代碼自己進(jìn)行測試的 可以Git自取
git clone https://gitee.com/myselfzxy/mq-producer.git
git clone https://gitee.com/myselfzxy/mq-customer.git
到此這篇關(guān)于SpringBoot集成MQ,四種交換機(jī)的實(shí)例的文章就介紹到這了,更多相關(guān)SpringBoot集成MQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot Rabbit MQ topic 配置文件綁定隊(duì)列和交換機(jī)的實(shí)現(xiàn)方法
- SpringBoot整合RabbitMQ實(shí)戰(zhàn)教程附死信交換機(jī)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)交換機(jī)與隊(duì)列的綁定
- RabbitMQ交換機(jī)與Springboot整合的簡單實(shí)現(xiàn)
- SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- SpringBoot集成RocketMQ的使用示例
- springboot集成mqtt超級(jí)詳細(xì)步驟
相關(guān)文章
java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例
在本篇文章里小編給大家分享了關(guān)于java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-05-05MybatisX無法自動(dòng)生成entity實(shí)體類的解決方法
本文主要介紹了MybatisX無法自動(dòng)生成entity實(shí)體類的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06基于Spring p標(biāo)簽和c標(biāo)簽注入方式
這篇文章主要介紹了Spring p標(biāo)簽和c標(biāo)簽注入方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java實(shí)現(xiàn)批量化操作Excel文件的示例代碼
在操作Excel的場景中,通常會(huì)有一些針對(duì)Excel的批量操作,這篇文章主要為大家詳細(xì)介紹了如何使用GcExcel實(shí)現(xiàn)批量化操作Excel,感興趣的可以了解一下2024-12-12在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程
這篇文章主要介紹了在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程,需要的朋友可以參考下2020-07-07基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)
本文將會(huì)詳細(xì)地展示如何搭建JSP的開發(fā)環(huán)境。本次教程使用的是最新版的Eclipse 2018-09編輯器和最新版的Apache Tomcat v9.0,步驟詳細(xì),內(nèi)容詳盡,適合零基礎(chǔ)學(xué)者作為學(xué)習(xí)參考2018-12-12springboot如何統(tǒng)一設(shè)置時(shí)區(qū)
這篇文章主要介紹了springboot如何統(tǒng)一設(shè)置時(shí)區(qū)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01詳解使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離
本篇文章主要介紹了使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-01-01Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問題解決辦法
這篇文章主要介紹了Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問題解決辦法,按正常來說不應(yīng)該有上方的空白,當(dāng)然如果只是查看我也不至于非要解決他,主要是假如接口是json傳參,調(diào)試界面都沒辦法修改參數(shù),遇到同樣問題的同學(xué)可以參考閱讀本文2024-09-09