springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例
1 RabbitMQ 核心概念
RabbitMQ 是一個(gè)開源的消息代理軟件,實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議 (AMQP 0-9-1),為應(yīng)用程序提供了異步通信的能力。在深入了解消息發(fā)送機(jī)制之前,我們需要理解幾個(gè)核心概念:
- 生產(chǎn)者 (Producer):發(fā)送消息的應(yīng)用程序,負(fù)責(zé)創(chuàng)建消息并將其發(fā)布到 RabbitMQ 交換器。
- 消費(fèi)者 (Consumer):接收消息的應(yīng)用程序,從隊(duì)列中獲取消息并進(jìn)行處理。
- 交換器 (Exchange):消息的入口點(diǎn),生產(chǎn)者將消息發(fā)送到交換器。交換器根據(jù)特定規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。
- 隊(duì)列 (Queue):存儲(chǔ)消息的緩沖區(qū),類似于郵箱名稱。消息一直在隊(duì)列中等待,直到被消費(fèi)者處理。
- 綁定 (Binding):連接交換器和隊(duì)列的路由規(guī)則,定義了消息應(yīng)該如何從交換器流轉(zhuǎn)到隊(duì)列。
- 路由鍵 (Routing Key):生產(chǎn)者發(fā)送消息時(shí)指定的一個(gè)屬性,交換器使用它來確定如何路由消息。
消息流經(jīng) RabbitMQ 的基本過程是:
- 生產(chǎn)者創(chuàng)建消息并將其發(fā)送到交換器。
- 交換器接收消息并根據(jù)其類型和綁定規(guī)則決定將消息路由到哪些隊(duì)列。
- 消息存儲(chǔ)在隊(duì)列中,直到被消費(fèi)者處理。
- 消費(fèi)者從隊(duì)列中獲取消息并進(jìn)行處理。
這種架構(gòu)提供了應(yīng)用程序解耦、異步通信和流量緩沖等優(yōu)勢(shì),使分布式系統(tǒng)更加靈活和可靠。
2.RabbitMQ 交換器類型詳解
RabbitMQ中的Exchange(交換器)是消息路由的核心組件,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)特定的路由規(guī)則將消息分發(fā)到一個(gè)或多個(gè)隊(duì)列中。在RabbitMQ 3.13中,主要有以下幾種交換器類型:
2.1. Direct Exchange(直連交換器)
特點(diǎn):
- 基于精確匹配的路由機(jī)制
- 消息的Routing Key必須與Binding Key完全匹配才會(huì)被路由到相應(yīng)隊(duì)列
工作原理:
- 當(dāng)隊(duì)列綁定到Direct Exchange時(shí),需要指定一個(gè)Binding Key
- 只有當(dāng)消息的Routing Key與Binding Key完全一致時(shí),消息才會(huì)被路由到該隊(duì)列
應(yīng)用場(chǎng)景:
- 適用于一對(duì)一的精確消息投遞場(chǎng)景
- 比如將路由鍵為"sms"的消息只投遞給綁定鍵也為"sms"的隊(duì)列
2.2. Fanout Exchange(扇出交換器)
特點(diǎn):
- 廣播模式,忽略Routing Key
- 將消息路由到所有綁定到該交換器的隊(duì)列
工作原理:
- 不需要指定Binding Key
- 發(fā)送到Fanout Exchange的每條消息都會(huì)被轉(zhuǎn)發(fā)到所有綁定的隊(duì)列
應(yīng)用場(chǎng)景:
- 適用于廣播通知場(chǎng)景
- 需要將同一條消息發(fā)送給多個(gè)消費(fèi)者的場(chǎng)景
2.3. Topic Exchange(主題交換器)
特點(diǎn):
- 基于模式匹配的路由機(jī)制
- 支持通配符匹配Routing Key和Binding Key
工作原理:
- Routing Key和Binding Key都是包含".“分隔符的字符串(如"quick.orange.rabbit”)
- 支持兩種通配符:
- “*”:匹配一個(gè)單詞
- “#”:匹配零個(gè)或多個(gè)單詞
應(yīng)用場(chǎng)景:
- 適用于基于主題的發(fā)布訂閱模式
- 比如綁定鍵為"*.stock.usd"的隊(duì)列會(huì)收到所有以任意單詞開頭,以"stock.usd"結(jié)尾的路由鍵的消息
2.4. Headers Exchange(頭交換器)
特點(diǎn):
- 基于消息頭部屬性進(jìn)行路由
- 不依賴Routing Key進(jìn)行匹配
工作原理:
- 根據(jù)消息的Headers屬性進(jìn)行匹配
- 隊(duì)列綁定到Headers Exchange時(shí)需要指定匹配的頭部屬性
應(yīng)用場(chǎng)景:
- 適用于基于消息屬性的復(fù)雜路由場(chǎng)景
- 但性能相對(duì)較差,實(shí)際應(yīng)用較少
2.5. Default Exchange(默認(rèn)交換器)
特點(diǎn):
- 無名的Direct Exchange
- 自動(dòng)存在于每個(gè)RabbitMQ實(shí)例中
工作原理:
- 當(dāng)隊(duì)列創(chuàng)建時(shí),會(huì)自動(dòng)綁定到默認(rèn)交換器
- Routing Key就是目標(biāo)隊(duì)列的名稱
2.6. Dead Letter Exchange(死信交換器)
特點(diǎn):
- 處理無法被正常投遞的消息
- 不是標(biāo)準(zhǔn)的交換器類型,而是一種處理模式
工作原理:
- 當(dāng)消息被拒絕、過期或隊(duì)列達(dá)到最大長(zhǎng)度時(shí),可以配置將其發(fā)送到DLX
- 便于對(duì)無法處理的消息進(jìn)行進(jìn)一步處理或記錄
總結(jié)
不同類型的交換器適用于不同的業(yè)務(wù)場(chǎng)景。在選擇交換器類型時(shí),需要根據(jù)具體的路由需求來決定:
- 需要精確匹配時(shí)使用Direct Exchange
- 需要廣播消息時(shí)使用Fanout Exchange
- 需要基于模式匹配時(shí)使用Topic Exchange
- 特殊情況下可以考慮Headers Exchange
3.spring boot 整合rabbitmq
3.1 項(xiàng)目結(jié)構(gòu)
spring-rabbitmq-demo/ ├── src/ │ └── main/ │ ├── java/ │ │ └── cn/ │ │ └── spring/ │ │ └── rabbitmq/ │ │ └── demo/ │ │ ├── config/ │ │ │ ├── RabbitMQConfig.java │ │ │ ├── RabbitmqTemplatePostProcessor.java │ │ │ └── SwaggerConfig.java │ │ ├── controller/ │ │ │ └── rabbitmq/ │ │ │ └── RabbitmqDemoController.java │ │ ├── rabbitmq/ │ │ │ ├── consumer/ │ │ │ │ └── MessageConsumer.java │ │ │ ├── message/ │ │ │ │ ├── BaseMessage.java │ │ │ │ └── DemoMessage.java │ │ │ └── sender/ │ │ │ └── MessageSender.java │ │ └── RabbitmqDemoApplication.java │ └── resources/ │ ├── application.yaml │ └── logback-spring.xml └── pom.xml
3.2 依賴配置
Maven 依賴
在 pom.xml 文件中添加以下依賴:
<dependencies> <!-- Web 相關(guān) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RabbitMQ 相關(guān) --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- Swagger 相關(guān) --> <dependency> <groupId>org.springdoc</groupId> <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId> <version>2.8.11</version> </dependency> <!-- Jackson 用于JSON序列化 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- hutool 工具包 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.15</version> </dependency> </dependencies>
3.3. 配置文件
server: port: 8081 spring: application: name: spring-rabbitmq-demo profiles: active: dev main: allow-circular-references: true # 允許循環(huán)依賴 --- #############rabbitmq配置##################### spring: rabbitmq: host: 127.0.0.1 port: 5672 username: xxxx password: xxxx virtual-host: / publisher-returns: true # 啟用發(fā)布者返回功能,如果消息沒有到達(dá)隊(duì)列,則會(huì)通知生產(chǎn)者。 # NONE(默認(rèn)值):不返回任何信息。 # SIMPLE(簡(jiǎn)單模式)使用同步阻塞方式等待MQ的確認(rèn)回執(zhí),發(fā)送消息后會(huì)阻塞當(dāng)前線程,直到收到確認(rèn)結(jié)果,性能相對(duì)較低,因?yàn)樾枰却_認(rèn)結(jié)果 # CORRELATED(關(guān)聯(lián)模式):使用異步非阻塞方式,生產(chǎn)者發(fā)送消息后,不等待MQ的確認(rèn)回執(zhí),而是直接返回,并通過回調(diào)函數(shù)的方式通知生產(chǎn)者。 publisher-confirm-type: NONE listener: # SIMPLE:RabbitMQ 消費(fèi)者將消息分發(fā)到一個(gè)單獨(dú)的線程(Invoker Thread)進(jìn)行處理,消費(fèi)者線程與業(yè)務(wù)處理線程是分離的 #特點(diǎn): #異步處理:消息消費(fèi)與業(yè)務(wù)處理在不同線程中執(zhí)行 #更好的容錯(cuò)性:業(yè)務(wù)處理異常不會(huì)直接影響 RabbitMQ 消費(fèi)者線程 #更高的資源消耗:需要額外的線程來進(jìn)行消息分發(fā)和處理 #支持并發(fā)消費(fèi)者配置 #DIRECT:適用于對(duì)延遲敏感、吞吐量要求高的場(chǎng)景,或者資源受限的環(huán)境 #工作原理:監(jiān)聽器直接在 RabbitMQ 消費(fèi)者線程上調(diào)用執(zhí)行,沒有額外的分發(fā)線程 #特點(diǎn): #同步處理:消息消費(fèi)與業(yè)務(wù)處理在同一個(gè)線程中執(zhí)行 #更低的延遲:沒有線程切換開銷 #更少的資源消耗:不需要額外的線程池 #更簡(jiǎn)單的線程模型:更容易調(diào)試和分析 #業(yè)務(wù)處理的異常會(huì)直接影響消費(fèi)者線程 #STREAM(流容器) #工作原理:使用 RabbitMQ Stream Client 處理消息流 #特點(diǎn): #專為高吞吐量和持久化流處理設(shè)計(jì) #支持超大規(guī)模消息保留 #支持消費(fèi)者組和消息重播 #適用于需要處理大量歷史數(shù)據(jù)的場(chǎng)景 #需要 RabbitMQ 3.9+ 版本支持 type: simple # 使用simple類型監(jiān)聽器容器 simple: # 是否在啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器,默認(rèn)為true 當(dāng)設(shè)置為 true 時(shí),容器會(huì)在 Spring 應(yīng)用上下文啟動(dòng)完成后自動(dòng)開始監(jiān)聽消息;設(shè)置為 false 時(shí),需要手動(dòng)調(diào)用 start() 方法啟動(dòng)容器 auto-startup: true # 偵聽器調(diào)用者線程的最小數(shù)量,默認(rèn)為1 控制并發(fā)消費(fèi)者的最小數(shù)量,用于處理消息的并行度 concurrency: 1 # 偵聽器調(diào)用者線程的最大數(shù)量,默認(rèn)為1(與concurrency相同)當(dāng)消息負(fù)載較高時(shí),容器可以動(dòng)態(tài)擴(kuò)展到的最大消費(fèi)者數(shù)量 max-concurrency: 1 # 每個(gè)消費(fèi)者能夠同時(shí)存在且未被確認(rèn)的消息的最大數(shù)量。,默認(rèn)為250 prefetch: 250 # 確認(rèn)模式,可選值:NONE(不確認(rèn))、MANUAL(手動(dòng)確認(rèn))、AUTO(自動(dòng)確認(rèn)),默認(rèn)為AUTO acknowledge-mode: AUTO # 默認(rèn)情況下,拒絕交付是否重新排隊(duì),默認(rèn)為true 當(dāng)監(jiān)聽器方法拋出異常時(shí),決定消息是否重新放回隊(duì)列。設(shè)置為 false 可以避免消息無限重試 default-requeue-rejected: true # 應(yīng)該多久發(fā)布一次空閑容器事件,默認(rèn)不發(fā)布(無默認(rèn)值) 用于監(jiān)控容器狀態(tài),當(dāng)容器在指定時(shí)間內(nèi)沒有消息處理時(shí)會(huì)發(fā)布 ApplicationEvent idle-event-interval: 0ms # 是否將批處理消息作為離散消息傳遞,或者將整個(gè)批處理傳遞給監(jiān)聽器,默認(rèn)為true 當(dāng)啟用時(shí),批量消息會(huì)被分解為單條消息分別處理;禁用時(shí),整個(gè)批次作為一個(gè)消息傳遞給監(jiān)聽器 de-batching-enabled: true # 當(dāng)容器停止時(shí)是否立即停止還是處理完所有預(yù)取的消息,默認(rèn)為false 設(shè)置為 true 時(shí),容器會(huì)在處理完當(dāng)前消息后立即停止;false 時(shí),會(huì)處理完所有預(yù)取消息后再停止 force-stop: false # 是否啟用觀察(Observation),默認(rèn)為false 啟用后可以通過 Micrometer 收集容器的指標(biāo)信息 observation-enabled: false # 批次大小,以物理消息數(shù)量表示,默認(rèn)為1 與批量處理相關(guān),定義每個(gè)批次包含的消息數(shù)量 batch-size: 10 # 如果容器聲明的隊(duì)列在代理上不可用,是否失敗,默認(rèn)為true 設(shè)置為 true 時(shí),如果隊(duì)列不可用容器會(huì)失敗或停止;false 時(shí)容器會(huì)繼續(xù)運(yùn)行 missing-queues-fatal: true # 是否基于'receive-timeout'和'batch-size'創(chuàng)建消息批次,默認(rèn)為false 啟用后會(huì)將 deBatchingEnabled 強(qiáng)制設(shè)為 true,將生產(chǎn)者創(chuàng)建的批次內(nèi)容作為離散記錄包含在批次中 consumer-batch-enabled: false # 適中的接收超時(shí)時(shí)間 receive-timeout: 5000ms # 重試相關(guān)配置 retry: # 是否啟用重試機(jī)制,默認(rèn)為false enabled: false # 最大嘗試次數(shù),默認(rèn)為3 max-attempts: 3 # 初始重試間隔時(shí)間,默認(rèn)為1000ms initial-interval: 1000ms # 最大重試間隔時(shí)間,默認(rèn)為10000ms max-interval: 10000ms # 重試間隔的乘數(shù),默認(rèn)為2.0 multiplier: 2.0 # 重試時(shí)是有狀態(tài)還是無狀態(tài),默認(rèn)為true(無狀態(tài)) stateless: true template: # 是否啟用強(qiáng)制消息投遞,默認(rèn)為false # 當(dāng)設(shè)置為true時(shí),如果消息無法路由到隊(duì)列,會(huì)拋出AmqpMessageReturnedException異常 # 需要配合RabbitTemplate的ReturnsCallback使用 mandatory: false # receive()操作的超時(shí)時(shí)間,默認(rèn)為0ms(無限等待) # 用于receive()方法調(diào)用時(shí)的等待超時(shí)時(shí)間 receive-timeout: 0ms # sendAndReceive()操作的超時(shí)時(shí)間,默認(rèn)為5000ms(5秒) # 用于請(qǐng)求-回復(fù)模式下的等待超時(shí)時(shí)間 reply-timeout: 5000ms # 發(fā)送操作使用的默認(rèn)交換機(jī)名稱,默認(rèn)為空字符串(使用默認(rèn)交換機(jī)) # 當(dāng)使用RabbitTemplate發(fā)送消息時(shí)不指定交換機(jī)時(shí)使用此默認(rèn)值 exchange: "" # 發(fā)送操作使用的默認(rèn)路由鍵,默認(rèn)為空字符串 # 當(dāng)使用RabbitTemplate發(fā)送消息時(shí)不指定路由鍵時(shí)使用此默認(rèn)值 routing-key: "" # 當(dāng)沒有明確指定接收隊(duì)列時(shí),默認(rèn)接收消息的隊(duì)列名稱,默認(rèn)為null # 用于RabbitTemplate接收消息時(shí)的默認(rèn)隊(duì)列 default-receive-queue: # 是否啟用觀察(Observation),默認(rèn)為false # 啟用后可以通過Micrometer收集RabbitTemplate的指標(biāo)信息 observation-enabled: false # 用于反序列化時(shí)允許的包/類的簡(jiǎn)單模式列表,默認(rèn)為null # 用于控制哪些類可以被反序列化,防止不安全的反序列化 allowed-list-patterns: # 重試相關(guān)配置 retry: # 是否啟用重試機(jī)制,默認(rèn)為false # 啟用后RabbitTemplate在發(fā)送消息失敗時(shí)會(huì)進(jìn)行重試 enabled: false # 最大嘗試次數(shù),默認(rèn)為3次 max-attempts: 3 # 初始重試間隔時(shí)間,默認(rèn)為1000ms(1秒) initial-interval: 1000ms # 最大重試間隔時(shí)間,默認(rèn)為10000ms(10秒) max-interval: 10000ms # 重試間隔的乘數(shù),默認(rèn)為2.0 # 每次重試的間隔時(shí)間會(huì)乘以這個(gè)數(shù)值 multiplier: 2.0
3.4 核心組件實(shí)現(xiàn)
3.4.1 消息實(shí)體類
BaseMessage.java
@Data public class BaseMessage implements java.io.Serializable { @Serial private static final long serialVersionUID = 1L; protected Long deliveryTag = IdUtil.getSnowflake().nextId(); }
DemoMessage.java
@Data public class DemoMessage extends BaseMessage{ private Long userId; private String message; private Date createTime; }
3.4.2 RabbitMQ 配置類
RabbitMQConfig.java
@Configuration @Import(RabbitmqTemplatePostProcessor.class) public class RabbitMQConfig { /** * 延遲交換機(jī)(用于接收延遲消息) * * @return */ @Bean public Exchange delayExchange() { return ExchangeBuilder.directExchange("delay.exchange") .durable(true) .build(); } /** * 延遲隊(duì)列(沒有消費(fèi)者監(jiān)聽) * 設(shè)置死信交換機(jī)和路由鍵,當(dāng)消息過期后自動(dòng)轉(zhuǎn)發(fā)到死信交換機(jī) * * @return */ @Bean public Queue delayQueue() { return QueueBuilder.durable("delay.queue") // 設(shè)置死信交換機(jī) .deadLetterExchange("delay.process.exchange") // 設(shè)置死信路由鍵 .deadLetterRoutingKey("delay.process") .build(); } /** * 延遲交換機(jī)與延遲隊(duì)列綁定 * * @return */ @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with("delay") .noargs(); } /** * json消息轉(zhuǎn)換器 * * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 為批量處理創(chuàng)建專用的監(jiān)聽器容器工廠 * * @param connectionFactory * @return */ @Bean("batchListenerContainerFactory") public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(3); factory.setPrefetchCount(10); factory.setBatchListener(true); // 啟用批量監(jiān)聽 factory.setConsumerBatchEnabled(true); // 啟用消費(fèi)者端批量處理 factory.setBatchSize(10); // 設(shè)置批次大小 factory.setReceiveTimeout(5000L); // 設(shè)置接收超時(shí)時(shí)間 factory.setBatchReceiveTimeout(5000L); factory.setDeBatchingEnabled(false); // 禁用分解批處理消息 factory.setMessageConverter(messageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設(shè)置為 true 可在當(dāng)前消息處理完畢后停止容器,并重新排隊(duì)任何預(yù)取的消息。在使用獨(dú)占或單活動(dòng)消費(fèi)者時(shí)很有用 factory.setForceStop(true); // 設(shè)置關(guān)閉超時(shí)時(shí)間 factory.setContainerCustomizer((container -> { container.setShutdownTimeout(30000L); // container.setExclusive(true); })); return factory; } }
RabbitmqTemplatePostProcessor.java
RabbitTemplate 啟用 ConfirmCallback、ReturnsCallback 和消息持久化 需要配合配置:
publisher-returns: true # 啟用發(fā)布者返回功能,如果消息沒有到達(dá)隊(duì)列,則會(huì)通知生產(chǎn)者。 # NONE(默認(rèn)值):不返回任何信息。 # SIMPLE(簡(jiǎn)單模式)使用同步阻塞方式等待MQ的確認(rèn)回執(zhí),發(fā)送消息后會(huì)阻塞當(dāng)前線程,直到收到確認(rèn)結(jié)果,性能相對(duì)較低,因?yàn)樾枰却_認(rèn)結(jié)果 # CORRELATED(關(guān)聯(lián)模式):使用異步非阻塞方式,生產(chǎn)者發(fā)送消息后,不等待MQ的確認(rèn)回執(zhí),而是直接返回,并通過回調(diào)函數(shù)的方式通知生產(chǎn)者。 publisher-confirm-type: NONE
@Slf4j public class RabbitmqTemplatePostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof RabbitTemplate) { RabbitTemplate rabbitTemplate = (RabbitTemplate) bean; // 啟用發(fā)送確認(rèn)機(jī)制(ConfirmCallback) rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息成功發(fā)送到交換機(jī),correlationData: {}", correlationData); } else { log.error("消息發(fā)送到交換機(jī)失敗,correlationData: {}, cause: {}", correlationData, cause); } } }); // 啟用發(fā)送失敗回調(diào)機(jī)制(ReturnCallback) rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息無法路由到隊(duì)列,exchange: {}, routingKey: {}, message: {}, replyCode: {}, replyText: {}", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText()); } }); // 設(shè)置消息持久化 rabbitTemplate.setMandatory(true); log.info("RabbitTemplate 配置完成:?jiǎn)⒂?ConfirmCallback、ReturnsCallback 和消息持久化"); } return bean; } }
3.4.3 消息發(fā)送者
MessageSender.java
@Component public class MessageSender { @Resource private RabbitTemplate rabbitTemplate; @Value("${rabbitmq.fanout.exchange:fanout-exchange}") private String fanoutExchange; @Value("${rabbitmq.topic.exchange:topic-exchange}") private String topicExchange; @Value("${rabbitmq.delay.exchange:delay.exchange}") private String delayExchange; /** * 發(fā)送消息到隊(duì)列demo.queue * * @param message */ public void basicSend(DemoMessage message) { rabbitTemplate.convertAndSend("demo.queue", message); } /** * 發(fā)送消息到廣播exchange * * @param message */ public void fanoutSend(DemoMessage message) { rabbitTemplate.convertAndSend(fanoutExchange, "", message); } /** * 發(fā)送消息到topic exchange * * @param message 消息內(nèi)容 * @param routingKey 路由鍵 */ public void topicSend(DemoMessage message, String routingKey) { rabbitTemplate.convertAndSend(topicExchange, routingKey, message); } /** * 發(fā)送延遲消息 * * @param message 消息內(nèi)容 * @param delay 延遲時(shí)間(毫秒) */ public void delaySend(DemoMessage message, long delay) { rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> { if (delay > 0) { msg.getMessageProperties().setExpiration(String.valueOf(delay)); } return msg; }); } /** * 批量發(fā)送消息 * * @param messages 消息列表 */ public void batchSend(List<DemoMessage> messages) { messages.forEach(message -> { rabbitTemplate.convertAndSend("batch.exchange", "batch", message, msg -> { // 自定義消息屬性 return msg; }); }); } }
3.4.4 消息消費(fèi)者
MessageConsumer.java
@Slf4j @Component public class MessageConsumer { @Resource private ObjectProvider<MessageConverter> messageConverterObjectProvider; /** * 監(jiān)聽并處理DemoMessage類型的消息 * * @param message 消息內(nèi)容 */ @RabbitListener(queuesToDeclare = {@Queue("demo.queue")}) public void handleMessageByAnnotation(DemoMessage message) { log.info("[handleMessageByAnnotation] 收到消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 監(jiān)聽廣播消息1 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout.queue.1"), exchange = @Exchange(value = "fanout-exchange", type = "fanout") )) public void handleFanoutMessage1(DemoMessage message) { log.info("[handleFanoutMessage1] 收到廣播消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 監(jiān)聽廣播消息2 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout.queue.2"), exchange = @Exchange(value = "fanout-exchange", type = "fanout") )) public void handleFanoutMessage2(DemoMessage message) { log.info("[handleFanoutMessage2] 收到廣播消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 監(jiān)聽topic消息1 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue.1"), exchange = @Exchange(value = "topic-exchange", type = "topic"), key = "topic.message.specific" )) public void handleTopicMessage1(DemoMessage message) { log.info("[handleTopicMessage1] 收到Topic消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 監(jiān)聽topic消息2 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue.2"), exchange = @Exchange(value = "topic-exchange", type = "topic"), key = "topic.message.*" )) public void handleTopicMessage2(DemoMessage message) { log.info("[handleTopicMessage2] 收到Topic消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 監(jiān)聽延遲消息處理隊(duì)列 * 當(dāng)延遲隊(duì)列中的消息過期后,會(huì)被自動(dòng)轉(zhuǎn)發(fā)到此隊(duì)列進(jìn)行處理 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("delay.process.queue"), exchange = @Exchange(value = "delay.process.exchange", type = "direct"), key = "delay.process" )) public void handleDelayMessage(DemoMessage message) { log.info("[handleDelayMessage] 收到延遲消息: userId={}, message={}, createTime={}", message.getUserId(), message.getMessage(), message.getCreateTime()); } /** * 批量處理消息 * * @param messages */ @RabbitListener(bindings = @QueueBinding( value = @Queue("batch.queue"), exchange = @Exchange(value = "batch.exchange", type = "direct"), key = "batch" ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL") public void handleBatchMessage(List<Message> messages, Channel channel) { log.info("[handleBatchMessage] 開始處理批量消息,共 {} 條", messages.size()); if (CollUtil.isEmpty(messages)) { log.info("[handleBatchMessage] 消息列表為空,無需處理"); return; } // 分別存儲(chǔ)成功和失敗的消息 List<Message> successMessages = new ArrayList<>(); List<Message> failedMessages = new ArrayList<>(); // 批量轉(zhuǎn)換消息 for (Message message : messages) { try { DemoMessage demoMessage = (DemoMessage) messageConverterObjectProvider.getObject().fromMessage(message); demoMessage.setDeliveryTag(message.getMessageProperties().getDeliveryTag()); successMessages.add(message); log.debug("[handleBatchMessage] 消息轉(zhuǎn)換成功: deliveryTag={}", message.getMessageProperties().getDeliveryTag()); } catch (Exception e) { log.error("[handleBatchMessage] 消息轉(zhuǎn)換失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e); failedMessages.add(message); } } // 處理成功轉(zhuǎn)換的消息 if (CollUtil.isNotEmpty(successMessages)) { try { log.info("[handleBatchMessage] 開始處理 {} 條成功轉(zhuǎn)換的消息", successMessages.size()); // 模擬處理時(shí)間 - 實(shí)際應(yīng)用中這里應(yīng)該是真正的業(yè)務(wù)邏輯 processMessages(successMessages); // 批量確認(rèn)所有成功處理的消息 for (Message message : successMessages) { try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); log.debug("[handleBatchMessage] 消息確認(rèn)成功: deliveryTag={}", deliveryTag); } catch (IOException e) { log.error("[handleBatchMessage] 消息確認(rèn)失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e); } } log.info("[handleBatchMessage] 成功處理并確認(rèn) {} 條消息", successMessages.size()); } catch (Exception e) { log.error("[handleBatchMessage] 處理消息時(shí)發(fā)生異常", e); // 如果處理過程中出現(xiàn)異常,將所有成功轉(zhuǎn)換的消息標(biāo)記為失敗 failedMessages.addAll(successMessages); } } // 處理轉(zhuǎn)換失敗的消息 if (!failedMessages.isEmpty()) { log.warn("[handleBatchMessage] 共 {} 條消息處理失敗,嘗試重新入隊(duì)", failedMessages.size()); for (Message message : failedMessages) { try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 第三個(gè)參數(shù)設(shè)為true,表示將消息重新放回隊(duì)列 channel.basicNack(deliveryTag, false, true); log.debug("[handleBatchMessage] 消息重新入隊(duì): deliveryTag={}", deliveryTag); } catch (IOException e) { log.error("[handleBatchMessage] 消息重新入隊(duì)失敗: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e); } } } log.info("[handleBatchMessage] 批量消息處理完成: 成功={}條, 失敗={}條", successMessages.size(), failedMessages.size()); } /** * 實(shí)際處理消息的方法 * 在實(shí)際應(yīng)用中,這里應(yīng)該包含真正的業(yè)務(wù)邏輯 * * @param messages 待處理的消息列表 * @throws Exception 處理異常 */ private void processMessages(List<Message> messages) throws Exception { // 模擬處理時(shí)間 Thread.sleep(50L); // 實(shí)際應(yīng)用中,這里應(yīng)該是真正的業(yè)務(wù)邏輯處理 log.info("[processMessages] 處理了 {} 條消息", messages.size()); }
3.5 關(guān)鍵技術(shù)點(diǎn)
3.5.1 消息確認(rèn)機(jī)制
在批量處理消息時(shí),使用手動(dòng)確認(rèn)模式(MANUAL)來確保消息處理的可靠性:
@RabbitListener(..., ackMode = "MANUAL") public void handleBatchMessage(List<Message> messages, Channel channel) { // 處理消息 // 手動(dòng)確認(rèn)消息 channel.basicAck(deliveryTag, false); }
3.5.2 批量消息處理
@Bean("batchListenerContainerFactory") public SimpleRabbitListenerContainerFactory batchListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setBatchListener(true); // 啟用批量監(jiān)聽 factory.setConsumerBatchEnabled(true); // 啟用消費(fèi)者端批量處理 factory.setBatchSize(10); // 設(shè)置批次大小 return factory; }
/** * 批量處理消息 * * @param messages */ @RabbitListener(bindings = @QueueBinding( value = @Queue("batch.queue"), exchange = @Exchange(value = "batch.exchange", type = "direct"), key = "batch" ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL") public void handleBatchMessage(List<Message> messages, Channel channel) { }
3.5.3 延遲消息
public void delaySend(DemoMessage message, long delay) { rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> { if (delay > 0) { msg.getMessageProperties().setExpiration(String.valueOf(delay)); } return msg; }); } /** * 監(jiān)聽延遲消息處理隊(duì)列 * 當(dāng)延遲隊(duì)列中的消息過期后,會(huì)被自動(dòng)轉(zhuǎn)發(fā)到此隊(duì)列進(jìn)行處理 * * @param message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("delay.process.queue"), exchange = @Exchange(value = "delay.process.exchange", type = "direct"), key = "delay.process" )) public void handleDelayMessage(DemoMessage message) { ... }
4.最佳實(shí)踐
- 使用 JSON 序列化:通過 Jackson2JsonMessageConverter 實(shí)現(xiàn)消息的 JSON 序列化,提高可讀性和兼容性。
- 手動(dòng)確認(rèn)消息:在關(guān)鍵業(yè)務(wù)場(chǎng)景中使用手動(dòng)確認(rèn)模式,確保消息被正確處理。
- 異常處理:合理處理消息轉(zhuǎn)換和業(yè)務(wù)處理過程中的異常,避免消息丟失。
- 批量處理:對(duì)于高吞吐量場(chǎng)景,使用批量處理提高處理效率。
- 配置優(yōu)化:根據(jù)業(yè)務(wù)需求合理配置消費(fèi)者數(shù)量、預(yù)取數(shù)量等參數(shù)。
5.總結(jié)
本文檔介紹了 Spring Boot 與 RabbitMQ 的整合方案,包括基礎(chǔ)配置、消息發(fā)送、消息消費(fèi)、批量處理、延遲消息等核心功能。通過合理使用這些功能,可以構(gòu)建高可用、高性能的異步消息處理系統(tǒng)。在實(shí)際應(yīng)用中,需要根據(jù)具體業(yè)務(wù)場(chǎng)景進(jìn)行相應(yīng)的調(diào)整和優(yōu)化。
源碼地址:https://gitee.com/zheji/spring-rabbitmq-demo
到此這篇關(guān)于springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)springboot3.0整合rabbitmq內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis+lombok出現(xiàn)java.lang.IndexOutOfBoundsException錯(cuò)誤及解決
在使用MyBatis和Lombok時(shí),如果遇到j(luò)ava.lang.IndexOutOfBoundsException問題,是因?yàn)镸yBatis在嘗試將查詢結(jié)果封裝成Java對(duì)象時(shí),找不到構(gòu)造函數(shù)中對(duì)應(yīng)的字段,這通常是由于Lombok的@Builder注解生成了全參構(gòu)造函數(shù)2025-02-02@Value如何獲取yml和properties配置參數(shù)
這篇文章主要介紹了@Value如何獲取yml和properties配置參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07springboot項(xiàng)目idea熱部署的教程詳解
這篇文章主要介紹了springboot項(xiàng)目idea熱部署,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08深入理解Spring Boot Starter概念、特點(diǎn)、場(chǎng)景、原理及自定義starter的方法
本文將深入探討Spring Boot Starter的基本概念、主要特點(diǎn)、應(yīng)用場(chǎng)景以及實(shí)現(xiàn)原理,幫助讀者更好地理解和應(yīng)用這一強(qiáng)大工具,感興趣的朋友跟隨小編一起看看吧2025-08-08springboot微服務(wù)項(xiàng)目集成html頁面的實(shí)現(xiàn)
本文主要介紹了springboot微服務(wù)項(xiàng)目集成html頁面的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04mybatis使用雙層<foreach>循環(huán)嵌套方式
在Mybatis中使用雙層循環(huán)嵌套插入數(shù)據(jù)可以有效減少數(shù)據(jù)庫IO操作,提高性能,通過在mapper.xml中定義雙層集合來實(shí)現(xiàn),外層集合為實(shí)體類中的集合字段,內(nèi)層集合為集合字段中的另一個(gè)集合,通過這種方式,可以在業(yè)務(wù)代碼中減少循環(huán)插入的次數(shù)2024-09-09Java?中向?Arraylist?添加對(duì)象的示例代碼
本文介紹了如何在 Java 中向 ArrayList 添加對(duì)象,并提供了示例和注意事項(xiàng),通過掌握這些知識(shí),讀者可以在自己的 Java 項(xiàng)目中有效地使用 ArrayList 來存儲(chǔ)和操作對(duì)象,需要的朋友可以參考下2023-11-11