SprigBoot整合rocketmq-v5-client-spring-boot的示例詳解
- 安裝RocketMQ 服務端
Apache RocketMQ官方網站
5.X文檔地址
rocketmq-v5-client-spring-boot官方示例
rocketmq-v5-client-spring-boot-starter maven倉庫
RocketMQ服務端 的安裝包分為兩種,二進制包和源碼包。這里以5.3.2版本做示例 。 點擊這里 下載 Apache RocketMQ 5.3.2的源碼包。你也可以從這里 下載到二進制包。二進制包是已經編譯完成后可以直接運行的,源碼包是需要編譯后運行的。
本文整合的rocketmq-v5-client-spring-boot 版本2.3.3 , 內部引用的是rocketmq-client-java , 版本5.0.7,使用的是gRPC 協(xié)議 , 使用前建議先把官方文檔與示例看一下,使用的Java環(huán)境是openjdk-17
建議使用jdk11以上版本
ubuntu安裝RocketMQ
查看jdk版本
java -version
如果沒安裝jdk的話,在root賬號下執(zhí)行以下命令
apt-get upgrade apt-get update apt install openjdk-17-jdk
創(chuàng)建文件夾
cd /usr/local
mkdir rocketmq
cd rocketmq
下載RocketMQ二進制包(我這里使用的是5.3.2版本,使用其他版本可前往官方下載)
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip
解壓 沒有unzip的解壓命令 , ubuntu會提示,根據提示安裝unzip插件
unzip rocketmq-all-5.3.2-bin-release.zip
進入bin目錄
調整runserver.sh的內存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"調整runbroker.sh內存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"修改/conf/broker.conf配置文件,這里主要是為了測試方便我們放開自動創(chuàng)建Topic的配置,加入以下配置(經過測試5.0以上版本不支持自動創(chuàng)建主題topic)
# 開啟自動創(chuàng)建 Topic 加不加都行 autoCreateTopicEnable=true #內網ip namesrvAddr:nameSrv地址 公網訪問設置公網IP 內網訪問設置內網IP 以下所有IP需一致 namesrvAddr=192.168.3.86:9876 #brokerIP1:broker也需要一個ip 內網或公網 brokerIP1=192.168.3.86
配置 NameServer 的環(huán)境變量
配置環(huán)境
vim /etc/profile
添加以下配置
#MQ安裝位置 export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.3.2 #MQ公網或內網ip 公網訪問設置公網IP 內網訪問設置內網IP export NAMESRV_ADDR=192.168.3.86:9876
重新編譯文件生效
source /etc/profile
修改完后,我們就可以啟動 RocketMQ 的 NameServer 了
啟動 namesrv
nohup sh bin/mqnamesrv &
驗證
# 驗證 namesrv 是否啟動成功 tail -f -n 500 mqnamesrv.log ... The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 # 或者是 tail -f ~/logs/rocketmqlogs/namesrv.log
啟動 Broker 消息存儲中心和 Proxy 代理
# 啟動(不使用代理) nohup sh bin/mqbroker -n 192.168.3.86:9876 >mqbroker.log 2>&1 & # 啟動 Broker+Proxy nohup sh bin/mqbroker -n 192.168.3.86:9876 --enable-proxy & # 推薦使用 指定配置文件啟動(broker默認使用的端口是10911,我們也可以在配置文件修改端口) nohup sh bin/mqbroker -n 192.168.3.86:9876 -c conf/broker.conf --enable-proxy &
# 驗證是否啟動成功 tail -n 500 nohup.out tail -f ~/logs/rocketmqlogs/broker.log tail -f ~/logs/rocketmqlogs/proxy.log
Wed May 14 12:41:41 CST 2025 rocketmq-proxy startup successfully
使用tail -f ~/logs/rocketmqlogs/broker.log 查看日志如果提示以下
The default acl dir /usr/local/rocketmq/rocketmq-all-5.3.2/conf/acl is not exist
需要切換conf目錄下 新建acl文件夾就行了
mkdir acl
由于v5可參考的文檔太少,這個報錯我也沒找到為什么源碼包里會少一個acl的文件夾,有知道的希望留言告知
測試消息收發(fā)
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
可以通過 mqadmin 命令創(chuàng)建
Admin Tool官方命令工具
注意 TestTopic 是topic名稱
sh bin/mqadmin updatetopic -n 192.168.3.86:9876 -t TestTopic -c DefaultCluster
打印
create topic to 192.168.3.86:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]安裝RocketMQ Dashboard 可視化
官方介紹
按照官方安裝運行就可以
需要注意的點,IP需要跟上面設置的IP一致,防火墻開通8080,8081,10911,9876

注意:默認端口為:8080,不修改會跟Proxy端口沖突,Proxy端口默認的也是8080
我這里修改了RocketMQ Dashboard的默認端口,改成8082
可以在本地運行,也可以打包運行,我是打包運行的,運行成功后訪問:http://192.168.3.86:8082


可以先添加主題,(研究了幾天,沒研究自動添加的方法 , 如果那位大佬研究出來了可以給我分享一下)


點擊提交就行了。研究了源碼,這個添加跟更新是一個接口。提交之后就可以敲代碼了(有研究出來能自動加載Topic的麻煩留言)
如果啟動報錯,需添加 topic
CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
整合rocketmq-client-java
先用官方原生 rocketmq-client-java JDK
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>官方示例代碼可以去看看
發(fā)送普通消息
package com.example.mq.producer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "192.168.3.86:8081";
// 消息發(fā)送的目標Topic名稱,需要提前創(chuàng)建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer時需要設置通信配置以及預綁定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息發(fā)送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey")
// 設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag")
// 消息體。
.setBody("你好,mq".getBytes())
.build();
try {
// 發(fā)送消息,需要關注發(fā)送結果,并捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}訂閱
package com.example.mq.consumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.3.86:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 為消費者指定所屬的消費者分組,Group需要提前創(chuàng)建。
String consumerGroup = "YourConsumerGroup";
// 指定需要訂閱哪個目標Topic,Topic需要提前創(chuàng)建。
String topic = "TestTopic";
// 初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數以及訂閱關系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 設置消費者分組。
.setConsumerGroup(consumerGroup)
// 設置預綁定的訂閱關系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 設置消費監(jiān)聽器。
.setMessageListener(messageView -> {
// 處理消息并返回消費結果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
logger.info("Consume message successfully, body={}", message);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可關閉該實例。
// pushConsumer.close();
}
}

4.整合rocketmq-v5-client-spring-boot
整合SpringBoot rocketmq-v5-client-spring-boot
注意原生的rocketmq-client-java需要注釋掉,rocketmq-v5-client-spring-boot已經引入了,不注釋會jar包沖突
引入maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-v5-client-spring-boot</artifactId> <version>2.3.3</version> </dependency>
先創(chuàng)建變量
public class RocketMQVariable {
/**
* 普通消息隊列
*/
public static final String NORMAL_TOPIC = "normal-topic";
// 如果使用負載均衡模式 需要設置相同的消費組名
public static final String NORMAL_GROUP = "normal-group";
// 處理廣播消費模式使用
public static final String NORMAL1_GROUP = "normal1-group";
/**
* 異步普通消息隊列
*/
public static final String ASYNC_NORMAL_TOPIC = "async-normal-topic";
public static final String ASYNC_NORMAL_GROUP = "async-normal-group";
/**
* 順序消息隊列
*/
public static final String FIFO_TOPIC = "fifo-topic";
public static final String FIFO_GROUP = "fifo-group";
/**
* 定時/延時消息隊列
*/
public static final String DELAY_TOPIC = "delay-topic";
public static final String DELAY_GROUP = "delay-group";
/**
* 事務消息隊列
*/
public static final String TRANSACTION_TOPIC = "transaction-topic";
public static final String TRANSACTION_GROUP = "transaction-group";
}注意:
如果使用負載均衡模式 需設置相同的Topic 相同的group
如果使用廣播消費模式 需設置相同的Topic 不同的group
編寫工具類
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.Resource;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class RocketMQV2Service {
private static final Logger log = LoggerFactory.getLogger(RocketMQV2Service.class);
@Resource
private RocketMQClientTemplate template;
/**
* 發(fā)送普通消息
*
* @param topic
* @param message
*/
public void syncSendNormalMessage(String topic, Object message) {
SendReceipt sendReceipt = template.syncSendNormalMessage(topic, message);
log.info("普通消息發(fā)送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);
}
/**
* 發(fā)送異步普通消息
*
* @param topic
* @param message
*/
public void asyncSendNormalMessage(String topic, Object message) {
CompletableFuture<SendReceipt> future = new CompletableFuture<>();
ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
future.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
log.error("發(fā)送消息失敗", throwable);
return;
}
log.info("發(fā)送異步消息消費成功5, messageId={}", sendReceipt.getMessageId());
}, sendCallbackExecutor);
CompletableFuture<SendReceipt> completableFuture = template.asyncSendNormalMessage(topic, message, future);
log.info("發(fā)送異步消息成功1, topic={}, message = {}, sendReceipt={}", topic, message, completableFuture);
}
/**
* 發(fā)送順序消息
*
* @param topic
* @param message
* @param messageGroup
*/
public void syncSendFifoMessage(String topic, Object message, String messageGroup) {
SendReceipt sendReceipt = template.syncSendFifoMessage(topic, message, messageGroup);
log.info("順序消息發(fā)送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);
}
/**
* 發(fā)送延時消息
*
* @param topic
* @param message
* @param delay 單位:秒
*/
public void syncSendDelayMessage(String topic, Object message, Long delay) {
SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, Duration.ofSeconds(delay));
log.info("延時消息發(fā)送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);
}
/**
* 發(fā)送延時消息
*
* @param topic
* @param message
* @param duration Duration.ofSeconds(秒) Duration.ofMinutes(分鐘) Duration.ofHours(小時)
*/
public void syncSendDelayMessage(String topic, Object message, Duration duration) {
SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, duration);
log.info("延時消息發(fā)送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);
}
/**
* 發(fā)送事務消息
*
* @param topic
* @param message
* @throws ClientException
*/
public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
try {
Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
SendReceipt sendReceipt = pair.getSendReceipt();
Transaction transaction = pair.getTransaction();
log.info("事務消息發(fā)送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
log.info("消息id : {} ", sendReceipt.getMessageId());
//如果這里提交了事務
if (doLocalTransaction(1)) {
log.info("本地事務執(zhí)行成功");
transaction.commit();
} else {
log.info("本地事務執(zhí)行失敗");
transaction.rollback();
}
return pair;
} catch (ClientException e) {
throw new RuntimeException(e);
}
}
boolean doLocalTransaction(int number) {
// 本地事務邏輯 數據庫操作
log.info("執(zhí)行本地事務 : {}", number);
return number > 5;
}
}測試代碼
yml配置
rocketmq:
producer:
endpoints: 192.168.3.86:8081
topic:
push-consumer:
endpoints: 192.168.3.86:8081
access-key:
secret-key:
topic:
tag: "*"發(fā)送消息
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.util.RocketMQV2Service;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
@Slf4j
@RestController
@RequestMapping("/send")
public class SendController {
@Autowired
private RocketMQV2Service rocketMQV2Service;
/**
* 普通消息
*
* @return
*/
@GetMapping("/normal.message")
public String normalMessage() {
rocketMQV2Service.syncSendNormalMessage(RocketMQVariable.NORMAL_TOPIC, "hello RocketMQ 這是普通消息");
return "發(fā)送成功";
}
/**
* 異步普通消息
*
* @return
*/
@GetMapping("/async.normal.message")
public String asyncSendNormalMessageNormalMessage() {
rocketMQV2Service.asyncSendNormalMessage(RocketMQVariable.ASYNC_NORMAL_TOPIC, "hello RocketMQ 這是異步普通消息");
return "發(fā)送成功";
}
/**
* 順序消息
*
* @return
*/
@GetMapping("/flfo.message")
public String flfoMessage() {
for (int i = 0; i < 20; i++) {
rocketMQV2Service.syncSendFifoMessage(RocketMQVariable.FIFO_TOPIC, "hello RocketMQ 這是順序消息" + i, RocketMQVariable.FIFO_GROUP);
}
return "發(fā)送成功";
}
/**
* 定時/延時消息
*
* @return
*/
@GetMapping("/delay.message")
public String delayMessage() {
rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 這是30秒定時消息", Duration.ofSeconds(30));
rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 這是10秒定時消息 ", 10l);
rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 這是1分鐘定時消息", Duration.ofMinutes(1));
return "發(fā)送成功";
}
/**
* 事務消息
*
* @return
*/
@GetMapping("/transaction.message")
public String transactionMessage() throws ClientException {
Pair<SendReceipt, Transaction> sendReceiptTransactionPair = rocketMQV2Service.sendMessageInTransaction(RocketMQVariable.TRANSACTION_TOPIC, "hello RocketMQ 這是事務消息");
Transaction transaction = sendReceiptTransactionPair.getTransaction();
SendReceipt sendReceipt = sendReceiptTransactionPair.getSendReceipt();
MessageId messageId = sendReceipt.getMessageId();
log.info("事務消息發(fā)送完成 messageId = {}", messageId);
log.info("事務消息發(fā)送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
return "發(fā)送成功";
}
}普通消息(廣播模式)
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormalService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("普通消息, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("普通消息, message={}", message);
Map<String, String> properties = messageView.getProperties();
log.info("普通消息, properties={}", JSONObject.toJSONString(properties));
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("普通消息, message={}", messageView);
return ConsumeResult.FAILURE;
}
return ConsumeResult.SUCCESS;
}
}import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
consumerGroup = RocketMQVariable.NORMAL1_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("普通消息1, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("普通消息1, message={}", message);
Map<String, String> properties = messageView.getProperties();
log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("普通消息1, message={}", messageView);
return ConsumeResult.FAILURE;
}
return ConsumeResult.SUCCESS;
}
}日志 (注意consumerGroup 不同)

負載均衡模式 把consumerGroup改為 RocketMQVariable.NORMAL_GROUP
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("普通消息1, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("普通消息1, message={}", message);
Map<String, String> properties = messageView.getProperties();
log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("普通消息1, message={}", messageView);
return ConsumeResult.FAILURE;
}
return ConsumeResult.SUCCESS;
}
}日志 (會自動選擇一個消費者消費)

順序消費
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.FIFO_TOPIC,
consumerGroup = RocketMQVariable.FIFO_GROUP)
public class PushConsumerFifoService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("順序消息, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("順序消息, message={}", message);
Map<String, String> properties = messageView.getProperties();
log.info("順序消息, properties={}", JSONObject.toJSONString(properties));
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("順序消息, message={}", messageView);
return ConsumeResult.FAILURE;
}
log.info("rollback transaction");
return ConsumeResult.SUCCESS;
}
}日志

定時/延時任務消息 (可自定義時間)
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.DELAY_TOPIC,
consumerGroup = RocketMQVariable.DELAY_GROUP)
public class PushConsumerDelayService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("定時/延時消息, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("定時/延時消息, message={}", message);
Map<String, String> properties = messageView.getProperties();
log.info("定時/延時消息, properties={}", JSONObject.toJSONString(properties));
if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
log.info("定時/延時消息, message={}", messageView);
return ConsumeResult.FAILURE;
}
log.info("定時/延時消息 消費完成");
return ConsumeResult.SUCCESS;
}
}日志

事務處理情況1
/**
* 發(fā)送事務消息
*
* @param topic
* @param message
* @throws ClientException
*/
public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
try {
Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
SendReceipt sendReceipt = pair.getSendReceipt();
Transaction transaction = pair.getTransaction();
log.info("事務消息發(fā)送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
log.info("消息id : {} ", sendReceipt.getMessageId());
//如果這里提交了事務
if (doLocalTransaction(1)) {
log.info("本地事務執(zhí)行成功");
transaction.commit();
} else {
log.info("本地事務執(zhí)行失敗");
transaction.rollback();
}
return pair;
} catch (ClientException e) {
throw new RuntimeException(e);
}
}
boolean doLocalTransaction(int number) {
// 本地事務邏輯 數據庫操作
log.info("執(zhí)行本地事務 : {}", number);
return number > 5;
}如果在工具類里面提交了事務 transaction.commit();下面的就不會進入處理了
@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
@Override
public TransactionResolution check(MessageView messageView) {
log.info("Receive transactional message check, message={}", messageView);
return null;
}
}而是直接消費了
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("事務消息消費, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("事務消息消費, message={}", message);
if (Objects.isNull(message)) {
log.info("事務消息 消費失敗");
return ConsumeResult.FAILURE;
}
log.info("事務消息 消費成功");
return ConsumeResult.SUCCESS;
}
}日志

事務處理情況2
/**
* 發(fā)送事務消息 這里只發(fā)消息 不參與事務提交
*
* @param topic
* @param message
* @throws ClientException
*/
public void sendMessageInTransaction(String topic, Object message) {
try {
template.sendMessageInTransaction(topic, message);
} catch (ClientException e) {
throw new RuntimeException(e);
}
}使用官方事務處理機制處理事務
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.core.RocketMQTransactionChecker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
@Override
public TransactionResolution check(MessageView messageView) {
log.info("事務消息 事務操作, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("事務消息 事務操作, message={}", message);
String messageId = messageView.getMessageId().toString();
if (Objects.nonNull(messageId)) {
log.info("事務消息 事務操作, messageId={}", messageId);
return TransactionResolution.COMMIT;
}
log.info("事務消息消費失敗");
return TransactionResolution.ROLLBACK;
}
}事務提交后才會被消費
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("事務消息消費, messageView={}", messageView);
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("事務消息消費, message={}", message);
if (Objects.isNull(message)) {
log.info("事務消息 消費失敗");
return ConsumeResult.FAILURE;
}
log.info("事務消息 消費成功");
return ConsumeResult.SUCCESS;
}
}日志

到此這篇關于SprigBoot整合rocketmq-v5-client-spring-boot的詳細過程的文章就介紹到這了,更多相關SprigBoot rocketmq-v5-client-spring-boot內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
PipedWriter和PipedReader源碼分析_動力節(jié)點Java學院整理
這篇文章主要介紹了PipedWriter和PipedReader源碼分析_動力節(jié)點Java學院整理,需要的朋友可以參考下2017-05-05
Spring Boot 集成 MongoDB Template 的步驟
MongoDB 是一個流行的 NoSQL 數據庫,適合處理大量非結構化數據,本篇文章將詳細介紹如何在 Spring Boot 3.4.0 中集成 MongoDB Template,從零開始構建一個簡單的應用程序,感興趣的朋友一起看看吧2024-12-12
Java隊列篇之實現數組模擬隊列及可復用環(huán)形隊列詳解
像棧一樣,隊列(queue)也是一種線性表,它的特性是先進先出,插入在一端,刪除在另一端。就像排隊一樣,剛來的人入隊(push)要排在隊尾(rear),每次出隊(pop)的都是隊首(front)的人2021-10-10
解決springboot 獲取form-data里的file文件的問題
這篇文章主要介紹了解決springboot 獲取form-data里的file文件的問題的相關資料,這里提供了詳細的解決步驟,需要的朋友可以參考下2017-07-07

