欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Java代碼實(shí)現(xiàn)RocketMQ的生產(chǎn)與消費(fèi)消息

 更新時(shí)間:2024年07月31日 08:43:23   作者:小威要向諸佬學(xué)習(xí)呀  
這篇文章介紹一下其他的小組件以及使用Java代碼實(shí)現(xiàn)生產(chǎn)者對(duì)消息的生成,消費(fèi)者消費(fèi)消息等知識(shí)點(diǎn),并通過(guò)代碼示例介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下

RocketMQ其他組件

在RocketMQ中,除了生產(chǎn)者,消費(fèi)者,還有一些其他的小組件,接下來(lái)逐一介紹一下他們。

監(jiān)聽(tīng)器(Listener)

定義:監(jiān)聽(tīng)器是消費(fèi)者用于處理消息的組件。在PushConsumer(推)模式下,消費(fèi)者客戶端必須設(shè)置消費(fèi)監(jiān)聽(tīng)器,以便在接收到消息時(shí)執(zhí)行相應(yīng)的處理邏輯。(比如一會(huì)兒下面的代碼)

偏移量(Offset)

定義:偏移量是指在消費(fèi)消息時(shí),記錄消費(fèi)者已經(jīng)消費(fèi)到的消息位置的值。每個(gè)消息都有一個(gè)唯一的偏移量值,它代表了消息在消息隊(duì)列中的位置。

偏移量具有很大的作用:它能夠保證消費(fèi)者在重啟或宕機(jī)后能夠從上次消費(fèi)的位置繼續(xù)消費(fèi)消息,避免重復(fù)消費(fèi)或漏消費(fèi)。

簡(jiǎn)而言之就是它能告訴消費(fèi)者已經(jīng)消費(fèi)到哪一條消息?。?/p>

  • 集群模式:在集群消費(fèi)模式下,消息隊(duì)列的消費(fèi)進(jìn)度保存在Broker端。消費(fèi)者每次消費(fèi)完消息后,會(huì)將最新的消費(fèi)進(jìn)度同步到Broker,以便在消費(fèi)者重啟或者故障轉(zhuǎn)移的時(shí)候能夠從上一次消費(fèi)的位置繼續(xù)消費(fèi)。
  • 廣播模式:在廣播消費(fèi)模式下,消息隊(duì)列的消費(fèi)進(jìn)度保存在消費(fèi)者本地。因?yàn)閺V播模式下每條消息都會(huì)被所有消費(fèi)者消費(fèi),所以不需要在Broker端保存消費(fèi)進(jìn)度。

所以,偏移量的的實(shí)現(xiàn)方式有兩種:包括存儲(chǔ)在本地文件(OffsetStore)和存儲(chǔ)在Broker中這兩種方式。(這樣一看,清晰了吧)

命名服務(wù)器(NameServer)

定義:命名服務(wù)器是RocketMQ中的輕量級(jí)路由服務(wù),存儲(chǔ)生產(chǎn)者和消費(fèi)者與Broker之間的路由信息。

它的作用:提供Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)服務(wù),生產(chǎn)者和消費(fèi)者通過(guò)NameServer查詢Broker的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。

消息組成

  • Topic:消息主題,對(duì)不同的業(yè)務(wù)消息進(jìn)行分類。
  • Tag:消息標(biāo)簽,進(jìn)一步區(qū)分某個(gè)Topic下的消息分類。使用Tag可以實(shí)現(xiàn)對(duì)Topic中的消息進(jìn)行過(guò)濾。消費(fèi)者可以根據(jù)Tag來(lái)訂閱自己感興趣的消息,而不是接收Topic下的所有消息。
  • Message Body:消息體,消息的實(shí)際內(nèi)容。
  • Keys:消息的鍵值,標(biāo)識(shí)消息的唯一性。在RocketMQ中,每個(gè)消息都可以設(shè)置Keys字段,以便在需要的時(shí)候根據(jù)Keys來(lái)查詢或者定位消息。
  • 屬性:除了上面滴,RocketMQ的消息還可以包含一系列的屬性信息,比如消息的發(fā)送時(shí)間、生產(chǎn)者信息等等。這些屬性信息以鍵值對(duì)的形式存在,隨著消息一起被存儲(chǔ)和傳輸。

實(shí)現(xiàn)生產(chǎn)與消費(fèi)消息

按之前的步驟搭建完成RocketMQ集群后...

首先我們創(chuàng)建一個(gè)空的maven工程,在pom.xml文件中添加RocketMQ的依賴(RocketMQ的依賴版本需要與虛擬機(jī)中的保持一致,這里選擇和之前一樣的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生產(chǎn)消息

然后編寫(xiě)生產(chǎn)者生產(chǎn)消息的代碼:

// 1.創(chuàng)建一個(gè)DefaultMQProducer實(shí)例,指定生產(chǎn)者組名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.設(shè)置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.啟動(dòng)生產(chǎn)者實(shí)例  
producer.start();   
// 4.使用for循環(huán)發(fā)送10條消息  
for (int i = 0; i < 10; i++) {  
    // 創(chuàng)建一條消息,指定Topic為"MyTopic1",Tag標(biāo)簽為"TagA",消息體為"hello rocketmq"加上循環(huán)變量的值,同時(shí)把字符串轉(zhuǎn)換為字節(jié)數(shù)組  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.發(fā)送消息并接收發(fā)送結(jié)果  
    SendResult sendResult = producer.send(message);  
    // 打印發(fā)送結(jié)果,包括消息ID、發(fā)送狀態(tài)等信息  
    System.out.println(sendResult);  
}  
// 6.發(fā)送完所有消息后,關(guān)閉生產(chǎn)者實(shí)例,釋放資源  
producer.shutdown();

生產(chǎn)者生產(chǎn)消息和消費(fèi)者消費(fèi)消息這塊的代碼都相對(duì)較為簡(jiǎn)單,已經(jīng)在代碼塊中加了注釋,這里就不再贅述了。

這個(gè)時(shí)候就可以訪問(wèn)虛擬機(jī)+端口號(hào)來(lái)搜索到發(fā)送的消息詳情了!

消費(fèi)消息

// 1.和生產(chǎn)者一樣,創(chuàng)建一個(gè)DefaultMQPushConsumer實(shí)例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.設(shè)置NameServer的地址,消費(fèi)者通過(guò)這個(gè)地址與NameServer進(jìn)行通信,來(lái)獲取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 訂閱一個(gè)或多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息。我們訂閱了"MyTopic1",使用"*"來(lái)匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注冊(cè)消息監(jiān)聽(tīng)器,用于處理從Broker接收到的消息。使用MessageListenerConcurrently接口的實(shí)現(xiàn),表示并行消費(fèi)  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.當(dāng)收到消息時(shí),方法會(huì)被調(diào)用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍歷消息列表,并打印每條消息的內(nèi)容(注意:這里直接打印msg對(duì)象不會(huì)得到預(yù)期的消息內(nèi)容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我們打印msg.getBody()的內(nèi)容,為了保留消息原樣  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消費(fèi)狀態(tài),這里表示消息已成功消費(fèi)  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.啟動(dòng)消費(fèi)者實(shí)例  
consumer.start();  
// 8.打印日志消費(fèi)者已啟動(dòng)  
System.out.println("消費(fèi)者已啟動(dòng)");

這里需要注意,msg包含了消息的詳細(xì)信息,包括消息體、標(biāo)簽、屬性等等。如果想打印消息內(nèi)容,應(yīng)該使用msg.getBody()方法獲取消息體的字節(jié)數(shù)組,并且把它轉(zhuǎn)換為字符串(如果消息體是文本的話)。

本篇文章到這里就結(jié)束了,后續(xù)會(huì)繼續(xù)分享RocketMQ相關(guān)的知識(shí),感謝各位小伙伴們的支持!

以上就是使用Java代碼實(shí)現(xiàn)RocketMQ的生產(chǎn)與消費(fèi)消息的詳細(xì)內(nèi)容,更多關(guān)于Java實(shí)現(xiàn)RocketMQ生產(chǎn)與消費(fèi)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Spring Cloud Nacos 和 Eureka區(qū)別解析

    Spring Cloud Nacos 和 Eureka區(qū)別解析

    Spring Cloud Nacos 和 Spring Cloud Eureka 都是 Spring Cloud 微服務(wù)框架中的服務(wù)注冊(cè)和發(fā)現(xiàn)組件,用于幫助開(kāi)發(fā)者輕松地構(gòu)建和管理微服務(wù)應(yīng)用,這篇文章主要介紹了Spring Cloud Nacos 和 Eureka區(qū)別,需要的朋友可以參考下
    2023-08-08
  • 全面解析java中的hashtable

    全面解析java中的hashtable

    以下是對(duì)java中的hashtable進(jìn)行了詳細(xì)的分析介紹。需要的朋友可以過(guò)來(lái)參考下
    2013-08-08
  • 如何更好的使用Java8中方法引用詳解

    如何更好的使用Java8中方法引用詳解

    在Java8中,我們可以直接通過(guò)方法引用來(lái)簡(jiǎn)寫(xiě)lambda表達(dá)式中已經(jīng)存在的方法,這種特性就叫做方法引用(Method Reference)。下面這篇文章主要給大家介紹了關(guān)于如何更好的使用Java8中方法引用的相關(guān)資料,需要的朋友可以參考下。
    2017-09-09
  • spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn))

    spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn))

    本篇文章主要介紹了spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn)),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • springboot如何解決跨域后session獲取不到sessionId不一致

    springboot如何解決跨域后session獲取不到sessionId不一致

    這篇文章主要介紹了springboot如何解決跨域后session獲取不到sessionId不一致問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • SpringBoot開(kāi)發(fā)技巧之如何處理跨域請(qǐng)求CORS

    SpringBoot開(kāi)發(fā)技巧之如何處理跨域請(qǐng)求CORS

    CORS(Cross-Origin Resource Sharing)"跨域資源共享",是一個(gè)W3C標(biāo)準(zhǔn),它允許瀏覽器向跨域服務(wù)器發(fā)送Ajax請(qǐng)求,打破了Ajax只能訪問(wèn)本站內(nèi)的資源限制
    2021-10-10
  • SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解

    SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解

    這篇文章主要為大家介紹了SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解,文中的實(shí)現(xiàn)步驟講解詳細(xì),感興趣的小伙伴們可以了解一下
    2022-06-06
  • 解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫(kù)時(shí)結(jié)果返回null問(wèn)題

    解決mybatis使用char類型字段查詢oracle數(shù)據(jù)庫(kù)時(shí)結(jié)果返回null問(wèn)題

    這篇文章主要介紹了mybatis使用char類型字段查詢oracle數(shù)據(jù)庫(kù)時(shí)結(jié)果返回null問(wèn)題的解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-06-06
  • Java并發(fā) 線程間的等待與通知

    Java并發(fā) 線程間的等待與通知

    這篇文章主要介紹了Java并發(fā) 線程間的等待與通知,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • Spring Boot 整合 Druid 并開(kāi)啟監(jiān)控的操作方法

    Spring Boot 整合 Druid 并開(kāi)啟監(jiān)控的操作方法

    本文介紹了如何在SpringBoot項(xiàng)目中引入和配置Druid數(shù)據(jù)庫(kù)連接池,并開(kāi)啟其監(jiān)控功能,通過(guò)添加依賴、配置數(shù)據(jù)源、開(kāi)啟監(jiān)控、自定義配置以及訪問(wèn)監(jiān)控頁(yè)面,開(kāi)發(fā)者可以有效提高數(shù)據(jù)庫(kù)訪問(wèn)效率并監(jiān)控連接池狀態(tài),感興趣的朋友跟隨小編一起看看吧
    2025-01-01

最新評(píng)論