SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息
一、簡(jiǎn)介
RocketMQ 是一款開(kāi)源的分布式消息中間件,由阿里巴巴開(kāi)源。由阿里巴巴集團(tuán)開(kāi)發(fā)并開(kāi)源,目前被捐贈(zèng)給Apache基金會(huì),并入選孵化器項(xiàng)目,2017年從Apache基金會(huì)畢業(yè)后,RocketMQ被指定為頂級(jí)項(xiàng)目(TLP)。它具有高可用性、高性能、低延遲等特點(diǎn),廣泛應(yīng)用于阿里巴巴集團(tuán)內(nèi)部以及眾多外部企業(yè)的業(yè)務(wù)系統(tǒng)中。
1.1、RocketMQ 主要特點(diǎn)
- 分布式架構(gòu):RocketMQ 是基于分布式架構(gòu)設(shè)計(jì)的,支持水平擴(kuò)展,可以靈活地部署和擴(kuò)展。
- 高可用性:RocketMQ 支持主從架構(gòu),消息存儲(chǔ)采用主從復(fù)制的方式,保證了消息的高可用性和可靠性。
- 高性能:RocketMQ 在消息存儲(chǔ)、消息傳輸?shù)确矫孢M(jìn)行了優(yōu)化,具有較高的吞吐量和較低的延遲。
- 豐富的特性:RocketMQ 提供了豐富的特性,包括順序消息、延遲消息、事務(wù)消息、消息過(guò)濾、消息軌跡等,滿足了不同場(chǎng)景下的需求。
- 監(jiān)控和管理:RocketMQ 提供了豐富的監(jiān)控和管理功能,可以通過(guò)控制臺(tái)或者監(jiān)控工具實(shí)時(shí)監(jiān)控消息的狀態(tài)和性能指標(biāo)。
- 開(kāi)源社區(qū)支持:RocketMQ 是一款開(kāi)源的消息中間件,擁有活躍的開(kāi)源社區(qū),提供了豐富的文檔和示例,為用戶提供了便利的支持和幫助。
1.2、RocketMQ 核心組件
RocketMQ 的架構(gòu)主要包括以下核心組件:
- NameServer:NameServer 是 RocketMQ 的路由管理組件,負(fù)責(zé)管理 Broker 的路由信息??蛻舳嗽诎l(fā)送消息或者消費(fèi)消息時(shí),需要先從 NameServer 獲取相應(yīng)的 Broker 地址,然后再與 Broker 建立連接。
- Broker:Broker 是 RocketMQ 的消息存儲(chǔ)和傳輸組件,負(fù)責(zé)存儲(chǔ)消息以及向消費(fèi)者傳遞消息。一個(gè) RocketMQ 系統(tǒng)可以包含多個(gè) Broker,每個(gè) Broker 負(fù)責(zé)存儲(chǔ)一部分消息數(shù)據(jù),并提供相應(yīng)的消息服務(wù)。
- Producer:Producer 是消息的生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息并將消息發(fā)送到 Broker 中。Producer 將消息發(fā)送到指定的 Topic,然后由 Broker 存儲(chǔ)并傳遞給相應(yīng)的消費(fèi)者。
- Consumer:Consumer 是消息的消費(fèi)者,負(fù)責(zé)訂閱并消費(fèi) Broker 中的消息。Consumer 通過(guò)訂閱指定的 Topic 來(lái)接收消息,并進(jìn)行相應(yīng)的業(yè)務(wù)處理。
- Topic:Topic 是消息的主題,用于對(duì)消息進(jìn)行分類和管理。Producer 將消息發(fā)送到指定的 Topic,而 Consumer 則訂閱相應(yīng)的 Topic 來(lái)接收消息。
- Message Queue:Message Queue 是消息隊(duì)列,用于存儲(chǔ)消息。每個(gè) Topic 可以分為多個(gè) Message Queue,每個(gè) Message Queue 保存了一部分消息,多個(gè) Message Queue 組成了一個(gè) Topic 的完整消息存儲(chǔ)。
總的來(lái)說(shuō),RocketMQ是阿里推出的優(yōu)秀開(kāi)源分布式消息中間件,具有高性能、高可靠、高并發(fā)等優(yōu)點(diǎn),是構(gòu)建分布式系統(tǒng)不可或缺的基礎(chǔ)組件之一。
1.3、概念
同步發(fā)送指的是生產(chǎn)者在發(fā)送消息后,會(huì)阻塞當(dāng)前線程,直到收到Broker的發(fā)送響應(yīng)后才返回,響應(yīng)中包含消息是否發(fā)送成功的狀態(tài)。同步發(fā)送的優(yōu)缺點(diǎn)如下:
優(yōu)點(diǎn):
- 發(fā)送可靠性高,能夠立即得知發(fā)送結(jié)果
- 發(fā)送反饋直接返回,不需要通過(guò)專門的監(jiān)聽(tīng)通道接收結(jié)果
缺點(diǎn):
- 當(dāng)Broker發(fā)生故障或者網(wǎng)絡(luò)延遲時(shí),生產(chǎn)者線程會(huì)被阻塞,影響發(fā)送效率
- 對(duì)于發(fā)送端吞吐量較高的場(chǎng)景,同步發(fā)送存在性能瓶頸
1.4、場(chǎng)景
同步發(fā)送對(duì)消息可靠性傳輸有較高要求的場(chǎng)景,如通知消息等,發(fā)送端對(duì)發(fā)送吞吐量要求不是特別高的場(chǎng)景
二、父工程
因?yàn)檫@個(gè)系統(tǒng)會(huì)有很多RocketMQ的知識(shí),我準(zhǔn)備拆開(kāi)寫,避免頻繁的導(dǎo)入依賴和包,我這里采用分模塊開(kāi)發(fā)。
2.1、父工程依賴
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alian</groupId>
<artifactId>common-rocketmq-dto</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
2.2、公共模塊
這里需要注意的是下面這個(gè)依賴,其實(shí)就是一個(gè)會(huì)員類。如果你傳輸?shù)牟皇菍?duì)象也可以不要,我這里演示就創(chuàng)建了。
<dependency>
<groupId>com.alian</groupId>
<artifactId>common-rocketmq-dto</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
Member.java
package com.alian.common;
import lombok.Data;
import java.util.Date;
@Data
public class Member {
private Long id;
private String memberName;
private int age;
private Date birthday;
}
后續(xù)的項(xiàng)目都會(huì)用到這個(gè)父工程和公共模塊,后面就不再過(guò)多說(shuō)明了。
三、生產(chǎn)者
我們?cè)诟腹こ滔滦陆ㄒ粋€(gè)模塊用于發(fā)送同步消息。
3.1 Maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq</artifactId>
<groupId>com.alian</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>01-send-sync-message</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alian</groupId>
<artifactId>common-rocketmq-dto</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
3.2 application配置
application.properties
server.port=8001 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默認(rèn)的生產(chǎn)者組 rocketmq.producer.group=sync_group # 發(fā)送同步消息超時(shí)時(shí)間 rocketmq.producer.send-message-timeout=3000 # 用于設(shè)置在消息發(fā)送失敗后,生產(chǎn)者是否嘗試切換到下一個(gè)服務(wù)器。設(shè)置為 true 表示啟用,在發(fā)送失敗時(shí)嘗試切換到下一個(gè)服務(wù)器 rocketmq.producer.retry-next-server=true # 用于指定消息發(fā)送失敗時(shí)的重試次數(shù) rocketmq.producer.retry-times-when-send-failed=3 # 設(shè)置消息壓縮的閾值,為0表示禁用消息體的壓縮 rocketmq.producer.compress-message-body-threshold=0
3.3 發(fā)送字符串消息
消息的發(fā)送比較簡(jiǎn)單,我們直接引用
@Autowired
private RocketMQTemplate rocketMQTemplate;
此對(duì)象封裝了一系列的消息發(fā)送方法。
SendStrMessageTest.java
package com.alian.sync;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@SpringBootTest
public class SendStrMessageTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void syncSendStringMessage() {
String topic = "string_message_topic";
String message = "我是一條同步文本消息:syncSendStringMessage";
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@Test
public void syncSendStringMessageWithBuilder() {
String topic = "string_message_topic";
String message = "我是一條同步的文本消息:syncSendStringMessageWithBuilder";
Message<String> msg = MessageBuilder.withPayload(message)
// 消息類型
.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@Test
public void syncSendStringMessageWithBuilderTimeOut() {
String topic = "string_message_topic";
String message = "我是一條同步的文本消息:syncSendStringMessageWithBuilderTimeOut";
Message<String> msg = MessageBuilder.withPayload(message)
// 消息類型
.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
.build();
// 3秒發(fā)送超時(shí)
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@AfterEach
public void waiting() {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這里需要注意的是,我發(fā)送字符串消息的時(shí)候,topic都是 string_message_topic,因?yàn)槲疫@里是本地開(kāi)發(fā)環(huán)境,并且我在配置文件中配置了
autoCreateTopicEnable=true
當(dāng)該參數(shù)設(shè)置為true時(shí),如果生產(chǎn)者在發(fā)送消息時(shí)使用了一個(gè)在Broker端不存在的Topic,則Broker會(huì)自動(dòng)創(chuàng)建該Topic,允許消息正常發(fā)送和存儲(chǔ)。
當(dāng)該參數(shù)設(shè)置為false時(shí),如果生產(chǎn)者使用了不存在的Topic,則Broker會(huì)直接拒絕發(fā)送請(qǐng)求,不會(huì)自動(dòng)創(chuàng)建Topic。
官方對(duì)該參數(shù)的解釋是:自動(dòng)創(chuàng)建Topic的特性主要是為了方便,但也可能帶來(lái)一些風(fēng)險(xiǎn),比如有的應(yīng)用程序由于編碼上的低級(jí)錯(cuò)誤導(dǎo)致無(wú)意中創(chuàng)建了大量的Topic。因此,生產(chǎn)環(huán)境建議將該參數(shù)設(shè)置為false,只有手工創(chuàng)建所需的Topic。
3.4 發(fā)送JSON消息
SendJsonMessageTest.java
package com.alian.sync;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@SpringBootTest
public class SendJsonMessageTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void sendJsonMessage() {
String topic = "json_message_topic";
JSONObject json = new JSONObject();
json.put("name", "Alian");
json.put("age", "28");
json.put("hobby", "java");
SendResult sendResult = rocketMQTemplate.syncSend(topic, json);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@Test
public void sendJsonMessageWithBuilder() {
String topic = "json_message_topic";
JSONObject json = new JSONObject();
json.put("name", "Alian");
json.put("age", "28");
json.put("hobby", "java");
Message<JSONObject> msg = MessageBuilder.withPayload(json)
// 消息類型
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@Test
public void sendMapMessage() {
String topic = "json_message_topic";
Map<String, String> map = new HashMap<>();
map.put("1", "java");
map.put("2", "go");
map.put("3", "c");
map.put("4", "vue");
map.put("5", "react");
SendResult sendResult = rocketMQTemplate.syncSend(topic, map);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@AfterEach
public void waiting() {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這里需要注意的是,我發(fā)送JSON消息的時(shí)候,topic都是 json_message_topic,這里Map消息也能被JSONObject消費(fèi)。
3.5 發(fā)送Java對(duì)象消息
SendJavaObjectMessageTest.java
package com.alian.sync;
import com.alian.common.Member;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@SpringBootTest
public class SendJavaObjectMessageTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void sendJavaObjectMessage() {
String topic = "java_object_message_topic";
Member member = new Member();
member.setId(10086L);
member.setMemberName("Alian");
member.setAge(28);
member.setBirthday(new Date());
SendResult sendResult = rocketMQTemplate.syncSend(topic, member);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@Test
public void sendJavaObjectMessageWithBuilder() {
String topic = "java_object_message_topic";
Member member = new Member();
member.setId(10086L);
member.setMemberName("Alian");
member.setAge(28);
member.setBirthday(new Date());
Message<Member> msg = MessageBuilder.withPayload(member)
// 設(shè)置消息類型
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
log.info("同步發(fā)送返回的結(jié)果:{}", sendResult);
}
@AfterEach
public void waiting() {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這里需要注意的是,我發(fā)送java對(duì)象消息的時(shí)候,topic都是 java_object_message_topic
四、消費(fèi)者
我們?cè)诟腹こ滔滦陆ㄒ粋€(gè)模塊用于發(fā)送同步消息。
4.1 Maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq</artifactId>
<groupId>com.alian</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>08-comsume-concurrent</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alian</groupId>
<artifactId>common-rocketmq-dto</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
4.2 application配置
application.properties
server.port=8008 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默認(rèn)的消費(fèi)者組 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 批量拉取消息的數(shù)量 rocketmq.consumer.pull-batch-size=10 # 集群消費(fèi)模式 rocketmq.consumer.message-model=CLUSTERING
實(shí)際上對(duì)于本文來(lái)說(shuō),下面兩個(gè)配置不用配置,也不會(huì)生效。
# 默認(rèn)的消費(fèi)者組 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 集群消費(fèi)模式 rocketmq.consumer.message-model=CLUSTERING
因?yàn)閮?yōu)先的是@RocketMQMessageListener 注解中設(shè)置 consumerGroup 和messageModel 參數(shù)。
4.3 消費(fèi)字符串消息
StringMessageConsumer.java
package com.alian.concurrent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "string_message_topic",
consumerGroup = "CONCURRENT_GROUP_STRING",
consumeThreadNumber = 1)
public class StringMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("字符串消費(fèi)者接收到的消息: {}", message);
// 處理消息的業(yè)務(wù)邏輯
}
}
如果要消費(fèi)消息,我們需要實(shí)現(xiàn)RocketMQListener<T>,實(shí)現(xiàn)onMessage方法。發(fā)送的是什么對(duì)象,我們接收就是什么對(duì)象,也就是T是什么對(duì)象。生產(chǎn)者發(fā)送的字符串消息,我們這里就用String接收,也就是RocketMQListener<String>。
package org.apache.rocketmq.spring.core;
public interface RocketMQListener<T> {
void onMessage(T var1);
}
同時(shí)加上@RocketMQMessageListener注解,主要用到三個(gè)注解
- topic:指定該消費(fèi)者訂閱的Topic名稱,可以是單個(gè)Topic,也可以是用||分隔的多個(gè)Topic
- consumerGroup:指定該消費(fèi)者所屬的消費(fèi)組名稱,消費(fèi)組用于組織同一類消費(fèi)實(shí)例,相同消費(fèi)組的消費(fèi)實(shí)例可以消費(fèi)完全相同的消息,RocketMQ 通過(guò)消費(fèi)者組(Consumer Group)來(lái)維護(hù)不同消費(fèi)者的消費(fèi)進(jìn)度。 每個(gè)消費(fèi)者組都有一個(gè)消費(fèi)進(jìn)度(offset),用于標(biāo)記該組下的消費(fèi)者在某個(gè)主題(Topic)和隊(duì)列(Queue)上已經(jīng)消費(fèi)到的位置。
需要注意的是:如果Topic不存在,只有在生產(chǎn)者發(fā)送消息時(shí),并且autoCreateTopicEnable設(shè)置為true的情況下,Broker端才會(huì)自動(dòng)創(chuàng)建該Topic。消費(fèi)者啟動(dòng)時(shí),即使autoCreateTopicEnable=true,也不會(huì)自動(dòng)創(chuàng)建不存在的Topic。
具體來(lái)說(shuō):
- 生產(chǎn)者啟動(dòng)并發(fā)送消息到一個(gè)不存在的Topic時(shí):
- 如果autoCreateTopicEnable=true,Broker會(huì)自動(dòng)創(chuàng)建該Topic,允許消息發(fā)送成功
- 如果autoCreateTopicEnable=false,Broker會(huì)拒絕消息發(fā)送,報(bào)錯(cuò)該Topic不存在
- 消費(fèi)者啟動(dòng)訂閱一個(gè)不存在的Topic時(shí):
- 無(wú)論autoCreateTopicEnable的值是true還是false,Broker都不會(huì)自動(dòng)創(chuàng)建該Topic
- 消費(fèi)者會(huì)啟動(dòng)成功,但獲取不到消息,處于持續(xù)等待狀態(tài)
所以生產(chǎn)者發(fā)送消息時(shí)才可能自動(dòng)創(chuàng)建Topic,而消費(fèi)者啟動(dòng)時(shí)是不會(huì)自動(dòng)創(chuàng)建Topic的。
4.4 消費(fèi)JSON消息
JsonMessageConsumer.java
package com.alian.concurrent;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "json_message_topic", consumerGroup = "CONCURRENT_GROUP_JSON")
public class JsonMessageConsumer implements RocketMQListener<JSONObject> {
@Override
public void onMessage(JSONObject json) {
log.info("json消費(fèi)者接收到的消息: {}", json);
// 處理消息的業(yè)務(wù)邏輯
}
}
生產(chǎn)者發(fā)送的JSONObject消息,我們這里就用JSONObject接收,也就是RocketMQListener<JSONObject>
4.5 消費(fèi)Java對(duì)象消息
JavaObjectMessageConsumer.java
package com.alian.concurrent;
import com.alian.common.Member;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "java_object_message_topic", consumerGroup = "CONCURRENT_GROUP_JAVA_OBJECT")
public class JavaObjectMessageConsumer implements RocketMQListener<Member> {
@Override
public void onMessage(Member member) {
// 因?yàn)榘l(fā)序列化的原因Member必須是同一個(gè)包
log.info("java對(duì)象消費(fèi)者接收到的消息: {}", member);
// 處理消息的業(yè)務(wù)邏輯
}
}
生產(chǎn)者發(fā)送的Member消息,我們這里就用Member接收,也就是RocketMQListener<Member>。
這里需要再次說(shuō)明下發(fā)送java對(duì)象消息時(shí),因?yàn)榉葱蛄械脑颍陨a(chǎn)者和消費(fèi)者使用的是公共包里同一個(gè)對(duì)象,也就是發(fā)送和接收的對(duì)象的包路徑要一致。
五、部分運(yùn)行結(jié)果
5.1、字符串消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F000001339418B4AAC22DB848C70000, offsetMsgId=C0A800EA00002A9F0000000000000000, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0] 字符串消費(fèi)者接收到的消息: 我是一條同步文本消息:syncSendStringMessage
5.2、json消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F00000137FC18B4AAC22DB9D42F0000, offsetMsgId=C0A800EA00002A9F0000000000000146, messageQueue=MessageQueue [topic=json_message_topic, brokerName=broker-a, queueId=1], queueOffset=0]
json消費(fèi)者接收到的消息: {"name":"Alian","age":"28","hobby":"java"}
5.3、java對(duì)象消息
同步發(fā)送返回的結(jié)果:SendResult [sendStatus=SEND_OK, msgId=7F000001098C18B4AAC22DBACEB50000, offsetMsgId=C0A800EA00002A9F0000000000000270, messageQueue=MessageQueue [topic=java_object_message_topic, brokerName=broker-a, queueId=0], queueOffset=0] java對(duì)象消費(fèi)者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Sat Mar 09 21:06:57 CST 2024)
5.4、現(xiàn)象說(shuō)明
為什么在rocketmq中當(dāng)autoCreateTopicEnable=true,先啟動(dòng)消費(fèi)者,然后生產(chǎn)者第一次向一個(gè)未創(chuàng)建的topic中發(fā)送消息時(shí),消息發(fā)送成功了(馬上返回成功了),但是消費(fèi)者要等一段時(shí)間才能收到?這種現(xiàn)象的原因是RocketMQ在創(chuàng)建Topic時(shí),存在一個(gè)延遲同步的過(guò)程。具體來(lái)說(shuō):
- 當(dāng)生產(chǎn)者發(fā)送消息到一個(gè)不存在的Topic時(shí),Broker會(huì)自動(dòng)創(chuàng)建該Topic。
- 但是Broker創(chuàng)建Topic后,并不會(huì)立即將其元數(shù)據(jù)信息同步給所有的消費(fèi)者,而是異步延遲同步。
- 生產(chǎn)者發(fā)送消息時(shí),Broker會(huì)立即返回發(fā)送成功響應(yīng),因?yàn)樗恍鑼⑾⒊志没鎯?chǔ)即可。
- 消費(fèi)者在啟動(dòng)時(shí)已訂閱了該Topic,但由于元數(shù)據(jù)未同步,暫時(shí)無(wú)法獲知該Topic的路由信息,所以無(wú)法立即開(kāi)始消費(fèi)。
- 過(guò)了一段時(shí)間后,Broker將新創(chuàng)建的Topic元數(shù)據(jù)同步給了消費(fèi)者,消費(fèi)者才開(kāi)始從該Topic拉取并消費(fèi)消息。
這種延遲同步機(jī)制是RocketMQ的一個(gè)設(shè)計(jì),目的是為了減小創(chuàng)建Topic時(shí)對(duì)消費(fèi)者的影響,避免大量消費(fèi)者同時(shí)更新元數(shù)據(jù)造成系統(tǒng)抖動(dòng)。
我們可以通過(guò)調(diào)整Broker的配置項(xiàng)來(lái)控制這個(gè)延遲同步時(shí)間,比如:
- brokerTopicPutEnable=true開(kāi)啟自動(dòng)創(chuàng)建Topic功能
- topicDelayEnable=true開(kāi)啟延時(shí)創(chuàng)建Topic功能
- topicDelayOffsetEnabled=true開(kāi)啟延時(shí)創(chuàng)建Topic偏移量同步功能
通過(guò)調(diào)小topicDelayOffsetInterval可以縮短元數(shù)據(jù)同步延遲時(shí)間,但也會(huì)增加系統(tǒng)開(kāi)銷。
所以這種現(xiàn)象實(shí)際上是RocketMQ的一個(gè)正常設(shè)計(jì)行為,目的是為了系統(tǒng)整體的健壯性和可用性。如果應(yīng)用對(duì)延遲時(shí)間不太敏感,保持默認(rèn)配置即可;如果對(duì)延遲敏感,可以適當(dāng)調(diào)小延遲同步時(shí)間。
以上就是SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RocketMQ發(fā)送同步消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 淺談Springboot整合RocketMQ使用心得
- springBoot整合RocketMQ及坑的示例代碼
- SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收的詳細(xì)步驟
- Springboot RocketMq實(shí)現(xiàn)過(guò)程詳解
- 解決SpringBoot整合RocketMQ遇到的坑
- Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
- SpringBoot集成RocketMQ的使用示例
- SpringBoot項(xiàng)目嵌入RocketMQ的實(shí)現(xiàn)示例
- SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼
相關(guān)文章
Spring Boot集成Sorl搜索客戶端的實(shí)現(xiàn)代碼
本篇文章主要介紹了Spring Boot集成Sorl搜索客戶端的實(shí)現(xiàn)代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-11-11
Java實(shí)現(xiàn)解析zip壓縮包并獲取文件內(nèi)容
這篇文章主要為大家詳細(xì)介紹了如何利用Java語(yǔ)言實(shí)現(xiàn)頁(yè)面上傳一個(gè)源碼壓縮包,后端將壓縮包解壓,并獲取每個(gè)文件中的內(nèi)容,感興趣的可以動(dòng)手嘗試一下2022-07-07
SpringBoot的三大開(kāi)發(fā)工具小結(jié)
本文主要介紹了SpringBoot的三大開(kāi)發(fā)工具,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02
深入詳解Java中synchronized鎖升級(jí)的套路
synchronized鎖是啥?鎖其實(shí)就是一個(gè)對(duì)象,隨便哪一個(gè)都可以,Java中所有的對(duì)象都是鎖,換句話說(shuō),Java中所有對(duì)象都可以成為鎖。本文我們主要來(lái)聊聊synchronized鎖升級(jí)的套路,感興趣的可以收藏一下2023-04-04

