SpringBoot集成MQ的過(guò)程(四種交換機(jī)的實(shí)例)
?RabbitMQ交換機(jī)(Exchange)的核心作用
在RabbitMQ中,?交換機(jī) 是消息路由的核心組件,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)規(guī)則(如路由鍵、頭信息等)將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中。
不同交換機(jī)類型決定了消息的路由邏輯,使用不同的交換機(jī)在不同的場(chǎng)景下可以提高消息系統(tǒng)的高可用性。
1. 直連交換機(jī)(Direct Exchange)?
?路由機(jī)制 ?
- 精確匹配路由鍵(Routing Key)?:消息會(huì)被發(fā)送到與
Routing Key?完全匹配 的隊(duì)列。 - ?典型場(chǎng)景:一對(duì)一或一對(duì)多的精確消息分發(fā)。
應(yīng)用場(chǎng)景 ?
- 任務(wù)分發(fā):如訂單處理系統(tǒng),根據(jù)訂單類型(如
order.payment、order.shipping)分發(fā)到不同隊(duì)列。 - ?日志分類:將不同級(jí)別的日志(
log.error、log.info)路由到對(duì)應(yīng)的處理服務(wù)。
使用直連交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收
1.創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下:
server:
port: 8021
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服務(wù)器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest2.初始化隊(duì)列和交換機(jī),并進(jìn)行綁定
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午1:55
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue TestDirectQueue(){
return new Queue("TestDirectQueue",true);
}
@Bean
DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange",true,false);
}
@Bean
Binding bindingDirect(){
return BindingBuilder.bind(TestDirectQueue())
.to(TestDirectExchange())
.with("TestDirectRouting");
}
} 3.實(shí)現(xiàn)sendDirectMessage發(fā)送消息請(qǐng)求,由生產(chǎn)者發(fā)送到MQ,TestDirectRouting作為Key,用于精確轉(zhuǎn)發(fā)。
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午2:12
*/
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Hello MQ!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "OK";
}
}4.此時(shí)就可以啟動(dòng)項(xiàng)目發(fā)送消息了,使用PostMan發(fā)送消息,返回OK說(shuō)明發(fā)送成功

5.進(jìn)入http://localhost:15672/,可以看到消息發(fā)送成功,我這里是請(qǐng)求了兩次(也就是發(fā)了兩條消息)。

6.接下來(lái)寫消費(fèi)者的消費(fèi)過(guò)程,新創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下
server:
port: 8022
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服務(wù)器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest7.消費(fèi)者配置類,同樣TestDirectRouting用于唯一識(shí)別Key
package com.atguigu.demomq2;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 功能:
* 作者:程序員ZXY
* 日期:2025/3/8 下午
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}8.消費(fèi)者 接收消息@RabbitListener(queues = "TestDirectQueue")用于監(jiān)聽指定隊(duì)列發(fā)送的消息
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費(fèi)者收到消息 : " + testMessage.toString());
}
}9.啟動(dòng)消費(fèi)者,成功接收消息

10.查看MQ控制臺(tái),消息成功被消費(fèi)

2. 扇出交換機(jī)(Fanout Exchange)? ?
路由機(jī)制(一個(gè)交換機(jī)轉(zhuǎn)發(fā)到多個(gè)隊(duì)列)
- 廣播模式:忽略
Routing Key,將消息發(fā)送到所有綁定的隊(duì)列。 - ?典型場(chǎng)景:消息的全局通知或并行處理。
?應(yīng)用場(chǎng)景
- ?實(shí)時(shí)通知系統(tǒng):如用戶注冊(cè)成功后,同時(shí)發(fā)送郵件、短信、更新緩存。
- ?日志廣播:多個(gè)服務(wù)訂閱同一日志源,各自獨(dú)立處理。
使用扇出交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收
1.扇出交換機(jī)配置
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
// 定義扇出交換機(jī)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.user.register", true, false);
}
// 定義郵件隊(duì)列
@Bean
public Queue emailQueue() {
return new Queue("fanout.user.email", true);
}
// 定義短信隊(duì)列
@Bean
public Queue smsQueue() {
return new Queue("fanout.user.sms", true);
}
// 綁定所有隊(duì)列到扇出交換機(jī)(無(wú)需路由鍵)
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
}2.生產(chǎn)者
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FanoutUserService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendFanoutMessage")
public String sendRegisterBroadcast() {
rabbitTemplate.convertAndSend(
"fanout.user.register",
"", // 扇出交換機(jī)忽略路由鍵
"message MQ"
);
return "OK Fan";
}
}3.消費(fèi)者
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutNotificationConsumer {
@RabbitListener(queues = "fanout.user.email")
public void handleEmail(String message) {
System.out.println("[Email] Received: " + message);
}
@RabbitListener(queues = "fanout.user.sms")
public void handleSms(String message) {
System.out.println("[SMS] Received: " + message);
}
}4.請(qǐng)求并查看消費(fèi)結(jié)果


可以看到一個(gè)交換機(jī)完成消費(fèi)兩條消息
?3. 主題交換機(jī)(Topic Exchange)?
- ?路由機(jī)制 ?模式匹配路由鍵:使用
*(匹配一個(gè)單詞)和#(匹配多個(gè)單詞)通配符。? - 典型場(chǎng)景:靈活的多條件消息路由。 ?
應(yīng)用場(chǎng)景
- ?新聞?dòng)嗛喯到y(tǒng):用戶訂閱特定主題(如
news.sports.*、news.tech.#)。? - 設(shè)備狀態(tài)監(jiān)控:根據(jù)設(shè)備類型和區(qū)域路由消息(如
sensor.temperature.room1)。
1.配置主題交換機(jī)
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
// 定義主題交換機(jī)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.news", true, false);
}
// 定義體育新聞隊(duì)列
@Bean
public Queue sportsQueue() {
return new Queue("topic.news.sports", true);
}
// 定義科技新聞隊(duì)列
@Bean
public Queue techQueue() {
return new Queue("topic.news.tech", true);
}
// 綁定體育隊(duì)列:匹配 news.sports.*
@Bean
public Binding sportsBinding() {
return BindingBuilder.bind(sportsQueue())
.to(topicExchange())
.with("news.sports.*");
}
// 綁定科技隊(duì)列:匹配 news.tech.#
@Bean
public Binding techBinding() {
return BindingBuilder.bind(techQueue())
.to(topicExchange())
.with("news.tech.#");
}
}2.生產(chǎn)者
package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TopicNewsService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendTopicMessage1")
public String sendSportsNews() {
rabbitTemplate.convertAndSend(
"topic.news",
"news.sports.football",
"* message:news.sports.football"
);
return "*OK";
}
@GetMapping("/sendTopicMessage2")
public String sendTechNews() {
rabbitTemplate.convertAndSend(
"topic.news",
"news.tech.ai.abc.123456",
"# message:news.tech.ai.abc.123456"
);
return "#OK";
}
}3. 消費(fèi)者
package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicNewsConsumer {
@RabbitListener(queues = "topic.news.sports")
public void handleSports(String message) {
System.out.println("[Sports] Received: " + message);
}
@RabbitListener(queues = "topic.news.tech")
public void handleTech(String message) {
System.out.println("[Tech] Received: " + message);
}
}4.發(fā)送請(qǐng)求


可以看到消息成功消費(fèi),第一個(gè)為*通配符,第二個(gè)為#通配符

?4. 頭交換機(jī)(Headers Exchange)?
?路由機(jī)制( 我的理解是一種基于 ?多條件組合 的消息路由機(jī)制) ?
- ?基于消息頭(Headers)匹配:忽略
Routing Key,通過(guò)鍵值對(duì)(Headers)匹配隊(duì)列綁定的條件。 - ?匹配規(guī)則:
x-match參數(shù)設(shè)為all(需全部匹配)或any(匹配任意一個(gè))。
?應(yīng)用場(chǎng)景
- ?復(fù)雜路由邏輯:如根據(jù)消息的版本號(hào)、語(yǔ)言等元數(shù)據(jù)路由。?
- 多維度過(guò)濾:如同時(shí)匹配用戶類型(
user_type: vip)和地理位置(region: asia)。
1.頭交換機(jī)配置
package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersExchangeConfig {
// 定義頭交換機(jī)
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.user", true, false);
}
// 定義VIP用戶隊(duì)列
@Bean
public Queue vipQueue() {
return new Queue("headers.user.vip", true);
}
// 綁定VIP隊(duì)列,要求同時(shí)匹配 userType=vip 和 region=asia
@Bean
public Binding vipBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("userType", "vip");
headers.put("region", "asia");
return BindingBuilder.bind(vipQueue())
.to(headersExchange())
.whereAll(headers).match(); // whereAll 表示需全部匹配
}
}2.生產(chǎn)者
package com.atguigu.demomq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HeaderUserVipService {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendHeaderMessage")
public String sendVipMessage() {
MessageProperties props = new MessageProperties();
props.setHeader("userType", "vip");
props.setHeader("region", "asia");
Message msg = new Message("HeaderMessage".getBytes(), props);
rabbitTemplate.send("headers.user", "", msg);
return "OK";
}
}3.消費(fèi)者
package com.atguigu.demomq2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HeaderUserVipConsumer {
@RabbitListener(queues = "headers.user.vip")
public void handleVip(Message message) {
String body = new String(message.getBody());
System.out.println("[VIP] Received: " + body);
}
}4.PostMan測(cè)試


這里僅消費(fèi)交換機(jī)初始化時(shí)滿足所有設(shè)定條件的消息,我們可以測(cè)試一下不滿足條件時(shí)發(fā)送消息

消費(fèi)者不消費(fèi)消息

總結(jié)

需要代碼自己進(jìn)行測(cè)試的 可以Git自取
git clone https://gitee.com/myselfzxy/mq-producer.git
git clone https://gitee.com/myselfzxy/mq-customer.git
到此這篇關(guān)于SpringBoot集成MQ,四種交換機(jī)的實(shí)例的文章就介紹到這了,更多相關(guān)SpringBoot集成MQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot Rabbit MQ topic 配置文件綁定隊(duì)列和交換機(jī)的實(shí)現(xiàn)方法
- SpringBoot整合RabbitMQ實(shí)戰(zhàn)教程附死信交換機(jī)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)交換機(jī)與隊(duì)列的綁定
- RabbitMQ交換機(jī)與Springboot整合的簡(jiǎn)單實(shí)現(xiàn)
- SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- SpringBoot集成RocketMQ的使用示例
- springboot集成mqtt超級(jí)詳細(xì)步驟
相關(guān)文章
java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例
在本篇文章里小編給大家分享了關(guān)于java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-05-05
MybatisX無(wú)法自動(dòng)生成entity實(shí)體類的解決方法
本文主要介紹了MybatisX無(wú)法自動(dòng)生成entity實(shí)體類的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
基于Spring p標(biāo)簽和c標(biāo)簽注入方式
這篇文章主要介紹了Spring p標(biāo)簽和c標(biāo)簽注入方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java實(shí)現(xiàn)批量化操作Excel文件的示例代碼
在操作Excel的場(chǎng)景中,通常會(huì)有一些針對(duì)Excel的批量操作,這篇文章主要為大家詳細(xì)介紹了如何使用GcExcel實(shí)現(xiàn)批量化操作Excel,感興趣的可以了解一下2024-12-12
在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程
這篇文章主要介紹了在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程,需要的朋友可以參考下2020-07-07
基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)
本文將會(huì)詳細(xì)地展示如何搭建JSP的開發(fā)環(huán)境。本次教程使用的是最新版的Eclipse 2018-09編輯器和最新版的Apache Tomcat v9.0,步驟詳細(xì),內(nèi)容詳盡,適合零基礎(chǔ)學(xué)者作為學(xué)習(xí)參考2018-12-12
springboot如何統(tǒng)一設(shè)置時(shí)區(qū)
這篇文章主要介紹了springboot如何統(tǒng)一設(shè)置時(shí)區(qū)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01
詳解使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離
本篇文章主要介紹了使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-01-01
Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問(wèn)題解決辦法
這篇文章主要介紹了Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問(wèn)題解決辦法,按正常來(lái)說(shuō)不應(yīng)該有上方的空白,當(dāng)然如果只是查看我也不至于非要解決他,主要是假如接口是json傳參,調(diào)試界面都沒辦法修改參數(shù),遇到同樣問(wèn)題的同學(xué)可以參考閱讀本文2024-09-09

