欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java RocketMQ快速入門基礎(chǔ)知識

 更新時間:2019年06月20日 09:48:00   作者:匠心零度  
這篇文章主要介紹了java RocketMQ快速入門基礎(chǔ)知識,所以RocketMQ是站在巨人的肩膀上(kafka),又對其進行了優(yōu)化讓其更滿足互聯(lián)網(wǎng)公司的特點。它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點。,需要的朋友可以參考下

如何使用

1、引入 rocketmq-client

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>

2、編寫Producer

DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* Producer對象在使用之前必須要調(diào)用start初始化,初始化一次即可
* 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法
*/
producer.start();

for (int i = 0; i < 997892; i++) {
try {
//構(gòu)建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發(fā)送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();

3、編寫Consumer

/**
* Consumer Group,非常重要的概念,后續(xù)會慢慢補充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
* 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
}
});
consumer.start();
System.out.printf("Consumer Started.%n");

4、說明

各位根據(jù)自己的環(huán)境,修改NamesrvAddr的值,我的集群請參考:RocketMQ集群部署配置。稍后通過RocketMQ管控臺就可以看到之前搭建的多Master多Slave模式,異步復(fù)制集群模式。

5、通過RocketMQ管控臺

rocketmq-console-ng獲取方式為:rocketmq-console-ng,之后通過mavne進行編譯獲取jar,命令如下:

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據(jù)自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值。

直接啟動:

java -jar rocketmq-console-ng-1.0.0.jar

管控臺是基于springboot的,的確springboot非常方便和非?;鹆耍杂斜匾W(xué)習(xí)下springboot了(其實還是spring系列,所以spring也必要深入學(xué)習(xí)下),稍后通過管控臺進行觀察運行。

6、運行觀察

一個好的習(xí)慣是先運行Consumer,之后在運行Producer,之后通過rocketmq-console-ng管控臺觀察

運行完成之后,的確broker-a的數(shù)據(jù)加上broker-b的數(shù)據(jù)量就等于我們發(fā)送的數(shù)據(jù)量,而且slave的數(shù)量也master的數(shù)量也是一致的,效果如下:

查看發(fā)送這些數(shù)據(jù),2臺機器的磁盤情況如下:

到目前位置,關(guān)于RocketMQ快速入門就結(jié)束了。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論