java RocketMQ快速入門基礎(chǔ)知識
如何使用
1、引入 rocketmq-client
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency>
2、編寫Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的 /** * Producer對象在使用之前必須要調(diào)用start初始化,初始化一次即可 * 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //構(gòu)建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //發(fā)送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown();
3、編寫Consumer
/** * Consumer Group,非常重要的概念,后續(xù)會慢慢補充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的 /** * 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 * 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for(MessageExt msg:msgs){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.printf("Consumer Started.%n");
4、說明
各位根據(jù)自己的環(huán)境,修改NamesrvAddr的值,我的集群請參考:RocketMQ集群部署配置。稍后通過RocketMQ管控臺就可以看到之前搭建的多Master多Slave模式,異步復(fù)制集群模式。
5、通過RocketMQ管控臺
rocketmq-console-ng獲取方式為:rocketmq-console-ng,之后通過mavne進行編譯獲取jar,命令如下:
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據(jù)自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值。
直接啟動:
java -jar rocketmq-console-ng-1.0.0.jar
管控臺是基于springboot的,的確springboot非常方便和非?;鹆耍杂斜匾W(xué)習(xí)下springboot了(其實還是spring系列,所以spring也必要深入學(xué)習(xí)下),稍后通過管控臺進行觀察運行。
6、運行觀察
一個好的習(xí)慣是先運行Consumer,之后在運行Producer,之后通過rocketmq-console-ng管控臺觀察
運行完成之后,的確broker-a的數(shù)據(jù)加上broker-b的數(shù)據(jù)量就等于我們發(fā)送的數(shù)據(jù)量,而且slave的數(shù)量也master的數(shù)量也是一致的,效果如下:
查看發(fā)送這些數(shù)據(jù),2臺機器的磁盤情況如下:
到目前位置,關(guān)于RocketMQ快速入門就結(jié)束了。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java線程之使用Runnable接口創(chuàng)建線程的方法
本篇文章介紹了,java中使用Runnable接口創(chuàng)建線程的方法。需要的朋友參考下2013-05-05Java執(zhí)行SQL腳本文件到數(shù)據(jù)庫詳解
這篇文章主要為大家詳細介紹了Java執(zhí)行SQL腳本文件到數(shù)據(jù)庫的相關(guān)方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06解決Maven 項目報錯 java.httpservlet和synchronized使用方法
下面小編就為大家?guī)硪黄鉀QMaven 項目報錯 java.httpservlet和synchronized使用方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07springboot程序啟動慢-未配置hostname的解決
這篇文章主要介紹了springboot程序啟動慢-未配置hostname的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08關(guān)于protected修飾符詳解-源于Cloneable接口
這篇文章主要介紹了protected修飾符詳解-源于Cloneable接口,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11java并發(fā)編程synchronized底層實現(xiàn)原理
這篇文章主要介紹了java并發(fā)編程synchronized底層實現(xiàn)原理2022-02-02IKAnalyzer使用不同版本中文分詞的切詞方式實現(xiàn)相同功能效果
今天小編就為大家分享一篇關(guān)于IKAnalyzer使用不同版本中文分詞的切詞方式實現(xiàn)相同功能效果,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12