RocketMQ生產(chǎn)消息與消費消息超詳細講解
1 RocketMQ簡介
RocketMQ是阿里開源的一款非常優(yōu)秀中間件產(chǎn)品,脫胎于阿里的另一款隊列技術(shù)MetaQ,后捐贈給Apache基金會作為一款孵化技術(shù),僅僅經(jīng)歷了一年多的時間就成為Apache基金會的頂級項目。并且它現(xiàn)在已經(jīng)在阿里內(nèi)部被廣泛的應(yīng)用,并且經(jīng)受住了多次雙十一的這種極致場景的壓力(2017年的雙十一,RocketMQ流轉(zhuǎn)的消息量達到了萬億級,峰值TPS達到5600萬)
2 MQ的常見產(chǎn)品
ActiveMQ:java語言實現(xiàn),萬級數(shù)據(jù)吞吐量,處理速度ms級,主從架構(gòu),成熟度高
RabbitMQ :erlang語言實現(xiàn),萬級數(shù)據(jù)吞吐量,處理速度us級,主從架構(gòu),
RocketMQ :java語言實現(xiàn),十萬級數(shù)據(jù)吞吐量,處理速度ms級,分布式架構(gòu),功能強,擴展性強
kafka :scala語言實現(xiàn),十萬級數(shù)據(jù)吞吐量,處理速度ms級,分布式架構(gòu),功能較少,應(yīng)用于大數(shù)據(jù)較多
3 環(huán)境搭建
創(chuàng)建maven工程
引入依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
4 單生產(chǎn)者單消費者模式
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個發(fā)送消息的對象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3啟動發(fā)送的服務(wù) producer.start(); //4.1創(chuàng)建要發(fā)送的消息對象,指定topic,指定內(nèi)容body Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8")); //4.2發(fā)送消息 SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result); //5.關(guān)閉連接 producer.shutdown(); } }
消費者:
//消費者,消費消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標簽為任意* consumer.subscribe("topic1","*"); //4.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發(fā)給消費者 } }); //5.啟動接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行 System.out.println("接收消息服務(wù)已運行"); } }
測試:
5 單生產(chǎn)者多消費者模式
5.1默認模式(負載均衡)
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個發(fā)送消息的對象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.設(shè)定發(fā)送的命名服務(wù)器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3啟動發(fā)送的服務(wù) producer.start(); for (int i = 1; i <= 10; i++) { Message msg = new Message("topic1",("生產(chǎn)者2: hello rocketmq "+i).getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("返回結(jié)果:"+result); } //5.關(guān)閉連接 producer.shutdown(); } }
消費者:
//消費者,消費消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標簽為任意* consumer.subscribe("topic1","*"); //4.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發(fā)給消費者 } }); //5.啟動接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行 System.out.println("接收消息服務(wù)已運行"); } }
測試:
5.2廣播模式
生產(chǎn)者的代碼不變,消費者的代碼改動如下:
//設(shè)置當前消費者的消費模式(默認模式:負載均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置當前消費者的消費模式為廣播模式:所有客戶端接收的消息是一樣的 consumer.setMessageModel(MessageModel.BROADCASTING);
具體消費者代碼:
//消費者,消費消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.創(chuàng)建一個接收消息的對象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.設(shè)定接收的命名服務(wù)器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.設(shè)置接收消息對應(yīng)的topic,對應(yīng)的sub標簽為任意* consumer.subscribe("topic1","*"); //設(shè)置當前消費者的消費模式(默認模式:負載均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); //設(shè)置當前消費者的消費模式為廣播模式:所有客戶端接收的消息是一樣的 consumer.setMessageModel(MessageModel.BROADCASTING); //4.開啟監(jiān)聽,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個 標記后相同的消息講不會再次發(fā)給消費者 } }); //5.啟動接收消息的服務(wù) consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運行 System.out.println("接收消息服務(wù)已運行"); } }
測試:
廣播模式的現(xiàn)象
如果 生產(chǎn)者先發(fā)送消息, 后啟動消費者, 消息只能被消費一次
如果多個消費者先啟動(廣播模式),后發(fā)消息,才有廣播的效果
結(jié)論: 必須先啟動消費者再啟動發(fā)送者才有廣播的效果
6 多生產(chǎn)者多消費者模式
多生產(chǎn)者產(chǎn)生的消息可以被同一個消費者消費,也可以被多個消費者消費
運行多個生產(chǎn)者,在啟動消費者
測試:
到此這篇關(guān)于RocketMQ生產(chǎn)消息與消費消息超詳細講解的文章就介紹到這了,更多相關(guān)RocketMQ生產(chǎn)消息與消費消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot啟動執(zhí)行sql腳本的3種方法實例
在應(yīng)用程序啟動后,可以自動執(zhí)行建庫、建表等SQL腳本,下面這篇文章主要給大家介紹了關(guān)于SpringBoot啟動執(zhí)行sql腳本的3種方法,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-01-01基于mybatis batch實現(xiàn)批量提交大量數(shù)據(jù)
這篇文章主要介紹了基于mybatis batch實現(xiàn)批量提交大量數(shù)據(jù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-05-05Java語言實現(xiàn)簡單的酒店前臺管理小功能(實例代碼)
這篇文章主要介紹了Java語言實現(xiàn)簡單的酒店前臺管理小功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03java實例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)
下面小編就為大家?guī)硪黄猨ava實例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05