SpringBoot整合RocketMQ實現(xiàn)發(fā)送同步消息
一、簡介
RocketMQ 是一款開源的分布式消息中間件,由阿里巴巴開源。由阿里巴巴集團開發(fā)并開源,目前被捐贈給Apache基金會,并入選孵化器項目,2017年從Apache基金會畢業(yè)后,RocketMQ被指定為頂級項目(TLP)。它具有高可用性、高性能、低延遲等特點,廣泛應(yīng)用于阿里巴巴集團內(nèi)部以及眾多外部企業(yè)的業(yè)務(wù)系統(tǒng)中。
1.1、RocketMQ 主要特點
- 分布式架構(gòu):RocketMQ 是基于分布式架構(gòu)設(shè)計的,支持水平擴展,可以靈活地部署和擴展。
- 高可用性:RocketMQ 支持主從架構(gòu),消息存儲采用主從復(fù)制的方式,保證了消息的高可用性和可靠性。
- 高性能:RocketMQ 在消息存儲、消息傳輸?shù)确矫孢M行了優(yōu)化,具有較高的吞吐量和較低的延遲。
- 豐富的特性:RocketMQ 提供了豐富的特性,包括順序消息、延遲消息、事務(wù)消息、消息過濾、消息軌跡等,滿足了不同場景下的需求。
- 監(jiān)控和管理:RocketMQ 提供了豐富的監(jiān)控和管理功能,可以通過控制臺或者監(jiān)控工具實時監(jiān)控消息的狀態(tài)和性能指標。
- 開源社區(qū)支持:RocketMQ 是一款開源的消息中間件,擁有活躍的開源社區(qū),提供了豐富的文檔和示例,為用戶提供了便利的支持和幫助。
1.2、RocketMQ 核心組件
RocketMQ 的架構(gòu)主要包括以下核心組件:
- NameServer:NameServer 是 RocketMQ 的路由管理組件,負責管理 Broker 的路由信息??蛻舳嗽诎l(fā)送消息或者消費消息時,需要先從 NameServer 獲取相應(yīng)的 Broker 地址,然后再與 Broker 建立連接。
- Broker:Broker 是 RocketMQ 的消息存儲和傳輸組件,負責存儲消息以及向消費者傳遞消息。一個 RocketMQ 系統(tǒng)可以包含多個 Broker,每個 Broker 負責存儲一部分消息數(shù)據(jù),并提供相應(yīng)的消息服務(wù)。
- Producer:Producer 是消息的生產(chǎn)者,負責產(chǎn)生消息并將消息發(fā)送到 Broker 中。Producer 將消息發(fā)送到指定的 Topic,然后由 Broker 存儲并傳遞給相應(yīng)的消費者。
- Consumer:Consumer 是消息的消費者,負責訂閱并消費 Broker 中的消息。Consumer 通過訂閱指定的 Topic 來接收消息,并進行相應(yīng)的業(yè)務(wù)處理。
- Topic:Topic 是消息的主題,用于對消息進行分類和管理。Producer 將消息發(fā)送到指定的 Topic,而 Consumer 則訂閱相應(yīng)的 Topic 來接收消息。
- Message Queue:Message Queue 是消息隊列,用于存儲消息。每個 Topic 可以分為多個 Message Queue,每個 Message Queue 保存了一部分消息,多個 Message Queue 組成了一個 Topic 的完整消息存儲。
總的來說,RocketMQ是阿里推出的優(yōu)秀開源分布式消息中間件,具有高性能、高可靠、高并發(fā)等優(yōu)點,是構(gòu)建分布式系統(tǒng)不可或缺的基礎(chǔ)組件之一。
1.3、概念
同步發(fā)送指的是生產(chǎn)者在發(fā)送消息后,會阻塞當前線程,直到收到Broker的發(fā)送響應(yīng)后才返回,響應(yīng)中包含消息是否發(fā)送成功的狀態(tài)。同步發(fā)送的優(yōu)缺點如下:
優(yōu)點:
- 發(fā)送可靠性高,能夠立即得知發(fā)送結(jié)果
- 發(fā)送反饋直接返回,不需要通過專門的監(jiān)聽通道接收結(jié)果
缺點:
- 當Broker發(fā)生故障或者網(wǎng)絡(luò)延遲時,生產(chǎn)者線程會被阻塞,影響發(fā)送效率
- 對于發(fā)送端吞吐量較高的場景,同步發(fā)送存在性能瓶頸
1.4、場景
同步發(fā)送對消息可靠性傳輸有較高要求的場景,如通知消息等,發(fā)送端對發(fā)送吞吐量要求不是特別高的場景
二、父工程
因為這個系統(tǒng)會有很多RocketMQ的知識,我準備拆開寫,避免頻繁的導(dǎo)入依賴和包,我這里采用分模塊開發(fā)。
2.1、父工程依賴
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </dependencyManagement>
2.2、公共模塊
這里需要注意的是下面這個依賴,其實就是一個會員類。如果你傳輸?shù)牟皇菍ο笠部梢圆灰疫@里演示就創(chuàng)建了。
<dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
Member.java
package com.alian.common; import lombok.Data; import java.util.Date; @Data public class Member { private Long id; private String memberName; private int age; private Date birthday; }
后續(xù)的項目都會用到這個父工程和公共模塊,后面就不再過多說明了。
三、生產(chǎn)者
我們在父工程下新建一個模塊用于發(fā)送同步消息。
3.1 Maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq</artifactId> <groupId>com.alian</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>01-send-sync-message</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
3.2 application配置
application.properties
server.port=8001 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默認的生產(chǎn)者組 rocketmq.producer.group=sync_group # 發(fā)送同步消息超時時間 rocketmq.producer.send-message-timeout=3000 # 用于設(shè)置在消息發(fā)送失敗后,生產(chǎn)者是否嘗試切換到下一個服務(wù)器。設(shè)置為 true 表示啟用,在發(fā)送失敗時嘗試切換到下一個服務(wù)器 rocketmq.producer.retry-next-server=true # 用于指定消息發(fā)送失敗時的重試次數(shù) rocketmq.producer.retry-times-when-send-failed=3 # 設(shè)置消息壓縮的閾值,為0表示禁用消息體的壓縮 rocketmq.producer.compress-message-body-threshold=0
3.3 發(fā)送字符串消息
消息的發(fā)送比較簡單,我們直接引用
@Autowired private RocketMQTemplate rocketMQTemplate;
此對象封裝了一系列的消息發(fā)送方法。
SendStrMessageTest.java
package com.alian.sync; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @Slf4j @SpringBootTest public class SendStrMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void syncSendStringMessage() { String topic = "string_message_topic"; String message = "我是一條同步文本消息:syncSendStringMessage"; SendResult sendResult = rocketMQTemplate.syncSend(topic, message); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @Test public void syncSendStringMessageWithBuilder() { String topic = "string_message_topic"; String message = "我是一條同步的文本消息:syncSendStringMessageWithBuilder"; Message<String> msg = MessageBuilder.withPayload(message) // 消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @Test public void syncSendStringMessageWithBuilderTimeOut() { String topic = "string_message_topic"; String message = "我是一條同步的文本消息:syncSendStringMessageWithBuilderTimeOut"; Message<String> msg = MessageBuilder.withPayload(message) // 消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // 3秒發(fā)送超時 SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
這里需要注意的是,我發(fā)送字符串消息的時候,topic都是 string_message_topic,因為我這里是本地開發(fā)環(huán)境,并且我在配置文件中配置了
autoCreateTopicEnable=true
當該參數(shù)設(shè)置為true時,如果生產(chǎn)者在發(fā)送消息時使用了一個在Broker端不存在的Topic,則Broker會自動創(chuàng)建該Topic,允許消息正常發(fā)送和存儲。
當該參數(shù)設(shè)置為false時,如果生產(chǎn)者使用了不存在的Topic,則Broker會直接拒絕發(fā)送請求,不會自動創(chuàng)建Topic。
官方對該參數(shù)的解釋是:自動創(chuàng)建Topic的特性主要是為了方便,但也可能帶來一些風險,比如有的應(yīng)用程序由于編碼上的低級錯誤導(dǎo)致無意中創(chuàng)建了大量的Topic。因此,生產(chǎn)環(huán)境建議將該參數(shù)設(shè)置為false,只有手工創(chuàng)建所需的Topic。
3.4 發(fā)送JSON消息
SendJsonMessageTest.java
package com.alian.sync; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.HashMap; import java.util.Map; @Slf4j @SpringBootTest public class SendJsonMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendJsonMessage() { String topic = "json_message_topic"; JSONObject json = new JSONObject(); json.put("name", "Alian"); json.put("age", "28"); json.put("hobby", "java"); SendResult sendResult = rocketMQTemplate.syncSend(topic, json); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @Test public void sendJsonMessageWithBuilder() { String topic = "json_message_topic"; JSONObject json = new JSONObject(); json.put("name", "Alian"); json.put("age", "28"); json.put("hobby", "java"); Message<JSONObject> msg = MessageBuilder.withPayload(json) // 消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @Test public void sendMapMessage() { String topic = "json_message_topic"; Map<String, String> map = new HashMap<>(); map.put("1", "java"); map.put("2", "go"); map.put("3", "c"); map.put("4", "vue"); map.put("5", "react"); SendResult sendResult = rocketMQTemplate.syncSend(topic, map); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
這里需要注意的是,我發(fā)送JSON消息的時候,topic都是 json_message_topic,這里Map消息也能被JSONObject消費。
3.5 發(fā)送Java對象消息
SendJavaObjectMessageTest.java
package com.alian.sync; import com.alian.common.Member; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.Date; import java.util.HashMap; import java.util.Map; @Slf4j @SpringBootTest public class SendJavaObjectMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendJavaObjectMessage() { String topic = "java_object_message_topic"; Member member = new Member(); member.setId(10086L); member.setMemberName("Alian"); member.setAge(28); member.setBirthday(new Date()); SendResult sendResult = rocketMQTemplate.syncSend(topic, member); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @Test public void sendJavaObjectMessageWithBuilder() { String topic = "java_object_message_topic"; Member member = new Member(); member.setId(10086L); member.setMemberName("Alian"); member.setAge(28); member.setBirthday(new Date()); Message<Member> msg = MessageBuilder.withPayload(member) // 設(shè)置消息類型 .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步發(fā)送返回的結(jié)果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
這里需要注意的是,我發(fā)送java對象消息的時候,topic都是 java_object_message_topic
四、消費者
我們在父工程下新建一個模塊用于發(fā)送同步消息。
4.1 Maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq</artifactId> <groupId>com.alian</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>08-comsume-concurrent</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
4.2 application配置
application.properties
server.port=8008 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默認的消費者組 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 批量拉取消息的數(shù)量 rocketmq.consumer.pull-batch-size=10 # 集群消費模式 rocketmq.consumer.message-model=CLUSTERING
實際上對于本文來說,下面兩個配置不用配置,也不會生效。
# 默認的消費者組 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 集群消費模式 rocketmq.consumer.message-model=CLUSTERING
因為優(yōu)先的是@RocketMQMessageListener 注解中設(shè)置 consumerGroup 和messageModel 參數(shù)。
4.3 消費字符串消息
StringMessageConsumer.java
package com.alian.concurrent; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "string_message_topic", consumerGroup = "CONCURRENT_GROUP_STRING", consumeThreadNumber = 1) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("字符串消費者接收到的消息: {}", message); // 處理消息的業(yè)務(wù)邏輯 } }
如果要消費消息,我們需要實現(xiàn)RocketMQListener<T>,實現(xiàn)onMessage方法。發(fā)送的是什么對象,我們接收就是什么對象,也就是T是什么對象。生產(chǎn)者發(fā)送的字符串消息,我們這里就用String接收,也就是RocketMQListener<String>。
package org.apache.rocketmq.spring.core; public interface RocketMQListener<T> { void onMessage(T var1); }
同時加上@RocketMQMessageListener注解,主要用到三個注解
- topic:指定該消費者訂閱的Topic名稱,可以是單個Topic,也可以是用||分隔的多個Topic
- consumerGroup:指定該消費者所屬的消費組名稱,消費組用于組織同一類消費實例,相同消費組的消費實例可以消費完全相同的消息,RocketMQ 通過消費者組(Consumer Group)來維護不同消費者的消費進度。 每個消費者組都有一個消費進度(offset),用于標記該組下的消費者在某個主題(Topic)和隊列(Queue)上已經(jīng)消費到的位置。
需要注意的是:如果Topic不存在,只有在生產(chǎn)者發(fā)送消息時,并且autoCreateTopicEnable設(shè)置為true的情況下,Broker端才會自動創(chuàng)建該Topic。消費者啟動時,即使autoCreateTopicEnable=true,也不會自動創(chuàng)建不存在的Topic。
具體來說:
- 生產(chǎn)者啟動并發(fā)送消息到一個不存在的Topic時:
- 如果autoCreateTopicEnable=true,Broker會自動創(chuàng)建該Topic,允許消息發(fā)送成功
- 如果autoCreateTopicEnable=false,Broker會拒絕消息發(fā)送,報錯該Topic不存在
- 消費者啟動訂閱一個不存在的Topic時:
- 無論autoCreateTopicEnable的值是true還是false,Broker都不會自動創(chuàng)建該Topic
- 消費者會啟動成功,但獲取不到消息,處于持續(xù)等待狀態(tài)
所以生產(chǎn)者發(fā)送消息時才可能自動創(chuàng)建Topic,而消費者啟動時是不會自動創(chuàng)建Topic的。
4.4 消費JSON消息
JsonMessageConsumer.java
package com.alian.concurrent; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "json_message_topic", consumerGroup = "CONCURRENT_GROUP_JSON") public class JsonMessageConsumer implements RocketMQListener<JSONObject> { @Override public void onMessage(JSONObject json) { log.info("json消費者接收到的消息: {}", json); // 處理消息的業(yè)務(wù)邏輯 } }
生產(chǎn)者發(fā)送的JSONObject消息,我們這里就用JSONObject接收,也就是RocketMQListener<JSONObject>
4.5 消費Java對象消息
JavaObjectMessageConsumer.java
package com.alian.concurrent; import com.alian.common.Member; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "java_object_message_topic", consumerGroup = "CONCURRENT_GROUP_JAVA_OBJECT") public class JavaObjectMessageConsumer implements RocketMQListener<Member> { @Override public void onMessage(Member member) { // 因為發(fā)序列化的原因Member必須是同一個包 log.info("java對象消費者接收到的消息: {}", member); // 處理消息的業(yè)務(wù)邏輯 } }
生產(chǎn)者發(fā)送的Member消息,我們這里就用Member接收,也就是RocketMQListener<Member>。
這里需要再次說明下發(fā)送java對象消息時,因為反序列的原因,所以生產(chǎn)者和消費者使用的是公共包里同一個對象,也就是發(fā)送和接收的對象的包路徑要一致。
五、部分運行結(jié)果
5.1、字符串消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F000001339418B4AAC22DB848C70000, offsetMsgId=C0A800EA00002A9F0000000000000000, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0] 字符串消費者接收到的消息: 我是一條同步文本消息:syncSendStringMessage
5.2、json消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F00000137FC18B4AAC22DB9D42F0000, offsetMsgId=C0A800EA00002A9F0000000000000146, messageQueue=MessageQueue [topic=json_message_topic, brokerName=broker-a, queueId=1], queueOffset=0] json消費者接收到的消息: {"name":"Alian","age":"28","hobby":"java"}
5.3、java對象消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F000001098C18B4AAC22DBACEB50000, offsetMsgId=C0A800EA00002A9F0000000000000270, messageQueue=MessageQueue [topic=java_object_message_topic, brokerName=broker-a, queueId=0], queueOffset=0] java對象消費者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Sat Mar 09 21:06:57 CST 2024)
5.4、現(xiàn)象說明
為什么在rocketmq中當autoCreateTopicEnable=true,先啟動消費者,然后生產(chǎn)者第一次向一個未創(chuàng)建的topic中發(fā)送消息時,消息發(fā)送成功了(馬上返回成功了),但是消費者要等一段時間才能收到?這種現(xiàn)象的原因是RocketMQ在創(chuàng)建Topic時,存在一個延遲同步的過程。具體來說:
- 當生產(chǎn)者發(fā)送消息到一個不存在的Topic時,Broker會自動創(chuàng)建該Topic。
- 但是Broker創(chuàng)建Topic后,并不會立即將其元數(shù)據(jù)信息同步給所有的消費者,而是異步延遲同步。
- 生產(chǎn)者發(fā)送消息時,Broker會立即返回發(fā)送成功響應(yīng),因為它只需將消息持久化存儲即可。
- 消費者在啟動時已訂閱了該Topic,但由于元數(shù)據(jù)未同步,暫時無法獲知該Topic的路由信息,所以無法立即開始消費。
- 過了一段時間后,Broker將新創(chuàng)建的Topic元數(shù)據(jù)同步給了消費者,消費者才開始從該Topic拉取并消費消息。
這種延遲同步機制是RocketMQ的一個設(shè)計,目的是為了減小創(chuàng)建Topic時對消費者的影響,避免大量消費者同時更新元數(shù)據(jù)造成系統(tǒng)抖動。
我們可以通過調(diào)整Broker的配置項來控制這個延遲同步時間,比如:
- brokerTopicPutEnable=true開啟自動創(chuàng)建Topic功能
- topicDelayEnable=true開啟延時創(chuàng)建Topic功能
- topicDelayOffsetEnabled=true開啟延時創(chuàng)建Topic偏移量同步功能
通過調(diào)小topicDelayOffsetInterval可以縮短元數(shù)據(jù)同步延遲時間,但也會增加系統(tǒng)開銷。
所以這種現(xiàn)象實際上是RocketMQ的一個正常設(shè)計行為,目的是為了系統(tǒng)整體的健壯性和可用性。如果應(yīng)用對延遲時間不太敏感,保持默認配置即可;如果對延遲敏感,可以適當調(diào)小延遲同步時間。
以上就是SpringBoot整合RocketMQ實現(xiàn)發(fā)送同步消息的詳細內(nèi)容,更多關(guān)于SpringBoot RocketMQ發(fā)送同步消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實戰(zhàn)在線選課系統(tǒng)的實現(xiàn)流程
讀萬卷書不如行萬里路,只學書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實現(xiàn)一個在線選課系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11數(shù)據(jù)庫連接池c3p0配置_動力節(jié)點Java學院整理
這篇文章主要為大家詳細介紹了數(shù)據(jù)庫連接池c3p0配置的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08SpringBoot引入Thymeleaf的實現(xiàn)方法
這篇文章主要介紹了SpringBoot引入Thymeleaf的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-04-04解決@Autowired報錯Could not autowire. No bea
介紹了在IDEA中使用@Autowired報錯Couldnot autowire. No beans of 'XXX' type found的解決方法,原因是@Autowired在注入service時,由于service接口沒有實現(xiàn)類,而mybatis僅需提供Dao接口,導(dǎo)致@Autowired無法識別2024-12-12如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)
Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。這篇文章主要介紹了如何使用IDEA開發(fā)Spark SQL程序(一文搞懂),需要的朋友可以參考下2021-08-08