欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java的RocketMq隊列之消息可靠性詳解

 更新時間:2024年01月08日 09:18:24   作者:獵戶星座。  
這篇文章主要介紹了Java的RocketMq隊列之消息可靠性詳解,生產(chǎn)者通過網(wǎng)絡發(fā)送消息給 Broker,當 Broker 收到之后,將會返回確認響應信息給 Producer,所以生產(chǎn)者只要接收到返回的確認響應,就代表消息在生產(chǎn)階段未丟失,需要的朋友可以參考下

1. 消息的發(fā)送流程

一條消息從生產(chǎn)到被消費,將會經(jīng)歷三個階段:

  • 生產(chǎn)階段,Producer 新建消息,然后通過網(wǎng)絡將消息投遞給 MQ Broker
  • 存儲階段,消息將會存儲在 Broker 端磁盤中
  • 消息階段, Consumer 將會從 Broker 拉取消息

以上任一階段都可能會丟失消息,我們只要找到這三個階段丟失消息原因,采用合理的辦法避免丟失,就可以徹底解決消息丟失的問題。

2. 生產(chǎn)階段

生產(chǎn)者(Producer) 通過網(wǎng)絡發(fā)送消息給 Broker,當 Broker 收到之后,將會返回確認響應信息給 Producer。所以生產(chǎn)者只要接收到返回的確認響應,就代表消息在生產(chǎn)階段未丟失。

發(fā)送模式

可靠同步發(fā)送

  • 原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會在收到接收方發(fā)回響應之后才發(fā)下一個數(shù)據(jù)包的通訊方式。
  • 場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統(tǒng)等。
  • 類似推拉的形式 發(fā)送 ->同步返回 ->發(fā)送 ->同步返回

可靠異步發(fā)送

  • 原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應,接著發(fā)送下個數(shù)據(jù)包的通訊方式。 MQ 的異步發(fā)送,需要用戶實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務器響應即可返回,進行第二條消息發(fā)送。發(fā)送方通過回調(diào)接口接收服務器響應,并對響應結(jié)果進行處理。
  • 場景:異步發(fā)送一般用于鏈路耗時較長,對響應時間較為敏感的業(yè)務場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務,轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。 耗時比較長的 可以不需要同步返回給用戶的

單向(Oneway)發(fā)送

  • 原理:單向(Oneway)發(fā)送特點為發(fā)送方只負責發(fā)送消息,不等待服務器回應且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應答。 此方式發(fā)送消息的過程耗時非常短,一般在微秒級別。
  • 場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。

RocketMQ 發(fā)送消息示例代碼如下:

DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// 設置 nameSpace 地址
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發(fā)送消息到一個Broker
try {
    SendResult sendResult = mqProducer.send(msg);
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

send 方法是一個同步操作,只要這個方法不拋出任何異常,就代表消息已經(jīng)發(fā)送成功。

消息發(fā)送成功僅代表消息已經(jīng)到了 Broker 端,Broker 在不同配置下,可能會返回不同響應狀態(tài):

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

引用官方狀態(tài)說明:

image-20200319220927210

另外 RocketMQ 還提供異步的發(fā)送的方式,適合于鏈路耗時較長,對響應時間較為敏感的業(yè)務場景。

DefaultMQProducer mqProducer = new DefaultMQProducer("test");
// 設置 nameSpace 地址
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.setRetryTimesWhenSendFailed(5);
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
try {
    // 異步發(fā)送消息到,主線程不會被阻塞,立刻會返回
    mqProducer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息發(fā)送成功,
        }
        @Override
        public void onException(Throwable e) {
            // 消息發(fā)送失敗,可以持久化這條數(shù)據(jù),后續(xù)進行補償處理
        }
    });
} catch (RemotingException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

異步發(fā)送消息一定要注意重寫回調(diào)方法,在回調(diào)方法中檢查發(fā)送結(jié)果。

不管是同步還是異步的方式,都會碰到網(wǎng)絡問題導致發(fā)送失敗的情況。針對這種情況,我們可以設置合理的重試次數(shù),當出現(xiàn)網(wǎng)絡問題,可以自動重試。設置方式如下:

// 同步發(fā)送消息重試次數(shù),默認為 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 異步發(fā)送消息重試次數(shù),默認為 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);

總結(jié)

producer消息發(fā)送方式雖然有3種,但為了減小丟失消息的可能性盡量采用同步的發(fā)送方式,producer同步等待broker響應消息的發(fā)送結(jié)果,利用同步發(fā)送+重試機制+多個master節(jié)點,盡可能減小消息丟失的可能性。 

3. Broker 存儲階段

默認情況下,消息只要到了 Broker 端,將會優(yōu)先保存到內(nèi)存中,然后立刻返回確認響應給生產(chǎn)者。隨后 Broker 定期批量的將一組消息從內(nèi)存異步刷入磁盤。

這種方式減少 I/O 次數(shù),可以取得更好的性能,但是如果發(fā)生機器掉電,異常宕機等情況,消息還未及時刷入磁盤,就會出現(xiàn)丟失消息的情況。

若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機制修改為同步刷盤方式,即消息存儲磁盤成功,才會返回響應。

修改 Broker 端配置如下:

## 默認情況為 ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH

若 Broker 未在同步刷盤時間內(nèi)(默認為 5s)完成刷盤,將會返回 SendStatus.FLUSH_DISK_TIMEOUT 狀態(tài)給生產(chǎn)者。

集群部署

為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丟失,消息還需要復制到 slave 節(jié)點。

默認方式下,消息寫入 master 成功,就可以返回確認響應給生產(chǎn)者,接著消息將會異步復制到 slave 節(jié)點。

注:master 配置:flushDiskType = SYNC_FLUSH

此時若 master 突然宕機且不可恢復,那么還未復制到 slave 的消息將會丟失。

為了進一步提高消息的可靠性,我們可以采用同步的復制方式,master 節(jié)點將會同步等待 slave 節(jié)點復制完成,才會返回確認響應。

異步復制與同步復制區(qū)別:

  • Sync Broker:生產(chǎn)者發(fā)送的每一條消息都至少同步復制到一個slave后才返回告訴生產(chǎn)者成功,即“同步雙寫”。
  • Async Broker:生產(chǎn)者發(fā)送的每一條消息只要寫入master就返回告訴生產(chǎn)者成功。然后再“異步復制”到slave。

Broker master 節(jié)點 同步復制配置如下:

## 默認為 ASYNC_MASTER 
brokerRole=SYNC_MASTER

如果 slave 節(jié)點未在指定時間內(nèi)同步返回響應,生產(chǎn)者將會收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回狀態(tài)。

總結(jié)

在broker端,消息丟失的可能性主要在于刷盤策略和同步機制。 RocketMQ默認broker的刷盤策略為異步刷盤,如果有主從,同步策略也默認的是異步同步,這樣子可以提高broker處理消息的效率,但是會有丟失的可能性。因此可以通過同步刷盤策略+同步slave策略+主從的方式解決丟失消息的可能。

結(jié)合生產(chǎn)階段與存儲階段,若需要嚴格保證消息不丟失,broker 需要采用如下配置:

## master 節(jié)點配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
 
## slave 節(jié)點配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

同時這個過程我們還需要生產(chǎn)者配合,判斷返回狀態(tài)是否是 SendStatus.SEND_OK。若是其他狀態(tài),就需要考慮補償重試。

雖然上述配置提高消息的高可靠性,但是會降低性能,生產(chǎn)實踐中需要綜合選擇。

4. 消費階段

從producer投遞消息到broker,即使前面這些過程保證了消息正常持久化,但如果consumer消費消息沒有消費到也不能理解為消息絕對的可靠。因此RockerMQ默認提供了At least Once機制保證消息可靠消費。

何為At least Once?

Consumer先pull 消息到本地,消費完成后,才向服務器返回ack。

通常消費消息的ack機制一般分為兩種思路:

1、先提交后消費;

2、先消費,消費成功后再提交;

思路一可以解決重復消費的問題但是會丟失消息,因此Rocket默認實現(xiàn)的是思路二,由各自consumer業(yè)務方保證冪等來解決重復消費問題。

消費者從 broker 拉取消息,然后執(zhí)行相應的業(yè)務邏輯。一旦執(zhí)行成功,將會返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 狀態(tài)給 Broker。

如果 Broker 未收到消費確認響應或收到其他狀態(tài),消費者下次還會再次拉取到該條消息,進行重試。這樣的方式有效避免了消費者消費過程發(fā)生異常,或者消息在網(wǎng)絡傳輸中丟失的情況。

消息消費的代碼如下:

// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
 
// 設置NameServer的地址
consumer.setNamesrvAddr("namesrvAddr");
 
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("test_topic", "*");
// 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 執(zhí)行業(yè)務邏輯
        // 標記該消息已經(jīng)被成功消費
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 啟動消費者實例
consumer.start();

以上消費消息過程的,我們需要注意返回消息狀態(tài)。只有當業(yè)務邏輯真正執(zhí)行成功,我們才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否則我們需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重試。

5. 總結(jié)

最后我們還可以說出我們的思考,雖然提高消息可靠性,但是可能導致消息重發(fā),重復消費。所以對于消費客戶端,需要注意保證冪等性。

到此這篇關(guān)于Java的RocketMq隊列之消息可靠性詳解的文章就介紹到這了,更多相關(guān)RocketMq消息可靠性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java異常處理try?catch的基本使用

    Java異常處理try?catch的基本使用

    大家好,本篇文章主要講的是Java異常處理try?catch的基本使用,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-02-02
  • 深入理解 Java注解及實例

    深入理解 Java注解及實例

    這篇文章主要介紹了深入理解 Java注解及實例的相關(guān)資料,希望通過本文大家能夠掌握java注解的知識,需要的朋友可以參考下
    2017-09-09
  • java 將方法作為傳參--多態(tài)的實例

    java 將方法作為傳參--多態(tài)的實例

    下面小編就為大家?guī)硪黄猨ava 將方法作為傳參--多態(tài)的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • Java 堆排序?qū)嵗?大頂堆、小頂堆)

    Java 堆排序?qū)嵗?大頂堆、小頂堆)

    下面小編就為大家分享一篇Java 堆排序?qū)嵗?大頂堆、小頂堆),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2017-12-12
  • 實例解析如何正確使用Java數(shù)組

    實例解析如何正確使用Java數(shù)組

    同一種類型數(shù)據(jù)的集合。其實數(shù)組就是一個容器。運算的時候有很多數(shù)據(jù)參與運算,那么首先需要做的是什么下面我們就一起來看看。
    2016-07-07
  • 詳解Springboot分布式限流實踐

    詳解Springboot分布式限流實踐

    這篇文章主要介紹了詳解Springboot分布式限流實踐 ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-06-06
  • Mybatis-plus自動填充不生效或自動填充數(shù)據(jù)為null原因及解決方案

    Mybatis-plus自動填充不生效或自動填充數(shù)據(jù)為null原因及解決方案

    本文主要介紹了Mybatis-plus自動填充不生效或自動填充數(shù)據(jù)為null原因及解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-05-05
  • 詳解Java如何進行Base64的編碼(Encode)與解碼(Decode)

    詳解Java如何進行Base64的編碼(Encode)與解碼(Decode)

    這篇文章主要介紹了詳解Java如何進行Base64的編碼(Encode)與解碼(Decode),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-03-03
  • SpringCloud使用Zookeeper作為配置中心的示例

    SpringCloud使用Zookeeper作為配置中心的示例

    這篇文章主要介紹了SpringCloud使用Zookeeper作為配置中心的示例,幫助大家更好的理解和學習使用SpringCloud,感興趣的朋友可以了解下
    2021-04-04
  • Java實現(xiàn)線性表的順序存儲

    Java實現(xiàn)線性表的順序存儲

    這篇文章主要為大家詳細介紹了Java實現(xiàn)線性表的順序存儲,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-10-10

最新評論