使用Java代碼實(shí)現(xiàn)RocketMQ的生產(chǎn)與消費(fèi)消息
RocketMQ其他組件
在RocketMQ中,除了生產(chǎn)者,消費(fèi)者,還有一些其他的小組件,接下來(lái)逐一介紹一下他們。
監(jiān)聽(tīng)器(Listener)
定義:監(jiān)聽(tīng)器是消費(fèi)者用于處理消息的組件。在PushConsumer(推)模式下,消費(fèi)者客戶端必須設(shè)置消費(fèi)監(jiān)聽(tīng)器,以便在接收到消息時(shí)執(zhí)行相應(yīng)的處理邏輯。(比如一會(huì)兒下面的代碼)
偏移量(Offset)
定義:偏移量是指在消費(fèi)消息時(shí),記錄消費(fèi)者已經(jīng)消費(fèi)到的消息位置的值。每個(gè)消息都有一個(gè)唯一的偏移量值,它代表了消息在消息隊(duì)列中的位置。
偏移量具有很大的作用:它能夠保證消費(fèi)者在重啟或宕機(jī)后能夠從上次消費(fèi)的位置繼續(xù)消費(fèi)消息,避免重復(fù)消費(fèi)或漏消費(fèi)。
簡(jiǎn)而言之就是它能告訴消費(fèi)者已經(jīng)消費(fèi)到哪一條消息?。?/p>
- 集群模式:在集群消費(fèi)模式下,消息隊(duì)列的消費(fèi)進(jìn)度保存在Broker端。消費(fèi)者每次消費(fèi)完消息后,會(huì)將最新的消費(fèi)進(jìn)度同步到Broker,以便在消費(fèi)者重啟或者故障轉(zhuǎn)移的時(shí)候能夠從上一次消費(fèi)的位置繼續(xù)消費(fèi)。
- 廣播模式:在廣播消費(fèi)模式下,消息隊(duì)列的消費(fèi)進(jìn)度保存在消費(fèi)者本地。因?yàn)閺V播模式下每條消息都會(huì)被所有消費(fèi)者消費(fèi),所以不需要在Broker端保存消費(fèi)進(jìn)度。
所以,偏移量的的實(shí)現(xiàn)方式有兩種:包括存儲(chǔ)在本地文件(OffsetStore)和存儲(chǔ)在Broker中這兩種方式。(這樣一看,清晰了吧)
命名服務(wù)器(NameServer)
定義:命名服務(wù)器是RocketMQ中的輕量級(jí)路由服務(wù),存儲(chǔ)生產(chǎn)者和消費(fèi)者與Broker之間的路由信息。
它的作用:提供Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)服務(wù),生產(chǎn)者和消費(fèi)者通過(guò)NameServer查詢Broker的路由信息
,從而進(jìn)行消息的投遞和消費(fèi)。
消息組成
- Topic:消息主題,對(duì)不同的業(yè)務(wù)消息進(jìn)行分類。
- Tag:消息標(biāo)簽,進(jìn)一步區(qū)分某個(gè)Topic下的消息分類。使用Tag可以實(shí)現(xiàn)對(duì)Topic中的消息進(jìn)行
過(guò)濾
。消費(fèi)者可以根據(jù)Tag來(lái)訂閱自己感興趣的消息,而不是接收Topic下的所有消息。 - Message Body:消息體,消息的實(shí)際內(nèi)容。
- Keys:消息的鍵值,標(biāo)識(shí)消息的唯一性。在RocketMQ中,每個(gè)消息都可以設(shè)置Keys字段,以便在需要的時(shí)候根據(jù)Keys來(lái)查詢或者定位消息。
- 屬性:除了上面滴,RocketMQ的消息還可以包含一系列的屬性信息,比如
消息的發(fā)送時(shí)間、生產(chǎn)者信息
等等。這些屬性信息以鍵值對(duì)的形式存在,隨著消息一起被存儲(chǔ)和傳輸。
實(shí)現(xiàn)生產(chǎn)與消費(fèi)消息
按之前的步驟搭建完成RocketMQ集群后...
首先我們創(chuàng)建一個(gè)空的maven工程,在pom.xml文件中添加RocketMQ的依賴(RocketMQ的依賴版本需要與虛擬機(jī)中的保持一致,這里選擇和之前一樣的4.7.1版本):
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
生產(chǎn)消息
然后編寫(xiě)生產(chǎn)者生產(chǎn)消息的代碼:
// 1.創(chuàng)建一個(gè)DefaultMQProducer實(shí)例,指定生產(chǎn)者組名 DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1"); // 2.設(shè)置NameServer的地址 producer.setNamesrvAddr("192.168.220.135:9876"); // 3.啟動(dòng)生產(chǎn)者實(shí)例 producer.start(); // 4.使用for循環(huán)發(fā)送10條消息 for (int i = 0; i < 10; i++) { // 創(chuàng)建一條消息,指定Topic為"MyTopic1",Tag標(biāo)簽為"TagA",消息體為"hello rocketmq"加上循環(huán)變量的值,同時(shí)把字符串轉(zhuǎn)換為字節(jié)數(shù)組 Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8)); // 5.發(fā)送消息并接收發(fā)送結(jié)果 SendResult sendResult = producer.send(message); // 打印發(fā)送結(jié)果,包括消息ID、發(fā)送狀態(tài)等信息 System.out.println(sendResult); } // 6.發(fā)送完所有消息后,關(guān)閉生產(chǎn)者實(shí)例,釋放資源 producer.shutdown();
生產(chǎn)者生產(chǎn)消息和消費(fèi)者消費(fèi)消息這塊的代碼都相對(duì)較為簡(jiǎn)單,已經(jīng)在代碼塊中加了注釋,這里就不再贅述了。
這個(gè)時(shí)候就可以訪問(wèn)虛擬機(jī)+端口號(hào)來(lái)搜索到發(fā)送的消息詳情了!
消費(fèi)消息
// 1.和生產(chǎn)者一樣,創(chuàng)建一個(gè)DefaultMQPushConsumer實(shí)例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1"); // 2.設(shè)置NameServer的地址,消費(fèi)者通過(guò)這個(gè)地址與NameServer進(jìn)行通信,來(lái)獲取Broker的地址信息 consumer.setNamesrvAddr("192.168.220.135:9876"); // 訂閱一個(gè)或多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息。我們訂閱了"MyTopic1",使用"*"來(lái)匹配此Topic下的所有Tag consumer.subscribe("MyTopic1", "*"); // 3.注冊(cè)消息監(jiān)聽(tīng)器,用于處理從Broker接收到的消息。使用MessageListenerConcurrently接口的實(shí)現(xiàn),表示并行消費(fèi) consumer.registerMessageListener(new MessageListenerConcurrently() { @Override // 4.當(dāng)收到消息時(shí),方法會(huì)被調(diào)用。 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 5.遍歷消息列表,并打印每條消息的內(nèi)容(注意:這里直接打印msg對(duì)象不會(huì)得到預(yù)期的消息內(nèi)容字符串) for (MessageExt msg : msgs) { // 所以我們打印msg.getBody()的內(nèi)容,為了保留消息原樣 System.out.println("已收到消息" + msg); } // 6.返回消費(fèi)狀態(tài),這里表示消息已成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 7.啟動(dòng)消費(fèi)者實(shí)例 consumer.start(); // 8.打印日志消費(fèi)者已啟動(dòng) System.out.println("消費(fèi)者已啟動(dòng)");
這里需要注意,msg包含了消息的詳細(xì)信息,包括消息體、標(biāo)簽、屬性等等。如果想打印消息內(nèi)容,應(yīng)該使用msg.getBody()方法獲取消息體的字節(jié)數(shù)組,并且把它轉(zhuǎn)換為字符串(如果消息體是文本的話)。
本篇文章到這里就結(jié)束了,后續(xù)會(huì)繼續(xù)分享RocketMQ相關(guān)的知識(shí),感謝各位小伙伴們的支持!
以上就是使用Java代碼實(shí)現(xiàn)RocketMQ的生產(chǎn)與消費(fèi)消息的詳細(xì)內(nèi)容,更多關(guān)于Java實(shí)現(xiàn)RocketMQ生產(chǎn)與消費(fèi)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Cloud Nacos 和 Eureka區(qū)別解析
Spring Cloud Nacos 和 Spring Cloud Eureka 都是 Spring Cloud 微服務(wù)框架中的服務(wù)注冊(cè)和發(fā)現(xiàn)組件,用于幫助開(kāi)發(fā)者輕松地構(gòu)建和管理微服務(wù)應(yīng)用,這篇文章主要介紹了Spring Cloud Nacos 和 Eureka區(qū)別,需要的朋友可以參考下2023-08-08spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn))
本篇文章主要介紹了spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn)),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10springboot如何解決跨域后session獲取不到sessionId不一致
這篇文章主要介紹了springboot如何解決跨域后session獲取不到sessionId不一致問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01SpringBoot開(kāi)發(fā)技巧之如何處理跨域請(qǐng)求CORS
CORS(Cross-Origin Resource Sharing)"跨域資源共享",是一個(gè)W3C標(biāo)準(zhǔn),它允許瀏覽器向跨域服務(wù)器發(fā)送Ajax請(qǐng)求,打破了Ajax只能訪問(wèn)本站內(nèi)的資源限制2021-10-10SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解
這篇文章主要為大家介紹了SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解,文中的實(shí)現(xiàn)步驟講解詳細(xì),感興趣的小伙伴們可以了解一下2022-06-06解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫(kù)時(shí)結(jié)果返回null問(wèn)題
這篇文章主要介紹了mybatis使用char類型字段查詢oracle數(shù)據(jù)庫(kù)時(shí)結(jié)果返回null問(wèn)題的解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-06-06Spring Boot 整合 Druid 并開(kāi)啟監(jiān)控的操作方法
本文介紹了如何在SpringBoot項(xiàng)目中引入和配置Druid數(shù)據(jù)庫(kù)連接池,并開(kāi)啟其監(jiān)控功能,通過(guò)添加依賴、配置數(shù)據(jù)源、開(kāi)啟監(jiān)控、自定義配置以及訪問(wèn)監(jiān)控頁(yè)面,開(kāi)發(fā)者可以有效提高數(shù)據(jù)庫(kù)訪問(wèn)效率并監(jiān)控連接池狀態(tài),感興趣的朋友跟隨小編一起看看吧2025-01-01