欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解

 更新時(shí)間:2023年05月20日 09:41:27   作者:叫我二蛋  
這篇文章主要介紹了關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解,RocketMQ作為一款純java、分布式、隊(duì)列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等,需要的朋友可以參考下

環(huán)境搭建

  1. 創(chuàng)建Maven項(xiàng)目。
  2. pom.xml文件中引入RocketMQ依賴。
<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
</dependencies>

生產(chǎn)者

普通消息

RocketMQ可用于以三種方式發(fā)送消息:同步、異步和單向傳輸。前兩種消息類型是可靠的,因?yàn)闊o論它們是否成功發(fā)送都有響應(yīng)。

同步發(fā)送

在這里插入圖片描述

    private final static String nameServer = "127.0.0.1:9876";
    private final static String producerGroup = "my_group";
    private final static String topic = "topic-test";
@Test
    public void syncSend() {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 啟動producer
            producer.start();
            // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,關(guān)閉producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

異步發(fā)送

在這里插入圖片描述

@Test
    public void asyncSend() throws IOException {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 啟動producer
            producer.start();
            // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 異步發(fā)送消息, 發(fā)送結(jié)果通過callback返回給客戶端
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("OK %s %n",
                            sendResult.getMsgId());
                }
                public void onException(Throwable e) {
                    System.out.printf("Exception %s %n", e);
                    e.printStackTrace();
                }
            },10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.in.read();
    }

單向傳輸

在這里插入圖片描述

@Test
    public void onewaySend()  {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 啟動producer
            producer.start();
            // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 異步發(fā)送消息, 發(fā)送結(jié)果通過callback返回給客戶端
            producer.sendOneway(msg);
            // 一旦producer不再使用,關(guān)閉producer
            //producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

順序消息

RocketMQ 通過生產(chǎn)者和服務(wù)端的協(xié)議保障單個(gè)生產(chǎn)者串行地發(fā)送消息,并按序存儲和持久化。如需保證消息生產(chǎn)的順序性,則必須滿足以下條件:

  • 單一生產(chǎn)者: 消息生產(chǎn)的順序性僅支持單一生產(chǎn)者,不同生產(chǎn)者分布在不同的系統(tǒng),即使設(shè)置相同的分區(qū)鍵,不同生產(chǎn)者之間產(chǎn)生的消息也無法判定其先后順序。
  • 串行發(fā)送:生產(chǎn)者客戶端支持多線程安全訪問,但如果生產(chǎn)者使用多線程并行發(fā)送,則不同線程間產(chǎn)生的消息將無法判定其先后順序。
@Test
    public void orderSend() {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 啟動producer
            producer.start();
            String[] tags = new String[]{"TagA", "TagB", "TagC"};
            for (int i = 0; i < 10; i++) {
                int orderId = i % 10;
                Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

延遲消息

延遲消息發(fā)送是指消息發(fā)送到RocketMQ后,并不期望立馬投遞這條消息,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi)。 使用 RocketMQ 的延時(shí)消息可以簡化定時(shí)調(diào)度任務(wù)的開發(fā)邏輯,實(shí)現(xiàn)高性能、可擴(kuò)展、高可靠的定時(shí)觸發(fā)能力。 RocketMQ 一共支持18個(gè)等級的延遲投遞,具體時(shí)間如下:

投遞等級延遲時(shí)間投遞等級延遲時(shí)間
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h
@Test
    public void scheduledSend() {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 啟動producer
            producer.start();
            // 創(chuàng)建一條消息,并指定topic、tag、body等信息,tag可以理解成標(biāo)簽,對消息進(jìn)行再歸類,RocketMQ可以在消費(fèi)端對tag進(jìn)行過濾
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 消息延遲等級
            msg.setDelayTimeLevel(2);
            // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,關(guān)閉producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

批量消息

在對吞吐率有一定要求的情況下,Apache RocketMQ可以將一些消息聚成一批以后進(jìn)行發(fā)送,可以增加吞吐率,并減少API和網(wǎng)絡(luò)調(diào)用次數(shù)。

 @Test
    public void batchSend() {
        try {
            // 初始化一個(gè)producer并設(shè)置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 啟動producer
            producer.start();
            List<Message> messages = new ArrayList<Message>();
            messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes()));
            // 利用producer進(jìn)行發(fā)送,并同步等待發(fā)送結(jié)果
            SendResult sendResult = producer.send(messages, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,關(guān)閉producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

事務(wù)消息

在一些對數(shù)據(jù)一致性有強(qiáng)需求的場景,可以用RocketMQ 事務(wù)消息來解決,從而保證上下游數(shù)據(jù)的一致性。

基于 RocketMQ 的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實(shí)現(xiàn)全局提交結(jié)果的一致性。

第一階段會發(fā)送一個(gè)半事務(wù)消息,半事務(wù)消息是指暫不能投遞的消息。 如果發(fā)送成功則執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)執(zhí)行成功與否,向 Broker 半事務(wù)消息狀態(tài)(commit或者rollback)。 半事務(wù)消息只有 commit 狀態(tài)才會真正向下游投遞。 如果由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,Broker 端會通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時(shí),需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback)。這樣最終保證了本地事務(wù)執(zhí)行成功,下游就能收到消息,本地事務(wù)執(zhí)行失敗,下游就收不到消息。

事務(wù)消息的詳細(xì)交互流程如下圖所示:

在這里插入圖片描述

 @Test
    public void transactionSend() {
        try {
            // 事務(wù)消息的發(fā)送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 進(jìn)行發(fā)送
            TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
            // 設(shè)置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 事務(wù)回查的線程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
            producer.setExecutorService(executorService);
            producer.setTransactionListener(new TransactionListener() {
                //半事務(wù)消息發(fā)送成功后,執(zhí)行本地事務(wù)的方法
                public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
                    System.out.printf("執(zhí)行本地事務(wù) %n");
                    /*
                    二次確認(rèn)
                    LocalTransactionState.COMMIT_MESSAGE:提交事務(wù),允許消費(fèi)者消費(fèi)該消息
                    LocalTransactionState.ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費(fèi)。
                    LocalTransactionState.UNKNOW:暫時(shí)無法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。
                    */
                    return LocalTransactionState.UNKNOW;
                }
                // 二次確認(rèn)消息沒有收到,Broker端回查事務(wù)狀態(tài)的方法,默認(rèn)60s
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.printf("二次確認(rèn)失敗,broker事務(wù)回查  %n");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            producer.setSendMsgTimeout(10000);
            // 啟動producer
            producer.start();
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer進(jìn)行發(fā)送事務(wù)消息,并同步等待發(fā)送結(jié)果
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,關(guān)閉producer
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

消費(fèi)者

MQ的消費(fèi)模式可以大致分為兩種,一種是推Push,一種是拉Pull。

Push消費(fèi)

Push是服務(wù)端主動推送消息給客戶端,優(yōu)點(diǎn)是及時(shí)性較好,但如果客戶端沒有做好流控,一旦服務(wù)端推送大量消息到客戶端時(shí),就會導(dǎo)致客戶端消息堆積甚至崩潰。

    private final static String nameServer = "127.0.0.1:9876";
    private final static String consumerGroup = "my_group";
    private final static String topic = "topic-test";
    @Test
    public void consumerPush() throws MQClientException, IOException {
        // 初始化consumer,并設(shè)置consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 設(shè)置NameServer地址
        consumer.setNamesrvAddr(nameServer);
        // 訂閱一個(gè)或多個(gè)topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息
        consumer.subscribe(topic, "*");
        //設(shè)置采用廣播模式,廣播模式下,消費(fèi)組內(nèi)的每一個(gè)消費(fèi)者都會消費(fèi)全量消息。
        //consumer.setMessageModel(MessageModel.BROADCASTING);
        //注冊回調(diào)接口來處理從Broker中收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費(fèi)成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 順序消費(fèi)
//        consumer.registerMessageListener(new MessageListenerOrderly() {
//            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//                return ConsumeOrderlyStatus.SUCCESS;
//            }
//        });
        // 啟動Consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
        System.in.read();
    }

Pull 消費(fèi)

Pull是客戶端需要主動到服務(wù)端取數(shù)據(jù),優(yōu)點(diǎn)是客戶端可以依據(jù)自己的消費(fèi)能力進(jìn)行消費(fèi),但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務(wù)端和客戶端的壓力,拉取間隔長又容易造成消費(fèi)不及時(shí)。

@Test
    public void consumerPull() {
        try {
            DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
            consumer.setNamesrvAddr(nameServer);
            //關(guān)閉自動提交
            consumer.setAutoCommit(false);
            consumer.subscribe(topic, "*");
            consumer.setPullBatchSize(20);
            consumer.start();
            while (true) {
                List<MessageExt> messageExts = consumer.poll();
                System.out.printf("%s%n", messageExts);
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

代碼倉庫

https://gitee.com/codeWBG/learn_rocketmq

到此這篇關(guān)于關(guān)于Java整合RocketMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)詳解的文章就介紹到這了,更多相關(guān)Java整合RocketMQ生產(chǎn)消費(fèi)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot后端接收參數(shù)優(yōu)化代碼示例(統(tǒng)一處理前端參數(shù))

    SpringBoot后端接收參數(shù)優(yōu)化代碼示例(統(tǒng)一處理前端參數(shù))

    使用Spring Boot開發(fā)API的時(shí)候,讀取請求參數(shù)是服務(wù)端編碼中最基本的一項(xiàng)操作,下面這篇文章主要給大家介紹了關(guān)于SpringBoot后端接收參數(shù)優(yōu)化(統(tǒng)一處理前端參數(shù))的相關(guān)資料,需要的朋友可以參考下
    2024-07-07
  • java經(jīng)典問題:連個(gè)字符串互為回環(huán)變位

    java經(jīng)典問題:連個(gè)字符串互為回環(huán)變位

    連個(gè)字符串互為回環(huán)變位經(jīng)常出現(xiàn)在java程序員面試中,這個(gè)是考驗(yàn)程序員的解題思路和方法的最經(jīng)典的一題,小編為大家詳細(xì)分析一下,一起來學(xué)習(xí)吧。
    2017-11-11
  • Java報(bào)錯(cuò):UnsupportedOperationException in Collections的解決方案

    Java報(bào)錯(cuò):UnsupportedOperationException in Collection

    在Java編程中,UnsupportedOperationException是一種常見的運(yùn)行時(shí)異常,通常在試圖對不支持的操作執(zhí)行修改時(shí)發(fā)生,它表示當(dāng)前操作不被支持,本文將深入探討UnsupportedOperationException的產(chǎn)生原因,并提供具體的解決方案和最佳實(shí)踐,需要的朋友可以參考下
    2024-06-06
  • mybatis那些約定的配置你真的都了解嗎(經(jīng)驗(yàn)總結(jié))

    mybatis那些約定的配置你真的都了解嗎(經(jīng)驗(yàn)總結(jié))

    mybatsi中Mapper和xml文件之間有很多約定俗稱的規(guī)則,比如名稱匹配,包掃描,別名等,這些規(guī)則是什么。如果想更加靈活,該如何配置呢?今天就給大家講一下如何配置mybatsi的xml文件
    2021-06-06
  • eclipse輸出Hello World的實(shí)現(xiàn)方法

    eclipse輸出Hello World的實(shí)現(xiàn)方法

    這篇文章主要介紹了eclipse輸出Hello World的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • Java實(shí)現(xiàn)洗牌發(fā)牌的方法

    Java實(shí)現(xiàn)洗牌發(fā)牌的方法

    這篇文章主要介紹了Java實(shí)現(xiàn)洗牌發(fā)牌的方法,涉及java針對數(shù)組的遍歷與排序操作相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-07-07
  • java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動示例詳解

    java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動示例詳解

    這篇文章主要介紹了java 與testng利用XML做數(shù)據(jù)源的數(shù)據(jù)驅(qū)動示例詳解的相關(guān)資料,需要的朋友可以參考下
    2017-01-01
  • SpringBoot中使用Redis作為全局鎖示例過程

    SpringBoot中使用Redis作為全局鎖示例過程

    這篇文章主要為大家介紹了SpringBoot中使用Redis作為全局鎖示例過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-03-03
  • 了解java中對象基礎(chǔ)Object類

    了解java中對象基礎(chǔ)Object類

    本文主要講解了java中對象基礎(chǔ)Object類,文中運(yùn)用大量代碼講解的非常詳細(xì),想學(xué)習(xí)相關(guān)知識的小伙伴可以參考一下這篇文章
    2021-09-09
  • Java多態(tài)中動態(tài)綁定原理解析

    Java多態(tài)中動態(tài)綁定原理解析

    這篇文章主要介紹了Java多態(tài)中動態(tài)綁定原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12

最新評論