欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java開發(fā)RocketMQ生產(chǎn)者高可用示例詳解

 更新時間:2022年08月05日 16:10:52   作者:奔跑的毛球  
這篇文章主要為大家介紹了java開發(fā)RocketMQ生產(chǎn)者高可用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

前邊兩章說了點基礎的,從這章開始,我們挖挖源碼??纯碦ocketMQ是怎么工作的。

首先呢,這個生產(chǎn)者就是送孩子去碼頭的家長,孩子們呢,就是消息了。

我們看看消息孩子們都長啥樣。

1 消息

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主題名字
    private String topic;
    //消息擴展信息,Tag,keys,延遲級別都存在這里
    private Map<String, String> properties;
    //消息體,字節(jié)數(shù)組
    private byte[] body;
    //設置消息的key,
    public void setKeys(String keys) {}
    //設置topic
    public void setTopic(String topic) {}
    //延遲級別
    public int setDelayTimeLevel(int level) {}
    //消息過濾的標記
    public void setTags(String tags) {}
    //擴展信息存放在此
    public void putUserProperty(final String name, final String value) {}
}

消息就是孩子們,這些孩子們呢,有各自的特點,也有共性。同一個家長送來的兩個孩子可以是去同一個地方的,也可以是去不同的地方的。

1.1 topic

首先呢,每個孩子消息都有一個屬性topic,這個我們上文說到了,是一個候船大廳。孩子們進來之后,走到自己指定的候船大廳的指定區(qū)域(平時出門坐火車高鐵不也是指定的站臺乘車么),坐到message queue座位上等,等著出行。

Broker有一個或者多個topic,消息會存放到topic內的message queue內,等待被消費。

1.2 Body

孩子消息,也有一個Body屬性,這就是他的能力,他會畫畫,他會唱歌,他會干啥干啥,就記錄在這個Body屬性里。等走出去了,體現(xiàn)價值的地方也是這個Body屬性。

Body就是消息體,消費者會根據(jù)消息體執(zhí)行對應的操作。

1.3 tag

這個tag我們上節(jié)說了,就是一個標記,有的孩子背著畫板,相機,有的游船就特意找到這些孩子拉走,完成他們的任務。

可以給消息設置tag屬性,消費者可以選擇含有特定tag屬性的消息進行消費。

1.4 key

key就是每個孩子消息的名字了。要找哪個孩子,喊他名就行。

對發(fā)送的消息設置好 Key,以后可以根據(jù)這個Key 來查找消息。比如消息異常,消息丟失,進行查找會很方便。

1.5 延遲級別

當然,還有的孩子來就不急著走,來之前就想好了,要恰個飯,得30分鐘,所以自己來了會等30分鐘后被接走。

設置延遲級別可以規(guī)定多久后消息可以被消費。

2 生產(chǎn)者高可用

每個送孩子來的家長都希望能送到候船大廳里,更不希望孩子被搞丟了,這個時候這個候船大廳就需要一些保證機制了。

2.1 客戶端保證生產(chǎn)者高可用

2.1.1 重試機制

就是說家長送來了,孩子進到候船大廳之后,沒能成功坐到message queue座位上,這個時候工作人員會安排重試,再去看是否有座位坐。重試次數(shù)默認是2次,也就是說,消息孩子共有3次找座位坐的機會。

看源碼,我特意加了注解,大致可以看懂一些了。

//這里取到了重試的次數(shù)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    //獲取消息隊列
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
            beginTimestampPrev = System.currentTimeMillis();
            if (times > 0) {
                //Reset topic with namespace during resend.
                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
            }
            long costTime = beginTimestampPrev - beginTimestampFirst;
            if (timeout < costTime) {
                callTimeout = true;
                break;
            }
            //發(fā)送消息
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            ...
        } catch (RemotingException e) {
            ...
            continue;
        } catch (MQClientException e) {
            ...
            continue;
        } catch (MQBrokerException e) {
            ...
            continue;
        } catch (InterruptedException e) {
            //可以看到只有InterruptedException拋出了異常,其他的exception都會繼續(xù)重試
            throw e;
        }
    } else {
        break;
    }
}

重試代碼如上,這個sendDefaultImpl方法中,會嘗試發(fā)送三次消息,若是都失敗,才會拋出對應的錯誤。

2.1.2 客戶端容錯

若是有多個Broker候車大廳的時候,服務人員會安排消息孩子選擇一個相對不擁擠,比較容易進入的來進入。當然那些已經(jīng)關閉的,停電的,沒有服務能力的,我們是不會進的。

MQ Client會維護一個Broker的發(fā)送延遲信息,根據(jù)這個信息會選擇一個相對延遲較低的Broker來發(fā)送消息。會主動剔除哪些已經(jīng)宕機,不可用或發(fā)送延遲級別較高的Broker.

選擇Broker就是在選擇message queue,對應的代碼如下:

這里會先判斷延遲容錯開關是否開啟,這個開關默認是關閉的,若是開啟的話,會優(yōu)先選擇延遲較低的Broker。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //判斷發(fā)送延遲容錯開關是否開啟
    if (this.sendLatencyFaultEnable) {
        try {
            //選擇一個延遲上可以接受,并且和上次發(fā)送相同的Broker
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //若是Broker的延遲時間可以接受,則返回這個Broker
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //若是第一步?jīng)]能選中一個Broker,就選擇一個延遲較低的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //若是前邊都沒選中一個Broker,就隨機選一個Broker
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

但是當延遲容錯開關關閉狀態(tài)的時候,執(zhí)行的代碼如下:

為了均勻分散Broker的壓力,會選擇與之前不同的Broker。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //若是沒有上次的Brokername做參考,就隨機選一個
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //如果有,那么就選一個其他的Broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //這里判斷遇上一個使用的Broker不是同一個
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //若是上邊的都沒選中,那么就隨機選一個
        return selectOneMessageQueue();
    }
}

2.2 Broker端保證生產(chǎn)者高可用

Broker候船大廳為了能確切的接收到消息孩子,至少會有兩個廳,一個主廳一個副廳,一般來說孩子都會進入到主廳,然后一頓操作,卡該忙信那機資(影分身之術),然后讓分身進入到副廳,這樣當主廳停電了,不工作了,副廳的分身只要去完成了任務就ok的。一般來說都是主廳的消息孩子去坐船完成任務。

之后我們會聊到Broker的主從復制,分為同步復制和異步復制,同步復制時指當master 收到消息之后,同步到slaver才算消息發(fā)送成功。異步復制是只要master收到消息就算成功。生產(chǎn)中建議至少部署兩臺master和兩臺slaver。

下一篇,我們聊聊,消息的發(fā)送流程,就是說,一個消息孩子,從進碼頭的門到坐到message queue座位上,都經(jīng)歷了啥。

以上就是java開發(fā)RocketMQ生產(chǎn)者高可用示例詳解的詳細內容,更多關于java RocketMQ生產(chǎn)者高可用的資料請關注腳本之家其它相關文章!

相關文章

  • Java利用JavaCPP調用算法示例

    Java利用JavaCPP調用算法示例

    本文主要介紹了Java利用JavaCPP調用算法示例,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • 簡述springboot及springboot cloud環(huán)境搭建

    簡述springboot及springboot cloud環(huán)境搭建

    這篇文章主要介紹了簡述springboot及springboot cloud環(huán)境搭建的方法,包括spring boot 基礎應用環(huán)境搭建,需要的朋友可以參考下
    2017-07-07
  • Java8新特性Stream的完全使用指南

    Java8新特性Stream的完全使用指南

    這篇文章主要給大家介紹了關于Java8新特性Stream的完全使用指南,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Java8具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2020-05-05
  • Javassist用法詳解

    Javassist用法詳解

    這篇文章主要介紹了Javassist用法的相關資料,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下
    2021-02-02
  • 基于@Bean修飾的方法參數(shù)的注入方式

    基于@Bean修飾的方法參數(shù)的注入方式

    這篇文章主要介紹了@Bean修飾的方法參數(shù)的注入方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java中實現(xiàn)對象排序的兩種方法(Comparable,Comparator)

    java中實現(xiàn)對象排序的兩種方法(Comparable,Comparator)

    這篇文章主要給大家介紹了關于java中實現(xiàn)對象排序的兩種方法,一種是實現(xiàn)Comparable進行排序,另一種是實現(xiàn)Comparator進行排序,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-12-12
  • MyBatis下SQL注入攻擊的3種方式

    MyBatis下SQL注入攻擊的3種方式

    SQL注入漏洞作為WEB安全的最常見的漏洞之一,本文希望通過Mybatis框架使用不當導致的SQL注入問題為例,能夠拋磚引玉給新手一些思路。感興趣的可以了解一下
    2021-07-07
  • SpringBoot項目接入Nacos的實現(xiàn)步驟

    SpringBoot項目接入Nacos的實現(xiàn)步驟

    SpringBoot項目使用nacos作為配置中心和服務注冊中心,同時兼容dubbo的注冊中心。 本Demo項目使用的SpringBoot版本是2.3.9.RELEASE
    2021-05-05
  • IntelliJ?IDEA教程之clean或者install?Maven項目的操作方法

    IntelliJ?IDEA教程之clean或者install?Maven項目的操作方法

    這篇文章主要介紹了IntelliJ?IDEA教程之clean或者install?Maven項目的操作方法,本文分步驟給大家介紹兩種方式講解如何調試出窗口,需要的朋友可以參考下
    2023-04-04
  • 微服務之間如何通過feign調用接口上傳文件

    微服務之間如何通過feign調用接口上傳文件

    這篇文章主要介紹了微服務之間如何通過feign調用接口上傳文件的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06

最新評論