欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring boot框架下的RabbitMQ消息中間件詳解

 更新時(shí)間:2025年01月22日 10:46:14   作者:阿乾之銘  
這篇文章詳細(xì)介紹了Spring Boot框架下的RabbitMQ消息中間件的基本概念、消息傳輸模型、環(huán)境準(zhǔn)備、Spring Boot集成以及消息生產(chǎn)和消費(fèi),感興趣的朋友跟隨小編一起看看吧

1. RabbitMQ 基礎(chǔ)概念

1.1 消息處理流程與組件配合

Producer(生產(chǎn)者) 發(fā)送消息。消息先發(fā)送到 Exchange(交換機(jī)),而不是直接到隊(duì)列。

  • Exchange(交換機(jī)) 接收到消息后,根據(jù) Routing Key(路由鍵) 和 Binding(綁定規(guī)則),決定將消息發(fā)送到哪些 Queue(隊(duì)列)。
  • Queue(隊(duì)列) 存儲(chǔ)消息,等待 Consumer(消費(fèi)者) 消費(fèi)。
  • Consumer(消費(fèi)者) 從隊(duì)列中接收并處理消息。

Producer(生產(chǎn)者)

作用:負(fù)責(zé)發(fā)送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。

關(guān)鍵點(diǎn)

  • Producer 只需要知道 Exchange 和 Routing Key,不關(guān)心隊(duì)列。
  • Producer 不直接與隊(duì)列交互,消息的路由和存儲(chǔ)由 Exchange 和 Binding 決定。

代碼示例

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Sent message: " + message);
    }
}

調(diào)用示例

producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
  • direct-exchange:目標(biāo)交換機(jī)。
  • key1:消息的路由鍵。

Exchange(交換機(jī))

作用:接收來(lái)自 Producer 的消息,并根據(jù) Routing Key 和 Binding 的配置,決定將消息發(fā)送到哪些隊(duì)列。

Exchange 通常需要手動(dòng)注冊(cè)為 Bean。

  • RabbitMQ 的 Exchange 是通過(guò)名稱來(lái)標(biāo)識(shí)的。
  • 在 Spring Boot 中,您通過(guò) @Bean 方法注冊(cè) Exchange 時(shí),實(shí)際上是將 Exchange 的名稱和類型綁定到 RabbitMQ 服務(wù)器。
  • 發(fā)送消息時(shí),RabbitMQ 客戶端會(huì)根據(jù) Exchange 的名稱找到對(duì)應(yīng)的 Exchange,并根據(jù) Routing Key 將消息路由到隊(duì)列。

類型

  • Direct Exchange:精確匹配 Routing Key。消息的 Routing Key 必須與 Binding 的 Routing Key 完全一致。
  • Topic Exchange:支持通配符匹配。例如,with("key.*") 可以匹配 key.1、key.2 等。
  • Fanout Exchange:忽略 Routing Key,消息會(huì)被廣播到所有綁定的隊(duì)列。
  • Headers Exchange:忽略 Routing Key,根據(jù)消息頭屬性匹配。

代碼示例(定義交換機(jī)):

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExchangeConfig {
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers-exchange");
    }
}

Queue(隊(duì)列)

作用:消息的存儲(chǔ)容器,等待消費(fèi)者從中取出消息進(jìn)行處理。

Queue 也需要手動(dòng)注冊(cè)為 Bean。Spring Boot 不會(huì)自動(dòng)注冊(cè)隊(duì)列,因?yàn)殛?duì)列的名稱和屬性(如是否持久化、是否排他等)需要根據(jù)業(yè)務(wù)需求進(jìn)行配置。

關(guān)鍵點(diǎn)

  • 消息會(huì)保存在隊(duì)列中,直到被消費(fèi)。
  • 隊(duì)列可以是持久化的(重啟 RabbitMQ 后消息仍然存在)或非持久化的。

代碼示例(定義隊(duì)列):

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
    @Bean
    public Queue demoQueue() {
        return new Queue("demo-queue", true); // 持久化隊(duì)列
    }
}

Routing Key(路由鍵)

作用:決定消息如何從交換機(jī)路由到隊(duì)列。

關(guān)鍵點(diǎn)

  • Routing Key 由 Producer 指定。
  • 在 Direct 和 Topic 類型的 Exchange 中,Routing Key 決定隊(duì)列是否接收消息。

Binding(綁定)

  • 作用:將隊(duì)列與交換機(jī)連接,并定義路由規(guī)則。
  • 關(guān)鍵點(diǎn)
    • Binding 定義了隊(duì)列接受消息的條件。
    • 結(jié)合 Routing Key 和交換機(jī)類型,共同決定消息的路由方式。

代碼示例(定義綁定):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BindingConfig {
    @Bean
    public Binding binding(Queue demoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
    }
}

with("key1") 的作用是 指定 Binding 的 Routing Key。它的含義是:

  • 當(dāng)消息發(fā)送到 Exchange 時(shí),Exchange 會(huì)根據(jù)消息的 Routing Key 和 Binding 的 Routing Key 進(jìn)行匹配。
  • 如果匹配成功,消息會(huì)被路由到對(duì)應(yīng)的隊(duì)列;如果匹配失敗,消息會(huì)被丟棄或進(jìn)入死信隊(duì)列(如果有配置)。

Consumer(消費(fèi)者)

作用:從隊(duì)列中接收并處理消息。

關(guān)鍵點(diǎn)

  • 消費(fèi)者與隊(duì)列直接關(guān)聯(lián)。
  • 多個(gè)消費(fèi)者可以監(jiān)聽(tīng)同一隊(duì)列,實(shí)現(xiàn)負(fù)載均衡。

代碼示例

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

1.2 RabbitMQ 消息傳輸模型

點(diǎn)對(duì)點(diǎn)模型

定義:消息從生產(chǎn)者發(fā)送到隊(duì)列,由消費(fèi)者從隊(duì)列中接收,消息只能被一個(gè)消費(fèi)者消費(fèi)。

實(shí)現(xiàn)

  • 使用默認(rèn)交換機(jī)(空字符串 "")。
  • 直接將消息發(fā)送到隊(duì)列。

代碼示例

rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");

發(fā)布訂閱模型

定義:生產(chǎn)者將消息發(fā)送到 Fanout 類型的交換機(jī),消息會(huì)廣播到所有綁定的隊(duì)列。

實(shí)現(xiàn)

  • 不需要 Routing Key。
  • 所有綁定到 Fanout 交換機(jī)的隊(duì)列都會(huì)接收消息。

代碼示例

rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");

路由模型

定義:生產(chǎn)者將消息發(fā)送到 Direct 類型的交換機(jī),根據(jù) Routing Key 精確匹配隊(duì)列。

實(shí)現(xiàn)

  • 隊(duì)列通過(guò) Binding 綁定到交換機(jī)時(shí),指定 Routing Key。
  • 消息的 Routing Key 必須與 Binding 的 Routing Key 一致。

代碼示例

rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");

2. 環(huán)境準(zhǔn)備

2.1 安裝與配置 RabbitMQ

下載 Docker

啟動(dòng) Docker

  • 安裝完成后,啟動(dòng) Docker Desktop。
  • 確保 Docker 正在運(yùn)行(任務(wù)欄或菜單欄中可以看到 Docker 圖標(biāo))。

使用 Docker 快速部署 RabbitMQ

Docker 是部署 RabbitMQ 的最簡(jiǎn)單方式。通過(guò)以下命令,您可以快速啟動(dòng)一個(gè) RabbitMQ 容器:

  • docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

參數(shù)說(shuō)明

  • -d:以后臺(tái)模式運(yùn)行容器。
  • --name rabbitmq:為容器指定名稱(rabbitmq)。
  • -p 5672:5672:將容器的 5672 端口映射到主機(jī)的 5672 端口(RabbitMQ 的消息通信端口)。
  • -p 15672:15672:將容器的 15672 端口映射到主機(jī)的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。
  • rabbitmq:management:使用帶有管理插件的 RabbitMQ 鏡像。

驗(yàn)證 RabbitMQ 是否運(yùn)行

運(yùn)行以下命令,查看容器是否正常運(yùn)行:

docker ps

如果看到 rabbitmq 容器正在運(yùn)行,說(shuō)明 RabbitMQ 已成功啟動(dòng)。

2.2 使用 RabbitMQ 管理插件

RabbitMQ 提供了一個(gè) Web 管理界面,方便您監(jiān)控和管理 RabbitMQ。

訪問(wèn)管理界面

  • 打開(kāi)瀏覽器,訪問(wèn) http://localhost:15672。
  • 使用默認(rèn)用戶名和密碼登錄:
    • 用戶名:guest
    • 密碼:guest

管理界面功能

  • Overview:查看 RabbitMQ 的整體狀態(tài),如連接數(shù)、隊(duì)列數(shù)、消息速率等。
  • Connections:查看當(dāng)前連接到 RabbitMQ 的客戶端。
  • Channels:查看當(dāng)前打開(kāi)的通道。
  • Exchanges:查看和管理 Exchange。
  • Queues:查看和管理 Queue。
  • Admin:管理用戶和權(quán)限。

2.3 用戶與權(quán)限配置

默認(rèn)情況下,RabbitMQ 只有一個(gè)用戶 guest,密碼也是 guest。為了安全性和權(quán)限管理,建議創(chuàng)建新用戶并分配權(quán)限。

1. 創(chuàng)建新用戶

  • 在 RabbitMQ 管理界面中:
  • 點(diǎn)擊頂部導(dǎo)航欄的 Admin。
  • 在用戶列表下方,點(diǎn)擊 Add a user。
  • 輸入用戶名和密碼,例如:
    • 用戶名:admin
    • 密碼:admin123
  • 點(diǎn)擊 Add user 完成創(chuàng)建。

2. 分配權(quán)限

  • 在用戶列表中,找到剛創(chuàng)建的用戶(如 admin)。
  • 點(diǎn)擊用戶右側(cè)的 Set permission
  • 在權(quán)限設(shè)置頁(yè)面:
    • Virtual Host:選擇 /(默認(rèn)的虛擬主機(jī))。
    • Configure:輸入 .*,表示允許用戶配置所有資源。
    • Write:輸入 .*,表示允許用戶寫入所有資源。
    • Read:輸入 .*,表示允許用戶讀取所有資源。
  • 點(diǎn)擊 Set permission 完成權(quán)限分配。

3. 使用新用戶登錄

  • 退出當(dāng)前用戶(點(diǎn)擊右上角的 guest,選擇 Log out)。
  • 使用新用戶(如 admin)登錄。

2.4  Spring Boot 中引入 RabbitMQ 依賴 

在 pom.xml 中添加以下依賴:

<dependencies>
    <!-- RabbitMQ 依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依賴,它包含了以下內(nèi)容:

RabbitMQ 客戶端庫(kù)

  • 自動(dòng)引入 RabbitMQ 的 Java 客戶端庫(kù)(amqp-client),用于與 RabbitMQ 服務(wù)器通信。

Spring AMQP 支持

  • 提供了 Spring 對(duì) AMQP(Advanced Message Queuing Protocol)的支持,包括 RabbitTemplate、@RabbitListener 等。

2.5 Spring Boot 配置 RabbitMQ

在 Spring Boot 項(xiàng)目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的連接信息。

示例配置

  • # RabbitMQ 連接配置
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin123

配置說(shuō)明

  • spring.rabbitmq.host:RabbitMQ 服務(wù)器地址(默認(rèn) localhost)。
  • spring.rabbitmq.port:RabbitMQ 消息通信端口(默認(rèn) 5672)。
  • spring.rabbitmq.username:RabbitMQ 用戶名。
  • spring.rabbitmq.password:RabbitMQ 密碼。

3. Spring Boot 集成 RabbitMQ 的消息生產(chǎn)和消費(fèi)

3.1 消息生產(chǎn)者(Producer)

  • 在 Spring Boot 中,我們使用 RabbitTemplate 來(lái)發(fā)送消息。它由 spring-boot-starter-amqp 自動(dòng)配置成為一個(gè) Bean,可直接通過(guò) @Autowired 注入。
  • 如果 message 不是 String 類型的處理 Spring AMQP(spring-boot-starter-amqp)在使用 RabbitTemplate 時(shí),默認(rèn)的消息轉(zhuǎn)換器(MessageConverter)通常會(huì)將對(duì)象序列化為 JSON 或者將字符串消息轉(zhuǎn)換為字節(jié)。
  • 如果你的業(yè)務(wù)數(shù)據(jù)不是 String,常見(jiàn)做法是:
    • 在發(fā)送時(shí)把非字符串對(duì)象序列化(如轉(zhuǎn)換為 JSON 字符串);
    • 或者配置自定義的 MessageConverter,讓 Spring 幫你把對(duì)象自動(dòng)序列化/反序列化。

典型做法:手動(dòng)序列化為 JSON 再發(fā)送

@Service
public class CustomObjectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendCustomObject(String queueName, MyCustomObject obj) {
        // 1. 將自定義對(duì)象序列化為 JSON 字符串
        String jsonString = new Gson().toJson(obj);
        // 2. 發(fā)送 JSON 字符串到 RabbitMQ
        rabbitTemplate.convertAndSend(queueName, jsonString);
    }
}

在消費(fèi)者端,你也可以將消息(JSON 字符串)反序列化為 MyCustomObject。

配置自定義 Converter(可選)

Spring AMQP 提供了 Jackson2JsonMessageConverter 等現(xiàn)成轉(zhuǎn)換器。

@Configuration
public class RabbitConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // 配置 RabbitTemplate 使用該轉(zhuǎn)換器
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

這樣一來(lái),rabbitTemplate.convertAndSend(queueName, myObject) 會(huì)自動(dòng)把 myObject 轉(zhuǎn)成 JSON 發(fā)送;消費(fèi)者端則自動(dòng)解析為同樣的 Java 對(duì)象。 1)基本消息發(fā)送

場(chǎng)景
將消息直接發(fā)送到指定的隊(duì)列,跳過(guò)交換機(jī)的路由,讓 RabbitMQ 把消息放到這個(gè)隊(duì)列中。

核心代碼示例

@Service
public class BasicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;  // 1.自動(dòng)注入的 RabbitTemplate
    /**
     * 2.發(fā)送基本消息到指定的隊(duì)列
     * @param queueName  目標(biāo)隊(duì)列名稱
     * @param message    消息內(nèi)容
     */
    public void sendToQueue(String queueName, String message) {
        // 3.調(diào)用 convertAndSend,直接將消息放入指定隊(duì)列
        rabbitTemplate.convertAndSend(queueName, message);
        System.out.println("Message sent to queue: " + queueName + ", content: " + message);
    }
}

代碼詳解

  • @Autowired private RabbitTemplate rabbitTemplate;`
    • Spring Boot 自動(dòng)為我們配置了 RabbitTemplate,不用手動(dòng)定義 Bean。
    • 通過(guò)依賴注入即可使用所有與 RabbitMQ 交互的方法。
  • public void sendToQueue(String queueName, String message)
  • 方法參數(shù)包括:
    • queueName: 目標(biāo)隊(duì)列的名稱。
    • message: 要發(fā)送的字符串類型消息內(nèi)容。
  • rabbitTemplate.convertAndSend(queueName, message)
    • convertAndSend 方法會(huì)將消息轉(zhuǎn)換(轉(zhuǎn)換為字節(jié))并發(fā)送到指定隊(duì)列。
    • 如果該隊(duì)列不存在,RabbitMQ 會(huì)嘗試自動(dòng)創(chuàng)建(前提是 Broker 端配置允許自動(dòng)創(chuàng)建隊(duì)列)。

2)發(fā)送到交換機(jī)

場(chǎng)景
將消息發(fā)送到一個(gè)交換機(jī)(Exchange),再由交換機(jī)通過(guò) Routing Key 將消息路由到匹配的隊(duì)列中。

核心代碼示例

@Service
public class ExchangeProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息到指定交換機(jī)
     * @param exchangeName  交換機(jī)名稱
     * @param routingKey    路由鍵
     * @param message       消息內(nèi)容
     */
    public void sendToExchange(String exchangeName, String routingKey, String message) {
        // 將消息發(fā)送到 exchangeName 指定的交換機(jī),使用 routingKey 進(jìn)行路由
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
        System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
    }
}

代碼詳解

  • exchangeName
    • 要發(fā)送到的交換機(jī)名稱,例如 "direct-exchange"、"fanout-exchange" 等。
  • routingKey
    • 路由鍵,用來(lái)匹配綁定(Binding)。例如:對(duì) DirectExchange 而言,需要隊(duì)列綁定時(shí)的路由鍵與發(fā)送時(shí)的路由鍵相同,消息才能到達(dá)隊(duì)列。
    • rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
  • 將消息先發(fā)送到交換機(jī),再根據(jù)路由鍵將消息投遞到目標(biāo)隊(duì)列。

3)發(fā)送帶消息屬性的消息

場(chǎng)景
需要為消息設(shè)置 TTL(過(guò)期時(shí)間)或優(yōu)先級(jí)等屬性,控制消息在隊(duì)列中的行為。

核心代碼示例

@Service
public class PropertyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送帶消息屬性的消息(如 TTL, 優(yōu)先級(jí))
     */
    public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
        // 1.創(chuàng)建 MessageProperties 對(duì)象,用于指定消息的屬性
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("10000"); // 過(guò)期時(shí)間:10秒 (單位:毫秒)
        properties.setPriority(5);        // 優(yōu)先級(jí)設(shè)為 5
        // 2.根據(jù)消息體和屬性構(gòu)建 Message 對(duì)象
        Message message = new Message(messageContent.getBytes(), properties);
        // 3.使用 send 方法(而非 convertAndSend)直接發(fā)送 Message 對(duì)象
        rabbitTemplate.send(exchange, routingKey, message);
        System.out.println("Message with properties sent: " + messageContent);
    }
}

代碼詳解

  • MessageProperties properties = new MessageProperties();
    • MessageProperties 用于設(shè)置 AMQP 協(xié)議層的各種消息頭信息。
  • properties.setExpiration("10000");
    • setExpiration 設(shè)置消息的 TTL(Time-To-Live),單位是毫秒。如果到達(dá)時(shí)間后消息仍未被消費(fèi),RabbitMQ 會(huì)將其從隊(duì)列中移除并送入死信隊(duì)列(如果配置了死信隊(duì)列)。
    • properties.setPriority(5);
  • 設(shè)置消息的優(yōu)先級(jí)為 5,前提是隊(duì)列本身需要支持優(yōu)先級(jí)隊(duì)列(創(chuàng)建隊(duì)列時(shí)指定 x-max-priority)。
    • new Message(messageContent.getBytes(), properties)
  • 將純文本消息轉(zhuǎn)換為 Message 對(duì)象,結(jié)合了消息屬性和消息體。
    • rabbitTemplate.send(exchange, routingKey, message);
  • convertAndSend 不同,它不會(huì)嘗試進(jìn)行消息轉(zhuǎn)換(如 JSON、字符串),而是直接發(fā)送完整的 AMQP Message 對(duì)象。

Message 構(gòu)造函數(shù) 

public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
}
body:消息體的字節(jié)數(shù)組。
messageProperties:AMQP 的消息屬性,包括 TTL、優(yōu)先級(jí)、headers 等。、

如果消息體不是String類型

手動(dòng)轉(zhuǎn)換為字節(jié):你可以先將自定義對(duì)象轉(zhuǎn)換為字節(jié)數(shù)組(例如通過(guò) JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一個(gè)參數(shù)。
MyCustomObject obj = new MyCustomObject();
// 假設(shè)你想用 JSON
String jsonString = new Gson().toJson(obj);
byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);
MessageProperties properties = new MessageProperties();
// 設(shè)置一些屬性
Message message = new Message(body, properties);
  • 為什么不會(huì)自動(dòng)轉(zhuǎn) JSON?使用 new Message(...) 構(gòu)造方法是“純” AMQP 層的做法,不會(huì)調(diào)用 Spring 的轉(zhuǎn)換器,因此你必須自己處理序列化。
  • 使用 Message 構(gòu)造函數(shù) 時(shí),你必須自行處理對(duì)象到 byte[] 的轉(zhuǎn)換(無(wú)論是字符串、JSON,還是其他格式)。
  • 如果想讓 Spring AMQP 自動(dòng)轉(zhuǎn)換,你通常使用 rabbitTemplate.convertAndSend(Object msg) 這種高級(jí) API,或者配置自定義 MessageConverter。

3.2 消息消費(fèi)者(Consumer)

消費(fèi)者的核心功能是在指定的隊(duì)列中監(jiān)聽(tīng)消息,并根據(jù)配置的確認(rèn)模式(自動(dòng)確認(rèn)或手動(dòng)確認(rèn))對(duì)消息進(jìn)行處理或拒絕。

1)監(jiān)聽(tīng)隊(duì)列并消費(fèi)消息

核心代碼示例(自動(dòng)確認(rèn)模式)

@Service
public class Consumer {
    /**
     * 使用注解 @RabbitListener 指定要監(jiān)聽(tīng)的隊(duì)列
     * 由于默認(rèn)為 auto-ack 模式,
     * 當(dāng)消息到達(dá)后,RabbitMQ 會(huì)自動(dòng)確認(rèn)并從隊(duì)列中刪除該消息。
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        // 1.從 queueName 隊(duì)列中取到的消息內(nèi)容
        System.out.println("Received message: " + message);
        // 2.在 auto-ack 模式下,無(wú)需手動(dòng) ack
        //  如果這里出現(xiàn)異常,RabbitMQ 不會(huì)再次發(fā)送消息給消費(fèi)者,消息會(huì)丟失。
    }
}

代碼詳解(自動(dòng)確認(rèn)模式)

  • @RabbitListener(queues = "demo-queue")
    • 聲明監(jiān)聽(tīng)名為 demo-queue 的隊(duì)列。
    • 一旦有新消息到達(dá)該隊(duì)列,就會(huì)自動(dòng)回調(diào)此方法。
  • public void receiveMessage(String message)
    • 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會(huì)嘗試將其轉(zhuǎn)換為 String 并注入到 message 中。
  • 自動(dòng)確認(rèn)(auto-ack)的風(fēng)險(xiǎn)
    • 如果消費(fèi)者在處理消息時(shí)拋出異常,消息已經(jīng)被 RabbitMQ 標(biāo)記為“已確認(rèn)”,不會(huì)再重新發(fā)送或進(jìn)入死信隊(duì)列,導(dǎo)致消息丟失。

2)確認(rèn)機(jī)制

自動(dòng)確認(rèn)(auto-ack)

  • 行為
    • 當(dāng)消費(fèi)者從隊(duì)列中獲取消息后,RabbitMQ 會(huì)立即將該消息標(biāo)記為已確認(rèn)(acknowledged),并從隊(duì)列中刪除。
  • 問(wèn)題
    • 如果消息處理失敗(例如消費(fèi)者拋出異常),消息已經(jīng)被確認(rèn)并從隊(duì)列中刪除,無(wú)法重新處理。
    • 如果消費(fèi)者崩潰或斷開(kāi)連接,未處理的消息會(huì)丟失。
  • 適用場(chǎng)景
    • 對(duì)消息處理的可靠性要求不高的場(chǎng)景。

手動(dòng)確認(rèn)(manual-ack)

  • 行為
    • 消費(fèi)者處理完消息后,必須顯式調(diào)用 basicAck 方法確認(rèn)消息。
    • 如果消息處理失敗,可以調(diào)用 basicNack 或 basicReject 方法拒絕消息。
  • 優(yōu)點(diǎn)
    • 確保消息處理的可靠性。
    • 支持消息重新入隊(duì)或發(fā)送到死信隊(duì)列。
  • 適用場(chǎng)景
    • 對(duì)消息處理的可靠性要求較高的場(chǎng)景。

核心代碼示例:

@Service
public class ManualAckConsumer {
    /**
     * 在 application.properties 中配置:
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * 使得 RabbitMQ 使用手動(dòng)確認(rèn)模式
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            // 1.從消息中獲取消息體
            String body = new String(message.getBody());
            System.out.println("Processing message: " + body);
            // 2.如果業(yè)務(wù)處理成功,則調(diào)用 basicAck 手動(dòng)確認(rèn)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("Message processing failed: " + e.getMessage());
            // 3.如果處理失敗,需要決定是重新入隊(duì)還是拒絕并進(jìn)入死信隊(duì)列
            // requeue = true  -> 重新入隊(duì)
            // requeue = false -> 丟棄或進(jìn)入死信隊(duì)列(根據(jù)隊(duì)列配置)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

代碼詳解

配置手動(dòng)確認(rèn)

application.properties 添加

spring.rabbitmq.listener.simple.acknowledge-mode=manual

表示 Spring AMQP 使用手動(dòng)確認(rèn)模式(manual-ack)。

public void receiveMessage(Message message, Channel channel)

與自動(dòng)確認(rèn)不同,這里不僅接收字符串,還接收了 org.springframework.amqp.core.Message 對(duì)象和 com.rabbitmq.client.Channel。Message:包含消息體(body)和消息屬性(headers 等)。Channel:給我們提供了 basicAck, basicNack, basicReject 等底層 AMQP 操作。

手動(dòng)確認(rèn)成功

channel.basicAck(deliveryTag, multiple)

deliveryTag

本次消息的唯一標(biāo)記,從 message.getMessageProperties().getDeliveryTag() 獲取。

multiple = false:只確認(rèn)當(dāng)前這條消息。

basicAck(long deliveryTag, boolean multiple)

這里的 deliveryTag 并不是在你構(gòu)造 Message 時(shí)生成的,而是 RabbitMQ Broker 在投遞消息給消費(fèi)者時(shí)由底層 AMQP 協(xié)議自動(dòng)分配的一個(gè)遞增的序號(hào)
long deliveryTag = message.getMessageProperties().getDeliveryTag();

手動(dòng)確認(rèn)失敗

  • channel.basicNack(deliveryTag, multiple, requeue)basicReject
    • requeue = true:將消息重新放回隊(duì)列等待下一次消費(fèi)(可能導(dǎo)致死循環(huán),如處理一直失?。?/li>
    • requeue = false:拒絕消息,若配置了死信隊(duì)列,則進(jìn)入死信隊(duì)列;否則丟棄消息。

3)處理消費(fèi)失敗

  • 自動(dòng)確認(rèn)模式下的處理
  • 在自動(dòng)確認(rèn)模式下,如果消息處理失敗,RabbitMQ 不會(huì)重新發(fā)送消息,因?yàn)橄⒁呀?jīng)被確認(rèn)并從隊(duì)列中刪除。
  • 問(wèn)題
  • 消息丟失,無(wú)法重新處理。
  • 手動(dòng)確認(rèn)模式下的處理
  • 在手動(dòng)確認(rèn)模式下,如果消息處理失敗,可以通過(guò)以下方式處理:
  • 重新入隊(duì)
  • 調(diào)用 basicNack 或 basicReject 方法,并將 requeue 參數(shù)設(shè)置為 true。
  • 消息會(huì)重新進(jìn)入隊(duì)列,等待下一次消費(fèi)。
  • 發(fā)送到死信隊(duì)列
  • 調(diào)用 basicNack 或 basicReject 方法,并將 requeue 參數(shù)設(shè)置為 false。
  • 如果隊(duì)列配置了死信隊(duì)列,消息會(huì)被發(fā)送到死信隊(duì)列。
  • 重試機(jī)制(Spring AMQP 提供的簡(jiǎn)單重試)(只支持手動(dòng)確認(rèn)機(jī)制)

是重試失敗了才會(huì)將消息重新入隊(duì) ,所以重試在前,重新入隊(duì)在后

# 啟用重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重試間隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 間隔倍數(shù)
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重試間隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
  • Spring AMQP 提供了 重試機(jī)制,可以在消費(fèi)者處理消息失敗時(shí),自動(dòng)進(jìn)行多次重試,而不是直接將消息重新入隊(duì)。

行為

  • 當(dāng)消息處理失敗時(shí),Spring AMQP 會(huì)在 本地 進(jìn)行重試(即不將消息重新入隊(duì)),直到達(dá)到最大重試次數(shù)。
  • 如果重試次數(shù)用盡,消息會(huì)被拒絕(basicNack 或 basicReject),并根據(jù)配置決定是否重新入隊(duì)或發(fā)送到死信隊(duì)列。

死信隊(duì)列(DLQ)

  • 當(dāng)消息被拒絕或過(guò)期時(shí),RabbitMQ 會(huì)將其發(fā)送到我們配置的死信交換機(jī)(DLX),再路由到死信隊(duì)列(DLQ)。
  • 配置示例
@Configuration
public class RabbitConfig {
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal-queue")
                .withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // 指定死信交換機(jī)
                .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由鍵
                .build();
    }
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead-letter-exchange");
    }
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead-letter-queue");
    }
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
    }
}

原理

  • 正常隊(duì)列通過(guò) x-dead-letter-exchange 指定死信交換機(jī),一旦消息被拒絕(requeue=false)或超時(shí)(TTL 到期),RabbitMQ 會(huì)把消息發(fā)送到 dead-letter-exchange
  • dead-letter-exchangedead-letter-queue 進(jìn)行綁定(路由鍵 dead-letter-routing-key),從而實(shí)現(xiàn)死信隊(duì)列的存儲(chǔ)。

重新入隊(duì) vs 發(fā)送到死信隊(duì)列

  • 重新入隊(duì)channel.basicNack(deliveryTag, false, true)
  • 適用于臨時(shí)性錯(cuò)誤,比如數(shù)據(jù)庫(kù)鎖沖突、網(wǎng)絡(luò)抖動(dòng)等,等待后續(xù)重新處理。
  • 發(fā)送到死信隊(duì)列channel.basicNack(deliveryTag, false, false)
  • 適用于永久性錯(cuò)誤,比如消息格式無(wú)法解析,或業(yè)務(wù)邏輯指定不應(yīng)再嘗試。

到此這篇關(guān)于Spring boot框架下的RabbitMQ消息中間件的文章就介紹到這了,更多相關(guān)Spring boot RabbitMQ消息中間件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 基于Java Springboot + Vue + MyBatis實(shí)現(xiàn)音樂(lè)播放系統(tǒng)

    基于Java Springboot + Vue + MyBatis實(shí)現(xiàn)音樂(lè)播放系統(tǒng)

    這篇文章主要介紹了一個(gè)完整的音樂(lè)播放系統(tǒng)是基于Java Springboot + Vue + MyBatis編寫的,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08
  • IDEA入門級(jí)使用教程你居然還在用eclipse?

    IDEA入門級(jí)使用教程你居然還在用eclipse?

    上個(gè)月,idea的使用量超越eclipse的消息席卷了整個(gè)IT界,idea到底好在哪里呢?下面小編通過(guò)本文給大家詳細(xì)介紹下IDEA入門級(jí)使用教程,非常詳細(xì),感興趣的朋友一起看看吧
    2020-10-10
  • java8 stream多字段排序的實(shí)現(xiàn)

    java8 stream多字段排序的實(shí)現(xiàn)

    這篇文章主要介紹了java8 stream多字段排序的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Springboot解決no main manifest attribute錯(cuò)誤

    Springboot解決no main manifest attribute錯(cuò)誤

    在開(kāi)發(fā)Springboot項(xiàng)目時(shí),使用java -jar命令運(yùn)行jar包可能出現(xiàn)no main manifest attribute錯(cuò)誤,本文就來(lái)介紹一下該錯(cuò)誤的解決方法,感興趣的可以了解一下
    2024-09-09
  • Idea熱加載插件JRebel激活以及使用教程

    Idea熱加載插件JRebel激活以及使用教程

    JRebel是一款JVM插件,它使得Java代碼修改后不用重啟系統(tǒng),立即生效,下面這篇文章主要給大家介紹了關(guān)于Idea熱加載插件JRebel激活以及使用的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • SpringBoot實(shí)現(xiàn)自動(dòng)配置的方式詳解

    SpringBoot實(shí)現(xiàn)自動(dòng)配置的方式詳解

    Spring Boot 自動(dòng)配置 是其核心特性之一,它通過(guò)智能化的默認(rèn)配置減少了開(kāi)發(fā)者的工作量,自動(dòng)配置的原理基于條件化配置和 Spring 的 @Configuration 機(jī)制,本文給大家講解了SpringBoot實(shí)現(xiàn)自動(dòng)配置的過(guò)程,需要的朋友可以參考下
    2025-04-04
  • 一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 使用Java判定一個(gè)數(shù)值是否在指定的開(kāi)閉區(qū)間范圍內(nèi)

    使用Java判定一個(gè)數(shù)值是否在指定的開(kāi)閉區(qū)間范圍內(nèi)

    這篇文章主要給大家介紹了關(guān)于使用Java判定一個(gè)數(shù)值是否在指定的開(kāi)閉區(qū)間范圍內(nèi)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-09-09
  • 用Java8 stream處理數(shù)據(jù)

    用Java8 stream處理數(shù)據(jù)

    這篇文章主要介紹了用Java8 stream處理數(shù)據(jù),Java 8 API的設(shè)計(jì)者重新提出了一個(gè)新的抽象稱為流Stream,可以讓我們以一種聲明的方式處理數(shù)據(jù),此外,數(shù)據(jù)流可以充分利用多核架構(gòu)而無(wú)需編寫多線程的一行代碼,下面我們一起來(lái)看看文章詳細(xì)介紹
    2021-11-11
  • SpringBoot集成Redis實(shí)現(xiàn)驗(yàn)證碼的簡(jiǎn)單案例

    SpringBoot集成Redis實(shí)現(xiàn)驗(yàn)證碼的簡(jiǎn)單案例

    本文主要介紹了SpringBoot集成Redis實(shí)現(xiàn)驗(yàn)證碼的簡(jiǎn)單案例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08

最新評(píng)論