Spring boot框架下的RabbitMQ消息中間件詳解
1. RabbitMQ 基礎概念
1.1 消息處理流程與組件配合
Producer(生產(chǎn)者) 發(fā)送消息。消息先發(fā)送到 Exchange(交換機),而不是直接到隊列。
- Exchange(交換機) 接收到消息后,根據(jù) Routing Key(路由鍵) 和 Binding(綁定規(guī)則),決定將消息發(fā)送到哪些 Queue(隊列)。
- Queue(隊列) 存儲消息,等待 Consumer(消費者) 消費。
- Consumer(消費者) 從隊列中接收并處理消息。
Producer(生產(chǎn)者)
作用:負責發(fā)送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。
關鍵點:
- Producer 只需要知道 Exchange 和 Routing Key,不關心隊列。
- Producer 不直接與隊列交互,消息的路由和存儲由 Exchange 和 Binding 決定。
代碼示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
System.out.println("Sent message: " + message);
}
}調(diào)用示例:
producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");direct-exchange:目標交換機。key1:消息的路由鍵。
Exchange(交換機)
作用:接收來自 Producer 的消息,并根據(jù) Routing Key 和 Binding 的配置,決定將消息發(fā)送到哪些隊列。
Exchange 通常需要手動注冊為 Bean。
- RabbitMQ 的 Exchange 是通過名稱來標識的。
- 在 Spring Boot 中,您通過
@Bean方法注冊 Exchange 時,實際上是將 Exchange 的名稱和類型綁定到 RabbitMQ 服務器。- 發(fā)送消息時,RabbitMQ 客戶端會根據(jù) Exchange 的名稱找到對應的 Exchange,并根據(jù) Routing Key 將消息路由到隊列。
類型:
- Direct Exchange:精確匹配 Routing Key。消息的 Routing Key 必須與 Binding 的 Routing Key 完全一致。
- Topic Exchange:支持通配符匹配。例如,
with("key.*")可以匹配key.1、key.2等。 - Fanout Exchange:忽略 Routing Key,消息會被廣播到所有綁定的隊列。
- Headers Exchange:忽略 Routing Key,根據(jù)消息頭屬性匹配。
代碼示例(定義交換機):
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExchangeConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers-exchange");
}
}Queue(隊列)
作用:消息的存儲容器,等待消費者從中取出消息進行處理。
Queue 也需要手動注冊為 Bean。Spring Boot 不會自動注冊隊列,因為隊列的名稱和屬性(如是否持久化、是否排他等)需要根據(jù)業(yè)務需求進行配置。
關鍵點:
- 消息會保存在隊列中,直到被消費。
- 隊列可以是持久化的(重啟 RabbitMQ 后消息仍然存在)或非持久化的。
代碼示例(定義隊列):
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue demoQueue() {
return new Queue("demo-queue", true); // 持久化隊列
}
}Routing Key(路由鍵)
作用:決定消息如何從交換機路由到隊列。
關鍵點:
- Routing Key 由 Producer 指定。
- 在 Direct 和 Topic 類型的 Exchange 中,Routing Key 決定隊列是否接收消息。
Binding(綁定)
- 作用:將隊列與交換機連接,并定義路由規(guī)則。
- 關鍵點:
- Binding 定義了隊列接受消息的條件。
- 結合 Routing Key 和交換機類型,共同決定消息的路由方式。
代碼示例(定義綁定):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BindingConfig {
@Bean
public Binding binding(Queue demoQueue, DirectExchange directExchange) {
return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
}
}with("key1") 的作用是 指定 Binding 的 Routing Key。它的含義是:
- 當消息發(fā)送到 Exchange 時,Exchange 會根據(jù)消息的 Routing Key 和 Binding 的 Routing Key 進行匹配。
- 如果匹配成功,消息會被路由到對應的隊列;如果匹配失敗,消息會被丟棄或進入死信隊列(如果有配置)。
Consumer(消費者)
作用:從隊列中接收并處理消息。
關鍵點:
- 消費者與隊列直接關聯(lián)。
- 多個消費者可以監(jiān)聽同一隊列,實現(xiàn)負載均衡。
代碼示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
@RabbitListener(queues = "demo-queue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}1.2 RabbitMQ 消息傳輸模型
點對點模型
定義:消息從生產(chǎn)者發(fā)送到隊列,由消費者從隊列中接收,消息只能被一個消費者消費。
實現(xiàn):
- 使用默認交換機(空字符串
"")。 - 直接將消息發(fā)送到隊列。
代碼示例:
rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");發(fā)布訂閱模型
定義:生產(chǎn)者將消息發(fā)送到 Fanout 類型的交換機,消息會廣播到所有綁定的隊列。
實現(xiàn):
- 不需要 Routing Key。
- 所有綁定到 Fanout 交換機的隊列都會接收消息。
代碼示例:
rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");路由模型
定義:生產(chǎn)者將消息發(fā)送到 Direct 類型的交換機,根據(jù) Routing Key 精確匹配隊列。
實現(xiàn):
- 隊列通過 Binding 綁定到交換機時,指定 Routing Key。
- 消息的 Routing Key 必須與 Binding 的 Routing Key 一致。
代碼示例:
rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");2. 環(huán)境準備
2.1 安裝與配置 RabbitMQ
下載 Docker
訪問 Docker 官方網(wǎng)站:Docker: Accelerated Container Application Development。
根據(jù)您的操作系統(tǒng)(Windows、macOS 或 Linux)下載并安裝 Docker Desktop。
啟動 Docker
- 安裝完成后,啟動 Docker Desktop。
- 確保 Docker 正在運行(任務欄或菜單欄中可以看到 Docker 圖標)。
使用 Docker 快速部署 RabbitMQ
Docker 是部署 RabbitMQ 的最簡單方式。通過以下命令,您可以快速啟動一個 RabbitMQ 容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
參數(shù)說明:
-d:以后臺模式運行容器。--name rabbitmq:為容器指定名稱(rabbitmq)。-p 5672:5672:將容器的 5672 端口映射到主機的 5672 端口(RabbitMQ 的消息通信端口)。-p 15672:15672:將容器的 15672 端口映射到主機的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。rabbitmq:management:使用帶有管理插件的 RabbitMQ 鏡像。
驗證 RabbitMQ 是否運行
運行以下命令,查看容器是否正常運行:
docker ps
如果看到 rabbitmq 容器正在運行,說明 RabbitMQ 已成功啟動。
2.2 使用 RabbitMQ 管理插件
RabbitMQ 提供了一個 Web 管理界面,方便您監(jiān)控和管理 RabbitMQ。
訪問管理界面
- 打開瀏覽器,訪問
http://localhost:15672。 - 使用默認用戶名和密碼登錄:
- 用戶名:
guest - 密碼:
guest
- 用戶名:
管理界面功能
- Overview:查看 RabbitMQ 的整體狀態(tài),如連接數(shù)、隊列數(shù)、消息速率等。
- Connections:查看當前連接到 RabbitMQ 的客戶端。
- Channels:查看當前打開的通道。
- Exchanges:查看和管理 Exchange。
- Queues:查看和管理 Queue。
- Admin:管理用戶和權限。
2.3 用戶與權限配置
默認情況下,RabbitMQ 只有一個用戶 guest,密碼也是 guest。為了安全性和權限管理,建議創(chuàng)建新用戶并分配權限。
1. 創(chuàng)建新用戶
- 在 RabbitMQ 管理界面中:
- 點擊頂部導航欄的 Admin。
- 在用戶列表下方,點擊 Add a user。
- 輸入用戶名和密碼,例如:
- 用戶名:
admin - 密碼:
admin123
- 用戶名:
- 點擊 Add user 完成創(chuàng)建。
2. 分配權限
- 在用戶列表中,找到剛創(chuàng)建的用戶(如
admin)。 - 點擊用戶右側的 Set permission。
- 在權限設置頁面:
- Virtual Host:選擇
/(默認的虛擬主機)。 - Configure:輸入
.*,表示允許用戶配置所有資源。 - Write:輸入
.*,表示允許用戶寫入所有資源。 - Read:輸入
.*,表示允許用戶讀取所有資源。
- Virtual Host:選擇
- 點擊 Set permission 完成權限分配。
3. 使用新用戶登錄
- 退出當前用戶(點擊右上角的
guest,選擇 Log out)。 - 使用新用戶(如
admin)登錄。
2.4 Spring Boot 中引入 RabbitMQ 依賴
在 pom.xml 中添加以下依賴:
<dependencies>
<!-- RabbitMQ 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依賴,它包含了以下內(nèi)容:
RabbitMQ 客戶端庫:
- 自動引入 RabbitMQ 的 Java 客戶端庫(
amqp-client),用于與 RabbitMQ 服務器通信。
Spring AMQP 支持:
- 提供了 Spring 對 AMQP(Advanced Message Queuing Protocol)的支持,包括
RabbitTemplate、@RabbitListener等。
2.5 Spring Boot 配置 RabbitMQ
在 Spring Boot 項目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的連接信息。
示例配置
# RabbitMQ 連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin123
配置說明:
spring.rabbitmq.host:RabbitMQ 服務器地址(默認localhost)。spring.rabbitmq.port:RabbitMQ 消息通信端口(默認5672)。spring.rabbitmq.username:RabbitMQ 用戶名。spring.rabbitmq.password:RabbitMQ 密碼。
3. Spring Boot 集成 RabbitMQ 的消息生產(chǎn)和消費
3.1 消息生產(chǎn)者(Producer)
- 在 Spring Boot 中,我們使用
RabbitTemplate來發(fā)送消息。它由spring-boot-starter-amqp自動配置成為一個 Bean,可直接通過@Autowired注入。 - 如果 message 不是 String 類型的處理 Spring AMQP(
spring-boot-starter-amqp)在使用RabbitTemplate時,默認的消息轉換器(MessageConverter)通常會將對象序列化為 JSON 或者將字符串消息轉換為字節(jié)。 - 如果你的業(yè)務數(shù)據(jù)不是
String,常見做法是:- 在發(fā)送時把非字符串對象序列化(如轉換為 JSON 字符串);
- 或者配置自定義的
MessageConverter,讓 Spring 幫你把對象自動序列化/反序列化。
典型做法:手動序列化為 JSON 再發(fā)送
@Service
public class CustomObjectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomObject(String queueName, MyCustomObject obj) {
// 1. 將自定義對象序列化為 JSON 字符串
String jsonString = new Gson().toJson(obj);
// 2. 發(fā)送 JSON 字符串到 RabbitMQ
rabbitTemplate.convertAndSend(queueName, jsonString);
}
}在消費者端,你也可以將消息(JSON 字符串)反序列化為 MyCustomObject。
配置自定義 Converter(可選)
Spring AMQP 提供了 Jackson2JsonMessageConverter 等現(xiàn)成轉換器。
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 配置 RabbitTemplate 使用該轉換器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}這樣一來,rabbitTemplate.convertAndSend(queueName, myObject) 會自動把 myObject 轉成 JSON 發(fā)送;消費者端則自動解析為同樣的 Java 對象。 1)基本消息發(fā)送
場景:
將消息直接發(fā)送到指定的隊列,跳過交換機的路由,讓 RabbitMQ 把消息放到這個隊列中。
核心代碼示例:
@Service
public class BasicProducer {
@Autowired
private RabbitTemplate rabbitTemplate; // 1.自動注入的 RabbitTemplate
/**
* 2.發(fā)送基本消息到指定的隊列
* @param queueName 目標隊列名稱
* @param message 消息內(nèi)容
*/
public void sendToQueue(String queueName, String message) {
// 3.調(diào)用 convertAndSend,直接將消息放入指定隊列
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("Message sent to queue: " + queueName + ", content: " + message);
}
}代碼詳解:
@Autowiredprivate RabbitTemplate rabbitTemplate;`- Spring Boot 自動為我們配置了
RabbitTemplate,不用手動定義 Bean。 - 通過依賴注入即可使用所有與 RabbitMQ 交互的方法。
- Spring Boot 自動為我們配置了
public void sendToQueue(String queueName, String message)- 方法參數(shù)包括:
queueName: 目標隊列的名稱。message: 要發(fā)送的字符串類型消息內(nèi)容。
rabbitTemplate.convertAndSend(queueName, message)convertAndSend方法會將消息轉換(轉換為字節(jié))并發(fā)送到指定隊列。- 如果該隊列不存在,RabbitMQ 會嘗試自動創(chuàng)建(前提是 Broker 端配置允許自動創(chuàng)建隊列)。
2)發(fā)送到交換機
場景:
將消息發(fā)送到一個交換機(Exchange),再由交換機通過 Routing Key 將消息路由到匹配的隊列中。
核心代碼示例:
@Service
public class ExchangeProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息到指定交換機
* @param exchangeName 交換機名稱
* @param routingKey 路由鍵
* @param message 消息內(nèi)容
*/
public void sendToExchange(String exchangeName, String routingKey, String message) {
// 將消息發(fā)送到 exchangeName 指定的交換機,使用 routingKey 進行路由
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
}
}代碼詳解:
exchangeName- 要發(fā)送到的交換機名稱,例如
"direct-exchange"、"fanout-exchange"等。
- 要發(fā)送到的交換機名稱,例如
routingKey- 路由鍵,用來匹配綁定(Binding)。例如:對
DirectExchange而言,需要隊列綁定時的路由鍵與發(fā)送時的路由鍵相同,消息才能到達隊列。 rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
- 路由鍵,用來匹配綁定(Binding)。例如:對
- 將消息先發(fā)送到交換機,再根據(jù)路由鍵將消息投遞到目標隊列。
3)發(fā)送帶消息屬性的消息
場景:
需要為消息設置 TTL(過期時間)或優(yōu)先級等屬性,控制消息在隊列中的行為。
核心代碼示例:
@Service
public class PropertyProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送帶消息屬性的消息(如 TTL, 優(yōu)先級)
*/
public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
// 1.創(chuàng)建 MessageProperties 對象,用于指定消息的屬性
MessageProperties properties = new MessageProperties();
properties.setExpiration("10000"); // 過期時間:10秒 (單位:毫秒)
properties.setPriority(5); // 優(yōu)先級設為 5
// 2.根據(jù)消息體和屬性構建 Message 對象
Message message = new Message(messageContent.getBytes(), properties);
// 3.使用 send 方法(而非 convertAndSend)直接發(fā)送 Message 對象
rabbitTemplate.send(exchange, routingKey, message);
System.out.println("Message with properties sent: " + messageContent);
}
}代碼詳解:
MessageProperties properties = new MessageProperties();MessageProperties用于設置 AMQP 協(xié)議層的各種消息頭信息。
properties.setExpiration("10000");setExpiration設置消息的 TTL(Time-To-Live),單位是毫秒。如果到達時間后消息仍未被消費,RabbitMQ 會將其從隊列中移除并送入死信隊列(如果配置了死信隊列)。properties.setPriority(5);
- 設置消息的優(yōu)先級為 5,前提是隊列本身需要支持優(yōu)先級隊列(創(chuàng)建隊列時指定
x-max-priority)。new Message(messageContent.getBytes(), properties)
- 將純文本消息轉換為
Message對象,結合了消息屬性和消息體。rabbitTemplate.send(exchange, routingKey, message);
- 與
convertAndSend不同,它不會嘗試進行消息轉換(如 JSON、字符串),而是直接發(fā)送完整的 AMQPMessage對象。
Message 構造函數(shù)
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}body:消息體的字節(jié)數(shù)組。
messageProperties:AMQP 的消息屬性,包括 TTL、優(yōu)先級、headers 等。、
如果消息體不是String類型
手動轉換為字節(jié):你可以先將自定義對象轉換為字節(jié)數(shù)組(例如通過 JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一個參數(shù)。
MyCustomObject obj = new MyCustomObject(); // 假設你想用 JSON String jsonString = new Gson().toJson(obj); byte[] body = jsonString.getBytes(StandardCharsets.UTF_8); MessageProperties properties = new MessageProperties(); // 設置一些屬性 Message message = new Message(body, properties);
- 為什么不會自動轉 JSON?使用
new Message(...)構造方法是“純” AMQP 層的做法,不會調(diào)用 Spring 的轉換器,因此你必須自己處理序列化。 - 使用
Message構造函數(shù) 時,你必須自行處理對象到byte[]的轉換(無論是字符串、JSON,還是其他格式)。 - 如果想讓 Spring AMQP 自動轉換,你通常使用
rabbitTemplate.convertAndSend(Object msg)這種高級 API,或者配置自定義MessageConverter。
3.2 消息消費者(Consumer)
消費者的核心功能是在指定的隊列中監(jiān)聽消息,并根據(jù)配置的確認模式(自動確認或手動確認)對消息進行處理或拒絕。
1)監(jiān)聽隊列并消費消息
核心代碼示例(自動確認模式):
@Service
public class Consumer {
/**
* 使用注解 @RabbitListener 指定要監(jiān)聽的隊列
* 由于默認為 auto-ack 模式,
* 當消息到達后,RabbitMQ 會自動確認并從隊列中刪除該消息。
*/
@RabbitListener(queues = "demo-queue")
public void receiveMessage(String message) {
// 1.從 queueName 隊列中取到的消息內(nèi)容
System.out.println("Received message: " + message);
// 2.在 auto-ack 模式下,無需手動 ack
// 如果這里出現(xiàn)異常,RabbitMQ 不會再次發(fā)送消息給消費者,消息會丟失。
}
}代碼詳解(自動確認模式):
@RabbitListener(queues = "demo-queue")- 聲明監(jiān)聽名為
demo-queue的隊列。 - 一旦有新消息到達該隊列,就會自動回調(diào)此方法。
- 聲明監(jiān)聽名為
public void receiveMessage(String message)- 默認參數(shù)類型為字符串,當 RabbitMQ 收到消息后會嘗試將其轉換為
String并注入到message中。
- 默認參數(shù)類型為字符串,當 RabbitMQ 收到消息后會嘗試將其轉換為
- 自動確認(auto-ack)的風險
- 如果消費者在處理消息時拋出異常,消息已經(jīng)被 RabbitMQ 標記為“已確認”,不會再重新發(fā)送或進入死信隊列,導致消息丟失。
2)確認機制
自動確認(auto-ack)
- 行為:
- 當消費者從隊列中獲取消息后,RabbitMQ 會立即將該消息標記為已確認(acknowledged),并從隊列中刪除。
- 問題:
- 如果消息處理失?。ɡ缦M者拋出異常),消息已經(jīng)被確認并從隊列中刪除,無法重新處理。
- 如果消費者崩潰或斷開連接,未處理的消息會丟失。
- 適用場景:
- 對消息處理的可靠性要求不高的場景。
手動確認(manual-ack)
- 行為:
- 消費者處理完消息后,必須顯式調(diào)用
basicAck方法確認消息。 - 如果消息處理失敗,可以調(diào)用
basicNack或basicReject方法拒絕消息。
- 消費者處理完消息后,必須顯式調(diào)用
- 優(yōu)點:
- 確保消息處理的可靠性。
- 支持消息重新入隊或發(fā)送到死信隊列。
- 適用場景:
- 對消息處理的可靠性要求較高的場景。
核心代碼示例:
@Service
public class ManualAckConsumer {
/**
* 在 application.properties 中配置:
* spring.rabbitmq.listener.simple.acknowledge-mode=manual
* 使得 RabbitMQ 使用手動確認模式
*/
@RabbitListener(queues = "demo-queue")
public void receiveMessage(Message message, Channel channel) throws IOException {
try {
// 1.從消息中獲取消息體
String body = new String(message.getBody());
System.out.println("Processing message: " + body);
// 2.如果業(yè)務處理成功,則調(diào)用 basicAck 手動確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Message processing failed: " + e.getMessage());
// 3.如果處理失敗,需要決定是重新入隊還是拒絕并進入死信隊列
// requeue = true -> 重新入隊
// requeue = false -> 丟棄或進入死信隊列(根據(jù)隊列配置)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}代碼詳解:
配置手動確認:
在 application.properties 添加
spring.rabbitmq.listener.simple.acknowledge-mode=manual
表示 Spring AMQP 使用手動確認模式(manual-ack)。
public void receiveMessage(Message message, Channel channel):
與自動確認不同,這里不僅接收字符串,還接收了 org.springframework.amqp.core.Message 對象和 com.rabbitmq.client.Channel。Message:包含消息體(body)和消息屬性(headers 等)。Channel:給我們提供了 basicAck, basicNack, basicReject 等底層 AMQP 操作。
手動確認成功:
channel.basicAck(deliveryTag, multiple):
deliveryTag:
本次消息的唯一標記,從 message.getMessageProperties().getDeliveryTag() 獲取。
multiple = false:只確認當前這條消息。
這里的
basicAck(long deliveryTag, boolean multiple)deliveryTag并不是在你構造Message時生成的,而是 RabbitMQ Broker 在投遞消息給消費者時由底層 AMQP 協(xié)議自動分配的一個遞增的序號。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
手動確認失敗:
channel.basicNack(deliveryTag, multiple, requeue)或basicReject:requeue = true:將消息重新放回隊列等待下一次消費(可能導致死循環(huán),如處理一直失?。?/li>requeue = false:拒絕消息,若配置了死信隊列,則進入死信隊列;否則丟棄消息。
3)處理消費失敗
- 自動確認模式下的處理
- 在自動確認模式下,如果消息處理失敗,RabbitMQ 不會重新發(fā)送消息,因為消息已經(jīng)被確認并從隊列中刪除。
- 問題:
- 消息丟失,無法重新處理。
- 手動確認模式下的處理
- 在手動確認模式下,如果消息處理失敗,可以通過以下方式處理:
- 重新入隊:
- 調(diào)用
basicNack或basicReject方法,并將requeue參數(shù)設置為true。 - 消息會重新進入隊列,等待下一次消費。
- 發(fā)送到死信隊列:
- 調(diào)用
basicNack或basicReject方法,并將requeue參數(shù)設置為false。 - 如果隊列配置了死信隊列,消息會被發(fā)送到死信隊列。
- 重試機制(Spring AMQP 提供的簡單重試)(只支持手動確認機制)
是重試失敗了才會將消息重新入隊 ,所以重試在前,重新入隊在后
# 啟用重試 spring.rabbitmq.listener.simple.retry.enabled=true # 最大重試次數(shù) spring.rabbitmq.listener.simple.retry.max-attempts=3 # 初始重試間隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000 # 間隔倍數(shù) spring.rabbitmq.listener.simple.retry.multiplier=2.0 # 最大重試間隔 spring.rabbitmq.listener.simple.retry.max-interval=10000
- Spring AMQP 提供了 重試機制,可以在消費者處理消息失敗時,自動進行多次重試,而不是直接將消息重新入隊。
行為
- 當消息處理失敗時,Spring AMQP 會在 本地 進行重試(即不將消息重新入隊),直到達到最大重試次數(shù)。
- 如果重試次數(shù)用盡,消息會被拒絕(
basicNack或basicReject),并根據(jù)配置決定是否重新入隊或發(fā)送到死信隊列。
死信隊列(DLQ)
- 當消息被拒絕或過期時,RabbitMQ 會將其發(fā)送到我們配置的死信交換機(DLX),再路由到死信隊列(DLQ)。
- 配置示例:
@Configuration
public class RabbitConfig {
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal-queue")
.withArgument("x-dead-letter-exchange", "dead-letter-exchange") // 指定死信交換機
.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由鍵
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead-letter-exchange");
}
@Bean
public Queue deadLetterQueue() {
return new Queue("dead-letter-queue");
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
}
}原理:
- 正常隊列通過
x-dead-letter-exchange指定死信交換機,一旦消息被拒絕(requeue=false)或超時(TTL 到期),RabbitMQ 會把消息發(fā)送到dead-letter-exchange。 dead-letter-exchange與dead-letter-queue進行綁定(路由鍵dead-letter-routing-key),從而實現(xiàn)死信隊列的存儲。
重新入隊 vs 發(fā)送到死信隊列
- 重新入隊:
channel.basicNack(deliveryTag, false, true) - 適用于臨時性錯誤,比如數(shù)據(jù)庫鎖沖突、網(wǎng)絡抖動等,等待后續(xù)重新處理。
- 發(fā)送到死信隊列:
channel.basicNack(deliveryTag, false, false) - 適用于永久性錯誤,比如消息格式無法解析,或業(yè)務邏輯指定不應再嘗試。
到此這篇關于Spring boot框架下的RabbitMQ消息中間件的文章就介紹到這了,更多相關Spring boot RabbitMQ消息中間件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
基于Java Springboot + Vue + MyBatis實現(xiàn)音樂播放系統(tǒng)
這篇文章主要介紹了一個完整的音樂播放系統(tǒng)是基于Java Springboot + Vue + MyBatis編寫的,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-08-08
Springboot解決no main manifest attribute錯誤
在開發(fā)Springboot項目時,使用java -jar命令運行jar包可能出現(xiàn)no main manifest attribute錯誤,本文就來介紹一下該錯誤的解決方法,感興趣的可以了解一下2024-09-09
一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-03-03
使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)
這篇文章主要給大家介紹了關于使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)的相關資料,文中通過實例代碼介紹的非常詳細,對大家學習或者使用Java具有一定的參考學習價值,需要的朋友可以參考下2022-09-09
SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例
本文主要介紹了SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08

