java RocketMQ快速入門基礎(chǔ)知識(shí)
如何使用
1、引入 rocketmq-client
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency>
2、編寫Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可
* 注意:切記不可以在每次發(fā)送消息時(shí),都調(diào)用start方法
*/
producer.start();
for (int i = 0; i < 997892; i++) {
try {
//構(gòu)建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("測(cè)試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發(fā)送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
3、編寫Consumer
/**
* Consumer Group,非常重要的概念,后續(xù)會(huì)慢慢補(bǔ)充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多個(gè)地址以 ; 隔開
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
* 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4、說明
各位根據(jù)自己的環(huán)境,修改NamesrvAddr的值,我的集群請(qǐng)參考:RocketMQ集群部署配置。稍后通過RocketMQ管控臺(tái)就可以看到之前搭建的多Master多Slave模式,異步復(fù)制集群模式。
5、通過RocketMQ管控臺(tái)
rocketmq-console-ng獲取方式為:rocketmq-console-ng,之后通過mavne進(jìn)行編譯獲取jar,命令如下:
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據(jù)自己的NamesrvAddr進(jìn)行修改rocketmq.config.namesrvAddr的值。
直接啟動(dòng):
java -jar rocketmq-console-ng-1.0.0.jar

管控臺(tái)是基于springboot的,的確springboot非常方便和非?;鹆?,所以有必要去學(xué)習(xí)下springboot了(其實(shí)還是spring系列,所以spring也必要深入學(xué)習(xí)下),稍后通過管控臺(tái)進(jìn)行觀察運(yùn)行。
6、運(yùn)行觀察
一個(gè)好的習(xí)慣是先運(yùn)行Consumer,之后在運(yùn)行Producer,之后通過rocketmq-console-ng管控臺(tái)觀察

運(yùn)行完成之后,的確broker-a的數(shù)據(jù)加上broker-b的數(shù)據(jù)量就等于我們發(fā)送的數(shù)據(jù)量,而且slave的數(shù)量也master的數(shù)量也是一致的,效果如下:

查看發(fā)送這些數(shù)據(jù),2臺(tái)機(jī)器的磁盤情況如下:


到目前位置,關(guān)于RocketMQ快速入門就結(jié)束了。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java線程之使用Runnable接口創(chuàng)建線程的方法
本篇文章介紹了,java中使用Runnable接口創(chuàng)建線程的方法。需要的朋友參考下2013-05-05
Java執(zhí)行SQL腳本文件到數(shù)據(jù)庫詳解
這篇文章主要為大家詳細(xì)介紹了Java執(zhí)行SQL腳本文件到數(shù)據(jù)庫的相關(guān)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
解決Maven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法
下面小編就為大家?guī)硪黄鉀QMaven 項(xiàng)目報(bào)錯(cuò) java.httpservlet和synchronized使用方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-07-07
springboot程序啟動(dòng)慢-未配置hostname的解決
這篇文章主要介紹了springboot程序啟動(dòng)慢-未配置hostname的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
關(guān)于protected修飾符詳解-源于Cloneable接口
這篇文章主要介紹了protected修飾符詳解-源于Cloneable接口,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
java并發(fā)編程synchronized底層實(shí)現(xiàn)原理
這篇文章主要介紹了java并發(fā)編程synchronized底層實(shí)現(xiàn)原理2022-02-02
IKAnalyzer使用不同版本中文分詞的切詞方式實(shí)現(xiàn)相同功能效果
今天小編就為大家分享一篇關(guān)于IKAnalyzer使用不同版本中文分詞的切詞方式實(shí)現(xiàn)相同功能效果,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12

