使用Java代碼實現(xiàn)RocketMQ的生產(chǎn)與消費消息
RocketMQ其他組件
在RocketMQ中,除了生產(chǎn)者,消費者,還有一些其他的小組件,接下來逐一介紹一下他們。
監(jiān)聽器(Listener)
定義:監(jiān)聽器是消費者用于處理消息的組件。在PushConsumer(推)模式下,消費者客戶端必須設(shè)置消費監(jiān)聽器,以便在接收到消息時執(zhí)行相應(yīng)的處理邏輯。(比如一會兒下面的代碼)
偏移量(Offset)
定義:偏移量是指在消費消息時,記錄消費者已經(jīng)消費到的消息位置的值。每個消息都有一個唯一的偏移量值,它代表了消息在消息隊列中的位置。
偏移量具有很大的作用:它能夠保證消費者在重啟或宕機后能夠從上次消費的位置繼續(xù)消費消息,避免重復(fù)消費或漏消費。
簡而言之就是它能告訴消費者已經(jīng)消費到哪一條消息!!
- 集群模式:在集群消費模式下,消息隊列的消費進度保存在Broker端。消費者每次消費完消息后,會將最新的消費進度同步到Broker,以便在消費者重啟或者故障轉(zhuǎn)移的時候能夠從上一次消費的位置繼續(xù)消費。
- 廣播模式:在廣播消費模式下,消息隊列的消費進度保存在消費者本地。因為廣播模式下每條消息都會被所有消費者消費,所以不需要在Broker端保存消費進度。
所以,偏移量的的實現(xiàn)方式有兩種:包括存儲在本地文件(OffsetStore)和存儲在Broker中這兩種方式。(這樣一看,清晰了吧)
命名服務(wù)器(NameServer)
定義:命名服務(wù)器是RocketMQ中的輕量級路由服務(wù),存儲生產(chǎn)者和消費者與Broker之間的路由信息。
它的作用:提供Broker的動態(tài)注冊與發(fā)現(xiàn)服務(wù),生產(chǎn)者和消費者通過NameServer查詢Broker的路由信息
,從而進行消息的投遞和消費。
消息組成
- Topic:消息主題,對不同的業(yè)務(wù)消息進行分類。
- Tag:消息標(biāo)簽,進一步區(qū)分某個Topic下的消息分類。使用Tag可以實現(xiàn)對Topic中的消息進行
過濾
。消費者可以根據(jù)Tag來訂閱自己感興趣的消息,而不是接收Topic下的所有消息。 - Message Body:消息體,消息的實際內(nèi)容。
- Keys:消息的鍵值,標(biāo)識消息的唯一性。在RocketMQ中,每個消息都可以設(shè)置Keys字段,以便在需要的時候根據(jù)Keys來查詢或者定位消息。
- 屬性:除了上面滴,RocketMQ的消息還可以包含一系列的屬性信息,比如
消息的發(fā)送時間、生產(chǎn)者信息
等等。這些屬性信息以鍵值對的形式存在,隨著消息一起被存儲和傳輸。
實現(xiàn)生產(chǎn)與消費消息
按之前的步驟搭建完成RocketMQ集群后...
首先我們創(chuàng)建一個空的maven工程,在pom.xml文件中添加RocketMQ的依賴(RocketMQ的依賴版本需要與虛擬機中的保持一致,這里選擇和之前一樣的4.7.1版本):
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
生產(chǎn)消息
然后編寫生產(chǎn)者生產(chǎn)消息的代碼:
// 1.創(chuàng)建一個DefaultMQProducer實例,指定生產(chǎn)者組名 DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1"); // 2.設(shè)置NameServer的地址 producer.setNamesrvAddr("192.168.220.135:9876"); // 3.啟動生產(chǎn)者實例 producer.start(); // 4.使用for循環(huán)發(fā)送10條消息 for (int i = 0; i < 10; i++) { // 創(chuàng)建一條消息,指定Topic為"MyTopic1",Tag標(biāo)簽為"TagA",消息體為"hello rocketmq"加上循環(huán)變量的值,同時把字符串轉(zhuǎn)換為字節(jié)數(shù)組 Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8)); // 5.發(fā)送消息并接收發(fā)送結(jié)果 SendResult sendResult = producer.send(message); // 打印發(fā)送結(jié)果,包括消息ID、發(fā)送狀態(tài)等信息 System.out.println(sendResult); } // 6.發(fā)送完所有消息后,關(guān)閉生產(chǎn)者實例,釋放資源 producer.shutdown();
生產(chǎn)者生產(chǎn)消息和消費者消費消息這塊的代碼都相對較為簡單,已經(jīng)在代碼塊中加了注釋,這里就不再贅述了。
這個時候就可以訪問虛擬機+端口號來搜索到發(fā)送的消息詳情了!
消費消息
// 1.和生產(chǎn)者一樣,創(chuàng)建一個DefaultMQPushConsumer實例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1"); // 2.設(shè)置NameServer的地址,消費者通過這個地址與NameServer進行通信,來獲取Broker的地址信息 consumer.setNamesrvAddr("192.168.220.135:9876"); // 訂閱一個或多個Topic,以及Tag來過濾需要消費的消息。我們訂閱了"MyTopic1",使用"*"來匹配此Topic下的所有Tag consumer.subscribe("MyTopic1", "*"); // 3.注冊消息監(jiān)聽器,用于處理從Broker接收到的消息。使用MessageListenerConcurrently接口的實現(xiàn),表示并行消費 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override // 4.當(dāng)收到消息時,方法會被調(diào)用。 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 5.遍歷消息列表,并打印每條消息的內(nèi)容(注意:這里直接打印msg對象不會得到預(yù)期的消息內(nèi)容字符串) for (MessageExt msg : msgs) { // 所以我們打印msg.getBody()的內(nèi)容,為了保留消息原樣 System.out.println("已收到消息" + msg); } // 6.返回消費狀態(tài),這里表示消息已成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 7.啟動消費者實例 consumer.start(); // 8.打印日志消費者已啟動 System.out.println("消費者已啟動");
這里需要注意,msg包含了消息的詳細(xì)信息,包括消息體、標(biāo)簽、屬性等等。如果想打印消息內(nèi)容,應(yīng)該使用msg.getBody()方法獲取消息體的字節(jié)數(shù)組,并且把它轉(zhuǎn)換為字符串(如果消息體是文本的話)。
本篇文章到這里就結(jié)束了,后續(xù)會繼續(xù)分享RocketMQ相關(guān)的知識,感謝各位小伙伴們的支持!
以上就是使用Java代碼實現(xiàn)RocketMQ的生產(chǎn)與消費消息的詳細(xì)內(nèi)容,更多關(guān)于Java實現(xiàn)RocketMQ生產(chǎn)與消費的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Cloud Nacos 和 Eureka區(qū)別解析
Spring Cloud Nacos 和 Spring Cloud Eureka 都是 Spring Cloud 微服務(wù)框架中的服務(wù)注冊和發(fā)現(xiàn)組件,用于幫助開發(fā)者輕松地構(gòu)建和管理微服務(wù)應(yīng)用,這篇文章主要介紹了Spring Cloud Nacos 和 Eureka區(qū)別,需要的朋友可以參考下2023-08-08spring整合JMS實現(xiàn)同步收發(fā)消息(基于ActiveMQ的實現(xiàn))
本篇文章主要介紹了spring整合JMS實現(xiàn)同步收發(fā)消息(基于ActiveMQ的實現(xiàn)),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10springboot如何解決跨域后session獲取不到sessionId不一致
這篇文章主要介紹了springboot如何解決跨域后session獲取不到sessionId不一致問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01SpringBoot開發(fā)技巧之如何處理跨域請求CORS
CORS(Cross-Origin Resource Sharing)"跨域資源共享",是一個W3C標(biāo)準(zhǔn),它允許瀏覽器向跨域服務(wù)器發(fā)送Ajax請求,打破了Ajax只能訪問本站內(nèi)的資源限制2021-10-10SpringBoot中對應(yīng)2.0.x版本的Redis配置詳解
這篇文章主要為大家介紹了SpringBoot中對應(yīng)2.0.x版本的Redis配置詳解,文中的實現(xiàn)步驟講解詳細(xì),感興趣的小伙伴們可以了解一下2022-06-06解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題
這篇文章主要介紹了mybatis使用char類型字段查詢oracle數(shù)據(jù)庫時結(jié)果返回null問題的解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06Spring Boot 整合 Druid 并開啟監(jiān)控的操作方法
本文介紹了如何在SpringBoot項目中引入和配置Druid數(shù)據(jù)庫連接池,并開啟其監(jiān)控功能,通過添加依賴、配置數(shù)據(jù)源、開啟監(jiān)控、自定義配置以及訪問監(jiān)控頁面,開發(fā)者可以有效提高數(shù)據(jù)庫訪問效率并監(jiān)控連接池狀態(tài),感興趣的朋友跟隨小編一起看看吧2025-01-01