Spring Boot 集成 RocketMQ 全流程指南(從依賴引入到消息收發(fā))
前言
在分布式系統(tǒng)中,消息中間件是解耦服務(wù)、實(shí)現(xiàn)異步通信的核心組件。RocketMQ 作為阿里巴巴開源的高性能分布式消息中間件,憑借其高吞吐、低延遲、高可靠等特性,成為企業(yè)級(jí)應(yīng)用的首選。而 Spring Boot 通過其“約定優(yōu)于配置”的設(shè)計(jì)理念,極大簡化了項(xiàng)目開發(fā)的復(fù)雜度。本文將通過 手動(dòng)連接 和 配置連接 兩種方式,詳細(xì)講解如何在 Spring Boot 中集成 RocketMQ,實(shí)現(xiàn)消息的同步與異步發(fā)送,并提供完整示例代碼。
一、環(huán)境準(zhǔn)備
在開始前,請(qǐng)確保:
- JDK 17、Maven 3.6+、Spring Boot 2.7+。
- 安裝RocketMQ服務(wù)(本地或遠(yuǎn)程),推薦使用RocketMQ Docker鏡像快速搭建(可參考之前文章)。
二、示例—Springboot集成mq(手動(dòng)連接)
通過編碼方式初始化生產(chǎn)者,適用于需要?jiǎng)討B(tài)控制資源的場景。
2.1 新建項(xiàng)目
‍
2.2 引入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency>
2.3 生產(chǎn)者發(fā)送消息
- 構(gòu)建一個(gè)消息生產(chǎn)者DefaultMQProducer實(shí)例,然后指定生產(chǎn)者組為jihaiProducer;
- 指定NameServer的地址:服務(wù)器的ip:9876,因?yàn)樾枰獜腘ameServer拉取Broker的信息
- producer.start() 啟動(dòng)生產(chǎn)者
- 構(gòu)建一個(gè)內(nèi)容為:技海拾貝的消息1,然后指定這個(gè)消息往jihaishibei這個(gè)topic發(fā)送
- producer.send(msg):發(fā)送消息,打印結(jié)果
- 關(guān)閉生產(chǎn)者
public class Producer { public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)生產(chǎn)者,指定生產(chǎn)者組為jihaiProducer DefaultMQProducer producer = new DefaultMQProducer("jihaiProducer"); // 指定NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 第一次發(fā)送可能會(huì)超時(shí),設(shè)置的比較大 producer.setSendMsgTimeout(60000); // 啟動(dòng)生產(chǎn)者 producer.start(); // 創(chuàng)建一條消息 // topic為 jihaishibei // 消息內(nèi)容為 技海拾貝的消息1 // tags 為 TagA Message msg = new Message("jihaishibei", "TagA", "技海拾貝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發(fā)送消息并得到消息的發(fā)送結(jié)果,然后打印 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); // 關(guān)閉生產(chǎn)者 producer.shutdown(); } }
啟動(dòng),發(fā)送消息
在控制臺(tái)可以看到這條消息
這里就能看到發(fā)送消息的詳細(xì)信息。
左下角消息的消費(fèi)的消費(fèi),因?yàn)槲覀冞€沒有消費(fèi)者訂閱這個(gè)topic,所以左下角沒數(shù)據(jù)。
2.4 消費(fèi)者消費(fèi)消息
- 創(chuàng)建一個(gè)消費(fèi)者實(shí)例對(duì)象,指定消費(fèi)者組為jihaiConsumer
- 指定NameServer的地址:服務(wù)器的ip:9876
- 訂閱 jihaishibei這個(gè)topic的所有信息
- consumer.registerMessageListener ,這個(gè)很重要,是注冊(cè)一個(gè)監(jiān)聽器,這個(gè)監(jiān)聽器是當(dāng)有消息的時(shí)候就會(huì)回調(diào)這個(gè)監(jiān)聽器,處理消息,所以需要用戶實(shí)現(xiàn)這個(gè)接口,然后處理消息。
- 啟動(dòng)消費(fèi)者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 通過push模式消費(fèi)消息,指定消費(fèi)者組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer"); // 指定NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱這個(gè)topic下的所有的消息 consumer.subscribe("jihaishibei", "*"); // 注冊(cè)一個(gè)消費(fèi)的監(jiān)聽器,當(dāng)有消息的時(shí)候,會(huì)回調(diào)這個(gè)監(jiān)聽器來消費(fèi)消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("消費(fèi)消息:%s", new String(msg.getBody()) + "\n"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者 consumer.start(); System.out.printf("Consumer Started.%n"); } }
啟動(dòng)服務(wù),進(jìn)行消費(fèi)
在控制臺(tái),發(fā)現(xiàn)被jihaiConsumer這個(gè)消費(fèi)者組給消費(fèi)了。
三、示例2—Springboot集成mq(配置連接)
在 Spring Boot 中,可以通過配置文件簡化 RocketMQ 的連接配置。以下是在 application.yml
? 文件中進(jìn)行的配置:
3.1 配置文件修改
spring: application: name: rocket-mq-demo rocketmq: name-server: 127.0.0.1:9876 producer: group: rocket-mq-demo-producer send-message-timeout: 10000 comsumer: group: rocket-mq-demo-comsumer send-message-timeout: 10000
3.2 添加依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency>
根據(jù)需要選擇最新版本,從中央倉庫可以查看
?https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter
?
備注:如果添加rocketmq-client依賴,先注釋這個(gè)依賴
3.3 消費(fèi)者service類
import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * messageModel=MessageModel.CLUSTERING * 監(jiān)聽模式,有消息就會(huì)消費(fèi) */ @Service @RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING) public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.printf("收到消息: %s\n", s); } }
3.4 生產(chǎn)者service類
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; @Service public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate; private final String topic = "jihaishibei-topic"; // 1.同步發(fā)送消息 // 同步發(fā)送是指發(fā)送方發(fā)送一條消息后,會(huì)等待服務(wù)器返回確認(rèn)信息后再進(jìn)行后續(xù)操作。這種方式適用于需要可靠性保證的場景。 public void createAndSend(String message){ rocketMQTemplate.convertAndSend(topic, message); System.out.printf("同步發(fā)送結(jié)果: %s\n", message); } // 1.同步發(fā)送消息 // 同步發(fā)送是指發(fā)送方發(fā)送一條消息后,會(huì)等待服務(wù)器返回確認(rèn)信息后再進(jìn)行后續(xù)操作。這種方式適用于需要可靠性保證的場景。 public void sendSyncMessage(String message){ SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build()); System.out.println(sendResult.getMsgId()); System.out.printf("同步發(fā)送結(jié)果: %s\n", message); } // 2.異步發(fā)送消息 // 異步發(fā)送是指發(fā)送方發(fā)送消息后,不等待服務(wù)器返回確認(rèn)信息,而是通過回調(diào)接口處理返回結(jié)果。這種方式適用于對(duì)響應(yīng)時(shí)間要求較高的場景。 public void sendAsyncMessage(String message){ rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("異步發(fā)送成功: %s\n", sendResult); } @Override public void onException(Throwable throwable) { System.out.printf("異步發(fā)送失敗: %s\n", throwable.getMessage()); } }); } // 3.單向發(fā)送消息 // 單向發(fā)送是指發(fā)送方只負(fù)責(zé)發(fā)送消息,不關(guān)心服務(wù)器的響應(yīng)。該方式適用于對(duì)可靠性要求不高的場景,如日志收集。 public void sendOneWayMessage(String message){ rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build()); System.out.println("單向消息發(fā)送成功"); } }
3.5 測試controller類
@RequestMapping("api") @RestController public class RocketController { @Autowired private RocketMQProducer rocketMQProducer; @GetMapping("/createAndSend") public String createAndSend(@RequestParam String message) { rocketMQProducer.createAndSend(message); return "同步消息發(fā)送成功"; } @GetMapping("/sendSync") public String sendSync(@RequestParam String message) { rocketMQProducer.sendSyncMessage(message); return "同步消息發(fā)送成功"; } @GetMapping("/sendAsync") public String sendAsync(@RequestParam String message) { rocketMQProducer.sendAsyncMessage(message); return "異步消息發(fā)送中"; } @GetMapping("/sendOneWay") public String sendOneWay(@RequestParam String message) { rocketMQProducer.sendOneWayMessage(message); return "單向消息發(fā)送成功"; } }
3.6 啟動(dòng)服務(wù)
3.7 測試 同步消息1
同步消息2
異步消息
單向發(fā)送消息
四、結(jié)束語
本文通過手動(dòng)連接與配置連接兩種方式,展示了Spring Boot與RocketMQ的集成實(shí)踐。手動(dòng)連接幫助開發(fā)者理解底層API邏輯,而Spring Boot的配置化集成則極大簡化了開發(fā)流程。無論是同步消息的可靠性保障,還是異步消息的性能優(yōu)化,RocketMQ均能與Spring Boot無縫協(xié)作,為分布式系統(tǒng)提供高效的消息通信能力。
未來可進(jìn)一步探索集群部署、消息重試機(jī)制及監(jiān)控告警,以實(shí)現(xiàn)更健壯的消息服務(wù)。希望本文能為開發(fā)者快速構(gòu)建高可用的消息系統(tǒng)提供參考!?
到此這篇關(guān)于Spring Boot 集成 RocketMQ 全流程指南(從依賴引入到消息收發(fā))的文章就介紹到這了,更多相關(guān)Spring Boot 集成 RocketMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot動(dòng)態(tài)加載jar包動(dòng)態(tài)配置實(shí)例詳解
這篇文章主要給大家介紹了關(guān)于springboot動(dòng)態(tài)加載jar包動(dòng)態(tài)配置的相關(guān)資料,在項(xiàng)目開發(fā)的過程中,有時(shí)候需要?jiǎng)討B(tài)靈活的加載某個(gè)jar包并執(zhí)行其里面的方法的時(shí)候,需要的朋友可以參考下2023-11-11Java實(shí)現(xiàn)提取Word文檔表格數(shù)據(jù)
使用Java實(shí)現(xiàn)Word文檔表格數(shù)據(jù)的提取,可以確保數(shù)據(jù)處理的一致性和準(zhǔn)確性,同時(shí)大大減少所需的時(shí)間和成本,下面我們來看看具體實(shí)現(xiàn)方法吧2025-01-01springboot+mybatis-plus+oracle實(shí)現(xiàn)邏輯刪除
最近在做一個(gè)前后端分離的小項(xiàng)目,需要?jiǎng)h除用戶表的用戶,本文主要實(shí)現(xiàn)了springboot+mybatis-plus+oracle邏輯刪除,具有一定的參考價(jià)值,感興趣的可以了解一下2021-08-08MyBatis-Plus非表字段的三種處理方法小結(jié)
這篇文章主要介紹了MyBatis-Plus非表字段的三種處理方法小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Spring MVC結(jié)合Spring Data JPA實(shí)現(xiàn)按條件查詢和分頁
這篇文章主要為大家詳細(xì)介紹了Spring MVC結(jié)合Spring Data JPA實(shí)現(xiàn)按條件查詢,以及分頁效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10Springboot通過url訪問本地圖片代碼實(shí)例
這篇文章主要介紹了springboot通過url訪問本地圖片代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03解決SpringBoot在后臺(tái)接收前臺(tái)傳遞對(duì)象方式的問題
這篇文章主要介紹了解決SpringBoot在后臺(tái)接收前臺(tái)傳遞對(duì)象方式的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-01-01