SpringBoot配置RocketMQ的詳細(xì)過程
- 引入maven
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>- 配置yml
# rocketmq 配置項(xiàng),對(duì)應(yīng) RocketMQProperties 配置類
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置項(xiàng)
producer:
group: demo-producer-group # 生產(chǎn)者分組
send-message-timeout: 3000 # 發(fā)送消息超時(shí)時(shí)間,單位:毫秒。默認(rèn)為 3000 。
compress-message-body-threshold: 4096 # 消息壓縮閥值,當(dāng)消息體的大小超過該閥值后,進(jìn)行消息壓縮。默認(rèn)為 4 * 1024B
max-message-size: 4194304 # 消息體的最大允許大小。。默認(rèn)為 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步發(fā)送消息時(shí),失敗重試次數(shù)。默認(rèn)為 2 次。
retry-times-when-send-async-failed: 2 # 異步發(fā)送消息時(shí),失敗重試次數(shù)。默認(rèn)為 2 次。
retry-next-server: false # 發(fā)送消息給 Broker 時(shí),如果發(fā)送失敗,是否重試另外一臺(tái) Broker 。默認(rèn)為 false
access-key: # Access Key ,可閱讀 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文檔
secret-key: # Secret Key
enable-msg-trace: true # 是否開啟消息軌跡功能。默認(rèn)為 true 開啟??砷喿x https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文檔
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定義消息軌跡的 Topic 。默認(rèn)為 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置項(xiàng)
consumer:
listeners: # 配置某個(gè)消費(fèi)分組,是否監(jiān)聽指定 Topic 。結(jié)構(gòu)為 Map<消費(fèi)者分組, <Topic, Boolean>> 。默認(rèn)情況下,不配置表示監(jiān)聽。
test-consumer-group:
topic1: false # 關(guān)閉 test-consumer-group 對(duì) topic1 的監(jiān)聽消費(fèi)- 配置變量
/**
* 消息隊(duì)列相關(guān)常亮配置,包括group、topic、tag
**/
public class MqTopicConstant {
/**
* 示例消息隊(duì)列,topic1個(gè)
*/
public static final String DEMO_TOPIC = "test-top-1";
/**
* 注冊(cè)tag
*/
public static final String DEMO_TAG_REGISTERED = "registered";
/**
* 修改tag
*/
public static final String DEMO_TAG_MODIFY = "modify";
}- 創(chuàng)建Service
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class RocketMQService {
private static final Logger log = LoggerFactory.getLogger(RocketMQService.class);
@Resource
private RocketMQTemplate template;
/**
* 發(fā)送普通消息
*
* @param topic topic
* @param message 消息體
*/
public void sendMessage(String topic, Object message) {
this.template.convertAndSend(topic, message);
log.info("普通消息發(fā)送完成:message = {}", message);
}
/**
* 發(fā)送同步消息
*
* @param topic topic
* @param message 消息體
*/
public void syncSendMessage(String topic, Object message) {
SendResult sendResult = this.template.syncSend(topic, message);
log.info("同步發(fā)送消息完成:message = {}, sendResult = {}", message, sendResult);
}
/**
* 發(fā)送同步消息
*
* @param topic topic
* @param message 消息體
*/
public SendResult syncSendMessageR(String topic, Object message) {
SendResult sendResult = this.template.syncSend(topic, message);
log.info("同步發(fā)送消息完成:message = {}, sendResult = {}", message, sendResult);
SendStatus sendStatus = sendResult.getSendStatus();
log.info("狀態(tài)打印 : {}" , sendStatus);
//SEND_OK
return sendResult;
}
/**
* 發(fā)送異步消息
*
* @param topic topic
* @param message 消息體
*/
public void asyncSendMessage(String topic, final Object message) {
this.template.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("異步消息發(fā)送成功, SendStatus = {}", sendResult.getSendStatus());
// log.info("異步消息發(fā)送成功,message = {}, SendStatus = {}", message, sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
log.info("異步消息發(fā)送異常,exception = {}", e.getMessage());
}
});
}
/**
* 發(fā)送單向消息
*
* @param topic topic
* @param message 消息體
*/
public void sendOneWayMessage(String topic, Object message) {
this.template.sendOneWay(topic, message);
log.info("單向發(fā)送消息完成:message = {}", message);
}
/**
* 同步發(fā)送批量消息
*
* @param topic topic
* @param messageList 消息集合
* @param timeout 超時(shí)時(shí)間(毫秒)
*/
public void syncSendMessages(String topic, List<Message<?>> messageList, long timeout) {
this.template.syncSend(topic, messageList, timeout);
log.info("同步發(fā)送批量消息完成:message = {}", JSON.toJSONString(messageList));
}
/**
* 發(fā)送攜帶 tag 的消息(過濾消息)
*
* @param topic topic,RocketMQTemplate將 topic 和 tag 合二為一了,底層會(huì)進(jìn)行
* 拆分再組裝。只要在指定 topic 時(shí)跟上 {:tags} 就可以指定tag
* 例如 test-topic:tagA
* @param message 消息體
*/
public void syncSendMessageWithTag(String topic, Object message) {
this.template.syncSend(topic, message);
log.info("發(fā)送帶 tag 的消息完成:message = {}", message);
}
/**
* 同步發(fā)送延時(shí)消息
*
* @param topic topic
* @param message 消息體
* @param timeout 超時(shí)
* @param delayLevel 延時(shí)等級(jí):現(xiàn)在RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),
* 從1s到2h分別對(duì)應(yīng)著等級(jí) 1 到 18,消息消費(fèi)失敗會(huì)進(jìn)入延時(shí)消息隊(duì)列
* "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) {
this.template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
log.info("已同步發(fā)送延時(shí)消息 message = {}", message);
}
/**
* 異步發(fā)送延時(shí)消息
*
* @param topic topic
* @param message 消息對(duì)象
* @param timeout 超時(shí)時(shí)間
* @param delayLevel 延時(shí)等級(jí)
*/
public void asyncSendDelay(String topic, final Object message, long timeout, int delayLevel) {
this.template.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("異步發(fā)送延時(shí)消息成功,message = {}", message);
}
@Override
public void onException(Throwable throwable) {
log.error("異步發(fā)送延時(shí)消息發(fā)生異常,exception = {}", throwable.getMessage());
}
}, timeout, delayLevel);
log.info("已異步發(fā)送延時(shí)消息 message = {}", message);
}
/**
* 發(fā)送事務(wù)消息
*
* @param topic topic,RocketMQTemplate將 topic 和 tag 合二為一了,底層會(huì)進(jìn)行
* 拆分再組裝。只要在指定 topic 時(shí)跟上 {:tags} 就可以指定tag
* 例如 test-topic:tagA
* @param message 消息對(duì)象
* @param arg 傳給事務(wù)監(jiān)聽器的參數(shù)(可以作為事務(wù)處理的唯一ID,來驗(yàn)證本地事務(wù))
*/
public void sendMessageInTransaction(String topic, final Object message ,final Object arg) {
TransactionSendResult res = this.template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), arg);
if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
log.info("【生產(chǎn)者】事物消息發(fā)送成功;成功結(jié)果:{}", res);
} else {
log.info("【生產(chǎn)者】事務(wù)發(fā)送失?。菏≡颍簕}", res);
}
}
}- 事務(wù)監(jiān)聽器
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 獲取第三個(gè)參數(shù)(用戶自定義參數(shù))
Integer customArg = (Integer) arg;
log.info("執(zhí)行本地事務(wù),自定義參數(shù): {}", customArg);
String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
log.info("這里是校驗(yàn)TAG: {}" , tag );
//RocketMQLocalTransactionState.COMMIT
//RocketMQLocalTransactionState.ROLLBACK
//RocketMQLocalTransactionState.UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("檢查本地交易: {}", message);
return RocketMQLocalTransactionState.COMMIT;
}
}狀態(tài)解釋
- RocketMQLocalTransactionState.COMMIT: 含義: 表示本地事務(wù)已成功執(zhí)行,允許提交消息。這意味著消息將對(duì)消費(fèi)者可見,可以被正常消費(fèi)。 使用場(chǎng)景: 當(dāng)您的業(yè)務(wù)邏輯成功執(zhí)行且希望該消息能夠被下游系統(tǒng)處理時(shí),應(yīng)返回此狀態(tài)。 - RocketMQLocalTransactionState.ROLLBACK: 含義: 表示本地事務(wù)執(zhí)行失敗,要求回滾消息。即該消息不會(huì)被發(fā)送給任何消費(fèi)者。 使用場(chǎng)景: 如果您的業(yè)務(wù)邏輯執(zhí)行過程中遇到錯(cuò)誤或異常情況,不希望該消息影響下游系統(tǒng),則應(yīng)回滾事務(wù),返回此狀態(tài)。 - RocketMQLocalTransactionState.UNKNOWN: 含義: 表示當(dāng)前無法確定事務(wù)的狀態(tài),可能是因?yàn)榫W(wǎng)絡(luò)問題或其他原因?qū)е聲簳r(shí)無法判斷。RocketMQ 會(huì)定期調(diào)用 checkLocalTransaction 方法來檢查事務(wù)的狀態(tài)。 使用場(chǎng)景: 當(dāng)您不確定事務(wù)是否成功完成時(shí)(例如,遠(yuǎn)程服務(wù)調(diào)用超時(shí)),可以返回此狀態(tài)。RocketMQ 將通過 checkLocalTransaction 方法嘗試再次確認(rèn)事務(wù)狀態(tài)。
- 測(cè)試發(fā)送消息
@Autowired
private RocketMQService rocketMQService;
## 發(fā)送事務(wù)消息
rocketMQService.sendMessageInTransaction(GpsConstants.DEDUCTION_MESSP_TOPIC, "這是測(cè)試消息");
## 消費(fèi)事務(wù)消息
@Service
@RocketMQMessageListener(topic = GpsConstants.DEDUCTION_MESSP_TOPIC
, consumerGroup = GpsConstants.DEDUCTION_MESSP_GROUP)
public class deductionTopic implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("msg : " + msg);
//這里就會(huì)打印msg : 這是測(cè)試消息
}
}提示 : 同一個(gè)topic,不同的consumerGroup都會(huì)消費(fèi),根據(jù)自己的業(yè)務(wù)指定不同的 consumerGroup 處理不同的業(yè)務(wù),如果不需要,則一個(gè)topic只能用一次
到此這篇關(guān)于SpringBoot配置RocketMQ的詳細(xì)過程的文章就介紹到這了,更多相關(guān)SpringBoot配置RocketMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
系統(tǒng)運(yùn)維問題排查-java內(nèi)存過高分析及說明
本文總結(jié)了監(jiān)控Java進(jìn)程內(nèi)存與線程狀態(tài)的常用命令及參數(shù),包括top排序、jmap查看內(nèi)存分布、jstat分析GC數(shù)據(jù)、jstack解析線程狀態(tài)等,強(qiáng)調(diào)需使用與進(jìn)程一致的用戶執(zhí)行,并解析了線程狀態(tài)和內(nèi)存區(qū)域的含義2025-07-07
PostgreSQL Docker部署+SpringBoot集成方式
本文介紹了如何在Docker中部署PostgreSQL和pgadmin,并通過SpringBoot集成PostgreSQL,主要步驟包括安裝PostgreSQL和pgadmin,配置防火墻,創(chuàng)建數(shù)據(jù)庫和表,以及在SpringBoot中配置數(shù)據(jù)源和實(shí)體類2024-12-12
基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實(shí)現(xiàn))
這篇文章主要介紹了基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實(shí)現(xiàn)),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋參考下吧2023-11-11
一次因Java應(yīng)用造成CPU過高的排查實(shí)踐過程
一個(gè)應(yīng)用占用CPU很高,除了確實(shí)是計(jì)算密集型應(yīng)用之外,通常原因都是出現(xiàn)了死循環(huán)。下面這篇文章主要給大家介紹了一次因Java應(yīng)用造成CPU過高的排查實(shí)踐過程,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2018-11-11
Mybatis如何獲取insert新增數(shù)據(jù)id值
這篇文章主要介紹了Mybatis如何獲取insert新增數(shù)據(jù)id值問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
面向?qū)ο蠛兔嫦蜻^程的區(qū)別(動(dòng)力節(jié)點(diǎn)java學(xué)院整理)
很多朋友不清楚面向?qū)ο蠛兔嫦蜻^程有什么區(qū)別,接下來小編給大家整理了關(guān)于面向?qū)ο蠛兔嫦蜻^程的區(qū)別講解,感興趣的朋友可以參考下2017-04-04
java數(shù)字和中文算數(shù)驗(yàn)證碼的實(shí)現(xiàn)
這篇文章主要介紹了java數(shù)字和中文算數(shù)驗(yàn)證碼的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07

