欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ普通消息實(shí)戰(zhàn)演練詳解

 更新時(shí)間:2022年08月22日 14:59:31   作者:奔跑的毛球  
這篇文章主要為大家介紹了RocketMQ普通消息實(shí)戰(zhàn)演練詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

之前研究了RocketMQ的源碼,在這里將各種消息發(fā)送與消費(fèi)的demo進(jìn)行舉例,方便以后使用的時(shí)候CV。

相關(guān)的配置,安裝和啟動(dòng)在這篇文章有相關(guān)講解  http://www.dbjr.com.cn/article/260237.htm

普通消息同步發(fā)送

同步消息是指發(fā)送出消息后,同步等待,直到接收到Broker發(fā)送成功的響應(yīng)才會(huì)繼續(xù)發(fā)送下一個(gè)消息。這個(gè)方式可以確保消息發(fā)送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發(fā)送方式
        SendResult send = producer.send(msg);
        //確認(rèn)返回
        System.out.println(send);
    }
    //關(guān)閉producer
    producer.shutdown();
}

普通消息異步發(fā)送

異步消息發(fā)送方在發(fā)送了一條消息后,不等接收方發(fā)回響應(yīng),接著進(jìn)行第二條消息發(fā)送。發(fā)送方通過(guò)回調(diào)接口的方式接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback會(huì)接收異步返回結(jié)果的回調(diào)
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是過(guò)早關(guān)閉producer,會(huì)拋出The producer service state not OK, SHUTDOWN_ALREADY的錯(cuò)
    Thread.sleep(10000);
    //關(guān)閉producer
    producer.shutdown();
}

普通消息單向發(fā)送

單項(xiàng)發(fā)送不關(guān)心發(fā)送的結(jié)果,只發(fā)送請(qǐng)求不等待應(yīng)答。發(fā)送消息耗時(shí)極短。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發(fā)送方式
        producer.sendOneway(msg);
    }
    //關(guān)閉producer
    producer.shutdown();
}

集群消費(fèi)模式

消費(fèi)者采用負(fù)載均衡的方式消費(fèi)消息,同一個(gè)Group下的多個(gè)Consumer共同消費(fèi)Queue里的Message,每個(gè)Consumer處理的消息不同。

一個(gè)Consumer Group中的各個(gè)Consumer實(shí)例分共同消費(fèi)消息,即一條消息只會(huì)投遞到一個(gè)Group下面的一個(gè)實(shí)例,并且只消費(fèi)一遍。

例如某個(gè)Topic有3個(gè)隊(duì)列,其中一個(gè)Consumer Group 有 3 個(gè)實(shí)例,那么每個(gè)實(shí)例只消費(fèi)其中的1個(gè)隊(duì)列。集群消費(fèi)模式是消費(fèi)者默認(rèn)的消費(fèi)方式。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息消費(fèi)者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動(dòng)消費(fèi)者實(shí)例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

廣播消費(fèi)模式

廣播消費(fèi)模式中把消息對(duì)一個(gè)Group下的各個(gè)Consumer實(shí)例都投遞一遍。也就是說(shuō)消息也會(huì)被 Group 中的每個(gè)Consumer都消費(fèi)一次。

實(shí)際上,是一個(gè)消費(fèi)組下的每個(gè)消費(fèi)者實(shí)例都獲取到了topic下面的每個(gè)Message Queue去拉取消費(fèi)。所以消息會(huì)投遞到每個(gè)消費(fèi)者實(shí)例。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息消費(fèi)者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動(dòng)消費(fèi)者實(shí)例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

以上就是RocketMQ普通消息實(shí)戰(zhàn)演練詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ普通消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java中壓縮文件并下載的實(shí)例詳解

    java中壓縮文件并下載的實(shí)例詳解

    在本篇內(nèi)容里小編給大家整理的是一篇關(guān)于java中壓縮文件并下載的實(shí)例詳解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-02-02
  • Spring中BeanFactory解析bean詳解

    Spring中BeanFactory解析bean詳解

    本篇文章主要介紹了Spring中BeanFactory解析bean詳解 ,詳細(xì)的介紹了使用BeanFactory對(duì)bean進(jìn)行解析的實(shí)例,有興趣的可以了解一下。
    2017-04-04
  • 通過(guò)AOP攔截Spring?Boot日志并將其存入數(shù)據(jù)庫(kù)功能實(shí)現(xiàn)

    通過(guò)AOP攔截Spring?Boot日志并將其存入數(shù)據(jù)庫(kù)功能實(shí)現(xiàn)

    本文介紹了如何使用Spring Boot和AOP技術(shù)實(shí)現(xiàn)攔截系統(tǒng)日志并保存到數(shù)據(jù)庫(kù)中的功能,包括配置數(shù)據(jù)庫(kù)連接、定義日志實(shí)體類、定義日志攔截器、使用AOP攔截日志并保存到數(shù)據(jù)庫(kù)中等步驟,感興趣的朋友一起看看吧
    2023-08-08
  • Java 8新特性方法引用詳細(xì)介紹

    Java 8新特性方法引用詳細(xì)介紹

    這篇文章主要介紹了Java 8新特性方法引用詳細(xì)介紹的相關(guān)資料,這里對(duì)新特性 方法引用做的資料整理,具有參考價(jià)值,需要的朋友可以參考下
    2016-12-12
  • Spring Boot 自定義 Shiro 過(guò)濾器無(wú)法使用 @Autowired問(wèn)題及解決方法

    Spring Boot 自定義 Shiro 過(guò)濾器無(wú)法使用 @Autowired問(wèn)題及解決方法

    這篇文章主要介紹了Spring Boot 自定義 Shiro 過(guò)濾器無(wú)法使用 @Autowired問(wèn)題及解決方法 ,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-06-06
  • Java中反射的"暴破"機(jī)制(SetAccessible方法)詳解

    Java中反射的"暴破"機(jī)制(SetAccessible方法)詳解

    這篇文章主要為大家詳細(xì)介紹了Java中反射的"暴破"機(jī)制,以及如何利用這一機(jī)制實(shí)現(xiàn)訪問(wèn)非公有屬性,方法,和構(gòu)造器,文中示例代碼講解詳細(xì),感興趣的可以了解一下
    2022-08-08
  • RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝

    RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝

    本文主要介紹了RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • jdk17?SpringBoot?JPA集成多數(shù)據(jù)庫(kù)的示例詳解

    jdk17?SpringBoot?JPA集成多數(shù)據(jù)庫(kù)的示例詳解

    這篇文章主要介紹了jdk17?SpringBoot?JPA集成多數(shù)據(jù)庫(kù)的示例代碼,包括配置類、請(qǐng)求攔截器、線程上下文等相關(guān)知識(shí),代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08
  • 如何在?Spring?Boot?中使用?OpenAI?ChatGPT?API

    如何在?Spring?Boot?中使用?OpenAI?ChatGPT?API

    這篇文章主要介紹了如何在Spring?Boot中使用OpenAI?ChatGPT?API,我們探索了 OpenAI ChatGPT API 以生成對(duì)提示的響應(yīng),我們創(chuàng)建了一個(gè) Spring Boot 應(yīng)用程序,它調(diào)用 API 來(lái)生成對(duì)提示的響應(yīng),需要的朋友可以參考下
    2023-08-08
  • Redisson分布式閉鎖RCountDownLatch的使用詳細(xì)講解

    Redisson分布式閉鎖RCountDownLatch的使用詳細(xì)講解

    分布式鎖和我們java基礎(chǔ)中學(xué)習(xí)到的synchronized略有不同,synchronized中我們的鎖是個(gè)對(duì)象,當(dāng)前系統(tǒng)部署在不同的服務(wù)實(shí)例上,單純使用synchronized或者lock已經(jīng)無(wú)法滿足對(duì)庫(kù)存一致性的判斷。本次主要講解基于rediss實(shí)現(xiàn)的分布式鎖
    2023-02-02

最新評(píng)論