欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RocketMQ批量發(fā)送消息的實(shí)現(xiàn)代碼

 更新時(shí)間:2024年04月01日 10:41:26   作者:嘉禾嘉寧papa  
這篇文章主要介紹了SpringBoot整合RocketMQ批量發(fā)送消息的實(shí)現(xiàn),文中通過代碼示例講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下

一、簡介

今天我們講講如何批量發(fā)送消息,主要還是使用方法RocketMQTemplatesyncSend方法。

1.1、特點(diǎn)

批量發(fā)送和單條發(fā)送消息的主要區(qū)別有以下幾點(diǎn):

  • 網(wǎng)絡(luò)開銷 發(fā)送單條消息時(shí),每個(gè)消息都需要單獨(dú)建立網(wǎng)絡(luò)連接、發(fā)送數(shù)據(jù)包、等待響應(yīng)等,網(wǎng)絡(luò)開銷較大。批量發(fā)送可以將多條消息打包在一起發(fā)送,減少網(wǎng)絡(luò)連接建立的次數(shù),降低網(wǎng)絡(luò)開銷
  • 吞吐量 由于批量發(fā)送減少了網(wǎng)絡(luò)開銷,所以可以在單位時(shí)間內(nèi)發(fā)送更多的消息,提高了吞吐量。在高并發(fā)高流量場景下,批量發(fā)送能夠發(fā)揮更好的性能
  • 消息順序 單條發(fā)送消息的順序是有序的,后發(fā)送的在隊(duì)列中排在前發(fā)送的后面。而對于批量發(fā)送,一個(gè)批次內(nèi)的消息順序是固定的,但不同批次之間的消息順序是無序的,會(huì)按照到達(dá)順序存儲(chǔ)在隊(duì)列中。如果需要嚴(yán)格消息順序,單條發(fā)送更合適
  • 消息重試 如果批量發(fā)送的一個(gè)批次中有部分消息發(fā)送失敗,需重發(fā)整個(gè)批次,沒有選擇重發(fā)其中部分消息的功能(涉及冪等性問題)。單條發(fā)送失敗時(shí)只需重發(fā)該單條消息
  • 編程復(fù)雜度 批量發(fā)送需要構(gòu)造MessageBatch或Message列表對象,編程略微復(fù)雜些。單條發(fā)送只需構(gòu)造單個(gè)Message對象

二、Maven依賴

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq</artifactId>
        <groupId>com.alian</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>06-send-batched-message</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common-rocketmq-dto</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

父工程已經(jīng)在我上一篇文章里,通用公共包也在我上一篇文章里有說明,包括消費(fèi)者。具體參考:SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息_java_腳本之家 (jb51.net)

三、application配置

application.properties

server.port=8005

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默認(rèn)的生產(chǎn)者組
rocketmq.producer.group=batched_group
# 發(fā)送同步消息超時(shí)時(shí)間
rocketmq.producer.send-message-timeout=3000
# 用于設(shè)置在消息發(fā)送失敗后,生產(chǎn)者是否嘗試切換到下一個(gè)服務(wù)器。設(shè)置為 true 表示啟用,在發(fā)送失敗時(shí)嘗試切換到下一個(gè)服務(wù)器
rocketmq.producer.retry-next-server=true
# 用于指定消息發(fā)送失敗時(shí)的重試次數(shù)
rocketmq.producer.retry-times-when-send-failed=3
# 設(shè)置消息壓縮的閾值,為0表示禁用消息體的壓縮
rocketmq.producer.compress-message-body-threshold=0

四、批量發(fā)送

在 RocketMQ 中,RocketMQTemplatesyncSend方法,它允許你批量發(fā)送同步消息,主要參數(shù):

  • topic:(普通消息都發(fā)送到topic=string_message_topic
  • Collection<T>:消息集合

測試類都引入依賴

	@Autowired
    private RocketMQTemplate rocketMQTemplate;

4.1、同步消息

    @Test
    public void syncSendBatchStringMessagesWithBuilder() {
        String topic = "string_message_topic";
        String message = "超級喜歡Golang語言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 設(shè)置消息類型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用syncSend發(fā)送批量消息
        SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);
        log.info("同步批量發(fā)送普通消息結(jié)果:{}",sendResult);
    }

運(yùn)行結(jié)果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:0
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:1
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:3
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:4
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:2
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:5
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:6
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:7
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:8
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送普通消息:9

4.2、異步消息

    @Test
    public void asyncSendBatchStringMessageWithBuilder() {
        String topic = "string_message_topic";
        String message = "Alian超級喜歡Golang語言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 設(shè)置消息類型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用asyncSend發(fā)送批量消息
        rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 異步發(fā)送成功的回調(diào)邏輯
                log.info("異步批量發(fā)送普通消息成功: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                // 異步發(fā)送失敗的回調(diào)邏輯
                log.info("異步批量發(fā)送普通消息失敗: " + e.getMessage());
            }
        });
    }

運(yùn)行結(jié)果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:0
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:1
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:7
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:4
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:2
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:6
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:3
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:8
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:9
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消費(fèi)者接收到的消息: 異步批量發(fā)送普通消息:5

4.3、順序消息

在 RocketMQ 中,RocketMQTemplatesyncSendOrderly方法,它允許你批量發(fā)送同步消息,主要參數(shù):

  • topic:(和之前有區(qū)別,普通消息都發(fā)送到topic=ordered_string_message_topic
  • Collection<T>:消息集合
  • hashKey:通過hashKey發(fā)送到同一個(gè)隊(duì)列
    @Test
    public void syncSendBatchOrderlyStringMessagesWithBuilder() {
        String topic = "ordered_string_message_topic";
        String message = "同步批量發(fā)送順序消息,超級喜歡Go語言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 設(shè)置消息類型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用syncSendOrderly發(fā)送批量順序消息,消費(fèi)者線程設(shè)置為1
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, messageList, "alian_sync_ordered");
        log.info("批量發(fā)送順序消息發(fā)送結(jié)果:{}",sendResult);
    }

運(yùn)行結(jié)果:

[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:0
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:1
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:2
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:3
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:4
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:5
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:6
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:7
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:8
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: 同步批量發(fā)送順序消息,超級喜歡Go語言:9

所以我之前說批量發(fā)送消息的topic不一樣,因?yàn)?/p>

@Slf4j
@Service
@RocketMQMessageListener(topic = "ordered_string_message_topic", 
						consumerGroup = "ORDERED_GROUP_STRING", 
						consumeMode = ConsumeMode.ORDERLY)
public class StringMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("字符串消費(fèi)者接收到的消息: {}", message);
        // 處理消息的業(yè)務(wù)邏輯
    }
}

順序消息要順序消費(fèi),也就是每次是一個(gè)線程去消費(fèi),相當(dāng)于單線程,也就有序了。關(guān)鍵就是配置了:consumeMode = ConsumeMode.ORDERLY

當(dāng)然,我們也可以把消費(fèi)者線程數(shù)設(shè)置為 consumeThreadNumber = 1,也就是單線程消費(fèi)了,從而確保了消息的順序消費(fèi)(指單實(shí)例):

@RocketMQMessageListener(topic = "ordered_string_message_topic", 
						consumerGroup = "CONCURRENT_GROUP_STRING",
						consumeThreadNumber = 1)
public class StringMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("字符串消費(fèi)者接收到的消息: {}", message);
        // 處理消息的業(yè)務(wù)邏輯
    }
}

4.4、關(guān)于異步批量發(fā)送

有可能你會(huì)寫下面的異步批量發(fā)送順序消息

	@Test
    public void asyncSendBatchOrderlyStringMessageWithBuilder2() {
        String topic = "ordered_string_message_topic";
        String message = "Alian超級喜歡Golang語言:";
        List<String> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            // 加入到列表
            messageList.add(message + i);
        }
        // 使用 asyncSendOrderly 發(fā)送批量消息
        rocketMQTemplate.asyncSendOrderly(topic, messageList, "alian_async_ordered", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 異步發(fā)送成功的回調(diào)邏輯
                log.info("異步消息發(fā)送字符串消息成功: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                // 異步發(fā)送失敗的回調(diào)邏輯
                log.info("異步消息發(fā)送字符串消息失敗: " + e.getMessage());
            }
        });
    }

其實(shí)這個(gè)是不對的,最終的結(jié)果是一個(gè)把你這里的messageList,當(dāng)做了一個(gè)消息列表接收了,如下結(jié)果:

[GROUP_STRING_18] com.alian.ordered.StringMessageConsumer  : 字符串消費(fèi)者接收到的消息: ["Alian超級喜歡Golang語言:0","Alian超級喜歡Golang語言:1","Alian超級喜歡Golang語言:2","Alian超級喜歡Golang語言:3","Alian超級喜歡Golang語言:4","Alian超級喜歡Golang語言:5","Alian超級喜歡Golang語言:6","Alian超級喜歡Golang語言:7","Alian超級喜歡Golang語言:8","Alian超級喜歡Golang語言:9"]

RocketMQ對于單條消息和批量消息在隊(duì)列中是如何被處理的?

  • 對于單條發(fā)送的消息,RocketMQ會(huì)按照隊(duì)列中的順序,將每條消息分發(fā)給一個(gè)消費(fèi)者線程。因此,即使有多個(gè)消費(fèi)者線程,由于每條消息都被單獨(dú)處理,消費(fèi)的順序仍然會(huì)與發(fā)送的順序一致。
  • 對于批量發(fā)送的消息,情況就有所不同。批量消息是作為一個(gè)整體發(fā)送的,因此在隊(duì)列中,它們被視為一個(gè)單獨(dú)的實(shí)體。當(dāng)RocketMQ從隊(duì)列中取出批量消息時(shí),它會(huì)將整個(gè)批量消息作為一個(gè)整體分發(fā)給一個(gè)消費(fèi)者線程。如果有多個(gè)消費(fèi)者線程,由于操作系統(tǒng)的線程調(diào)度策略,處理批量消息的線程可能會(huì)在處理消息的過程中被調(diào)度出去,從而允許其他線程處理后面的消息。這樣就可能導(dǎo)致消費(fèi)的順序與發(fā)送的順序不一致。

4.5、結(jié)論

為此我測試了多次,得到結(jié)論:

  • 單條發(fā)送消息到同一個(gè)隊(duì)列,使用多個(gè)消費(fèi)線程消費(fèi)該隊(duì)列,由于消息本身是有序的,所以消費(fèi)順序也是有序的
  • 單批次批量發(fā)送消息到同一個(gè)隊(duì)列,使用單個(gè)消費(fèi)線程消費(fèi)該隊(duì)列,由于消費(fèi)線程是單一的,所以消費(fèi)順序也是有序的
  • 單批次批量發(fā)送消息到同一個(gè)隊(duì)列,使用多個(gè)消費(fèi)線程消費(fèi)時(shí),消費(fèi)順序就不是有序的了

五、其他

既然知道批量消息是作為一個(gè)整體的,那么肯定就會(huì)對消息大小有限制,在 Apache RocketMQ 中,批量消息的大小默認(rèn)限制是4MB。這意味著,你不能發(fā)送總大小超過4MB的批量消息。

如果你想修改這個(gè)限制,你需要修改RocketMQ的配置。具體的修改方法如下:

  • 找到RocketMQ的配置文件broker.conf,這個(gè)文件通常位于RocketMQ安裝目錄的conf目錄下。
  • 在broker.conf文件中,找到maxMessageSize這個(gè)配置項(xiàng)。這個(gè)配置項(xiàng)決定了批量消息的最大大小。
  • 修改maxMessageSize的值為你想要的大小。注意,這個(gè)值是以字節(jié)為單位的,所以如果你想設(shè)置批量消息的最大大小為8MB,你應(yīng)該設(shè)置maxMessageSize=8388608。
  • 保存并關(guān)閉broker.conf文件。
  • 重啟RocketMQ的Broker服務(wù),以使新的配置生效。

雖然你可以通過修改配置來增加批量消息的最大大小,但是你應(yīng)該謹(jǐn)慎地考慮這個(gè)決定。增加批量消息的最大大小可能會(huì)增加Broker的內(nèi)存使用量,并可能影響到消息的發(fā)送和接收性能。因此,在修改這個(gè)配置之前,你應(yīng)該先考慮你的應(yīng)用的需求和Broker的性能。

因?yàn)閮?yōu)先的是@RocketMQMessageListener 注解中設(shè)置 consumerGroup 和messageModel 參數(shù)。

以上就是SpringBoot整合RocketMQ批量發(fā)送消息的實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RocketMQ批量發(fā)送消息的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 使用springboot單例模式與線程安全問題踩的坑

    使用springboot單例模式與線程安全問題踩的坑

    這篇文章主要介紹了使用springboot單例模式與線程安全問題踩的坑,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Protobuf的簡要介紹及使用詳解

    Protobuf的簡要介紹及使用詳解

    這篇文章主要介紹了Protobuf的簡要介紹及使用,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-10-10
  • JAVA控制流程break?continue的示例代碼

    JAVA控制流程break?continue的示例代碼

    JAVA流程控制中有相關(guān)代碼可以終止整個(gè)流程的進(jìn)程,他們就是(break和continue),本文通過實(shí)例代碼介紹下JAVA控制流程break?continue的相關(guān)知識(shí),感興趣的朋友一起看看吧
    2022-03-03
  • Kryo序列化及反序列化用法示例

    Kryo序列化及反序列化用法示例

    這篇文章主要介紹了Kryo序列化及反序列化用法示例,小編覺得挺不錯(cuò)的,這里分享給大家,需要的朋友可以參考下。
    2017-10-10
  • SpringCloud Nacos作為配置中心超詳細(xì)講解

    SpringCloud Nacos作為配置中心超詳細(xì)講解

    這篇文章主要介紹了Springcloud中的Nacos作為配置中心,本文以用戶微服務(wù)為例,進(jìn)行統(tǒng)一的配置,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-12-12
  • Java編程中的條件判斷之if語句的用法詳解

    Java編程中的條件判斷之if語句的用法詳解

    這篇文章主要介紹了Java編程中的條件判斷之if語句的用法詳解,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-11-11
  • IDEA部署Docker鏡像的實(shí)現(xiàn)示例

    IDEA部署Docker鏡像的實(shí)現(xiàn)示例

    本文主要介紹了IDEA部署Docker鏡像的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Java的深拷貝與淺拷貝的幾種實(shí)現(xiàn)方式

    Java的深拷貝與淺拷貝的幾種實(shí)現(xiàn)方式

    這篇文章主要介紹了Java的深拷貝與淺拷貝的幾種實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • Spring Boot 配置 IDEA和DevTools 熱部署的方法

    Spring Boot 配置 IDEA和DevTools 熱部署的方法

    這篇文章主要介紹了Spring Boot 配置 IDEA和DevTools 熱部署的方法,需要的朋友可以參考下
    2018-02-02
  • Java中如何避免sql注入實(shí)例詳解

    Java中如何避免sql注入實(shí)例詳解

    SQL注入是最常見的攻擊方式之一,它不是利用操作系統(tǒng)或其它系統(tǒng)的漏洞來實(shí)現(xiàn)攻擊的,而是程序員因?yàn)闆]有做好判斷,被不法用戶鉆了SQL的空子,下面這篇文章主要給大家介紹了關(guān)于Java中如何避免sql注入的相關(guān)資料,需要的朋友可以參考下
    2022-01-01

最新評論