欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例

 更新時(shí)間:2025年09月23日 09:48:52   作者:TanYYF  
本文主要介紹了springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1 RabbitMQ 核心概念

RabbitMQ 是一個(gè)開源的消息代理軟件,實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議 (AMQP 0-9-1),為應(yīng)用程序提供了異步通信的能力。在深入了解消息發(fā)送機(jī)制之前,我們需要理解幾個(gè)核心概念:

  • 生產(chǎn)者 (Producer):發(fā)送消息的應(yīng)用程序,負(fù)責(zé)創(chuàng)建消息并將其發(fā)布到 RabbitMQ 交換器。
  • 消費(fèi)者 (Consumer):接收消息的應(yīng)用程序,從隊(duì)列中獲取消息并進(jìn)行處理。
  • 交換器 (Exchange):消息的入口點(diǎn),生產(chǎn)者將消息發(fā)送到交換器。交換器根據(jù)特定規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。
  • 隊(duì)列 (Queue):存儲(chǔ)消息的緩沖區(qū),類似于郵箱名稱。消息一直在隊(duì)列中等待,直到被消費(fèi)者處理。
  • 綁定 (Binding):連接交換器和隊(duì)列的路由規(guī)則,定義了消息應(yīng)該如何從交換器流轉(zhuǎn)到隊(duì)列。
  • 路由鍵 (Routing Key):生產(chǎn)者發(fā)送消息時(shí)指定的一個(gè)屬性,交換器使用它來確定如何路由消息。

消息流經(jīng) RabbitMQ 的基本過程是:

  • 生產(chǎn)者創(chuàng)建消息并將其發(fā)送到交換器。
  • 交換器接收消息并根據(jù)其類型和綁定規(guī)則決定將消息路由到哪些隊(duì)列。
  • 消息存儲(chǔ)在隊(duì)列中,直到被消費(fèi)者處理。
  • 消費(fèi)者從隊(duì)列中獲取消息并進(jìn)行處理。

這種架構(gòu)提供了應(yīng)用程序解耦、異步通信和流量緩沖等優(yōu)勢(shì),使分布式系統(tǒng)更加靈活和可靠。

2.RabbitMQ 交換器類型詳解

RabbitMQ中的Exchange(交換器)是消息路由的核心組件,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)特定的路由規(guī)則將消息分發(fā)到一個(gè)或多個(gè)隊(duì)列中。在RabbitMQ 3.13中,主要有以下幾種交換器類型:

2.1. Direct Exchange(直連交換器)

特點(diǎn)

  • 基于精確匹配的路由機(jī)制
  • 消息的Routing Key必須與Binding Key完全匹配才會(huì)被路由到相應(yīng)隊(duì)列

工作原理

  • 當(dāng)隊(duì)列綁定到Direct Exchange時(shí),需要指定一個(gè)Binding Key
  • 只有當(dāng)消息的Routing Key與Binding Key完全一致時(shí),消息才會(huì)被路由到該隊(duì)列

應(yīng)用場(chǎng)景

  • 適用于一對(duì)一的精確消息投遞場(chǎng)景
  • 比如將路由鍵為"sms"的消息只投遞給綁定鍵也為"sms"的隊(duì)列

2.2. Fanout Exchange(扇出交換器)

特點(diǎn)

  • 廣播模式,忽略Routing Key
  • 將消息路由到所有綁定到該交換器的隊(duì)列

工作原理

  • 不需要指定Binding Key
  • 發(fā)送到Fanout Exchange的每條消息都會(huì)被轉(zhuǎn)發(fā)到所有綁定的隊(duì)列

應(yīng)用場(chǎng)景

  • 適用于廣播通知場(chǎng)景
  • 需要將同一條消息發(fā)送給多個(gè)消費(fèi)者的場(chǎng)景

2.3. Topic Exchange(主題交換器)

特點(diǎn)

  • 基于模式匹配的路由機(jī)制
  • 支持通配符匹配Routing Key和Binding Key

工作原理

  • Routing Key和Binding Key都是包含".“分隔符的字符串(如"quick.orange.rabbit”)
  • 支持兩種通配符:
    • “*”:匹配一個(gè)單詞
    • “#”:匹配零個(gè)或多個(gè)單詞

應(yīng)用場(chǎng)景

  • 適用于基于主題的發(fā)布訂閱模式
  • 比如綁定鍵為"*.stock.usd"的隊(duì)列會(huì)收到所有以任意單詞開頭,以"stock.usd"結(jié)尾的路由鍵的消息

2.4. Headers Exchange(頭交換器)

特點(diǎn)

  • 基于消息頭部屬性進(jìn)行路由
  • 不依賴Routing Key進(jìn)行匹配

工作原理

  • 根據(jù)消息的Headers屬性進(jìn)行匹配
  • 隊(duì)列綁定到Headers Exchange時(shí)需要指定匹配的頭部屬性

應(yīng)用場(chǎng)景

  • 適用于基于消息屬性的復(fù)雜路由場(chǎng)景
  • 但性能相對(duì)較差,實(shí)際應(yīng)用較少

2.5. Default Exchange(默認(rèn)交換器)

特點(diǎn)

  • 無名的Direct Exchange
  • 自動(dòng)存在于每個(gè)RabbitMQ實(shí)例中

工作原理

  • 當(dāng)隊(duì)列創(chuàng)建時(shí),會(huì)自動(dòng)綁定到默認(rèn)交換器
  • Routing Key就是目標(biāo)隊(duì)列的名稱

2.6. Dead Letter Exchange(死信交換器)

特點(diǎn)

  • 處理無法被正常投遞的消息
  • 不是標(biāo)準(zhǔn)的交換器類型,而是一種處理模式

工作原理

  • 當(dāng)消息被拒絕、過期或隊(duì)列達(dá)到最大長(zhǎng)度時(shí),可以配置將其發(fā)送到DLX
  • 便于對(duì)無法處理的消息進(jìn)行進(jìn)一步處理或記錄

總結(jié)

不同類型的交換器適用于不同的業(yè)務(wù)場(chǎng)景。在選擇交換器類型時(shí),需要根據(jù)具體的路由需求來決定:

  • 需要精確匹配時(shí)使用Direct Exchange
  • 需要廣播消息時(shí)使用Fanout Exchange
  • 需要基于模式匹配時(shí)使用Topic Exchange
  • 特殊情況下可以考慮Headers Exchange

3.spring boot 整合rabbitmq

3.1 項(xiàng)目結(jié)構(gòu)

spring-rabbitmq-demo/
├── src/
│   └── main/
│       ├── java/
│       │   └── cn/
│       │       └── spring/
│       │           └── rabbitmq/
│       │               └── demo/
│       │                   ├── config/
│       │                   │   ├── RabbitMQConfig.java
│       │                   │   ├── RabbitmqTemplatePostProcessor.java
│       │                   │   └── SwaggerConfig.java
│       │                   ├── controller/
│       │                   │   └── rabbitmq/
│       │                   │       └── RabbitmqDemoController.java
│       │                   ├── rabbitmq/
│       │                   │   ├── consumer/
│       │                   │   │   └── MessageConsumer.java
│       │                   │   ├── message/
│       │                   │   │   ├── BaseMessage.java
│       │                   │   │   └── DemoMessage.java
│       │                   │   └── sender/
│       │                   │       └── MessageSender.java
│       │                   └── RabbitmqDemoApplication.java
│       └── resources/
│           ├── application.yaml
│           └── logback-spring.xml
└── pom.xml

3.2 依賴配置

Maven 依賴

在 pom.xml 文件中添加以下依賴:

<dependencies>
    <!-- Web 相關(guān) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- RabbitMQ 相關(guān) -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    
    <!-- Swagger 相關(guān) -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
        <version>2.8.11</version>
    </dependency>
    
    <!-- Jackson 用于JSON序列化 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- hutool 工具包 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.15</version>
    </dependency>
</dependencies>

3.3. 配置文件

server:
  port: 8081
spring:
  application:
    name: spring-rabbitmq-demo
  profiles:
    active: dev

  main:
    allow-circular-references: true # 允許循環(huán)依賴
--- #############rabbitmq配置#####################
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xxxx
    password: xxxx
    virtual-host: /
    publisher-returns: true # 啟用發(fā)布者返回功能,如果消息沒有到達(dá)隊(duì)列,則會(huì)通知生產(chǎn)者。
    # NONE(默認(rèn)值):不返回任何信息。
    # SIMPLE(簡(jiǎn)單模式)使用同步阻塞方式等待MQ的確認(rèn)回執(zhí),發(fā)送消息后會(huì)阻塞當(dāng)前線程,直到收到確認(rèn)結(jié)果,性能相對(duì)較低,因?yàn)樾枰却_認(rèn)結(jié)果
    # CORRELATED(關(guān)聯(lián)模式):使用異步非阻塞方式,生產(chǎn)者發(fā)送消息后,不等待MQ的確認(rèn)回執(zhí),而是直接返回,并通過回調(diào)函數(shù)的方式通知生產(chǎn)者。
    publisher-confirm-type: NONE
    listener:
      # SIMPLE:RabbitMQ 消費(fèi)者將消息分發(fā)到一個(gè)單獨(dú)的線程(Invoker Thread)進(jìn)行處理,消費(fèi)者線程與業(yè)務(wù)處理線程是分離的
      #特點(diǎn):
          #異步處理:消息消費(fèi)與業(yè)務(wù)處理在不同線程中執(zhí)行
          #更好的容錯(cuò)性:業(yè)務(wù)處理異常不會(huì)直接影響 RabbitMQ 消費(fèi)者線程
          #更高的資源消耗:需要額外的線程來進(jìn)行消息分發(fā)和處理
          #支持并發(fā)消費(fèi)者配置
      #DIRECT:適用于對(duì)延遲敏感、吞吐量要求高的場(chǎng)景,或者資源受限的環(huán)境
        #工作原理:監(jiān)聽器直接在 RabbitMQ 消費(fèi)者線程上調(diào)用執(zhí)行,沒有額外的分發(fā)線程
          #特點(diǎn):
          #同步處理:消息消費(fèi)與業(yè)務(wù)處理在同一個(gè)線程中執(zhí)行
          #更低的延遲:沒有線程切換開銷
          #更少的資源消耗:不需要額外的線程池
          #更簡(jiǎn)單的線程模型:更容易調(diào)試和分析
          #業(yè)務(wù)處理的異常會(huì)直接影響消費(fèi)者線程
      #STREAM(流容器)
        #工作原理:使用 RabbitMQ Stream Client 處理消息流
          #特點(diǎn):
          #專為高吞吐量和持久化流處理設(shè)計(jì)
          #支持超大規(guī)模消息保留
          #支持消費(fèi)者組和消息重播
          #適用于需要處理大量歷史數(shù)據(jù)的場(chǎng)景
          #需要 RabbitMQ 3.9+ 版本支持
      type: simple # 使用simple類型監(jiān)聽器容器
      simple:
        # 是否在啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器,默認(rèn)為true 當(dāng)設(shè)置為 true 時(shí),容器會(huì)在 Spring 應(yīng)用上下文啟動(dòng)完成后自動(dòng)開始監(jiān)聽消息;設(shè)置為 false 時(shí),需要手動(dòng)調(diào)用 start() 方法啟動(dòng)容器
        auto-startup: true
        # 偵聽器調(diào)用者線程的最小數(shù)量,默認(rèn)為1 控制并發(fā)消費(fèi)者的最小數(shù)量,用于處理消息的并行度
        concurrency: 1
        # 偵聽器調(diào)用者線程的最大數(shù)量,默認(rèn)為1(與concurrency相同)當(dāng)消息負(fù)載較高時(shí),容器可以動(dòng)態(tài)擴(kuò)展到的最大消費(fèi)者數(shù)量
        max-concurrency: 1
        # 每個(gè)消費(fèi)者能夠同時(shí)存在且未被確認(rèn)的消息的最大數(shù)量。,默認(rèn)為250
        prefetch: 250
        # 確認(rèn)模式,可選值:NONE(不確認(rèn))、MANUAL(手動(dòng)確認(rèn))、AUTO(自動(dòng)確認(rèn)),默認(rèn)為AUTO
        acknowledge-mode: AUTO
        # 默認(rèn)情況下,拒絕交付是否重新排隊(duì),默認(rèn)為true 當(dāng)監(jiān)聽器方法拋出異常時(shí),決定消息是否重新放回隊(duì)列。設(shè)置為 false 可以避免消息無限重試
        default-requeue-rejected: true
        # 應(yīng)該多久發(fā)布一次空閑容器事件,默認(rèn)不發(fā)布(無默認(rèn)值) 用于監(jiān)控容器狀態(tài),當(dāng)容器在指定時(shí)間內(nèi)沒有消息處理時(shí)會(huì)發(fā)布 ApplicationEvent
        idle-event-interval: 0ms
        # 是否將批處理消息作為離散消息傳遞,或者將整個(gè)批處理傳遞給監(jiān)聽器,默認(rèn)為true 當(dāng)啟用時(shí),批量消息會(huì)被分解為單條消息分別處理;禁用時(shí),整個(gè)批次作為一個(gè)消息傳遞給監(jiān)聽器
        de-batching-enabled: true
        # 當(dāng)容器停止時(shí)是否立即停止還是處理完所有預(yù)取的消息,默認(rèn)為false 設(shè)置為 true 時(shí),容器會(huì)在處理完當(dāng)前消息后立即停止;false 時(shí),會(huì)處理完所有預(yù)取消息后再停止
        force-stop: false
        # 是否啟用觀察(Observation),默認(rèn)為false 啟用后可以通過 Micrometer 收集容器的指標(biāo)信息
        observation-enabled: false
        # 批次大小,以物理消息數(shù)量表示,默認(rèn)為1 與批量處理相關(guān),定義每個(gè)批次包含的消息數(shù)量
        batch-size: 10
        # 如果容器聲明的隊(duì)列在代理上不可用,是否失敗,默認(rèn)為true 設(shè)置為 true 時(shí),如果隊(duì)列不可用容器會(huì)失敗或停止;false 時(shí)容器會(huì)繼續(xù)運(yùn)行
        missing-queues-fatal: true
        # 是否基于'receive-timeout'和'batch-size'創(chuàng)建消息批次,默認(rèn)為false 啟用后會(huì)將 deBatchingEnabled 強(qiáng)制設(shè)為 true,將生產(chǎn)者創(chuàng)建的批次內(nèi)容作為離散記錄包含在批次中
        consumer-batch-enabled: false
        # 適中的接收超時(shí)時(shí)間
        receive-timeout: 5000ms
        # 重試相關(guān)配置
        retry:
          # 是否啟用重試機(jī)制,默認(rèn)為false
          enabled: false
          # 最大嘗試次數(shù),默認(rèn)為3
          max-attempts: 3
          # 初始重試間隔時(shí)間,默認(rèn)為1000ms
          initial-interval: 1000ms
          # 最大重試間隔時(shí)間,默認(rèn)為10000ms
          max-interval: 10000ms
          # 重試間隔的乘數(shù),默認(rèn)為2.0
          multiplier: 2.0
          # 重試時(shí)是有狀態(tài)還是無狀態(tài),默認(rèn)為true(無狀態(tài))
          stateless: true
    template:
      # 是否啟用強(qiáng)制消息投遞,默認(rèn)為false
      # 當(dāng)設(shè)置為true時(shí),如果消息無法路由到隊(duì)列,會(huì)拋出AmqpMessageReturnedException異常
      # 需要配合RabbitTemplate的ReturnsCallback使用
      mandatory: false
      # receive()操作的超時(shí)時(shí)間,默認(rèn)為0ms(無限等待)
      # 用于receive()方法調(diào)用時(shí)的等待超時(shí)時(shí)間
      receive-timeout: 0ms
      # sendAndReceive()操作的超時(shí)時(shí)間,默認(rèn)為5000ms(5秒)
      # 用于請(qǐng)求-回復(fù)模式下的等待超時(shí)時(shí)間
      reply-timeout: 5000ms
      # 發(fā)送操作使用的默認(rèn)交換機(jī)名稱,默認(rèn)為空字符串(使用默認(rèn)交換機(jī))
      # 當(dāng)使用RabbitTemplate發(fā)送消息時(shí)不指定交換機(jī)時(shí)使用此默認(rèn)值
      exchange: ""
      # 發(fā)送操作使用的默認(rèn)路由鍵,默認(rèn)為空字符串
      # 當(dāng)使用RabbitTemplate發(fā)送消息時(shí)不指定路由鍵時(shí)使用此默認(rèn)值
      routing-key: ""
      # 當(dāng)沒有明確指定接收隊(duì)列時(shí),默認(rèn)接收消息的隊(duì)列名稱,默認(rèn)為null
      # 用于RabbitTemplate接收消息時(shí)的默認(rèn)隊(duì)列
      default-receive-queue:
      # 是否啟用觀察(Observation),默認(rèn)為false
      # 啟用后可以通過Micrometer收集RabbitTemplate的指標(biāo)信息
      observation-enabled: false
      # 用于反序列化時(shí)允許的包/類的簡(jiǎn)單模式列表,默認(rèn)為null
      # 用于控制哪些類可以被反序列化,防止不安全的反序列化
      allowed-list-patterns:
      # 重試相關(guān)配置
      retry:
        # 是否啟用重試機(jī)制,默認(rèn)為false
        # 啟用后RabbitTemplate在發(fā)送消息失敗時(shí)會(huì)進(jìn)行重試
        enabled: false
        # 最大嘗試次數(shù),默認(rèn)為3次
        max-attempts: 3
        # 初始重試間隔時(shí)間,默認(rèn)為1000ms(1秒)
        initial-interval: 1000ms
        # 最大重試間隔時(shí)間,默認(rèn)為10000ms(10秒)
        max-interval: 10000ms
        # 重試間隔的乘數(shù),默認(rèn)為2.0
        # 每次重試的間隔時(shí)間會(huì)乘以這個(gè)數(shù)值
        multiplier: 2.0

3.4 核心組件實(shí)現(xiàn)

3.4.1 消息實(shí)體類

BaseMessage.java

@Data
public class BaseMessage implements java.io.Serializable {
    @Serial
    private static final long serialVersionUID = 1L;
    
    protected Long deliveryTag = IdUtil.getSnowflake().nextId();
}

DemoMessage.java

@Data
public class DemoMessage extends BaseMessage{
    private Long userId;
    private String message;
    private Date createTime;
}

3.4.2 RabbitMQ 配置類

RabbitMQConfig.java

@Configuration
@Import(RabbitmqTemplatePostProcessor.class)
public class RabbitMQConfig {

    /**
     * 延遲交換機(jī)(用于接收延遲消息)
     *
     * @return
     */
    @Bean
    public Exchange delayExchange() {
        return ExchangeBuilder.directExchange("delay.exchange")
                .durable(true)
                .build();
    }

    /**
     * 延遲隊(duì)列(沒有消費(fèi)者監(jiān)聽)
     * 設(shè)置死信交換機(jī)和路由鍵,當(dāng)消息過期后自動(dòng)轉(zhuǎn)發(fā)到死信交換機(jī)
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("delay.queue")
                // 設(shè)置死信交換機(jī)
                .deadLetterExchange("delay.process.exchange")
                // 設(shè)置死信路由鍵
                .deadLetterRoutingKey("delay.process")
                .build();
    }

    /**
     * 延遲交換機(jī)與延遲隊(duì)列綁定
     *
     * @return
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with("delay")
                .noargs();
    }


    /**
     * json消息轉(zhuǎn)換器
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 為批量處理創(chuàng)建專用的監(jiān)聽器容器工廠
     *
     * @param connectionFactory
     * @return
     */
    @Bean("batchListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(3);
        factory.setPrefetchCount(10);
        factory.setBatchListener(true); // 啟用批量監(jiān)聽
        factory.setConsumerBatchEnabled(true); // 啟用消費(fèi)者端批量處理
        factory.setBatchSize(10); // 設(shè)置批次大小
        factory.setReceiveTimeout(5000L); // 設(shè)置接收超時(shí)時(shí)間
        factory.setBatchReceiveTimeout(5000L);
        factory.setDeBatchingEnabled(false); // 禁用分解批處理消息
        factory.setMessageConverter(messageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 設(shè)置為 true 可在當(dāng)前消息處理完畢后停止容器,并重新排隊(duì)任何預(yù)取的消息。在使用獨(dú)占或單活動(dòng)消費(fèi)者時(shí)很有用
        factory.setForceStop(true);
        // 設(shè)置關(guān)閉超時(shí)時(shí)間
        factory.setContainerCustomizer((container -> {
            container.setShutdownTimeout(30000L);
            // container.setExclusive(true);
        }));
        return factory;
    }
}

RabbitmqTemplatePostProcessor.java
RabbitTemplate 啟用 ConfirmCallback、ReturnsCallback 和消息持久化 需要配合配置:

publisher-returns: true # 啟用發(fā)布者返回功能,如果消息沒有到達(dá)隊(duì)列,則會(huì)通知生產(chǎn)者。
    # NONE(默認(rèn)值):不返回任何信息。
    # SIMPLE(簡(jiǎn)單模式)使用同步阻塞方式等待MQ的確認(rèn)回執(zhí),發(fā)送消息后會(huì)阻塞當(dāng)前線程,直到收到確認(rèn)結(jié)果,性能相對(duì)較低,因?yàn)樾枰却_認(rèn)結(jié)果
    # CORRELATED(關(guān)聯(lián)模式):使用異步非阻塞方式,生產(chǎn)者發(fā)送消息后,不等待MQ的確認(rèn)回執(zhí),而是直接返回,并通過回調(diào)函數(shù)的方式通知生產(chǎn)者。
    publisher-confirm-type: NONE
@Slf4j
public class RabbitmqTemplatePostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RabbitTemplate) {
            RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;

            // 啟用發(fā)送確認(rèn)機(jī)制(ConfirmCallback)
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        log.info("消息成功發(fā)送到交換機(jī),correlationData: {}", correlationData);
                    } else {
                        log.error("消息發(fā)送到交換機(jī)失敗,correlationData: {}, cause: {}", correlationData, cause);
                    }
                }
            });

            // 啟用發(fā)送失敗回調(diào)機(jī)制(ReturnCallback)
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    log.error("消息無法路由到隊(duì)列,exchange: {}, routingKey: {}, message: {}, replyCode: {}, replyText: {}",
                            returnedMessage.getExchange(),
                            returnedMessage.getRoutingKey(),
                            returnedMessage.getMessage(),
                            returnedMessage.getReplyCode(),
                            returnedMessage.getReplyText());
                }
            });

            // 設(shè)置消息持久化
            rabbitTemplate.setMandatory(true);

            log.info("RabbitTemplate 配置完成:?jiǎn)⒂?ConfirmCallback、ReturnsCallback 和消息持久化");
        }
        return bean;
    }

}

3.4.3 消息發(fā)送者

MessageSender.java

@Component
public class MessageSender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.fanout.exchange:fanout-exchange}")
    private String fanoutExchange;

    @Value("${rabbitmq.topic.exchange:topic-exchange}")
    private String topicExchange;

    @Value("${rabbitmq.delay.exchange:delay.exchange}")
    private String delayExchange;


    /**
     * 發(fā)送消息到隊(duì)列demo.queue
     *
     * @param message
     */
    public void basicSend(DemoMessage message) {
        rabbitTemplate.convertAndSend("demo.queue", message);
    }


    /**
     * 發(fā)送消息到廣播exchange
     *
     * @param message
     */
    public void fanoutSend(DemoMessage message) {
        rabbitTemplate.convertAndSend(fanoutExchange, "", message);
    }

    /**
     * 發(fā)送消息到topic exchange
     *
     * @param message    消息內(nèi)容
     * @param routingKey 路由鍵
     */
    public void topicSend(DemoMessage message, String routingKey) {
        rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
    }

    /**
     * 發(fā)送延遲消息
     *
     * @param message 消息內(nèi)容
     * @param delay   延遲時(shí)間(毫秒)
     */
    public void delaySend(DemoMessage message, long delay) {
        rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> {
            if (delay > 0) {
                msg.getMessageProperties().setExpiration(String.valueOf(delay));
            }
            return msg;
        });
    }

    /**
     * 批量發(fā)送消息
     *
     * @param messages 消息列表
     */
    public void batchSend(List<DemoMessage> messages) {
        messages.forEach(message -> {
            rabbitTemplate.convertAndSend("batch.exchange", "batch", message, msg -> {
                // 自定義消息屬性
                return msg;
            });

        });
    }
}

3.4.4 消息消費(fèi)者

MessageConsumer.java

@Slf4j
@Component
public class MessageConsumer {

    @Resource
    private ObjectProvider<MessageConverter> messageConverterObjectProvider;

    /**
     * 監(jiān)聽并處理DemoMessage類型的消息
     *
     * @param message 消息內(nèi)容
     */
    @RabbitListener(queuesToDeclare = {@Queue("demo.queue")})
    public void handleMessageByAnnotation(DemoMessage message) {
        log.info("[handleMessageByAnnotation] 收到消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 監(jiān)聽廣播消息1
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue.1"),
            exchange = @Exchange(value = "fanout-exchange", type = "fanout")
    ))
    public void handleFanoutMessage1(DemoMessage message) {
        log.info("[handleFanoutMessage1] 收到廣播消息: userId={}, message={},  createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }


    /**
     * 監(jiān)聽廣播消息2
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue.2"),
            exchange = @Exchange(value = "fanout-exchange", type = "fanout")
    ))
    public void handleFanoutMessage2(DemoMessage message) {
        log.info("[handleFanoutMessage2] 收到廣播消息: userId={}, message={},  createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 監(jiān)聽topic消息1
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue.1"),
            exchange = @Exchange(value = "topic-exchange", type = "topic"),
            key = "topic.message.specific"
    ))
    public void handleTopicMessage1(DemoMessage message) {
        log.info("[handleTopicMessage1] 收到Topic消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 監(jiān)聽topic消息2
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue.2"),
            exchange = @Exchange(value = "topic-exchange", type = "topic"),
            key = "topic.message.*"
    ))
    public void handleTopicMessage2(DemoMessage message) {
        log.info("[handleTopicMessage2] 收到Topic消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 監(jiān)聽延遲消息處理隊(duì)列
     * 當(dāng)延遲隊(duì)列中的消息過期后,會(huì)被自動(dòng)轉(zhuǎn)發(fā)到此隊(duì)列進(jìn)行處理
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("delay.process.queue"),
            exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
            key = "delay.process"
    ))
    public void handleDelayMessage(DemoMessage message) {
        log.info("[handleDelayMessage] 收到延遲消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 批量處理消息
     *
     * @param messages
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("batch.queue"),
            exchange = @Exchange(value = "batch.exchange", type = "direct"),
            key = "batch"
    ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
    public void handleBatchMessage(List<Message> messages, Channel channel) {
        log.info("[handleBatchMessage] 開始處理批量消息,共 {} 條", messages.size());

        if (CollUtil.isEmpty(messages)) {
            log.info("[handleBatchMessage] 消息列表為空,無需處理");
            return;
        }

        // 分別存儲(chǔ)成功和失敗的消息
        List<Message> successMessages = new ArrayList<>();
        List<Message> failedMessages = new ArrayList<>();

        // 批量轉(zhuǎn)換消息
        for (Message message : messages) {
            try {
                DemoMessage demoMessage = (DemoMessage) messageConverterObjectProvider.getObject().fromMessage(message);
                demoMessage.setDeliveryTag(message.getMessageProperties().getDeliveryTag());
                successMessages.add(message);
                log.debug("[handleBatchMessage] 消息轉(zhuǎn)換成功: deliveryTag={}", message.getMessageProperties().getDeliveryTag());
            } catch (Exception e) {
                log.error("[handleBatchMessage] 消息轉(zhuǎn)換失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                failedMessages.add(message);
            }
        }

        // 處理成功轉(zhuǎn)換的消息
        if (CollUtil.isNotEmpty(successMessages)) {
            try {
                log.info("[handleBatchMessage] 開始處理 {} 條成功轉(zhuǎn)換的消息", successMessages.size());
                // 模擬處理時(shí)間 - 實(shí)際應(yīng)用中這里應(yīng)該是真正的業(yè)務(wù)邏輯
                processMessages(successMessages);

                // 批量確認(rèn)所有成功處理的消息
                for (Message message : successMessages) {
                    try {
                        long deliveryTag = message.getMessageProperties().getDeliveryTag();
                        channel.basicAck(deliveryTag, false);
                        log.debug("[handleBatchMessage] 消息確認(rèn)成功: deliveryTag={}", deliveryTag);
                    } catch (IOException e) {
                        log.error("[handleBatchMessage] 消息確認(rèn)失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                    }
                }
                log.info("[handleBatchMessage] 成功處理并確認(rèn) {} 條消息", successMessages.size());

            } catch (Exception e) {
                log.error("[handleBatchMessage] 處理消息時(shí)發(fā)生異常", e);
                // 如果處理過程中出現(xiàn)異常,將所有成功轉(zhuǎn)換的消息標(biāo)記為失敗
                failedMessages.addAll(successMessages);
            }
        }

        // 處理轉(zhuǎn)換失敗的消息
        if (!failedMessages.isEmpty()) {
            log.warn("[handleBatchMessage] 共 {} 條消息處理失敗,嘗試重新入隊(duì)", failedMessages.size());
            for (Message message : failedMessages) {
                try {
                    long deliveryTag = message.getMessageProperties().getDeliveryTag();
                    // 第三個(gè)參數(shù)設(shè)為true,表示將消息重新放回隊(duì)列
                    channel.basicNack(deliveryTag, false, true);
                    log.debug("[handleBatchMessage] 消息重新入隊(duì): deliveryTag={}", deliveryTag);
                } catch (IOException e) {
                    log.error("[handleBatchMessage] 消息重新入隊(duì)失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                }
            }
        }

        log.info("[handleBatchMessage] 批量消息處理完成: 成功={}條, 失敗={}條",
                successMessages.size(), failedMessages.size());
    }

    /**
     * 實(shí)際處理消息的方法
     * 在實(shí)際應(yīng)用中,這里應(yīng)該包含真正的業(yè)務(wù)邏輯
     *
     * @param messages 待處理的消息列表
     * @throws Exception 處理異常
     */
    private void processMessages(List<Message> messages) throws Exception {
        // 模擬處理時(shí)間
        Thread.sleep(50L);

        // 實(shí)際應(yīng)用中,這里應(yīng)該是真正的業(yè)務(wù)邏輯處理
        log.info("[processMessages] 處理了 {} 條消息", messages.size());
    }

3.5 關(guān)鍵技術(shù)點(diǎn)

3.5.1 消息確認(rèn)機(jī)制

在批量處理消息時(shí),使用手動(dòng)確認(rèn)模式(MANUAL)來確保消息處理的可靠性:

@RabbitListener(..., ackMode = "MANUAL")
public void handleBatchMessage(List<Message> messages, Channel channel) {
    // 處理消息
    // 手動(dòng)確認(rèn)消息
    channel.basicAck(deliveryTag, false);
}

3.5.2 批量消息處理

@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setBatchListener(true); // 啟用批量監(jiān)聽
    factory.setConsumerBatchEnabled(true); // 啟用消費(fèi)者端批量處理
    factory.setBatchSize(10); // 設(shè)置批次大小
    return factory;
}

/**
     * 批量處理消息
     *
     * @param messages
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("batch.queue"),
            exchange = @Exchange(value = "batch.exchange", type = "direct"),
            key = "batch"
    ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
    public void handleBatchMessage(List<Message> messages, Channel channel) {
    }

3.5.3 延遲消息

public void delaySend(DemoMessage message, long delay) {
    rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> {
        if (delay > 0) {
            msg.getMessageProperties().setExpiration(String.valueOf(delay));
        }
        return msg;
    });
}


/**
     * 監(jiān)聽延遲消息處理隊(duì)列
     * 當(dāng)延遲隊(duì)列中的消息過期后,會(huì)被自動(dòng)轉(zhuǎn)發(fā)到此隊(duì)列進(jìn)行處理
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("delay.process.queue"),
            exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
            key = "delay.process"
    ))
    public void handleDelayMessage(DemoMessage message) {
    ...
    }

4.最佳實(shí)踐

  1. 使用 JSON 序列化:通過 Jackson2JsonMessageConverter 實(shí)現(xiàn)消息的 JSON 序列化,提高可讀性和兼容性。
  2. 手動(dòng)確認(rèn)消息:在關(guān)鍵業(yè)務(wù)場(chǎng)景中使用手動(dòng)確認(rèn)模式,確保消息被正確處理。
  3. 異常處理:合理處理消息轉(zhuǎn)換和業(yè)務(wù)處理過程中的異常,避免消息丟失。
  4. 批量處理:對(duì)于高吞吐量場(chǎng)景,使用批量處理提高處理效率。
  5. 配置優(yōu)化:根據(jù)業(yè)務(wù)需求合理配置消費(fèi)者數(shù)量、預(yù)取數(shù)量等參數(shù)。

5.總結(jié)

本文檔介紹了 Spring Boot 與 RabbitMQ 的整合方案,包括基礎(chǔ)配置、消息發(fā)送、消息消費(fèi)、批量處理、延遲消息等核心功能。通過合理使用這些功能,可以構(gòu)建高可用、高性能的異步消息處理系統(tǒng)。在實(shí)際應(yīng)用中,需要根據(jù)具體業(yè)務(wù)場(chǎng)景進(jìn)行相應(yīng)的調(diào)整和優(yōu)化。

源碼地址:https://gitee.com/zheji/spring-rabbitmq-demo

到此這篇關(guān)于springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)springboot3.0整合rabbitmq內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • mybatis+lombok出現(xiàn)java.lang.IndexOutOfBoundsException錯(cuò)誤及解決

    mybatis+lombok出現(xiàn)java.lang.IndexOutOfBoundsException錯(cuò)誤及解決

    在使用MyBatis和Lombok時(shí),如果遇到j(luò)ava.lang.IndexOutOfBoundsException問題,是因?yàn)镸yBatis在嘗試將查詢結(jié)果封裝成Java對(duì)象時(shí),找不到構(gòu)造函數(shù)中對(duì)應(yīng)的字段,這通常是由于Lombok的@Builder注解生成了全參構(gòu)造函數(shù)
    2025-02-02
  • 小議Java中@param注解與@see注解的作用

    小議Java中@param注解與@see注解的作用

    這篇文章主要介紹了Java中@param注解與@see注解的作用,注解的功能類似于通常代碼中的注釋,需要的朋友可以參考下
    2015-12-12
  • @Value如何獲取yml和properties配置參數(shù)

    @Value如何獲取yml和properties配置參數(shù)

    這篇文章主要介紹了@Value如何獲取yml和properties配置參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • springboot項(xiàng)目idea熱部署的教程詳解

    springboot項(xiàng)目idea熱部署的教程詳解

    這篇文章主要介紹了springboot項(xiàng)目idea熱部署,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • 深入理解Spring Boot Starter概念、特點(diǎn)、場(chǎng)景、原理及自定義starter的方法

    深入理解Spring Boot Starter概念、特點(diǎn)、場(chǎng)景、原理及自定義starter的方法

    本文將深入探討Spring Boot Starter的基本概念、主要特點(diǎn)、應(yīng)用場(chǎng)景以及實(shí)現(xiàn)原理,幫助讀者更好地理解和應(yīng)用這一強(qiáng)大工具,感興趣的朋友跟隨小編一起看看吧
    2025-08-08
  • java 創(chuàng)建線程的四種方式

    java 創(chuàng)建線程的四種方式

    這篇文章主要介紹了java 創(chuàng)建線程的四種方式,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-11-11
  • springboot微服務(wù)項(xiàng)目集成html頁面的實(shí)現(xiàn)

    springboot微服務(wù)項(xiàng)目集成html頁面的實(shí)現(xiàn)

    本文主要介紹了springboot微服務(wù)項(xiàng)目集成html頁面的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • mybatis使用雙層<foreach>循環(huán)嵌套方式

    mybatis使用雙層<foreach>循環(huán)嵌套方式

    在Mybatis中使用雙層循環(huán)嵌套插入數(shù)據(jù)可以有效減少數(shù)據(jù)庫IO操作,提高性能,通過在mapper.xml中定義雙層集合來實(shí)現(xiàn),外層集合為實(shí)體類中的集合字段,內(nèi)層集合為集合字段中的另一個(gè)集合,通過這種方式,可以在業(yè)務(wù)代碼中減少循環(huán)插入的次數(shù)
    2024-09-09
  • Java連接SqlServer錯(cuò)誤的完美解決方法

    Java連接SqlServer錯(cuò)誤的完美解決方法

    我們?cè)谧鯦ava或者C#連接數(shù)據(jù)庫的時(shí)候,常常遇到連接SqlServer失敗的問題,明明檢查了好幾遍代碼沒問題了,還是連接不上,下面這篇文章主要給大家介紹了關(guān)于Java連接SqlServer錯(cuò)誤的完美解決方法,需要的朋友可以參考下
    2023-04-04
  • Java?中向?Arraylist?添加對(duì)象的示例代碼

    Java?中向?Arraylist?添加對(duì)象的示例代碼

    本文介紹了如何在 Java 中向 ArrayList 添加對(duì)象,并提供了示例和注意事項(xiàng),通過掌握這些知識(shí),讀者可以在自己的 Java 項(xiàng)目中有效地使用 ArrayList 來存儲(chǔ)和操作對(duì)象,需要的朋友可以參考下
    2023-11-11

最新評(píng)論