Spring boot框架下的RabbitMQ消息中間件詳解
1. RabbitMQ 基礎(chǔ)概念
1.1 消息處理流程與組件配合
Producer(生產(chǎn)者) 發(fā)送消息。消息先發(fā)送到 Exchange(交換機(jī)),而不是直接到隊(duì)列。
- Exchange(交換機(jī)) 接收到消息后,根據(jù) Routing Key(路由鍵) 和 Binding(綁定規(guī)則),決定將消息發(fā)送到哪些 Queue(隊(duì)列)。
- Queue(隊(duì)列) 存儲(chǔ)消息,等待 Consumer(消費(fèi)者) 消費(fèi)。
- Consumer(消費(fèi)者) 從隊(duì)列中接收并處理消息。
Producer(生產(chǎn)者)
作用:負(fù)責(zé)發(fā)送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。
關(guān)鍵點(diǎn):
- Producer 只需要知道 Exchange 和 Routing Key,不關(guān)心隊(duì)列。
- Producer 不直接與隊(duì)列交互,消息的路由和存儲(chǔ)由 Exchange 和 Binding 決定。
代碼示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
System.out.println("Sent message: " + message);
}
}調(diào)用示例:
producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");direct-exchange:目標(biāo)交換機(jī)。key1:消息的路由鍵。
Exchange(交換機(jī))
作用:接收來(lái)自 Producer 的消息,并根據(jù) Routing Key 和 Binding 的配置,決定將消息發(fā)送到哪些隊(duì)列。
Exchange 通常需要手動(dòng)注冊(cè)為 Bean。
- RabbitMQ 的 Exchange 是通過(guò)名稱來(lái)標(biāo)識(shí)的。
- 在 Spring Boot 中,您通過(guò)
@Bean方法注冊(cè) Exchange 時(shí),實(shí)際上是將 Exchange 的名稱和類型綁定到 RabbitMQ 服務(wù)器。- 發(fā)送消息時(shí),RabbitMQ 客戶端會(huì)根據(jù) Exchange 的名稱找到對(duì)應(yīng)的 Exchange,并根據(jù) Routing Key 將消息路由到隊(duì)列。
類型:
- Direct Exchange:精確匹配 Routing Key。消息的 Routing Key 必須與 Binding 的 Routing Key 完全一致。
- Topic Exchange:支持通配符匹配。例如,
with("key.*")可以匹配key.1、key.2等。 - Fanout Exchange:忽略 Routing Key,消息會(huì)被廣播到所有綁定的隊(duì)列。
- Headers Exchange:忽略 Routing Key,根據(jù)消息頭屬性匹配。
代碼示例(定義交換機(jī)):
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExchangeConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers-exchange");
}
}Queue(隊(duì)列)
作用:消息的存儲(chǔ)容器,等待消費(fèi)者從中取出消息進(jìn)行處理。
Queue 也需要手動(dòng)注冊(cè)為 Bean。Spring Boot 不會(huì)自動(dòng)注冊(cè)隊(duì)列,因?yàn)殛?duì)列的名稱和屬性(如是否持久化、是否排他等)需要根據(jù)業(yè)務(wù)需求進(jìn)行配置。
關(guān)鍵點(diǎn):
- 消息會(huì)保存在隊(duì)列中,直到被消費(fèi)。
- 隊(duì)列可以是持久化的(重啟 RabbitMQ 后消息仍然存在)或非持久化的。
代碼示例(定義隊(duì)列):
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue demoQueue() {
return new Queue("demo-queue", true); // 持久化隊(duì)列
}
}Routing Key(路由鍵)
作用:決定消息如何從交換機(jī)路由到隊(duì)列。
關(guān)鍵點(diǎn):
- Routing Key 由 Producer 指定。
- 在 Direct 和 Topic 類型的 Exchange 中,Routing Key 決定隊(duì)列是否接收消息。
Binding(綁定)
- 作用:將隊(duì)列與交換機(jī)連接,并定義路由規(guī)則。
- 關(guān)鍵點(diǎn):
- Binding 定義了隊(duì)列接受消息的條件。
- 結(jié)合 Routing Key 和交換機(jī)類型,共同決定消息的路由方式。
代碼示例(定義綁定):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BindingConfig {
@Bean
public Binding binding(Queue demoQueue, DirectExchange directExchange) {
return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
}
}with("key1") 的作用是 指定 Binding 的 Routing Key。它的含義是:
- 當(dāng)消息發(fā)送到 Exchange 時(shí),Exchange 會(huì)根據(jù)消息的 Routing Key 和 Binding 的 Routing Key 進(jìn)行匹配。
- 如果匹配成功,消息會(huì)被路由到對(duì)應(yīng)的隊(duì)列;如果匹配失敗,消息會(huì)被丟棄或進(jìn)入死信隊(duì)列(如果有配置)。
Consumer(消費(fèi)者)
作用:從隊(duì)列中接收并處理消息。
關(guān)鍵點(diǎn):
- 消費(fèi)者與隊(duì)列直接關(guān)聯(lián)。
- 多個(gè)消費(fèi)者可以監(jiān)聽(tīng)同一隊(duì)列,實(shí)現(xiàn)負(fù)載均衡。
代碼示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
@RabbitListener(queues = "demo-queue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}1.2 RabbitMQ 消息傳輸模型
點(diǎn)對(duì)點(diǎn)模型
定義:消息從生產(chǎn)者發(fā)送到隊(duì)列,由消費(fèi)者從隊(duì)列中接收,消息只能被一個(gè)消費(fèi)者消費(fèi)。
實(shí)現(xiàn):
- 使用默認(rèn)交換機(jī)(空字符串
"")。 - 直接將消息發(fā)送到隊(duì)列。
代碼示例:
rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");發(fā)布訂閱模型
定義:生產(chǎn)者將消息發(fā)送到 Fanout 類型的交換機(jī),消息會(huì)廣播到所有綁定的隊(duì)列。
實(shí)現(xiàn):
- 不需要 Routing Key。
- 所有綁定到 Fanout 交換機(jī)的隊(duì)列都會(huì)接收消息。
代碼示例:
rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");路由模型
定義:生產(chǎn)者將消息發(fā)送到 Direct 類型的交換機(jī),根據(jù) Routing Key 精確匹配隊(duì)列。
實(shí)現(xiàn):
- 隊(duì)列通過(guò) Binding 綁定到交換機(jī)時(shí),指定 Routing Key。
- 消息的 Routing Key 必須與 Binding 的 Routing Key 一致。
代碼示例:
rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");2. 環(huán)境準(zhǔn)備
2.1 安裝與配置 RabbitMQ
下載 Docker
訪問(wèn) Docker 官方網(wǎng)站:Docker: Accelerated Container Application Development。
根據(jù)您的操作系統(tǒng)(Windows、macOS 或 Linux)下載并安裝 Docker Desktop。
啟動(dòng) Docker
- 安裝完成后,啟動(dòng) Docker Desktop。
- 確保 Docker 正在運(yùn)行(任務(wù)欄或菜單欄中可以看到 Docker 圖標(biāo))。
使用 Docker 快速部署 RabbitMQ
Docker 是部署 RabbitMQ 的最簡(jiǎn)單方式。通過(guò)以下命令,您可以快速啟動(dòng)一個(gè) RabbitMQ 容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
參數(shù)說(shuō)明:
-d:以后臺(tái)模式運(yùn)行容器。--name rabbitmq:為容器指定名稱(rabbitmq)。-p 5672:5672:將容器的 5672 端口映射到主機(jī)的 5672 端口(RabbitMQ 的消息通信端口)。-p 15672:15672:將容器的 15672 端口映射到主機(jī)的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。rabbitmq:management:使用帶有管理插件的 RabbitMQ 鏡像。
驗(yàn)證 RabbitMQ 是否運(yùn)行
運(yùn)行以下命令,查看容器是否正常運(yùn)行:
docker ps
如果看到 rabbitmq 容器正在運(yùn)行,說(shuō)明 RabbitMQ 已成功啟動(dòng)。
2.2 使用 RabbitMQ 管理插件
RabbitMQ 提供了一個(gè) Web 管理界面,方便您監(jiān)控和管理 RabbitMQ。
訪問(wèn)管理界面
- 打開(kāi)瀏覽器,訪問(wèn)
http://localhost:15672。 - 使用默認(rèn)用戶名和密碼登錄:
- 用戶名:
guest - 密碼:
guest
- 用戶名:
管理界面功能
- Overview:查看 RabbitMQ 的整體狀態(tài),如連接數(shù)、隊(duì)列數(shù)、消息速率等。
- Connections:查看當(dāng)前連接到 RabbitMQ 的客戶端。
- Channels:查看當(dāng)前打開(kāi)的通道。
- Exchanges:查看和管理 Exchange。
- Queues:查看和管理 Queue。
- Admin:管理用戶和權(quán)限。
2.3 用戶與權(quán)限配置
默認(rèn)情況下,RabbitMQ 只有一個(gè)用戶 guest,密碼也是 guest。為了安全性和權(quán)限管理,建議創(chuàng)建新用戶并分配權(quán)限。
1. 創(chuàng)建新用戶
- 在 RabbitMQ 管理界面中:
- 點(diǎn)擊頂部導(dǎo)航欄的 Admin。
- 在用戶列表下方,點(diǎn)擊 Add a user。
- 輸入用戶名和密碼,例如:
- 用戶名:
admin - 密碼:
admin123
- 用戶名:
- 點(diǎn)擊 Add user 完成創(chuàng)建。
2. 分配權(quán)限
- 在用戶列表中,找到剛創(chuàng)建的用戶(如
admin)。 - 點(diǎn)擊用戶右側(cè)的 Set permission。
- 在權(quán)限設(shè)置頁(yè)面:
- Virtual Host:選擇
/(默認(rèn)的虛擬主機(jī))。 - Configure:輸入
.*,表示允許用戶配置所有資源。 - Write:輸入
.*,表示允許用戶寫入所有資源。 - Read:輸入
.*,表示允許用戶讀取所有資源。
- Virtual Host:選擇
- 點(diǎn)擊 Set permission 完成權(quán)限分配。
3. 使用新用戶登錄
- 退出當(dāng)前用戶(點(diǎn)擊右上角的
guest,選擇 Log out)。 - 使用新用戶(如
admin)登錄。
2.4 Spring Boot 中引入 RabbitMQ 依賴
在 pom.xml 中添加以下依賴:
<dependencies>
<!-- RabbitMQ 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依賴,它包含了以下內(nèi)容:
RabbitMQ 客戶端庫(kù):
- 自動(dòng)引入 RabbitMQ 的 Java 客戶端庫(kù)(
amqp-client),用于與 RabbitMQ 服務(wù)器通信。
Spring AMQP 支持:
- 提供了 Spring 對(duì) AMQP(Advanced Message Queuing Protocol)的支持,包括
RabbitTemplate、@RabbitListener等。
2.5 Spring Boot 配置 RabbitMQ
在 Spring Boot 項(xiàng)目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的連接信息。
示例配置
# RabbitMQ 連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin123
配置說(shuō)明:
spring.rabbitmq.host:RabbitMQ 服務(wù)器地址(默認(rèn)localhost)。spring.rabbitmq.port:RabbitMQ 消息通信端口(默認(rèn)5672)。spring.rabbitmq.username:RabbitMQ 用戶名。spring.rabbitmq.password:RabbitMQ 密碼。
3. Spring Boot 集成 RabbitMQ 的消息生產(chǎn)和消費(fèi)
3.1 消息生產(chǎn)者(Producer)
- 在 Spring Boot 中,我們使用
RabbitTemplate來(lái)發(fā)送消息。它由spring-boot-starter-amqp自動(dòng)配置成為一個(gè) Bean,可直接通過(guò)@Autowired注入。 - 如果 message 不是 String 類型的處理 Spring AMQP(
spring-boot-starter-amqp)在使用RabbitTemplate時(shí),默認(rèn)的消息轉(zhuǎn)換器(MessageConverter)通常會(huì)將對(duì)象序列化為 JSON 或者將字符串消息轉(zhuǎn)換為字節(jié)。 - 如果你的業(yè)務(wù)數(shù)據(jù)不是
String,常見(jiàn)做法是:- 在發(fā)送時(shí)把非字符串對(duì)象序列化(如轉(zhuǎn)換為 JSON 字符串);
- 或者配置自定義的
MessageConverter,讓 Spring 幫你把對(duì)象自動(dòng)序列化/反序列化。
典型做法:手動(dòng)序列化為 JSON 再發(fā)送
@Service
public class CustomObjectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomObject(String queueName, MyCustomObject obj) {
// 1. 將自定義對(duì)象序列化為 JSON 字符串
String jsonString = new Gson().toJson(obj);
// 2. 發(fā)送 JSON 字符串到 RabbitMQ
rabbitTemplate.convertAndSend(queueName, jsonString);
}
}在消費(fèi)者端,你也可以將消息(JSON 字符串)反序列化為 MyCustomObject。
配置自定義 Converter(可選)
Spring AMQP 提供了 Jackson2JsonMessageConverter 等現(xiàn)成轉(zhuǎn)換器。
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 配置 RabbitTemplate 使用該轉(zhuǎn)換器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}這樣一來(lái),rabbitTemplate.convertAndSend(queueName, myObject) 會(huì)自動(dòng)把 myObject 轉(zhuǎn)成 JSON 發(fā)送;消費(fèi)者端則自動(dòng)解析為同樣的 Java 對(duì)象。 1)基本消息發(fā)送
場(chǎng)景:
將消息直接發(fā)送到指定的隊(duì)列,跳過(guò)交換機(jī)的路由,讓 RabbitMQ 把消息放到這個(gè)隊(duì)列中。
核心代碼示例:
@Service
public class BasicProducer {
@Autowired
private RabbitTemplate rabbitTemplate; // 1.自動(dòng)注入的 RabbitTemplate
/**
* 2.發(fā)送基本消息到指定的隊(duì)列
* @param queueName 目標(biāo)隊(duì)列名稱
* @param message 消息內(nèi)容
*/
public void sendToQueue(String queueName, String message) {
// 3.調(diào)用 convertAndSend,直接將消息放入指定隊(duì)列
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("Message sent to queue: " + queueName + ", content: " + message);
}
}代碼詳解:
@Autowiredprivate RabbitTemplate rabbitTemplate;`- Spring Boot 自動(dòng)為我們配置了
RabbitTemplate,不用手動(dòng)定義 Bean。 - 通過(guò)依賴注入即可使用所有與 RabbitMQ 交互的方法。
- Spring Boot 自動(dòng)為我們配置了
public void sendToQueue(String queueName, String message)- 方法參數(shù)包括:
queueName: 目標(biāo)隊(duì)列的名稱。message: 要發(fā)送的字符串類型消息內(nèi)容。
rabbitTemplate.convertAndSend(queueName, message)convertAndSend方法會(huì)將消息轉(zhuǎn)換(轉(zhuǎn)換為字節(jié))并發(fā)送到指定隊(duì)列。- 如果該隊(duì)列不存在,RabbitMQ 會(huì)嘗試自動(dòng)創(chuàng)建(前提是 Broker 端配置允許自動(dòng)創(chuàng)建隊(duì)列)。
2)發(fā)送到交換機(jī)
場(chǎng)景:
將消息發(fā)送到一個(gè)交換機(jī)(Exchange),再由交換機(jī)通過(guò) Routing Key 將消息路由到匹配的隊(duì)列中。
核心代碼示例:
@Service
public class ExchangeProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息到指定交換機(jī)
* @param exchangeName 交換機(jī)名稱
* @param routingKey 路由鍵
* @param message 消息內(nèi)容
*/
public void sendToExchange(String exchangeName, String routingKey, String message) {
// 將消息發(fā)送到 exchangeName 指定的交換機(jī),使用 routingKey 進(jìn)行路由
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
}
}代碼詳解:
exchangeName- 要發(fā)送到的交換機(jī)名稱,例如
"direct-exchange"、"fanout-exchange"等。
- 要發(fā)送到的交換機(jī)名稱,例如
routingKey- 路由鍵,用來(lái)匹配綁定(Binding)。例如:對(duì)
DirectExchange而言,需要隊(duì)列綁定時(shí)的路由鍵與發(fā)送時(shí)的路由鍵相同,消息才能到達(dá)隊(duì)列。 rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
- 路由鍵,用來(lái)匹配綁定(Binding)。例如:對(duì)
- 將消息先發(fā)送到交換機(jī),再根據(jù)路由鍵將消息投遞到目標(biāo)隊(duì)列。
3)發(fā)送帶消息屬性的消息
場(chǎng)景:
需要為消息設(shè)置 TTL(過(guò)期時(shí)間)或優(yōu)先級(jí)等屬性,控制消息在隊(duì)列中的行為。
核心代碼示例:
@Service
public class PropertyProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送帶消息屬性的消息(如 TTL, 優(yōu)先級(jí))
*/
public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
// 1.創(chuàng)建 MessageProperties 對(duì)象,用于指定消息的屬性
MessageProperties properties = new MessageProperties();
properties.setExpiration("10000"); // 過(guò)期時(shí)間:10秒 (單位:毫秒)
properties.setPriority(5); // 優(yōu)先級(jí)設(shè)為 5
// 2.根據(jù)消息體和屬性構(gòu)建 Message 對(duì)象
Message message = new Message(messageContent.getBytes(), properties);
// 3.使用 send 方法(而非 convertAndSend)直接發(fā)送 Message 對(duì)象
rabbitTemplate.send(exchange, routingKey, message);
System.out.println("Message with properties sent: " + messageContent);
}
}代碼詳解:
MessageProperties properties = new MessageProperties();MessageProperties用于設(shè)置 AMQP 協(xié)議層的各種消息頭信息。
properties.setExpiration("10000");setExpiration設(shè)置消息的 TTL(Time-To-Live),單位是毫秒。如果到達(dá)時(shí)間后消息仍未被消費(fèi),RabbitMQ 會(huì)將其從隊(duì)列中移除并送入死信隊(duì)列(如果配置了死信隊(duì)列)。properties.setPriority(5);
- 設(shè)置消息的優(yōu)先級(jí)為 5,前提是隊(duì)列本身需要支持優(yōu)先級(jí)隊(duì)列(創(chuàng)建隊(duì)列時(shí)指定
x-max-priority)。new Message(messageContent.getBytes(), properties)
- 將純文本消息轉(zhuǎn)換為
Message對(duì)象,結(jié)合了消息屬性和消息體。rabbitTemplate.send(exchange, routingKey, message);
- 與
convertAndSend不同,它不會(huì)嘗試進(jìn)行消息轉(zhuǎn)換(如 JSON、字符串),而是直接發(fā)送完整的 AMQPMessage對(duì)象。
Message 構(gòu)造函數(shù)
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}body:消息體的字節(jié)數(shù)組。
messageProperties:AMQP 的消息屬性,包括 TTL、優(yōu)先級(jí)、headers 等。、
如果消息體不是String類型
手動(dòng)轉(zhuǎn)換為字節(jié):你可以先將自定義對(duì)象轉(zhuǎn)換為字節(jié)數(shù)組(例如通過(guò) JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一個(gè)參數(shù)。
MyCustomObject obj = new MyCustomObject(); // 假設(shè)你想用 JSON String jsonString = new Gson().toJson(obj); byte[] body = jsonString.getBytes(StandardCharsets.UTF_8); MessageProperties properties = new MessageProperties(); // 設(shè)置一些屬性 Message message = new Message(body, properties);
- 為什么不會(huì)自動(dòng)轉(zhuǎn) JSON?使用
new Message(...)構(gòu)造方法是“純” AMQP 層的做法,不會(huì)調(diào)用 Spring 的轉(zhuǎn)換器,因此你必須自己處理序列化。 - 使用
Message構(gòu)造函數(shù) 時(shí),你必須自行處理對(duì)象到byte[]的轉(zhuǎn)換(無(wú)論是字符串、JSON,還是其他格式)。 - 如果想讓 Spring AMQP 自動(dòng)轉(zhuǎn)換,你通常使用
rabbitTemplate.convertAndSend(Object msg)這種高級(jí) API,或者配置自定義MessageConverter。
3.2 消息消費(fèi)者(Consumer)
消費(fèi)者的核心功能是在指定的隊(duì)列中監(jiān)聽(tīng)消息,并根據(jù)配置的確認(rèn)模式(自動(dòng)確認(rèn)或手動(dòng)確認(rèn))對(duì)消息進(jìn)行處理或拒絕。
1)監(jiān)聽(tīng)隊(duì)列并消費(fèi)消息
核心代碼示例(自動(dòng)確認(rèn)模式):
@Service
public class Consumer {
/**
* 使用注解 @RabbitListener 指定要監(jiān)聽(tīng)的隊(duì)列
* 由于默認(rèn)為 auto-ack 模式,
* 當(dāng)消息到達(dá)后,RabbitMQ 會(huì)自動(dòng)確認(rèn)并從隊(duì)列中刪除該消息。
*/
@RabbitListener(queues = "demo-queue")
public void receiveMessage(String message) {
// 1.從 queueName 隊(duì)列中取到的消息內(nèi)容
System.out.println("Received message: " + message);
// 2.在 auto-ack 模式下,無(wú)需手動(dòng) ack
// 如果這里出現(xiàn)異常,RabbitMQ 不會(huì)再次發(fā)送消息給消費(fèi)者,消息會(huì)丟失。
}
}代碼詳解(自動(dòng)確認(rèn)模式):
@RabbitListener(queues = "demo-queue")- 聲明監(jiān)聽(tīng)名為
demo-queue的隊(duì)列。 - 一旦有新消息到達(dá)該隊(duì)列,就會(huì)自動(dòng)回調(diào)此方法。
- 聲明監(jiān)聽(tīng)名為
public void receiveMessage(String message)- 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會(huì)嘗試將其轉(zhuǎn)換為
String并注入到message中。
- 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會(huì)嘗試將其轉(zhuǎn)換為
- 自動(dòng)確認(rèn)(auto-ack)的風(fēng)險(xiǎn)
- 如果消費(fèi)者在處理消息時(shí)拋出異常,消息已經(jīng)被 RabbitMQ 標(biāo)記為“已確認(rèn)”,不會(huì)再重新發(fā)送或進(jìn)入死信隊(duì)列,導(dǎo)致消息丟失。
2)確認(rèn)機(jī)制
自動(dòng)確認(rèn)(auto-ack)
- 行為:
- 當(dāng)消費(fèi)者從隊(duì)列中獲取消息后,RabbitMQ 會(huì)立即將該消息標(biāo)記為已確認(rèn)(acknowledged),并從隊(duì)列中刪除。
- 問(wèn)題:
- 如果消息處理失敗(例如消費(fèi)者拋出異常),消息已經(jīng)被確認(rèn)并從隊(duì)列中刪除,無(wú)法重新處理。
- 如果消費(fèi)者崩潰或斷開(kāi)連接,未處理的消息會(huì)丟失。
- 適用場(chǎng)景:
- 對(duì)消息處理的可靠性要求不高的場(chǎng)景。
手動(dòng)確認(rèn)(manual-ack)
- 行為:
- 消費(fèi)者處理完消息后,必須顯式調(diào)用
basicAck方法確認(rèn)消息。 - 如果消息處理失敗,可以調(diào)用
basicNack或basicReject方法拒絕消息。
- 消費(fèi)者處理完消息后,必須顯式調(diào)用
- 優(yōu)點(diǎn):
- 確保消息處理的可靠性。
- 支持消息重新入隊(duì)或發(fā)送到死信隊(duì)列。
- 適用場(chǎng)景:
- 對(duì)消息處理的可靠性要求較高的場(chǎng)景。
核心代碼示例:
@Service
public class ManualAckConsumer {
/**
* 在 application.properties 中配置:
* spring.rabbitmq.listener.simple.acknowledge-mode=manual
* 使得 RabbitMQ 使用手動(dòng)確認(rèn)模式
*/
@RabbitListener(queues = "demo-queue")
public void receiveMessage(Message message, Channel channel) throws IOException {
try {
// 1.從消息中獲取消息體
String body = new String(message.getBody());
System.out.println("Processing message: " + body);
// 2.如果業(yè)務(wù)處理成功,則調(diào)用 basicAck 手動(dòng)確認(rèn)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Message processing failed: " + e.getMessage());
// 3.如果處理失敗,需要決定是重新入隊(duì)還是拒絕并進(jìn)入死信隊(duì)列
// requeue = true -> 重新入隊(duì)
// requeue = false -> 丟棄或進(jìn)入死信隊(duì)列(根據(jù)隊(duì)列配置)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}代碼詳解:
配置手動(dòng)確認(rèn):
在 application.properties 添加
spring.rabbitmq.listener.simple.acknowledge-mode=manual
表示 Spring AMQP 使用手動(dòng)確認(rèn)模式(manual-ack)。
public void receiveMessage(Message message, Channel channel):
與自動(dòng)確認(rèn)不同,這里不僅接收字符串,還接收了 org.springframework.amqp.core.Message 對(duì)象和 com.rabbitmq.client.Channel。Message:包含消息體(body)和消息屬性(headers 等)。Channel:給我們提供了 basicAck, basicNack, basicReject 等底層 AMQP 操作。
手動(dòng)確認(rèn)成功:
channel.basicAck(deliveryTag, multiple):
deliveryTag:
本次消息的唯一標(biāo)記,從 message.getMessageProperties().getDeliveryTag() 獲取。
multiple = false:只確認(rèn)當(dāng)前這條消息。
這里的
basicAck(long deliveryTag, boolean multiple)deliveryTag并不是在你構(gòu)造Message時(shí)生成的,而是 RabbitMQ Broker 在投遞消息給消費(fèi)者時(shí)由底層 AMQP 協(xié)議自動(dòng)分配的一個(gè)遞增的序號(hào)。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
手動(dòng)確認(rèn)失敗:
channel.basicNack(deliveryTag, multiple, requeue)或basicReject:requeue = true:將消息重新放回隊(duì)列等待下一次消費(fèi)(可能導(dǎo)致死循環(huán),如處理一直失?。?/li>requeue = false:拒絕消息,若配置了死信隊(duì)列,則進(jìn)入死信隊(duì)列;否則丟棄消息。
3)處理消費(fèi)失敗
- 自動(dòng)確認(rèn)模式下的處理
- 在自動(dòng)確認(rèn)模式下,如果消息處理失敗,RabbitMQ 不會(huì)重新發(fā)送消息,因?yàn)橄⒁呀?jīng)被確認(rèn)并從隊(duì)列中刪除。
- 問(wèn)題:
- 消息丟失,無(wú)法重新處理。
- 手動(dòng)確認(rèn)模式下的處理
- 在手動(dòng)確認(rèn)模式下,如果消息處理失敗,可以通過(guò)以下方式處理:
- 重新入隊(duì):
- 調(diào)用
basicNack或basicReject方法,并將requeue參數(shù)設(shè)置為true。 - 消息會(huì)重新進(jìn)入隊(duì)列,等待下一次消費(fèi)。
- 發(fā)送到死信隊(duì)列:
- 調(diào)用
basicNack或basicReject方法,并將requeue參數(shù)設(shè)置為false。 - 如果隊(duì)列配置了死信隊(duì)列,消息會(huì)被發(fā)送到死信隊(duì)列。
- 重試機(jī)制(Spring AMQP 提供的簡(jiǎn)單重試)(只支持手動(dòng)確認(rèn)機(jī)制)
是重試失敗了才會(huì)將消息重新入隊(duì) ,所以重試在前,重新入隊(duì)在后
# 啟用重試 spring.rabbitmq.listener.simple.retry.enabled=true # 最大重試次數(shù) spring.rabbitmq.listener.simple.retry.max-attempts=3 # 初始重試間隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000 # 間隔倍數(shù) spring.rabbitmq.listener.simple.retry.multiplier=2.0 # 最大重試間隔 spring.rabbitmq.listener.simple.retry.max-interval=10000
- Spring AMQP 提供了 重試機(jī)制,可以在消費(fèi)者處理消息失敗時(shí),自動(dòng)進(jìn)行多次重試,而不是直接將消息重新入隊(duì)。
行為
- 當(dāng)消息處理失敗時(shí),Spring AMQP 會(huì)在 本地 進(jìn)行重試(即不將消息重新入隊(duì)),直到達(dá)到最大重試次數(shù)。
- 如果重試次數(shù)用盡,消息會(huì)被拒絕(
basicNack或basicReject),并根據(jù)配置決定是否重新入隊(duì)或發(fā)送到死信隊(duì)列。
死信隊(duì)列(DLQ)
- 當(dāng)消息被拒絕或過(guò)期時(shí),RabbitMQ 會(huì)將其發(fā)送到我們配置的死信交換機(jī)(DLX),再路由到死信隊(duì)列(DLQ)。
- 配置示例:
@Configuration
public class RabbitConfig {
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal-queue")
.withArgument("x-dead-letter-exchange", "dead-letter-exchange") // 指定死信交換機(jī)
.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由鍵
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead-letter-exchange");
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dead-letter-queue");
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
}
}原理:
- 正常隊(duì)列通過(guò)
x-dead-letter-exchange指定死信交換機(jī),一旦消息被拒絕(requeue=false)或超時(shí)(TTL 到期),RabbitMQ 會(huì)把消息發(fā)送到dead-letter-exchange。 dead-letter-exchange與dead-letter-queue進(jìn)行綁定(路由鍵dead-letter-routing-key),從而實(shí)現(xiàn)死信隊(duì)列的存儲(chǔ)。
重新入隊(duì) vs 發(fā)送到死信隊(duì)列
- 重新入隊(duì):
channel.basicNack(deliveryTag, false, true) - 適用于臨時(shí)性錯(cuò)誤,比如數(shù)據(jù)庫(kù)鎖沖突、網(wǎng)絡(luò)抖動(dòng)等,等待后續(xù)重新處理。
- 發(fā)送到死信隊(duì)列:
channel.basicNack(deliveryTag, false, false) - 適用于永久性錯(cuò)誤,比如消息格式無(wú)法解析,或業(yè)務(wù)邏輯指定不應(yīng)再嘗試。
到此這篇關(guān)于Spring boot框架下的RabbitMQ消息中間件的文章就介紹到這了,更多相關(guān)Spring boot RabbitMQ消息中間件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java Springboot + Vue + MyBatis實(shí)現(xiàn)音樂(lè)播放系統(tǒng)
這篇文章主要介紹了一個(gè)完整的音樂(lè)播放系統(tǒng)是基于Java Springboot + Vue + MyBatis編寫的,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08
java8 stream多字段排序的實(shí)現(xiàn)
這篇文章主要介紹了java8 stream多字段排序的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Springboot解決no main manifest attribute錯(cuò)誤
在開(kāi)發(fā)Springboot項(xiàng)目時(shí),使用java -jar命令運(yùn)行jar包可能出現(xiàn)no main manifest attribute錯(cuò)誤,本文就來(lái)介紹一下該錯(cuò)誤的解決方法,感興趣的可以了解一下2024-09-09
SpringBoot實(shí)現(xiàn)自動(dòng)配置的方式詳解
Spring Boot 自動(dòng)配置 是其核心特性之一,它通過(guò)智能化的默認(rèn)配置減少了開(kāi)發(fā)者的工作量,自動(dòng)配置的原理基于條件化配置和 Spring 的 @Configuration 機(jī)制,本文給大家講解了SpringBoot實(shí)現(xiàn)自動(dòng)配置的過(guò)程,需要的朋友可以參考下2025-04-04
一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
使用Java判定一個(gè)數(shù)值是否在指定的開(kāi)閉區(qū)間范圍內(nèi)
這篇文章主要給大家介紹了關(guān)于使用Java判定一個(gè)數(shù)值是否在指定的開(kāi)閉區(qū)間范圍內(nèi)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-09-09
SpringBoot集成Redis實(shí)現(xiàn)驗(yàn)證碼的簡(jiǎn)單案例
本文主要介紹了SpringBoot集成Redis實(shí)現(xiàn)驗(yàn)證碼的簡(jiǎn)單案例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08

