RocketMQ普通消息實(shí)戰(zhàn)演練詳解
引言
之前研究了RocketMQ的源碼,在這里將各種消息發(fā)送與消費(fèi)的demo進(jìn)行舉例,方便以后使用的時(shí)候CV。
相關(guān)的配置,安裝和啟動(dòng)在這篇文章有相關(guān)講解 http://www.dbjr.com.cn/article/260237.htm
普通消息同步發(fā)送
同步消息是指發(fā)送出消息后,同步等待,直到接收到Broker發(fā)送成功的響應(yīng)才會(huì)繼續(xù)發(fā)送下一個(gè)消息。這個(gè)方式可以確保消息發(fā)送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發(fā)送方式 SendResult send = producer.send(msg); //確認(rèn)返回 System.out.println(send); } //關(guān)閉producer producer.shutdown(); }
普通消息異步發(fā)送
異步消息發(fā)送方在發(fā)送了一條消息后,不等接收方發(fā)回響應(yīng),接著進(jìn)行第二條消息發(fā)送。發(fā)送方通過(guò)回調(diào)接口的方式接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //SendCallback會(huì)接收異步返回結(jié)果的回調(diào) producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是過(guò)早關(guān)閉producer,會(huì)拋出The producer service state not OK, SHUTDOWN_ALREADY的錯(cuò) Thread.sleep(10000); //關(guān)閉producer producer.shutdown(); }
普通消息單向發(fā)送
單項(xiàng)發(fā)送不關(guān)心發(fā)送的結(jié)果,只發(fā)送請(qǐng)求不等待應(yīng)答。發(fā)送消息耗時(shí)極短。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發(fā)送方式 producer.sendOneway(msg); } //關(guān)閉producer producer.shutdown(); }
集群消費(fèi)模式
消費(fèi)者采用負(fù)載均衡的方式消費(fèi)消息,同一個(gè)Group下的多個(gè)Consumer共同消費(fèi)Queue里的Message,每個(gè)Consumer處理的消息不同。
一個(gè)Consumer Group中的各個(gè)Consumer實(shí)例分共同消費(fèi)消息,即一條消息只會(huì)投遞到一個(gè)Group下面的一個(gè)實(shí)例,并且只消費(fèi)一遍。
例如某個(gè)Topic有3個(gè)隊(duì)列,其中一個(gè)Consumer Group 有 3 個(gè)實(shí)例,那么每個(gè)實(shí)例只消費(fèi)其中的1個(gè)隊(duì)列。集群消費(fèi)模式是消費(fèi)者默認(rèn)的消費(fèi)方式。
public static void main(String[] args) throws Exception { //實(shí)例化消息消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 consumer.start(); System.out.printf("Consumer Started.%n"); }
廣播消費(fèi)模式
廣播消費(fèi)模式中把消息對(duì)一個(gè)Group下的各個(gè)Consumer實(shí)例都投遞一遍。也就是說(shuō)消息也會(huì)被 Group 中的每個(gè)Consumer都消費(fèi)一次。
實(shí)際上,是一個(gè)消費(fèi)組下的每個(gè)消費(fèi)者實(shí)例都獲取到了topic下面的每個(gè)Message Queue去拉取消費(fèi)。所以消息會(huì)投遞到每個(gè)消費(fèi)者實(shí)例。
public static void main(String[] args) throws Exception { //實(shí)例化消息消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 consumer.start(); System.out.printf("Consumer Started.%n"); }
以上就是RocketMQ普通消息實(shí)戰(zhàn)演練詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ普通消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過(guò)AOP攔截Spring?Boot日志并將其存入數(shù)據(jù)庫(kù)功能實(shí)現(xiàn)
本文介紹了如何使用Spring Boot和AOP技術(shù)實(shí)現(xiàn)攔截系統(tǒng)日志并保存到數(shù)據(jù)庫(kù)中的功能,包括配置數(shù)據(jù)庫(kù)連接、定義日志實(shí)體類、定義日志攔截器、使用AOP攔截日志并保存到數(shù)據(jù)庫(kù)中等步驟,感興趣的朋友一起看看吧2023-08-08Spring Boot 自定義 Shiro 過(guò)濾器無(wú)法使用 @Autowired問(wèn)題及解決方法
這篇文章主要介紹了Spring Boot 自定義 Shiro 過(guò)濾器無(wú)法使用 @Autowired問(wèn)題及解決方法 ,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-06-06Java中反射的"暴破"機(jī)制(SetAccessible方法)詳解
這篇文章主要為大家詳細(xì)介紹了Java中反射的"暴破"機(jī)制,以及如何利用這一機(jī)制實(shí)現(xiàn)訪問(wèn)非公有屬性,方法,和構(gòu)造器,文中示例代碼講解詳細(xì),感興趣的可以了解一下2022-08-08RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝
本文主要介紹了RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06jdk17?SpringBoot?JPA集成多數(shù)據(jù)庫(kù)的示例詳解
這篇文章主要介紹了jdk17?SpringBoot?JPA集成多數(shù)據(jù)庫(kù)的示例代碼,包括配置類、請(qǐng)求攔截器、線程上下文等相關(guān)知識(shí),代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08如何在?Spring?Boot?中使用?OpenAI?ChatGPT?API
這篇文章主要介紹了如何在Spring?Boot中使用OpenAI?ChatGPT?API,我們探索了 OpenAI ChatGPT API 以生成對(duì)提示的響應(yīng),我們創(chuàng)建了一個(gè) Spring Boot 應(yīng)用程序,它調(diào)用 API 來(lái)生成對(duì)提示的響應(yīng),需要的朋友可以參考下2023-08-08Redisson分布式閉鎖RCountDownLatch的使用詳細(xì)講解
分布式鎖和我們java基礎(chǔ)中學(xué)習(xí)到的synchronized略有不同,synchronized中我們的鎖是個(gè)對(duì)象,當(dāng)前系統(tǒng)部署在不同的服務(wù)實(shí)例上,單純使用synchronized或者lock已經(jīng)無(wú)法滿足對(duì)庫(kù)存一致性的判斷。本次主要講解基于rediss實(shí)現(xiàn)的分布式鎖2023-02-02