SpringBoot3集成RocketMq場(chǎng)景分析

RocketMQ因其架構(gòu)簡(jiǎn)單、業(yè)務(wù)功能豐富、具備極強(qiáng)可擴(kuò)展性等特點(diǎn)被廣泛應(yīng)用,比如金融業(yè)務(wù)、互聯(lián)網(wǎng)、大數(shù)據(jù)、物聯(lián)網(wǎng)等領(lǐng)域的業(yè)務(wù)場(chǎng)景;
標(biāo)簽:RocketMq5.Dashboard;
一、簡(jiǎn)介
RocketMQ因其架構(gòu)簡(jiǎn)單、業(yè)務(wù)功能豐富、具備極強(qiáng)可擴(kuò)展性等特點(diǎn)被廣泛應(yīng)用,比如金融業(yè)務(wù)、互聯(lián)網(wǎng)、大數(shù)據(jù)、物聯(lián)網(wǎng)等領(lǐng)域的業(yè)務(wù)場(chǎng)景;
二、環(huán)境部署
1、編譯打包
1、下載5.0版本源碼包 rocketmq-all-5.0.0-source-release.zip 2、解壓后進(jìn)入目錄,編譯打包 mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

2、修改配置
在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

3、服務(wù)啟動(dòng)
1、該目錄下 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/ 2、啟動(dòng)NameServer sh mqnamesrv 輸出日志 The Name Server boot success. serializeType=JSON 3、啟動(dòng)Broker+Proxy sh mqbroker -n localhost:9876 --enable-proxy 輸出日志 rocketmq-proxy startup successfully 4、關(guān)閉服務(wù) sh mqshutdown namesrv Send shutdown request to mqnamesrv(18636) OK sh mqshutdown broker Send shutdown request to mqbroker with proxy enable OK(18647)
4、控制臺(tái)安裝
1、下載master源碼包 rocketmq-dashboard-master 2、解壓后進(jìn)入目錄,編譯打包 mvn clean package -Dmaven.test.skip=true 3、啟動(dòng)服務(wù) java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar 4、輸出日志 INFO main - Tomcat started on port(s): 8080 (http) with context path '' 5、訪問服務(wù):localhost:8080

三、工程搭建
1、工程結(jié)構(gòu)

2、依賴管理
在 rocketmq-starter 組件中,實(shí)際上依賴的是 rocketmq-client 組件的 5.0 版本,由于兩個(gè)新版框架間的兼容問題,需要添加相關(guān)配置解決該問題;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-starter.version}</version>
</dependency>3、配置文件
配置RocketMq服務(wù)地址,消息生產(chǎn)者和消費(fèi)者;
rocketmq:
name-server: 127.0.0.1:9876
# 生產(chǎn)者
producer:
group: boot_group_1
# 消息發(fā)送超時(shí)時(shí)間
send-message-timeout: 3000
# 消息最大長度4M
max-message-size: 4096
# 消息發(fā)送失敗重試次數(shù)
retry-times-when-send-failed: 3
# 異步消息發(fā)送失敗重試次數(shù)
retry-times-when-send-async-failed: 2
# 消費(fèi)者
consumer:
group: boot_group_1
# 每次提取的最大消息數(shù)
pull-batch-size: 54、配置類
在配置類中主要定義兩個(gè)Bean的加載,即 RocketMQTemplate 和 DefaultMQProducer ,主要是提供消息發(fā)送的能力,即生產(chǎn)消息;
@Configuration
public class RocketMqConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.max-message-size}")
private Integer maxMessageSize;
@Value("${rocketmq.producer.retry-times-when-send-failed}")
private Integer retryTimesWhenSendFailed ;
@Value("${rocketmq.producer.retry-times-when-send-async-failed}")
private Integer retryTimesWhenSendAsyncFailed ;
@Bean
public RocketMQTemplate rocketMqTemplate(){
RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
rocketMqTemplate.setProducer(defaultMqProducer());
return rocketMqTemplate;
}
@Bean
public DefaultMQProducer defaultMqProducer() {
DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(this.nameServer);
producer.setProducerGroup(this.producerGroup);
producer.setSendMsgTimeout(this.sendMsgTimeout);
producer.setMaxMessageSize(this.maxMessageSize);
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
return producer;
}
}四、基礎(chǔ)用法
1、消息生產(chǎn)
編寫一個(gè)生產(chǎn)者接口類,分別使用 RocketMQTemplate 和 DefaultMQProducer 實(shí)現(xiàn)消息發(fā)送的功能,然后可以通過 Dashboard 控制面板查看消息詳情;
@RestController
public class ProducerWeb {
private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
@Autowired
private RocketMQTemplate rocketMqTemplate;
@GetMapping("/send/msg1")
public String sendMsg1 (){
try {
// 構(gòu)建消息主體
JsonMapper jsonMapper = new JsonMapper();
String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
// 發(fā)送消息
rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
} catch (Exception e) {
e.printStackTrace();
}
return "OK" ;
}
@Autowired
private DefaultMQProducer defaultMqProducer ;
@GetMapping("/send/msg2")
public String sendMsg2 (){
try {
// 構(gòu)建消息主體
JsonMapper jsonMapper = new JsonMapper();
String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
// 構(gòu)建消息對(duì)象
Message message = new Message();
message.setTopic("boot-mq-topic");
message.setTags("boot-mq-tag");
message.setKeys("boot-mq-key");
message.setBody(msgBody.getBytes());
// 發(fā)送消息,打印日志
SendResult sendResult = defaultMqProducer.send(message);
log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
return "OK" ;
}
}2、消息消費(fèi)
編寫消息監(jiān)聽類,實(shí)現(xiàn) RocketMQListener 接口,通過 RocketMQMessageListener 注解控制監(jiān)聽的具體信息;
@Service
@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
public class ConsumerListener implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
@Override
public void onMessage(String message) {
log.info("\n=====\n message:{} \n=====\n",message);
}
}
五、參考源碼
文檔倉庫: https://gitee.com/cicadasmile/butte-java-note
源碼倉庫: https://gitee.com/cicadasmile/butte-spring-parent
Gitee主頁: https://gitee.com/cicadasmile/butte-java-note
到此這篇關(guān)于SpringBoot3集成RocketMq的文章就介紹到這了,更多相關(guān)SpringBoot3集成RocketMq內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
maven插件assembly使用及springboot啟動(dòng)腳本start.sh和停止腳本 stop.sh
這篇文章主要介紹了maven插件assembly使用及springboot啟動(dòng)腳本start.sh和停止腳本 stop.sh的相關(guān)資料,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08
教你如何監(jiān)控 Java 線程池運(yùn)行狀態(tài)的操作(必看)
這篇文章主要介紹了教你如何監(jiān)控 Java 線程池運(yùn)行狀態(tài)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Java中生成不重復(fù)隨機(jī)數(shù)的四種方法舉例詳解
在Java編程中獲取隨機(jī)數(shù)是常見的需求,這篇文章主要介紹了Java中生成不重復(fù)隨機(jī)數(shù)的四種方法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-04-04
elasticsearch?java客戶端action的實(shí)現(xiàn)簡(jiǎn)單分析
這篇文章主要為大家介紹了elasticsearch?java客戶端action的實(shí)現(xiàn)簡(jiǎn)單分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
JAVA多線程的使用場(chǎng)景與注意事項(xiàng)總結(jié)
這篇文章主要給大家介紹了關(guān)于JAVA多線程的使用場(chǎng)景與注意事項(xiàng)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03
實(shí)例講解JAVA設(shè)計(jì)模式之備忘錄模式
這篇文章主要介紹了JAVA設(shè)計(jì)模式之備忘錄模式的的相關(guān)資料,文中示例代碼非常詳細(xì),供大家參考和學(xué)習(xí),感興趣的朋友可以了解下2020-06-06
Java棧的應(yīng)用之括號(hào)匹配算法實(shí)例分析
這篇文章主要介紹了Java棧的應(yīng)用之括號(hào)匹配算法,結(jié)合實(shí)例形式分析了Java使用棧實(shí)現(xiàn)括號(hào)匹配算法的相關(guān)原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-03-03

