RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解
1 RocketMQ簡(jiǎn)介
RocketMQ是阿里開源的一款非常優(yōu)秀中間件產(chǎn)品,脫胎于阿里的另一款隊(duì)列技術(shù)MetaQ,后捐贈(zèng)給Apache基金會(huì)作為一款孵化技術(shù),僅僅經(jīng)歷了一年多的時(shí)間就成為Apache基金會(huì)的頂級(jí)項(xiàng)目。并且它現(xiàn)在已經(jīng)在阿里內(nèi)部被廣泛的應(yīng)用,并且經(jīng)受住了多次雙十一的這種極致場(chǎng)景的壓力(2017年的雙十一,RocketMQ流轉(zhuǎn)的消息量達(dá)到了萬(wàn)億級(jí),峰值TPS達(dá)到5600萬(wàn))
2 MQ的常見(jiàn)產(chǎn)品
ActiveMQ:java語(yǔ)言實(shí)現(xiàn),萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),主從架構(gòu),成熟度高
RabbitMQ :erlang語(yǔ)言實(shí)現(xiàn),萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度us級(jí),主從架構(gòu),
RocketMQ :java語(yǔ)言實(shí)現(xiàn),十萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能強(qiáng),擴(kuò)展性強(qiáng)
kafka :scala語(yǔ)言實(shí)現(xiàn),十萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能較少,應(yīng)用于大數(shù)據(jù)較多
3 環(huán)境搭建
創(chuàng)建maven工程
引入依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
4 單生產(chǎn)者單消費(fèi)者模式
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3啟動(dòng)發(fā)送的服務(wù) producer.start(); //4.1創(chuàng)建要發(fā)送的消息對(duì)象,指定topic,指定內(nèi)容body Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8")); //4.2發(fā)送消息 SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result); //5.關(guān)閉連接 producer.shutdown(); } }
消費(fèi)者:
//消費(fèi)者,消費(fèi)消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意* consumer.subscribe("topic1","*"); //4.開啟監(jiān)聽(tīng),用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者 } }); //5.啟動(dòng)接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行 System.out.println("接收消息服務(wù)已運(yùn)行"); } }
測(cè)試:
5 單生產(chǎn)者多消費(fèi)者模式
5.1默認(rèn)模式(負(fù)載均衡)
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3啟動(dòng)發(fā)送的服務(wù) producer.start(); for (int i = 1; i <= 10; i++) { Message msg = new Message("topic1",("生產(chǎn)者2: hello rocketmq "+i).getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result); } //5.關(guān)閉連接 producer.shutdown(); } }
消費(fèi)者:
//消費(fèi)者,消費(fèi)消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意* consumer.subscribe("topic1","*"); //4.開啟監(jiān)聽(tīng),用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者 } }); //5.啟動(dòng)接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行 System.out.println("接收消息服務(wù)已運(yùn)行"); } }
測(cè)試:
5.2廣播模式
生產(chǎn)者的代碼不變,消費(fèi)者的代碼改動(dòng)如下:
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的 consumer.setMessageModel(MessageModel.BROADCASTING);
具體消費(fèi)者代碼:
//消費(fèi)者,消費(fèi)消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意* consumer.subscribe("topic1","*"); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的 consumer.setMessageModel(MessageModel.BROADCASTING); //4.開啟監(jiān)聽(tīng),用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者 } }); //5.啟動(dòng)接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行 System.out.println("接收消息服務(wù)已運(yùn)行"); } }
測(cè)試:
廣播模式的現(xiàn)象
如果 生產(chǎn)者先發(fā)送消息, 后啟動(dòng)消費(fèi)者, 消息只能被消費(fèi)一次
如果多個(gè)消費(fèi)者先啟動(dòng)(廣播模式),后發(fā)消息,才有廣播的效果
結(jié)論: 必須先啟動(dòng)消費(fèi)者再啟動(dòng)發(fā)送者才有廣播的效果
6 多生產(chǎn)者多消費(fèi)者模式
多生產(chǎn)者產(chǎn)生的消息可以被同一個(gè)消費(fèi)者消費(fèi),也可以被多個(gè)消費(fèi)者消費(fèi)
運(yùn)行多個(gè)生產(chǎn)者,在啟動(dòng)消費(fèi)者
測(cè)試:
到此這篇關(guān)于RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解的文章就介紹到這了,更多相關(guān)RocketMQ生產(chǎn)消息與消費(fèi)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Hibernate根據(jù)實(shí)體類自動(dòng)生成表的方法
這篇文章主要介紹了使用Hibernate根據(jù)實(shí)體類自動(dòng)生成表的方法,該篇提供了兩種方法,可以根據(jù)需要選擇其一,希望對(duì)你有所幫助,如有不對(duì)的地方還望指正2023-03-03Java鎖的升級(jí)策略 偏向鎖 輕量級(jí)鎖 重量級(jí)鎖
在本文中小編給的大家整理了關(guān)于Java鎖的升級(jí)策略 偏向鎖 輕量級(jí)鎖 重量級(jí)鎖的相關(guān)知識(shí)點(diǎn)內(nèi)容,需要的朋友們參考下。2019-06-06SpringBoot啟動(dòng)執(zhí)行sql腳本的3種方法實(shí)例
在應(yīng)用程序啟動(dòng)后,可以自動(dòng)執(zhí)行建庫(kù)、建表等SQL腳本,下面這篇文章主要給大家介紹了關(guān)于SpringBoot啟動(dòng)執(zhí)行sql腳本的3種方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-01-01Springboot如何優(yōu)雅地進(jìn)行字段校驗(yàn)
這篇文章主要給大家介紹了關(guān)于Springboot如何優(yōu)雅地進(jìn)行字段校驗(yàn)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01JAVA設(shè)計(jì)模式之訪問(wèn)者模式原理與用法詳解
這篇文章主要介紹了JAVA設(shè)計(jì)模式之訪問(wèn)者模式,簡(jiǎn)單說(shuō)明了訪問(wèn)者模式的原理,并結(jié)合實(shí)例分析了java訪問(wèn)者模式的定義與用法,需要的朋友可以參考下2017-08-08基于mybatis batch實(shí)現(xiàn)批量提交大量數(shù)據(jù)
這篇文章主要介紹了基于mybatis batch實(shí)現(xiàn)批量提交大量數(shù)據(jù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的酒店前臺(tái)管理小功能(實(shí)例代碼)
這篇文章主要介紹了Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的酒店前臺(tái)管理小功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03java實(shí)例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)
下面小編就為大家?guī)?lái)一篇java實(shí)例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05