欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何保證RabbitMQ全鏈路數(shù)據(jù)100%不丟失問題

 更新時間:2023年05月05日 09:27:59   作者:指尖涼  
這篇文章主要介紹了如何保證RabbitMQ全鏈路數(shù)據(jù)100%不丟失問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

保證RabbitMQ全鏈路數(shù)據(jù)100%不丟失

正在學(xué)RabbitMQ,特此記錄一下,這里就不講RabbitMQ基礎(chǔ)了,直接進(jìn)入主題。

我們都知道,消息從生產(chǎn)端到消費端消費要經(jīng)過3個步驟:

  • 生產(chǎn)端發(fā)送消息到RabbitMQ;
  • RabbitMQ發(fā)送消息到消費端;
  • 消費端消費這條消息;

這3個步驟中的每一步都有可能導(dǎo)致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統(tǒng)的可靠性。

這里的可靠并不是一定就100%不丟失了,磁盤損壞,機(jī)房爆炸等等都能導(dǎo)致數(shù)據(jù)丟失,當(dāng)然這種都是極小概率發(fā)生,能做到99.999999%消息不丟失,就是可靠的了。

下面來具體分析一下問題以及解決方案。

生產(chǎn)端可靠性投遞

生產(chǎn)端可靠性投遞,即生產(chǎn)端要確保將消息正確投遞到RabbitMQ中。生產(chǎn)端投遞的消息丟失的原因有很多,比如消息在網(wǎng)絡(luò)傳輸?shù)倪^程中發(fā)生網(wǎng)絡(luò)故障消息丟失,或者消息投遞到RabbitMQ時RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發(fā)生了什么。針對以上情況,RabbitMQ本身提供了一些機(jī)制。

事務(wù)消息機(jī)制

事務(wù)消息機(jī)制由于會嚴(yán)重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級的解決方案——confirm消息確認(rèn)機(jī)制。

confirm消息確認(rèn)機(jī)制

什么是confirm消息確認(rèn)機(jī)制?顧名思義,就是生產(chǎn)端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會發(fā)送一個確認(rèn)消息給生產(chǎn)端,讓生產(chǎn)端知道我已經(jīng)收到消息了,否則這條消息就可能已經(jīng)丟失了,需要生產(chǎn)端重新發(fā)送消息了。

通過下面這句代碼來開啟確認(rèn)模式:

channel.confirmSelect();// 開啟發(fā)送方確認(rèn)模式

然后異步監(jiān)聽確認(rèn)和未確認(rèn)的消息:

channel.addConfirmListener(new ConfirmListener() {
    //消息正確到達(dá)broker
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("已收到消息");
        //做一些其他處理
    }
    //RabbitMQ因為自身內(nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack消息
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未確認(rèn)消息,標(biāo)識:" + deliveryTag);
        //做一些其他處理,比如消息重發(fā)等
    }
});

這樣就可以讓生產(chǎn)端感知到消息是否投遞到RabbitMQ中了,當(dāng)然這樣還不夠,稍后我會說一下極端情況。

消息持久化

消息持久化呢?我們知道,RabbitMQ收到消息后將這個消息暫時存在了內(nèi)存中,那這就會有個問題,如果RabbitMQ掛了,那重啟后數(shù)據(jù)就丟失了,所以相關(guān)的數(shù)據(jù)應(yīng)該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數(shù)據(jù)恢復(fù)。那如何持久化呢?

message消息到達(dá)RabbitMQ后先是到exchange交換機(jī)中,然后路由給queue隊列,最后發(fā)送給消費端。

所有需要給exchange、queue和message都進(jìn)行持久化:

exchange持久化:

//第三個參數(shù)true表示這個exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

//第二個參數(shù)true表示這個queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

//第三個參數(shù)MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

這樣,如果RabbitMQ收到消息后掛了,重啟后會自行恢復(fù)消息。

到此,RabbitMQ提供的幾種機(jī)制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會有極端情況,比如RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發(fā)送確認(rèn)消息給生產(chǎn)端的過程中,由于網(wǎng)絡(luò)故障而導(dǎo)致生產(chǎn)端沒有收到確認(rèn)消息,這樣生產(chǎn)端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來的處理。

所以除了RabbitMQ提供的一些機(jī)制外,我們自己也要做一些消息補(bǔ)償機(jī)制,以應(yīng)對一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。

消息入庫

消息入庫,顧名思義就是將要發(fā)送的消息保存到數(shù)據(jù)庫中。

首先發(fā)送消息前先將消息保存到數(shù)據(jù)庫中,有一個狀態(tài)字段status=0,表示生產(chǎn)端將消息發(fā)送給了RabbitMQ但還沒收到確認(rèn);在生產(chǎn)端收到確認(rèn)后將status設(shè)為1,表示RabbitMQ已收到消息。

這里有可能會出現(xiàn)上面說的兩種情況,所以生產(chǎn)端這邊開一個定時器,定時檢索消息表,將status=0并且超過固定時間后(可能消息剛發(fā)出去還沒來得及確認(rèn)這邊定時器剛好檢索到這條status=0的消息,所以給個時間)還沒收到確認(rèn)的消息取出重發(fā)(第二種情況下這里會造成消息重復(fù),消費者端要做冪等性),可能重發(fā)還會失敗,所以可以做一個最大重發(fā)次數(shù),超過就做另外的處理。

這樣消息就可以可靠性投遞到RabbitMQ中了,而生產(chǎn)端也可以感知到了。

消費端消息不丟失

既然已經(jīng)可以讓生產(chǎn)端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費端的了,如何讓消費端不丟失消息。

默認(rèn)情況下,以下3種情況會導(dǎo)致消息丟失:

  • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,發(fā)生網(wǎng)絡(luò)故障,消費端與RabbitMQ斷開連接,此時消息會丟失;
  • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,消費端掛了,此時消息會丟失;
  • 消費端正確接收到消息,但在處理消息的過程中發(fā)生異?;蝈礄C(jī)了,消息也會丟失。

其實,上述3中情況導(dǎo)致消息丟失歸根結(jié)底是因為RabbitMQ的自動ack機(jī)制,即默認(rèn)RabbitMQ在消息發(fā)出后就立即將這條消息刪除,而不管消費端是否接收到,是否處理完,導(dǎo)致消費端消息丟失時RabbitMQ自己又沒有這條消息了。

所以就需要將自動ack機(jī)制改為手動ack機(jī)制。

消費端手動確認(rèn)消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        //接收到消息,做處理
        //手動確認(rèn)
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        //出錯處理,這里可以讓消息重回隊列重新發(fā)送或直接丟棄消息
    }
};
//第二個參數(shù)autoAck設(shè)為false表示關(guān)閉自動確認(rèn)機(jī)制,需手動確認(rèn)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

這樣,當(dāng)autoAck參數(shù)置為false,對于RabbitMQ服務(wù)端而言,隊列中的消息分成了兩個部分:

  • 一部分是等待投遞給消費端的消息;
  • 一部分是已經(jīng)投遞給消費端,但是還沒有收到消費端確認(rèn)信號的消息。

如果RabbitMQ一直沒有收到消費端的確認(rèn)信號,并且消費此消息的消費端已經(jīng)斷開連接或宕機(jī)(RabbitMQ會自己感知到),則RabbitMQ會安排該消息重新進(jìn)入隊列(放在隊列頭部),等待投遞給下一個消費者,當(dāng)然也有能還是原來的那個消費端,當(dāng)然消費端也需要確保冪等性。

好了,到此從生產(chǎn)端到RabbitMQ再到消費端的全鏈路,就可以保證數(shù)據(jù)的不丟失。

由于個人水平有限,有些地方可能理解錯了或理解不到位的,請大家多多指出!Thanks

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • mybatis createcriteria和or的區(qū)別說明

    mybatis createcriteria和or的區(qū)別說明

    這篇文章主要介紹了mybatis createcriteria和or的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 簡單實現(xiàn)java音樂播放器

    簡單實現(xiàn)java音樂播放器

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)音樂播放器的相關(guān)代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • 基于java中泛型的總結(jié)分析

    基于java中泛型的總結(jié)分析

    本篇文章介紹了,在java中泛型的總結(jié)分析。需要的朋友參考下
    2013-05-05
  • SpringBoot熱重啟配置詳解

    SpringBoot熱重啟配置詳解

    在本篇文章里小編給大家分享的是關(guān)于SpringBoot熱重啟配置知識點內(nèi)容,需要的朋友們可以學(xué)習(xí)下。
    2020-02-02
  • Mybatis-plus插入后返回元素id的問題

    Mybatis-plus插入后返回元素id的問題

    這篇文章主要介紹了Mybatis-plus插入后返回元素id的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • SpringBoot集成JWT實現(xiàn)token驗證的流程

    SpringBoot集成JWT實現(xiàn)token驗證的流程

    Json web token (JWT), 是為了在網(wǎng)絡(luò)應(yīng)用環(huán)境間傳遞聲明而執(zhí)行的一種基于JSON的開放標(biāo)準(zhǔn)((RFC 7519).這篇文章主要介紹了SpringBoot集成JWT實現(xiàn)token驗證,需要的朋友可以參考下
    2020-01-01
  • 詳解Spring3.x 升級至 Spring4.x的方法

    詳解Spring3.x 升級至 Spring4.x的方法

    本篇文章主要介紹了詳解Spring3.x 升級至 Spring4.x的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • idea安裝jerbel及文件上傳下載的實現(xiàn)示例

    idea安裝jerbel及文件上傳下載的實現(xiàn)示例

    JRebel是一個Java開發(fā)工具,它是一款用于實時代碼重載的插件,本文主要介紹了idea安裝jerbel及文件上傳下載的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解下
    2023-09-09
  • Java Map集合用法詳解

    Java Map集合用法詳解

    Map用于保存具有映射關(guān)系的數(shù)據(jù),Map集合里保存著兩組值,一組用于保存Map的ley,另一組保存著Map的value;Map集合和查字典類似,通過key找到對應(yīng)的value,通過頁數(shù)找到對應(yīng)的信息。用學(xué)生類來說,key相當(dāng)于學(xué)號,value對應(yīng)name,age,sex等信息。用這種對應(yīng)關(guān)系方便查找
    2021-10-10
  • 深入淺出MappedByteBuffer(推薦)

    深入淺出MappedByteBuffer(推薦)

    MappedByteBuffer使用虛擬內(nèi)存,因此分配(map)的內(nèi)存大小不受JVM的-Xmx參數(shù)限制,但是也是有大小限制的,這篇文章主要介紹了MappedByteBuffer的基本知識,需要的朋友可以參考下
    2022-12-12

最新評論