SpringBoot集成和使用RabbitMQ方式
1. 引言
RabbitMQ 是一個流行的消息代理系統(tǒng),廣泛應(yīng)用于分布式系統(tǒng)中的異步通信、任務(wù)解耦和負(fù)載分配。除了這些基本功能,RabbitMQ 還支持通過死信隊列(Dead-Letter Queue, DLQ)實(shí)現(xiàn)延時消息的發(fā)送。延時消息在某些場景下非常有用,例如訂單超時未支付的自動取消、延時通知等。
本文將結(jié)合 RabbitMQ 的基本使用,深入探討如何在 Spring Boot 中集成和使用 RabbitMQ,同時講解如何通過死信隊列實(shí)現(xiàn)延時消息的機(jī)制。
2. 環(huán)境配置
在開始編寫代碼之前,我們需要確保開發(fā)環(huán)境已經(jīng)正確配置。
2.1. Maven 依賴
首先,在 Spring Boot 項目中添加 RabbitMQ 的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.2. RabbitMQ 安裝與配置
RabbitMQ 可以通過 Docker 或直接在本地安裝。這里我們以 Docker 為例:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
這將啟動一個帶有管理插件的 RabbitMQ 容器,并暴露出 5672 和 15672 端口,分別用于 AMQP 和管理界面。
3. 基本概念與原理
在深入代碼之前,了解 RabbitMQ 的幾個核心概念非常重要:
- 生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用程序。
- 消費(fèi)者(Consumer):接收消息的應(yīng)用程序。
- 隊列(Queue):消息存儲的地方。
- 交換機(jī)(Exchange):接收生產(chǎn)者發(fā)送的消息,并根據(jù)路由規(guī)則將消息轉(zhuǎn)發(fā)到相應(yīng)的隊列。
- 綁定(Binding):隊列與交換機(jī)之間的關(guān)聯(lián),定義了消息如何從交換機(jī)路由到隊列。
- 死信隊列(Dead-Letter Queue, DLQ):用于存儲處理失敗、被拒絕或超時的消息。
3.1. 交換機(jī)類型
- Direct Exchange:將消息路由到綁定了特定路由鍵的隊列。
- Fanout Exchange:將消息廣播到綁定的所有隊列。
- Topic Exchange:根據(jù)路由鍵的模式匹配,將消息路由到一個或多個隊列。
- Headers Exchange:基于消息頭的內(nèi)容進(jìn)行路由。
4. Spring Boot 中的基本使用
4.1. 配置類
創(chuàng)建一個配置類,用于設(shè)置隊列、交換機(jī)和綁定關(guān)系:
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String QUEUE_NAME = "demoQueue"; public static final String EXCHANGE_NAME = "demoExchange"; public static final String ROUTING_KEY = "demoRoutingKey"; @Bean public Queue demoQueue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange demoExchange() { return new DirectExchange(EXCHANGE_NAME); } @Bean public Binding demoBinding(Queue demoQueue, DirectExchange demoExchange) { return BindingBuilder.bind(demoQueue).to(demoExchange).with(ROUTING_KEY); } }
4.2. 生產(chǎn)者
創(chuàng)建一個消息生產(chǎn)者,用于發(fā)送消息到指定的交換機(jī)和路由鍵:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(String message) { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message); System.out.println("Sent message: " + message); } }
4.3. 消費(fèi)者
創(chuàng)建一個消息消費(fèi)者,監(jiān)聽隊列并處理消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQConsumer { @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
5. 死信隊列與延時消息
5.1. 死信隊列配置
為了實(shí)現(xiàn)延時消息,我們可以利用 RabbitMQ 的死信隊列機(jī)制。
當(dāng)消息在原隊列中存留超過指定時間時,會自動轉(zhuǎn)發(fā)到死信隊列,我們可以通過消費(fèi)死信隊列的消息來實(shí)現(xiàn)延時效果。
import org.springframework.amqp.core.Queue; @Bean public Queue demoQueue() { return QueueBuilder.durable(QUEUE_NAME) .withArgument("x-dead-letter-exchange", "deadLetterExchange") .withArgument("x-dead-letter-routing-key", "deadLetterRoutingKey") .withArgument("x-message-ttl", 60000) // 設(shè)置消息在原隊列的存活時間(60秒) .build(); } @Bean public Queue deadLetterQueue() { return new Queue("deadLetterQueue", true); } @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("deadLetterExchange"); } @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterRoutingKey"); }
在上述配置中,x-message-ttl
參數(shù)指定了消息在原隊列中的存活時間,當(dāng)超時后,消息將被轉(zhuǎn)發(fā)到指定的死信隊列。
5.2. 延時消息的處理
消費(fèi)者監(jiān)聽死信隊列,實(shí)現(xiàn)延時消息的處理邏輯:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class DelayedMessageConsumer { @RabbitListener(queues = "deadLetterQueue") public void receiveDelayedMessage(String message) { System.out.println("Received delayed message: " + message); // 處理延時消息的邏輯 } }
6. 消息確認(rèn)機(jī)制
為了保證消息的可靠性,RabbitMQ 提供了生產(chǎn)者和消費(fèi)者的消息確認(rèn)機(jī)制。
生產(chǎn)者確認(rèn)用于確保消息成功發(fā)送到交換機(jī)或隊列,消費(fèi)者確認(rèn)用于確保消息被成功處理。
6.1. 生產(chǎn)者確認(rèn)
在生產(chǎn)者端,我們可以配置 RabbitTemplate
來監(jiān)聽消息是否成功發(fā)送:
import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @Service public class RabbitMQProducerWithConfirm { private final RabbitTemplate rabbitTemplate; public RabbitMQProducerWithConfirm(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("Message sent successfully"); } else { System.out.println("Message failed to send: " + cause); } } }); } public void sendMessage(String message) { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message); } }
6.2. 消費(fèi)者確認(rèn)
在消費(fèi)者端,默認(rèn)情況下 Spring AMQP 自動確認(rèn)消息。
如果需要手動確認(rèn),可以在 @RabbitListener
注解中設(shè)置 ackMode
:
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; @Service public class RabbitMQConsumerWithAck implements ChannelAwareMessageListener { @Override @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL") public void onMessage(org.springframework.amqp.core.Message message, Channel channel) throws Exception { try { String body = new String(message.getBody()); System.out.println("Received message: " + body); // 處理消息... channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
7. 集群與高可用性
7.1 RabbitMQ 集群模式概述
RabbitMQ 支持集群模式,可以提升消息代理的可靠性和可用性。在集群模式下,多個 RabbitMQ 節(jié)點(diǎn)共同組成一個集群,每個節(jié)點(diǎn)都能夠接收和發(fā)送消息,從而分擔(dān)系統(tǒng)負(fù)載。通過 Docker Compose 或 Kubernetes,可以快速部署一個高可用的 RabbitMQ 集群。
集群中的節(jié)點(diǎn)分為兩種角色:RAM 節(jié)點(diǎn)和 Disk 節(jié)點(diǎn)。RAM 節(jié)點(diǎn)將數(shù)據(jù)存儲在內(nèi)存中,適合對性能要求較高但對數(shù)據(jù)持久化要求較低的場景;Disk 節(jié)點(diǎn)則會將數(shù)據(jù)持久化到磁盤,保證數(shù)據(jù)在節(jié)點(diǎn)重啟或宕機(jī)后的恢復(fù)能力。根據(jù)不同的應(yīng)用需求,可以混合使用這兩種節(jié)點(diǎn)類型來優(yōu)化性能和持久化策略。
7.2 Docker Compose 部署集群
使用 Docker 可以非常方便地部署一個 RabbitMQ 集群。
以下示例展示了如何使用 Docker Compose 創(chuàng)建一個包含三個節(jié)點(diǎn)的 RabbitMQ 集群:
version: '3' services: rabbitmq-node1: image: rabbitmq:management container_name: rabbitmq-node1 ports: - "5673:5672" - "15673:15672" environment: RABBITMQ_ERLANG_COOKIE: "mycookie" RABBITMQ_NODENAME: "rabbit@rabbitmq-node1" rabbitmq-node2: image: rabbitmq:management container_name: rabbitmq-node2 ports: - "5674:5672" - "15674:15672" environment: RABBITMQ_ERLANG_COOKIE: "mycookie" RABBITMQ_NODENAME: "rabbit@rabbitmq-node2" depends_on: - rabbitmq-node1 rabbitmq-node3: image: rabbitmq:management container_name: rabbitmq-node3 ports: - "5675:5672" - "15675:15672" environment: RABBITMQ_ERLANG_COOKIE: "mycookie" RABBITMQ_NODENAME: "rabbit@rabbitmq-node3" depends_on: - rabbitmq-node1 - rabbitmq-node2
使用上述配置,可以通過以下命令啟動集群:
docker-compose up -d
集群啟動后,可以使用以下命令將節(jié)點(diǎn) 2 和節(jié)點(diǎn) 3 加入到節(jié)點(diǎn) 1 的集群中:
docker exec -it rabbitmq-node2 bash rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq-node1 rabbitmqctl start_app exit docker exec -it rabbitmq-node3 bash rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq-node1 rabbitmqctl start_app exit
至此,一個基本的 RabbitMQ 集群已經(jīng)部署完成。
7.3 Kubernetes 部署集群
在 Kubernetes 環(huán)境中,可以通過 Helm chart 快速部署 RabbitMQ 集群。Helm 是一個 Kubernetes 包管理工具,支持簡單、高效地管理 Kubernetes 應(yīng)用。
helm repo add bitnami https://charts.bitnami.com/bitnami helm install my-rabbitmq bitnami/rabbitmq
安裝完成后,RabbitMQ 集群將自動運(yùn)行在 Kubernetes 集群中,并提供高可用性??梢酝ㄟ^修改 Helm chart 配置文件調(diào)整集群的節(jié)點(diǎn)數(shù)量、資源分配等參數(shù),以適應(yīng)不同的業(yè)務(wù)需求。
8. 監(jiān)控與管理
8.1 RabbitMQ Management Plugin
RabbitMQ 提供了豐富的管理工具,通過內(nèi)置的 Management Plugin,可以方便地監(jiān)控和管理集群。
Management Plugin 啟用后,可以通過 Web 界面訪問 RabbitMQ 的管理控制臺。
啟用 Management Plugin:
rabbitmq-plugins enable rabbitmq_management
在集群節(jié)點(diǎn)上啟用后,可以通過 http://{hostname}:15672
訪問管理界面。默認(rèn)的用戶名和密碼均為 guest
,建議在生產(chǎn)環(huán)境中修改默認(rèn)密碼或禁用該賬戶。
8.2 監(jiān)控隊列與交換機(jī)
通過 RabbitMQ Management Plugin,可以實(shí)時查看隊列和交換機(jī)的狀態(tài),包括:
- 隊列的消息堆積數(shù)量、消費(fèi)者情況等。
- 交換機(jī)的消息路由情況、綁定信息等。
這些數(shù)據(jù)可以幫助運(yùn)維人員及時了解系統(tǒng)的運(yùn)行狀態(tài),發(fā)現(xiàn)并解決潛在的性能問題。
8.3 Prometheus 和 Grafana 集成
為了進(jìn)一步增強(qiáng)監(jiān)控能力,可以將 RabbitMQ 的監(jiān)控數(shù)據(jù)接入 Prometheus 和 Grafana。這些工具提供了更加靈活和可視化的監(jiān)控方案,適用于復(fù)雜的生產(chǎn)環(huán)境。
1. 啟用 Prometheus Exporter
RabbitMQ 提供了 Prometheus Exporter 插件,用于將 RabbitMQ 的監(jiān)控數(shù)據(jù)暴露給 Prometheus:
rabbitmq-plugins enable rabbitmq_prometheus
啟用后,Prometheus 可以通過 HTTP 訪問 RabbitMQ 的監(jiān)控數(shù)據(jù)。
2. 配置 Grafana 儀表盤
在 Prometheus 收集到 RabbitMQ 的監(jiān)控數(shù)據(jù)后,可以在 Grafana 中創(chuàng)建相應(yīng)的儀表盤,展示 RabbitMQ 的性能指標(biāo)。例如,隊列長度、消息處理速率、節(jié)點(diǎn)健康狀況等。Grafana 提供了直觀的可視化界面,幫助運(yùn)維人員實(shí)時監(jiān)控和分析系統(tǒng)的運(yùn)行狀態(tài)。
8.4 CLI 管理
除了 Web UI,RabbitMQ 還支持通過 CLI 進(jìn)行管理。常用的 CLI 命令包括:
rabbitmqctl status
:查看節(jié)點(diǎn)的狀態(tài)。rabbitmqctl list_queues
:列出所有隊列及其消息數(shù)量。rabbitmqctl list_connections
:查看所有連接及其狀態(tài)。
CLI 工具對于自動化運(yùn)維和批量操作非常有用,可以通過腳本實(shí)現(xiàn)對 RabbitMQ 集群的批量管理。
8.5 日志與告警管理
1. 日志配置
RabbitMQ 支持多種日志級別(debug、info、warning、error),可以根據(jù)需要調(diào)整日志輸出的詳細(xì)程度。
通過合理的日志配置,可以幫助運(yùn)維人員快速定位和解決問題。
rabbitmqctl set_log_level info
2. 告警配置
RabbitMQ 支持基于閾值的告警機(jī)制,可以在隊列長度、磁盤使用率或內(nèi)存使用率達(dá)到一定水平時觸發(fā)告警。
通過與郵件或短信系統(tǒng)集成,可以在異常情況發(fā)生時及時通知相關(guān)人員,確保問題能夠在第一時間得到處理。
9. 總結(jié)
本文詳細(xì)介紹了如何在 Spring Boot 項目中集成 RabbitMQ,并結(jié)合死信隊列實(shí)現(xiàn)延時消息。通過這些配置和機(jī)制,開發(fā)者可以在分布式系統(tǒng)中構(gòu)建更為靈活和可靠的消息傳遞系統(tǒng)。
擴(kuò)展閱讀:
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- Springboot使用Rabbitmq的延時隊列+死信隊列實(shí)現(xiàn)消息延期消費(fèi)
- Springboot使用RabbitMQ實(shí)現(xiàn)關(guān)閉超時訂單(示例詳解)
- SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解
- SpringBoot整合RabbitMQ實(shí)現(xiàn)流量消峰
- springboot整合RabbitMQ中死信隊列的實(shí)現(xiàn)
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- springboot整合rabbitmq實(shí)現(xiàn)訂單超時取消案例分析
相關(guān)文章
springBoot?@Scheduled實(shí)現(xiàn)多個任務(wù)同時開始執(zhí)行
這篇文章主要介紹了springBoot?@Scheduled實(shí)現(xiàn)多個任務(wù)同時開始執(zhí)行,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Spring Boot數(shù)據(jù)庫鏈接池配置方法
這篇文章主要介紹了Spring Boot數(shù)據(jù)庫鏈接池配置方法,需要的朋友可以參考下2017-04-04SpringBoot單點(diǎn)登錄實(shí)現(xiàn)過程詳細(xì)分析
這篇文章主要介紹了SpringBoot單點(diǎn)登錄實(shí)現(xiàn)過程,單點(diǎn)登錄英文全稱Single?Sign?On,簡稱就是SSO。它的解釋是:在多個應(yīng)用系統(tǒng)中,只需要登錄一次,就可以訪問其他相互信任的應(yīng)用系統(tǒng)2022-12-12Java如何跳出當(dāng)前的多重嵌套循環(huán)的問題
Java中的循環(huán)結(jié)構(gòu)包括for循環(huán)、while循環(huán)、do-while循環(huán)和增強(qiáng)型for循環(huán),每種循環(huán)都有其適用場景,在循環(huán)中,break、continue和return分別用于跳出循環(huán)、跳過當(dāng)前循環(huán)和結(jié)束當(dāng)前方法,對于多重嵌套循環(huán)2025-01-01Mybatis不啟動項目直接測試Mapper的實(shí)現(xiàn)方法
在項目開發(fā)中,測試單個Mybatis Mapper方法通常需要啟動整個SpringBoot項目,消耗大量時間,本文介紹通過Main方法和Mybatis配置類,快速測試Mapper功能,無需啟動整個項目,這方法使用AnnotationConfigApplicationContext容器2024-09-09