SpringBoot整合RocketMQ批量發(fā)送消息的實(shí)現(xiàn)代碼
一、簡介
今天我們講講如何批量發(fā)送消息,主要還是使用方法RocketMQTemplate的syncSend方法。
1.1、特點(diǎn)
批量發(fā)送和單條發(fā)送消息的主要區(qū)別有以下幾點(diǎn):
- 網(wǎng)絡(luò)開銷 發(fā)送單條消息時(shí),每個(gè)消息都需要單獨(dú)建立網(wǎng)絡(luò)連接、發(fā)送數(shù)據(jù)包、等待響應(yīng)等,網(wǎng)絡(luò)開銷較大。批量發(fā)送可以將多條消息打包在一起發(fā)送,減少網(wǎng)絡(luò)連接建立的次數(shù),降低網(wǎng)絡(luò)開銷
- 吞吐量 由于批量發(fā)送減少了網(wǎng)絡(luò)開銷,所以可以在單位時(shí)間內(nèi)發(fā)送更多的消息,提高了吞吐量。在高并發(fā)高流量場景下,批量發(fā)送能夠發(fā)揮更好的性能
- 消息順序 單條發(fā)送消息的順序是有序的,后發(fā)送的在隊(duì)列中排在前發(fā)送的后面。而對于批量發(fā)送,一個(gè)批次內(nèi)的消息順序是固定的,但不同批次之間的消息順序是無序的,會(huì)按照到達(dá)順序存儲(chǔ)在隊(duì)列中。如果需要嚴(yán)格消息順序,單條發(fā)送更合適
- 消息重試 如果批量發(fā)送的一個(gè)批次中有部分消息發(fā)送失敗,需重發(fā)整個(gè)批次,沒有選擇重發(fā)其中部分消息的功能(涉及冪等性問題)。單條發(fā)送失敗時(shí)只需重發(fā)該單條消息
- 編程復(fù)雜度 批量發(fā)送需要構(gòu)造MessageBatch或Message列表對象,編程略微復(fù)雜些。單條發(fā)送只需構(gòu)造單個(gè)Message對象
二、Maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq</artifactId> <groupId>com.alian</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>06-send-batched-message</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
父工程已經(jīng)在我上一篇文章里,通用公共包也在我上一篇文章里有說明,包括消費(fèi)者。具體參考:SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息_java_腳本之家 (jb51.net)
三、application配置
application.properties
server.port=8005 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默認(rèn)的生產(chǎn)者組 rocketmq.producer.group=batched_group # 發(fā)送同步消息超時(shí)時(shí)間 rocketmq.producer.send-message-timeout=3000 # 用于設(shè)置在消息發(fā)送失敗后,生產(chǎn)者是否嘗試切換到下一個(gè)服務(wù)器。設(shè)置為 true 表示啟用,在發(fā)送失敗時(shí)嘗試切換到下一個(gè)服務(wù)器 rocketmq.producer.retry-next-server=true # 用于指定消息發(fā)送失敗時(shí)的重試次數(shù) rocketmq.producer.retry-times-when-send-failed=3 # 設(shè)置消息壓縮的閾值,為0表示禁用消息體的壓縮 rocketmq.producer.compress-message-body-threshold=0
四、批量發(fā)送
在 RocketMQ 中,RocketMQTemplate的syncSend方法,它允許你批量發(fā)送同步消息,主要參數(shù):
- topic:(普通消息都發(fā)送到topic=string_message_topic)
- Collection<T>:消息集合
測試類都引入依賴
@Autowired private RocketMQTemplate rocketMQTemplate;
4.1、同步消息
@Test public void syncSendBatchStringMessagesWithBuilder() { String topic = "string_message_topic"; String message = "超級喜歡Golang語言:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = MessageBuilder.withPayload(message + i) // 設(shè)置消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // 加入到列表 messageList.add(rocketMessage); } // 使用syncSend發(fā)送批量消息 SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList); log.info("同步批量發(fā)送普通消息結(jié)果:{}",sendResult); }
運(yùn)行結(jié)果:
[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:0
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:1
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:3
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:4
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:2
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:5
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:6
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:7
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:8
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:9
4.2、異步消息
@Test public void asyncSendBatchStringMessageWithBuilder() { String topic = "string_message_topic"; String message = "Alian超級喜歡Golang語言:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = MessageBuilder.withPayload(message + i) // 設(shè)置消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // 加入到列表 messageList.add(rocketMessage); } // 使用asyncSend發(fā)送批量消息 rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 異步發(fā)送成功的回調(diào)邏輯 log.info("異步批量發(fā)送普通消息成功: " + sendResult); } @Override public void onException(Throwable e) { // 異步發(fā)送失敗的回調(diào)邏輯 log.info("異步批量發(fā)送普通消息失敗: " + e.getMessage()); } }); }
運(yùn)行結(jié)果:
[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:0
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:1
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:7
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:4
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:2
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:6
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:3
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:8
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:9
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:5
4.3、順序消息
在 RocketMQ 中,RocketMQTemplate的syncSendOrderly方法,它允許你批量發(fā)送同步消息,主要參數(shù):
- topic:(和之前有區(qū)別,普通消息都發(fā)送到topic=ordered_string_message_topic)
- Collection<T>:消息集合
- hashKey:通過hashKey發(fā)送到同一個(gè)隊(duì)列
@Test public void syncSendBatchOrderlyStringMessagesWithBuilder() { String topic = "ordered_string_message_topic"; String message = "同步批量發(fā)送順序消息,超級喜歡Go語言:"; List<Message<String>> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message<String> rocketMessage = MessageBuilder.withPayload(message + i) // 設(shè)置消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // 加入到列表 messageList.add(rocketMessage); } // 使用syncSendOrderly發(fā)送批量順序消息,消費(fèi)者線程設(shè)置為1 SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, messageList, "alian_sync_ordered"); log.info("批量發(fā)送順序消息發(fā)送結(jié)果:{}",sendResult); }
運(yùn)行結(jié)果:
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:0
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:1
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:2
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:3
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:4
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:5
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:6
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:7
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:8
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:9
所以我之前說批量發(fā)送消息的topic不一樣,因?yàn)?/p>
@Slf4j @Service @RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "ORDERED_GROUP_STRING", consumeMode = ConsumeMode.ORDERLY) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("字符串消費(fèi)者接收到的消息: {}", message); // 處理消息的業(yè)務(wù)邏輯 } }
順序消息要順序消費(fèi),也就是每次是一個(gè)線程去消費(fèi),相當(dāng)于單線程,也就有序了。關(guān)鍵就是配置了:consumeMode = ConsumeMode.ORDERLY
當(dāng)然,我們也可以把消費(fèi)者線程數(shù)設(shè)置為 consumeThreadNumber = 1,也就是單線程消費(fèi)了,從而確保了消息的順序消費(fèi)(指單實(shí)例):
@RocketMQMessageListener(topic = "ordered_string_message_topic", consumerGroup = "CONCURRENT_GROUP_STRING", consumeThreadNumber = 1) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("字符串消費(fèi)者接收到的消息: {}", message); // 處理消息的業(yè)務(wù)邏輯 } }
4.4、關(guān)于異步批量發(fā)送
有可能你會(huì)寫下面的異步批量發(fā)送順序消息
@Test public void asyncSendBatchOrderlyStringMessageWithBuilder2() { String topic = "ordered_string_message_topic"; String message = "Alian超級喜歡Golang語言:"; List<String> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) { // 加入到列表 messageList.add(message + i); } // 使用 asyncSendOrderly 發(fā)送批量消息 rocketMQTemplate.asyncSendOrderly(topic, messageList, "alian_async_ordered", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 異步發(fā)送成功的回調(diào)邏輯 log.info("異步消息發(fā)送字符串消息成功: " + sendResult); } @Override public void onException(Throwable e) { // 異步發(fā)送失敗的回調(diào)邏輯 log.info("異步消息發(fā)送字符串消息失敗: " + e.getMessage()); } }); }
其實(shí)這個(gè)是不對的,最終的結(jié)果是一個(gè)把你這里的messageList,當(dāng)做了一個(gè)消息列表接收了,如下結(jié)果:
[GROUP_STRING_18] com.alian.ordered.StringMessageConsumer : 字符串消費(fèi)者接收到的消息: ["Alian超級喜歡Golang語言:0","Alian超級喜歡Golang語言:1","Alian超級喜歡Golang語言:2","Alian超級喜歡Golang語言:3","Alian超級喜歡Golang語言:4","Alian超級喜歡Golang語言:5","Alian超級喜歡Golang語言:6","Alian超級喜歡Golang語言:7","Alian超級喜歡Golang語言:8","Alian超級喜歡Golang語言:9"]
RocketMQ對于單條消息和批量消息在隊(duì)列中是如何被處理的?
- 對于單條發(fā)送的消息,RocketMQ會(huì)按照隊(duì)列中的順序,將每條消息分發(fā)給一個(gè)消費(fèi)者線程。因此,即使有多個(gè)消費(fèi)者線程,由于每條消息都被單獨(dú)處理,消費(fèi)的順序仍然會(huì)與發(fā)送的順序一致。
- 對于批量發(fā)送的消息,情況就有所不同。批量消息是作為一個(gè)整體發(fā)送的,因此在隊(duì)列中,它們被視為一個(gè)單獨(dú)的實(shí)體。當(dāng)RocketMQ從隊(duì)列中取出批量消息時(shí),它會(huì)將整個(gè)批量消息作為一個(gè)整體分發(fā)給一個(gè)消費(fèi)者線程。如果有多個(gè)消費(fèi)者線程,由于操作系統(tǒng)的線程調(diào)度策略,處理批量消息的線程可能會(huì)在處理消息的過程中被調(diào)度出去,從而允許其他線程處理后面的消息。這樣就可能導(dǎo)致消費(fèi)的順序與發(fā)送的順序不一致。
4.5、結(jié)論
為此我測試了多次,得到結(jié)論:
- 單條發(fā)送消息到同一個(gè)隊(duì)列,使用多個(gè)消費(fèi)線程消費(fèi)該隊(duì)列,由于消息本身是有序的,所以消費(fèi)順序也是有序的
- 單批次批量發(fā)送消息到同一個(gè)隊(duì)列,使用單個(gè)消費(fèi)線程消費(fèi)該隊(duì)列,由于消費(fèi)線程是單一的,所以消費(fèi)順序也是有序的
- 單批次批量發(fā)送消息到同一個(gè)隊(duì)列,使用多個(gè)消費(fèi)線程消費(fèi)時(shí),消費(fèi)順序就不是有序的了
五、其他
既然知道批量消息是作為一個(gè)整體的,那么肯定就會(huì)對消息大小有限制,在 Apache RocketMQ 中,批量消息的大小默認(rèn)限制是4MB。這意味著,你不能發(fā)送總大小超過4MB的批量消息。
如果你想修改這個(gè)限制,你需要修改RocketMQ的配置。具體的修改方法如下:
- 找到RocketMQ的配置文件broker.conf,這個(gè)文件通常位于RocketMQ安裝目錄的conf目錄下。
- 在broker.conf文件中,找到maxMessageSize這個(gè)配置項(xiàng)。這個(gè)配置項(xiàng)決定了批量消息的最大大小。
- 修改maxMessageSize的值為你想要的大小。注意,這個(gè)值是以字節(jié)為單位的,所以如果你想設(shè)置批量消息的最大大小為8MB,你應(yīng)該設(shè)置maxMessageSize=8388608。
- 保存并關(guān)閉broker.conf文件。
- 重啟RocketMQ的Broker服務(wù),以使新的配置生效。
雖然你可以通過修改配置來增加批量消息的最大大小,但是你應(yīng)該謹(jǐn)慎地考慮這個(gè)決定。增加批量消息的最大大小可能會(huì)增加Broker的內(nèi)存使用量,并可能影響到消息的發(fā)送和接收性能。因此,在修改這個(gè)配置之前,你應(yīng)該先考慮你的應(yīng)用的需求和Broker的性能。
因?yàn)閮?yōu)先的是@RocketMQMessageListener 注解中設(shè)置 consumerGroup 和messageModel 參數(shù)。
以上就是SpringBoot整合RocketMQ批量發(fā)送消息的實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RocketMQ批量發(fā)送消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringCloud Nacos作為配置中心超詳細(xì)講解
這篇文章主要介紹了Springcloud中的Nacos作為配置中心,本文以用戶微服務(wù)為例,進(jìn)行統(tǒng)一的配置,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-12-12IDEA部署Docker鏡像的實(shí)現(xiàn)示例
本文主要介紹了IDEA部署Docker鏡像的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04Java的深拷貝與淺拷貝的幾種實(shí)現(xiàn)方式
這篇文章主要介紹了Java的深拷貝與淺拷貝的幾種實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01Spring Boot 配置 IDEA和DevTools 熱部署的方法
這篇文章主要介紹了Spring Boot 配置 IDEA和DevTools 熱部署的方法,需要的朋友可以參考下2018-02-02