一文掌握Springboot集成RabbitMQ的方法
1、前言
消息隊列(Message Queue,簡稱 MQ)是一種異步的消息傳遞中間件,它解耦了應(yīng)用程序之間的通信。應(yīng)用程序可以將消息發(fā)送到隊列,而無需知道誰會接收這些消息。接收應(yīng)用程序可以從隊列中檢索消息,而無需知道誰發(fā)送了這些消息。消息隊列是一種重要的中間件,它可以幫助應(yīng)用程序之間進行異步、可靠、可擴展的通信。常見的消息隊列中間件有ActiveMQ,RabbitMQ,Kafka......今天我們就來介紹RabbitMQ。
2、什么是RabbitMQ
RabbitMQ 是一個開源的消息隊列服務(wù)器,它實現(xiàn)了 AMQP (高級消息隊列協(xié)議) 標(biāo)準(zhǔn)。AMQP 是一種應(yīng)用層協(xié)議,為面向消息的中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。
AMQP :Advanced Message Queue,高級消息隊列協(xié)議。它是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。
RabbitMQ 的主要特點包括:
- 高性能:RabbitMQ 能夠處理大量的消息,并提供低延遲的性能。
- 可靠性:RabbitMQ 提供持久化消息存儲,確保消息不會丟失。
- 可擴展性:RabbitMQ 可以輕松擴展以滿足不斷增長的需求。
- 靈活性:RabbitMQ 支持多種編程語言和客戶端,并提供豐富的功能和配置選項。
RabbitMQ 的常見應(yīng)用場景包括:
- 分布式系統(tǒng):RabbitMQ 可以用于在分布式系統(tǒng)中進行異步通信。
- 異步處理:RabbitMQ 可以用于異步處理任務(wù),提高系統(tǒng)的性能和效率。
- 消息隊列:RabbitMQ 可以用于實現(xiàn)消息隊列,例如任務(wù)隊列、發(fā)布/訂閱隊列等。
- 消息通知:RabbitMQ 可以用于發(fā)送消息通知,例如電子郵件或短信。
3、安裝RabbitMQ
由于RabbitMQ是一個由 Erlang 語言開發(fā)的 AMQP 的開源實現(xiàn)。所以在安裝RabbitMQ前需要先安裝Erlang環(huán)境。
Erlang下載地址:Downloads - Erlang/OTP
RabbitMQ下載地址:Installing RabbitMQ | RabbitMQ
先安裝Erlang,在安裝RabbitMQ。安裝工程相對簡單,無腦下一步即可。
安裝完RabbitMQ后,打開cmd窗口,進入RabbitMQ的安裝目錄的sbin下,我的目錄是:
D:\RabbitMQ Server\rabbitmq_server-3.13.0\sbin
然后輸入以下命令安裝一下插件:
rabbitmq-plugins enable rabbitmq_management
提示以下這個就是安裝成功。
驗證RabbitMQ是否安裝成功,輸入以下命令:
rabbitmqctl status
這時候,直接訪問http://127.0.0.1:15672就可以看到RabbitMQ的管理頁面了,RabbitMQ默認端口為15672,默認的管理頁面賬號密碼均為guest。
登錄后,就可以看到一個初始的管理界面:
4、Springboot集成RabbitMQ
4.1、添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>springboot-rabbitmq</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.24</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4.2、添加配置
# rabbitmq連接配置信息 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 確保消息在未被隊列接收時返回 spring.rabbitmq.publisher-returns=true # 發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法 spring.rabbitmq.publisher-confirm-type=correlated
4.3、添加controller,作為生產(chǎn)者
新建controller,用于發(fā)送消息。
package com.example.springbootrabbitmq.controller; import com.example.springbootrabbitmq.config.MqProducerCallBack; import jakarta.annotation.Resource; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("push/message") public class PushMessageController { @Resource private RabbitTemplate rabbitTemplate; @Resource private MqProducerCallBack mqProducerCallBack; @GetMapping("test") public String sendMessage() { // correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當(dāng)前消息的唯一性。 CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis()); // 消息確認和返回回調(diào) rabbitTemplate.setConfirmCallback(mqProducerCallBack); rabbitTemplate.setReturnsCallback(mqProducerCallBack); // 消息發(fā)送 rabbitTemplate.convertAndSend("my-queue", "hello world", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, correlationData); return "publisher success..."; } }
4.4、設(shè)置生產(chǎn)者消息確認CallBack
package com.example.springbootrabbitmq.config; import cn.hutool.json.JSONUtil; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Component public class MqProducerCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當(dāng)前消息的唯一性。 * ack:消息投遞到broker 的狀態(tài),true成功,false失敗。 * cause:投遞失敗的原因。 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.err.println("消息ID=" + correlationData.getId() + "投遞失敗,失敗原因:" + cause); } else { System.out.println("消息投遞收到確認,correlationData=" + correlationData.getId()); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("返回消息結(jié)果:" + JSONUtil.toJsonStr(returnedMessage)); } }
4.5、添加Consumer,作為消費者
package com.example.springbootrabbitmq.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class PushMessageConsumer { /** * basicAck:表示成功確認,使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。 * void basicAck(long deliveryTag, boolean multiple) * deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nack、reject等操作。 * multiple:是否批量確認,值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。 * */ @RabbitListener(queuesToDeclare = @Queue(value = "my-queue")) @RabbitHandler public void consume(String msg, Channel channel, Message message) throws IOException { try { System.out.println("消費者收到消息:" + msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("deliveryTag:" + message.getMessageProperties().getDeliveryTag()); System.out.println("redelivered:" + message.getMessageProperties().getRedelivered()); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.err.println("消息已重復(fù)處理失敗,拒絕再次接收!"); /** * 拒絕消息,requeue=false 表示不再重新入隊,如果配置了死信隊列則進入死信隊列 * basicReject:拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似。 * deliveryTag:表示消息投遞序號。 * requeue:值為 true 消息將重新入隊列。 */ channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息即將再次返回隊列處理!"); /** * requeue為是否重新回到隊列,true重新入隊 * deliveryTag:表示消息投遞序號。 * multiple:是否批量確認。 * requeue:值為 true 消息將重新入隊列。 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
4.6、啟動程序,訪問
瀏覽器訪問:http://localhost:8080/push/message/test 模擬消息進行推送。
查看控制臺,發(fā)現(xiàn)消費者正常打印出了消費信息。
打開RabbitMQ管理控制臺,可以發(fā)現(xiàn)我們的消息隊列my-queue信息。
既可以查看消息隊列的裝填,消息投遞情況等。
到此這篇關(guān)于一文掌握Springboot集成RabbitMQ的文章就介紹到這了,更多相關(guān)Springboot集成RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java面向?qū)ο蟪绦蛟O(shè)計:繼承,多態(tài)用法實例分析
這篇文章主要介紹了Java面向?qū)ο蟪绦蛟O(shè)計:繼承,多態(tài)用法,結(jié)合實例形式分析了java繼承與多態(tài)的相關(guān)概念、原理、實現(xiàn)方法與操作注意事項,需要的朋友可以參考下2020-04-04mybatis和mybatis-plus設(shè)置值為null不起作用問題及解決
Mybatis-Plus的FieldStrategy主要用于控制新增、更新和查詢時對空值的處理策略,通過配置不同的策略類型,可以靈活地處理實體對象的空值問題2025-02-02Java使用split分割無效獲取不到預(yù)期效果的解決辦法
這篇文章主要給大家介紹了關(guān)于Java使用split分割無效獲取不到預(yù)期效果的解決辦法,java的String類中有個split方法,這個是我們經(jīng)常使用到的,需要的朋友可以參考下2023-08-08SpringCloud使用Nacos保存和讀取變量的配置方法
在使用SpringCloud開發(fā)微服務(wù)時,經(jīng)常會遇到一些比較小的后臺參數(shù)配置,這些配置不足以單獨開一張表去存儲,而且其他服務(wù)會讀取該參數(shù),這篇文章主要介紹了SpringCloud使用Nacos保存和讀取變量,需要的朋友可以參考下2022-07-07springboot中RestTemplate發(fā)送HTTP請求的實現(xiàn)示例
RestTemplate是一個 spring-web 提供的執(zhí)行HTTP請求的同步阻塞式工具類,本文就來介紹一下RestTemplate發(fā)送HTTP請求,具有一定的參考價值,感興趣的可以了解一下2024-03-03