RabbitMQ消息丟失解決方案
一.RabbitMQ消息丟失的三種情況
第一種:生產(chǎn)者弄丟了數(shù)據(jù)。生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候,可能數(shù)據(jù)就在半路給搞丟了,因為網(wǎng)絡(luò)問題啥的,都有可能。
第二種:RabbitMQ 弄丟了數(shù)據(jù)。MQ還沒有持久化自己掛了。
第三種:消費端弄丟了數(shù)據(jù)。剛消費到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了。
二.RabbitMQ消息丟失解決方案
1.針對生產(chǎn)者
方案1 :開啟RabbitMQ事務(wù)
可以選擇用 RabbitMQ 提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)channel.txSelect,然后發(fā)送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產(chǎn)者會收到異常報錯,此時就可以回滾事務(wù)channel.txRollback,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)channel.txCommit。
// 開啟事務(wù) channel.txSelect(); try { // 這里發(fā)送消息 } catch (Exception e) { channel.txRollback(); // 這里再次重發(fā)這條消息 } // 提交事務(wù) channel.txCommit();
缺點:
RabbitMQ 事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒,采用這種方式基本上吞吐量會下來,因為太耗性能。
方案2:使用confirm機制
事務(wù)機制和 confirm 機制最大的不同在于,事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒,但是 confirm 機制是異步的
在生產(chǎn)者開啟了confirm模式之后,每次寫的消息都會分配一個唯一的id,然后如果寫入了rabbitmq之中,rabbitmq會給你回傳一個ack消息,告訴你這個消息發(fā)送OK了;如果rabbitmq沒能處理這個消息,會回調(diào)你一個nack接口,告訴你這個消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個機制知道自己在內(nèi)存里維護(hù)每個消息的id,如果超過一定時間還沒接收到這個消息的回調(diào),那么你可以進(jìn)行重發(fā)。
//開啟confirm channel.confirm(); //發(fā)送成功回調(diào) public void ack(String messageId){ } // 發(fā)送失敗回調(diào) public void nack(String messageId){ //重發(fā)該消息 }
2.針對RabbitMQ
主要需要應(yīng)對三點:
要保證rabbitMQ不丟失消息,那么就需要開啟rabbitMQ的持久化機制,即把消息持久化到硬盤上,這樣即使rabbitMQ掛掉在重啟后仍然可以從硬盤讀取消息;
如果rabbitMQ單點故障怎么辦,這種情況倒不會造成消息丟失,這里就要提到rabbitMQ的3種安裝模式,單機模式、普通集群模式、鏡像集群模式,這里要保證rabbitMQ的高可用就要配合HAPROXY做鏡像集群模式;
如果硬盤壞掉怎么保證消息不丟失。
(1)消息持久化
RabbitMQ 的消息默認(rèn)存放在內(nèi)存上面,如果不特別聲明設(shè)置,消息不會持久化保存到硬盤上面的,如果節(jié)點重啟或者意外crash掉,消息就會丟失。
所以就要對消息進(jìn)行持久化處理。如何持久化,下面具體說明下。要想做到消息持久化,必須滿足以下三個條件,缺一不可。
Exchange 設(shè)置持久化
Queue 設(shè)置持久化
Message持久化發(fā)送:發(fā)送消息設(shè)置發(fā)送模式deliveryMode=2,代表持久化消息
(2)設(shè)置集群鏡像模式
先來介紹下RabbitMQ三種部署模式:
單節(jié)點模式:最簡單的情況,非集群模式,節(jié)點掛了,消息就不能用了。業(yè)務(wù)可能癱瘓,只能等待。
普通模式:消息只會存在與當(dāng)前節(jié)點中,并不會同步到其他節(jié)點,當(dāng)前節(jié)點宕機,有影響的業(yè)務(wù)會癱瘓,只能等待節(jié)點恢復(fù)重啟可用(必須持久化消息情況下)。
鏡像模式:消息會同步到其他節(jié)點上,可以設(shè)置同步的節(jié)點個數(shù),但吞吐量會下降。屬于RabbitMQ的HA方案
為什么設(shè)置鏡像模式集群,因為隊列的內(nèi)容僅僅存在某一個節(jié)點上面,不會存在所有節(jié)點上面,所有節(jié)點僅僅存放消息結(jié)構(gòu)和元數(shù)據(jù)。下面畫了一張圖介紹普通集群丟失消息情況:
如果想解決上面途中問題,保證消息不丟失,需要采用HA 鏡像模式隊列。
下面介紹下三種HA策略模式:
同步至所有的
同步最多N個機器
只同步至符合指定名稱的nodes
命令處理HA策略模版:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
1)為每個以rock.wechat
開頭的隊列設(shè)置所有節(jié)點的鏡像,并且設(shè)置為自動同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
2)為每個以rock.wechat.
開頭的隊列設(shè)置兩個節(jié)點的鏡像,并且設(shè)置為自動同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \ '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
3)為每個以node.
開頭的隊列分配指定的節(jié)點做鏡像
rabbitmqctl set_policy ha-nodes "^nodes\." \ '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
但是:HA 鏡像隊列有一個很大的缺點就是系統(tǒng)的吞吐量會有所下降。
(3)消息補償機制
為什么還要消息補償機制呢?難道消息還會丟失,沒錯,系統(tǒng)是在一個復(fù)雜的環(huán)境,不要想的太簡單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問題。
但是作為有追求的程序員來講,要絕對保證我的系統(tǒng)的穩(wěn)定性,有一種危機意識。
比如:持久化的消息,保存到硬盤過程中,當(dāng)前隊列節(jié)點掛了,存儲節(jié)點硬盤又壞了,消息丟了,怎么辦?
1)生產(chǎn)端首先將業(yè)務(wù)數(shù)據(jù)以及消息數(shù)據(jù)入庫,需要在同一個事務(wù)中,消息數(shù)據(jù)入庫失敗,則整體回滾。
2)根據(jù)消息表中消息狀態(tài),失敗則進(jìn)行消息補償措施,重新發(fā)送消息處理。
3.針對消費者
方案一:ACK確認(rèn)機制
多個消費者同時收取消息,比如消息接收到一半的時候,一個消費者死掉了(邏輯復(fù)雜時間太長,超時了或者消費被停機或者網(wǎng)絡(luò)斷開鏈接),如何保證消息不丟?
使用rabbitmq提供的ack機制,服務(wù)端首先關(guān)閉rabbitmq的自動ack,然后每次在確保處理完這個消息之后,在代碼里手動調(diào)用ack。這樣就可以避免消息還沒有處理完就ack。才把消息從內(nèi)存刪除。
這樣就解決了,即使一個消費者出了問題,但不會同步消息給服務(wù)端,會有其他的消費端去消費,保證了消息不丟的case。
總結(jié)
如果需要保證消息在整條鏈路中不丟失,那就需要生產(chǎn)端、mq自身與消費端共同去保障。
生產(chǎn)端:對生產(chǎn)的消息進(jìn)行狀態(tài)標(biāo)記,開啟confirm機制,依據(jù)mq的響應(yīng)來更新消息狀態(tài),使用定時任務(wù)重新投遞超時的消息,多次投遞失敗進(jìn)行報警。
mq自身:開啟持久化,并在落盤后再進(jìn)行ack。如果是鏡像部署模式,需要在同步到多個副本之后再進(jìn)行ack。
消費端:開啟手動ack模式,在業(yè)務(wù)處理完成后再進(jìn)行ack,并且需要保證冪等。
通過以上的處理,理論上不存在消息丟失的情況,但是系統(tǒng)的吞吐量以及性能有所下降。在實際開發(fā)中,需要考慮消息丟失的影響程度,來做出對可靠性以及性能之間的權(quán)衡。
以上就是RabbitMQ消息丟失解決方案的詳細(xì)內(nèi)容,更多關(guān)于RabbitMQ 消息丟失的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解mybatis-plus使用@EnumValue注解的方式對枚舉類型的處理
這篇文章主要介紹了詳解mybatis-plus使用@EnumValue注解的方式對枚舉類型的處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12IDEA JetBrains Mono字體介紹和安裝教程(詳解)
這篇文章主要介紹了IDEA JetBrains Mono字體介紹和安裝教程,本給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-03-03Java內(nèi)存之happens-before和重排序
在JMM(Java內(nèi)存模型)中,如果一個操作執(zhí)行的結(jié)果需要對另一個操作可見,那么這兩個操作之間必須存在happens-before關(guān)系。下面小編來簡單介紹一下2019-05-05POST方法給@RequestBody傳參數(shù)失敗的解決及原因分析
這篇文章主要介紹了POST方法給@RequestBody傳參數(shù)失敗的解決及原因分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10MyBatis SELECT基本查詢實現(xiàn)方法詳解
這篇文章主要介紹了MyBatis SELECT基本查詢實現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08java 如何遠(yuǎn)程控制tomcat啟動關(guān)機
這篇文章主要介紹了java 遠(yuǎn)程控制tomcat啟動關(guān)機的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04Java Object類詳解_動力節(jié)點Java學(xué)院整理
Java作為一個龐大的知識體系,涉及到的知識點繁多,本文將從Java中最基本的類java.lang.Object開始談起,對java object類相關(guān)知識感興趣的朋友一起學(xué)習(xí)吧2017-04-04SpringBoot Controller接收參數(shù)的幾種常用方式
這篇文章主要介紹了SpringBoot Controller接收參數(shù)的幾種常用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07