欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解

 更新時(shí)間:2022年12月27日 13:48:04   作者:一個(gè)雙子座的Java攻城獅  
這篇文章主要介紹了RocketMQ生產(chǎn)消息與消費(fèi)消息,RocketMQ可用于以三種方式發(fā)送消息:可靠的同步、可靠的異步和單向傳輸。前兩種消息類型是可靠的,因?yàn)闊o(wú)論它們是否成功發(fā)送都有響應(yīng)

1 RocketMQ簡(jiǎn)介

RocketMQ是阿里開源的一款非常優(yōu)秀中間件產(chǎn)品,脫胎于阿里的另一款隊(duì)列技術(shù)MetaQ,后捐贈(zèng)給Apache基金會(huì)作為一款孵化技術(shù),僅僅經(jīng)歷了一年多的時(shí)間就成為Apache基金會(huì)的頂級(jí)項(xiàng)目。并且它現(xiàn)在已經(jīng)在阿里內(nèi)部被廣泛的應(yīng)用,并且經(jīng)受住了多次雙十一的這種極致場(chǎng)景的壓力(2017年的雙十一,RocketMQ流轉(zhuǎn)的消息量達(dá)到了萬(wàn)億級(jí),峰值TPS達(dá)到5600萬(wàn))

2 MQ的常見(jiàn)產(chǎn)品

ActiveMQ:java語(yǔ)言實(shí)現(xiàn),萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),主從架構(gòu),成熟度高

RabbitMQ :erlang語(yǔ)言實(shí)現(xiàn),萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度us級(jí),主從架構(gòu),

RocketMQ :java語(yǔ)言實(shí)現(xiàn),十萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能強(qiáng),擴(kuò)展性強(qiáng)

kafka :scala語(yǔ)言實(shí)現(xiàn),十萬(wàn)級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能較少,應(yīng)用于大數(shù)據(jù)較多

3 環(huán)境搭建

創(chuàng)建maven工程

引入依賴:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
</dependency>

4 單生產(chǎn)者單消費(fèi)者模式

生產(chǎn)者:

//生產(chǎn)者,產(chǎn)生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.設(shè)定發(fā)送的命名服務(wù)器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3啟動(dòng)發(fā)送的服務(wù)
        producer.start();
        //4.1創(chuàng)建要發(fā)送的消息對(duì)象,指定topic,指定內(nèi)容body
        Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8"));
        //4.2發(fā)送消息
        SendResult result = producer.send(msg);
        System.out.println("返回結(jié)果:"+result);
        //5.關(guān)閉連接
        producer.shutdown();
    }
}

消費(fèi)者:

//消費(fèi)者,消費(fèi)消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.設(shè)定接收的命名服務(wù)器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
        consumer.subscribe("topic1","*");
        //4.開啟監(jiān)聽(tīng),用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
            }
        });
        //5.啟動(dòng)接收消息的服務(wù)
        consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
        System.out.println("接收消息服務(wù)已運(yùn)行");
    }
}

測(cè)試:

5 單生產(chǎn)者多消費(fèi)者模式

5.1默認(rèn)模式(負(fù)載均衡)

生產(chǎn)者:

//生產(chǎn)者,產(chǎn)生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.設(shè)定發(fā)送的命名服務(wù)器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3啟動(dòng)發(fā)送的服務(wù)
        producer.start();
        for (int i = 1; i <= 10; i++) {
            Message msg = new Message("topic1",("生產(chǎn)者2: hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回結(jié)果:"+result);
        }
        //5.關(guān)閉連接
        producer.shutdown();
    }
}

消費(fèi)者:

//消費(fèi)者,消費(fèi)消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.設(shè)定接收的命名服務(wù)器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
        consumer.subscribe("topic1","*");
        //4.開啟監(jiān)聽(tīng),用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
            }
        });
        //5.啟動(dòng)接收消息的服務(wù)
        consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
        System.out.println("接收消息服務(wù)已運(yùn)行");
    }
}

測(cè)試:

5.2廣播模式

生產(chǎn)者的代碼不變,消費(fèi)者的代碼改動(dòng)如下:

		//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的
        consumer.setMessageModel(MessageModel.BROADCASTING);

具體消費(fèi)者代碼:

//消費(fèi)者,消費(fèi)消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.設(shè)定接收的命名服務(wù)器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
        consumer.subscribe("topic1","*");
        //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        //設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.開啟監(jiān)聽(tīng),用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
            }
        });
        //5.啟動(dòng)接收消息的服務(wù)
        consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
        System.out.println("接收消息服務(wù)已運(yùn)行");
    }
}

測(cè)試:

廣播模式的現(xiàn)象

如果 生產(chǎn)者先發(fā)送消息, 后啟動(dòng)消費(fèi)者, 消息只能被消費(fèi)一次

如果多個(gè)消費(fèi)者先啟動(dòng)(廣播模式),后發(fā)消息,才有廣播的效果

結(jié)論: 必須先啟動(dòng)消費(fèi)者再啟動(dòng)發(fā)送者才有廣播的效果

6 多生產(chǎn)者多消費(fèi)者模式

多生產(chǎn)者產(chǎn)生的消息可以被同一個(gè)消費(fèi)者消費(fèi),也可以被多個(gè)消費(fèi)者消費(fèi)

運(yùn)行多個(gè)生產(chǎn)者,在啟動(dòng)消費(fèi)者

測(cè)試:

到此這篇關(guān)于RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解的文章就介紹到這了,更多相關(guān)RocketMQ生產(chǎn)消息與消費(fèi)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 使用Hibernate根據(jù)實(shí)體類自動(dòng)生成表的方法

    使用Hibernate根據(jù)實(shí)體類自動(dòng)生成表的方法

    這篇文章主要介紹了使用Hibernate根據(jù)實(shí)體類自動(dòng)生成表的方法,該篇提供了兩種方法,可以根據(jù)需要選擇其一,希望對(duì)你有所幫助,如有不對(duì)的地方還望指正
    2023-03-03
  • redis 獲取 list 中的所有元素操作

    redis 獲取 list 中的所有元素操作

    這篇文章主要介紹了redis 獲取 list 中的所有元素操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-03-03
  • Java鎖的升級(jí)策略 偏向鎖 輕量級(jí)鎖 重量級(jí)鎖

    Java鎖的升級(jí)策略 偏向鎖 輕量級(jí)鎖 重量級(jí)鎖

    在本文中小編給的大家整理了關(guān)于Java鎖的升級(jí)策略 偏向鎖 輕量級(jí)鎖 重量級(jí)鎖的相關(guān)知識(shí)點(diǎn)內(nèi)容,需要的朋友們參考下。
    2019-06-06
  • SpringBoot啟動(dòng)執(zhí)行sql腳本的3種方法實(shí)例

    SpringBoot啟動(dòng)執(zhí)行sql腳本的3種方法實(shí)例

    在應(yīng)用程序啟動(dòng)后,可以自動(dòng)執(zhí)行建庫(kù)、建表等SQL腳本,下面這篇文章主要給大家介紹了關(guān)于SpringBoot啟動(dòng)執(zhí)行sql腳本的3種方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-01-01
  • Springboot如何優(yōu)雅地進(jìn)行字段校驗(yàn)

    Springboot如何優(yōu)雅地進(jìn)行字段校驗(yàn)

    這篇文章主要給大家介紹了關(guān)于Springboot如何優(yōu)雅地進(jìn)行字段校驗(yàn)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • JAVA設(shè)計(jì)模式之訪問(wèn)者模式原理與用法詳解

    JAVA設(shè)計(jì)模式之訪問(wèn)者模式原理與用法詳解

    這篇文章主要介紹了JAVA設(shè)計(jì)模式之訪問(wèn)者模式,簡(jiǎn)單說(shuō)明了訪問(wèn)者模式的原理,并結(jié)合實(shí)例分析了java訪問(wèn)者模式的定義與用法,需要的朋友可以參考下
    2017-08-08
  • 基于mybatis batch實(shí)現(xiàn)批量提交大量數(shù)據(jù)

    基于mybatis batch實(shí)現(xiàn)批量提交大量數(shù)據(jù)

    這篇文章主要介紹了基于mybatis batch實(shí)現(xiàn)批量提交大量數(shù)據(jù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • Spring Security入門demo案例

    Spring Security入門demo案例

    Spring Security是一個(gè)高度自定義的安全框架,本文主要介紹了Spring Security入門,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的酒店前臺(tái)管理小功能(實(shí)例代碼)

    Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的酒店前臺(tái)管理小功能(實(shí)例代碼)

    這篇文章主要介紹了Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的酒店前臺(tái)管理小功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • java實(shí)例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)

    java實(shí)例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)

    下面小編就為大家?guī)?lái)一篇java實(shí)例方法被覆蓋,靜態(tài)方法被隱藏Explain(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05

最新評(píng)論