RocketMQ消息丟失的場景以及解決方案
1. 消息發(fā)送環(huán)節(jié)
案例:網(wǎng)絡(luò)故障導(dǎo)致的消息發(fā)送失敗 在Spring Boot應(yīng)用中,生產(chǎn)者可能會(huì)遇到網(wǎng)絡(luò)瞬斷,導(dǎo)致消息未成功發(fā)送到Broker。
代碼示例:
@Service public class ProducerService { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(String topic, String message) { try { rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build()); } catch (MQClientException e) { // 異常處理邏輯,例如記錄日志、報(bào)警等 } } }
解決方案: 通過配置生產(chǎn)者的重試次數(shù),我們可以強(qiáng)化消息的發(fā)送可靠性。
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ProducerService { @Autowired private RocketMQTemplate rocketMQTemplate; public void configureProducerRetry() { // 獲取RocketMQ的生產(chǎn)者客戶端 DefaultMQProducer producer = rocketMQTemplate.getProducer(); // 設(shè)置發(fā)送失敗時(shí)的重試次數(shù)為5 producer.setRetryTimesWhenSendFailed(5); // 設(shè)置發(fā)送失敗時(shí)重試另一個(gè)Broker producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 其他生產(chǎn)者配置... } public void sendMessage(String topic, String message) { // 在發(fā)送前確保調(diào)用了配置重試的方法 configureProducerRetry(); // 發(fā)送消息 rocketMQTemplate.syncSend(topic, message); } }
2. Broker存儲(chǔ)環(huán)節(jié)
案例:Broker宕機(jī)導(dǎo)致的消息存儲(chǔ)失敗 Broker在接收消息后,在持久化之前發(fā)生故障,這會(huì)導(dǎo)致消息丟失。
解決方案:
- 同步刷盤配置
在RocketMQ中,同步刷盤是指Broker在返回消息發(fā)送成功之前,將消息持久化到磁盤。這樣做可以確保即使Broker發(fā)生故障,消息也不會(huì)丟失。
要啟用同步刷盤,需要修改Broker的配置文件,通常是broker.properties
文件,設(shè)置如下屬性:
flushDiskType=SYNC_FLUSH
當(dāng)flushDiskType
設(shè)置為SYNC_FLUSH
時(shí),每次消息接收后,Broker都會(huì)同步地將消息寫入磁盤中。這確保了消息的持久性,但可能會(huì)對(duì)性能產(chǎn)生影響,因?yàn)槊看蜗懭攵夹枰疟PIO操作。
- 副本機(jī)制
RocketMQ使用主從架構(gòu)來提供數(shù)據(jù)的高可用性。主Broker負(fù)責(zé)處理消息的讀寫請(qǐng)求,而從Broker則復(fù)制主Broker的數(shù)據(jù)。如果主Broker不可用,從Broker可以接管工作,保證消息不會(huì)丟失。
要配置副本機(jī)制,可以在部署RocketMQ集群時(shí),為每個(gè)Master Broker設(shè)置一個(gè)或多個(gè)Slave Broker。在Broker的配置文件中,設(shè)置如下屬性:
brokerRole=SYNC_MASTER # 對(duì)于主Broker brokerRole=SLAVE # 對(duì)于從Broker
此外,還需要在名稱服務(wù)器(NameServer)配置中指定所有Broker的地址,以便生產(chǎn)者和消費(fèi)者能夠發(fā)現(xiàn)它們。
注意:副本數(shù)量的增加需要在RocketMQ集群部署時(shí)進(jìn)行規(guī)劃,需要考慮到資源消耗和數(shù)據(jù)一致性的要求。從Broker不會(huì)對(duì)外提供服務(wù),它的角色主要是數(shù)據(jù)的同步和在主Broker不可用時(shí)的故障轉(zhuǎn)移。
對(duì)于生產(chǎn)環(huán)境,建議進(jìn)行充分的測試,以平衡性能和可靠性的需求。正確配置同步刷盤和副本機(jī)制,可以極大地增強(qiáng)RocketMQ的消息可靠性。同時(shí),這些配置通常需要和其他系統(tǒng)資源(如磁盤性能、網(wǎng)絡(luò)帶寬等)一起考慮,以確保整體的系統(tǒng)穩(wěn)定性和性能。
3. 消息消費(fèi)環(huán)節(jié)
案例:消費(fèi)者異常導(dǎo)致的消息消費(fèi)失敗 消費(fèi)者在處理消息時(shí)發(fā)生異常,比如數(shù)據(jù)庫操作失敗,導(dǎo)致消息消費(fèi)不成功。
代碼示例:
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group") public class OrderConsumer implements RocketMQListener<Order> { @Override public void onMessage(Order order) { try { // 處理訂單邏輯 } catch (Exception e) { // 異常處理邏輯,如重試或記錄失敗的消息 } } }
解決方案: 可以在消費(fèi)者中實(shí)現(xiàn)邏輯確保消息在消費(fèi)成功后,再發(fā)送確認(rèn)。
在RocketMQ中,手動(dòng)消息確認(rèn)機(jī)制是指消費(fèi)者在成功處理完消息后,需要顯式地發(fā)送一個(gè)確認(rèn)(acknowledgment)回Broker,告訴它消息已經(jīng)被成功消費(fèi)。這樣做的目的是為了確保消息不會(huì)因?yàn)橄M(fèi)者的故障而丟失,同時(shí)防止消息被重復(fù)處理。
手動(dòng)確認(rèn)模式通常用于確保消息傳遞的可靠性,特別是在需要保證消息被精確一次處理的場景下。
以下是使用手動(dòng)確認(rèn)方式的完整方案:
配置消費(fèi)者使用手動(dòng)確認(rèn)模式: 在
@RocketMQMessageListener
注解中設(shè)置messageModel
參數(shù)為MessageModel.CLUSTERING
和consumeMode
為ConsumeMode.ORDERLY
或ConsumeMode.CONCURRENTLY
,取決于是否需要保證消息順序。處理消息: 實(shí)現(xiàn)
RocketMQPushConsumerLifecycleListener
接口,并在prepareStart
方法中注冊(cè)MessageListener
。在MessageListener
的實(shí)現(xiàn)中,處理消息并返回相應(yīng)的消費(fèi)狀態(tài)。確認(rèn)消息: 如果消息成功處理,返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
或ConsumeOrderlyStatus.SUCCESS
狀態(tài),這將會(huì)告訴Broker消息已經(jīng)被消費(fèi),可以從隊(duì)列中移除。如果處理消息時(shí)發(fā)生異?;蛐枰院笾匦孪M(fèi),返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
或ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
,Broker將會(huì)稍后重新發(fā)送該消息。異常處理: 如果在消費(fèi)過程中發(fā)生異常,可以捕獲異常并記錄必要的日志,然后選擇是立即重試還是延遲重試。
以下是一個(gè)示例代碼:
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group",consumeMode = ConsumeMode.ORDERLY) public class OrderConsumer implements RocketMQPushConsumerLifecycleListener { @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // 設(shè)置消費(fèi)者其他屬性... // 設(shè)置消息監(jiān)聽器 consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { MessageExt msg = msgs.get(0); // 假設(shè)一次只消費(fèi)一條消息 try { // 處理消息 // ... // 如果消息處理成功,確認(rèn)消息 return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { // 處理異常 // ... // 如果需要稍后重新消費(fèi)消息 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } }); } }
在這個(gè)例子中,OrderConsumer
類實(shí)現(xiàn)了RocketMQPushConsumerLifecycleListener
接口,并在prepareStart
方法中注冊(cè)了一個(gè)MessageListenerOrderly
,以確保消息以有序的方式被消費(fèi)。根據(jù)消息處理結(jié)果,它返回相應(yīng)的狀態(tài)碼,從而實(shí)現(xiàn)手動(dòng)確認(rèn)。
注意:在RocketMQ中,除了手動(dòng)確認(rèn)外,還有自動(dòng)確認(rèn)機(jī)制。在自動(dòng)確認(rèn)模式下,當(dāng)消費(fèi)者從Broker拉取到消息并由客戶端代理(即SDK)接收后,如果沒有發(fā)生異常,消息會(huì)自動(dòng)被確認(rèn)消費(fèi)成功。這種模式適用于那些對(duì)消息處理的可靠性要求不是非常高的場景。
在自動(dòng)確認(rèn)機(jī)制中,消費(fèi)者無需編寫額外的確認(rèn)邏輯。如果消費(fèi)者在處理消息時(shí)沒有拋出異常,SDK會(huì)自動(dòng)向Broker發(fā)送ACK(確認(rèn))。如果處理過程中拋出了異常,消息會(huì)根據(jù)設(shè)定的重試策略再次發(fā)送給消費(fèi)者。
默認(rèn)情況下,RocketMQ的消費(fèi)者采用的是自動(dòng)確認(rèn)機(jī)制。這意味著一旦消費(fèi)者監(jiān)聽器方法執(zhí)行完畢,無論其結(jié)果如何,消息都會(huì)被標(biāo)記為已消費(fèi)。如果在消費(fèi)過程中出現(xiàn)了異常,RocketMQ客戶端會(huì)根據(jù)配置的重試策略來重新投遞消息。 這種自動(dòng)確認(rèn)機(jī)制簡化了代碼,但是如果在消息處理過程中需要更細(xì)粒度的控制,或者需要確保消息即使在消費(fèi)過程中出現(xiàn)異常也不會(huì)丟失,那么應(yīng)該使用手動(dòng)確認(rèn)機(jī)制。
4. 高可用性問題
案例:主從同步延遲導(dǎo)致的消息丟失 在Broker主從同步配置不當(dāng)?shù)那闆r下,主Broker故障可能導(dǎo)致消息丟失。
解決方案:
為了確保在RocketMQ中消息的可靠性,特別是在出現(xiàn)故障時(shí)防止數(shù)據(jù)丟失,可以采取以下的解決方案:
- 1. 同步主從配置(SYNC_MASTER)
在RocketMQ中,SYNC_MASTER
是一個(gè)高可用性設(shè)置,它要求每條消息在確認(rèn)給生產(chǎn)者之前,不僅在主Broker上寫入磁盤,還要同步到所有的從Broker。這確保了即使主Broker出現(xiàn)故障,消息也不會(huì)丟失,因?yàn)樗呀?jīng)存在于從Broker中。
為了配置SYNC_MASTER
,需要在主Broker的配置文件(通常是broker-a.properties
)中設(shè)置:
brokerRole=SYNC_MASTER
并且在從Broker的配置文件(例如broker-b.properties
或broker-b-s.properties
)中設(shè)置:
brokerRole=SLAVE
這樣配置后,主Broker會(huì)等待消息同步到從Broker后才向生產(chǎn)者確認(rèn)消息發(fā)送成功。
注意
這種配置方式存在如下問題,主要包括:
性能開銷: 同步復(fù)制意味著每條消息都需要在被確認(rèn)前寫入主Broker并復(fù)制到從Broker。這個(gè)過程涉及額外的網(wǎng)絡(luò)IO和磁盤IO,會(huì)增加消息的延遲時(shí)間,尤其是在高負(fù)載情況下。
資源消耗: 由于每條消息都需要在多個(gè)Broker上存儲(chǔ),因此會(huì)增加存儲(chǔ)資源的使用。此外,同步復(fù)制還會(huì)增加網(wǎng)絡(luò)帶寬的占用。
擴(kuò)展性問題: 當(dāng)集群規(guī)模增大時(shí),同步復(fù)制可能會(huì)成為瓶頸。因?yàn)樗械膹腂roker都需要與主Broker保持實(shí)時(shí)數(shù)據(jù)同步,大量的同步操作可能會(huì)導(dǎo)致系統(tǒng)擴(kuò)展性受限。
系統(tǒng)復(fù)雜性: 同步主從配置增加了系統(tǒng)的復(fù)雜性,需要更多的管理和維護(hù)工作。例如,需要確保主從之間的同步機(jī)制正常工作,并且當(dāng)主Broker出現(xiàn)問題時(shí),從Broker能夠及時(shí)接管工作。
高可用性與性能的權(quán)衡: 雖然
SYNC_MASTER
配置可以提供較高的數(shù)據(jù)可靠性,但性能上的折衷可能使得它不適用于對(duì)延遲敏感或需要極高吞吐量的應(yīng)用場景。故障恢復(fù)時(shí)間: 當(dāng)主Broker失敗后,系統(tǒng)需要花費(fèi)時(shí)間來切換到從Broker,這期間系統(tǒng)的可用性可能會(huì)受到影響。
因此,使用SYNC_MASTER
配置時(shí)需要根據(jù)實(shí)際業(yè)務(wù)場景和需求來平衡可靠性和性能,可能需要在可接受的消息延遲和系統(tǒng)資源使用范圍內(nèi)進(jìn)行權(quán)衡。在不需要嚴(yán)格消息順序的場景下,可以考慮使用異步復(fù)制來提高性能。
- 2. 數(shù)據(jù)備份
為了進(jìn)一步保護(hù)數(shù)據(jù)免于丟失,可以定期對(duì)消息日志和消費(fèi)進(jìn)度信息進(jìn)行備份。這不僅包括了消息體本身,還包括了所有的消費(fèi)者偏移量和隊(duì)列信息。
備份策略可能包括以下幾點(diǎn):
定期備份:使用腳本或現(xiàn)成的備份工具,定期(如每天)將數(shù)據(jù)從Broker服務(wù)器復(fù)制到一個(gè)安全的備份位置。
實(shí)時(shí)備份:對(duì)于極其關(guān)鍵的數(shù)據(jù),可能需要考慮更高級(jí)的備份方案,例如使用磁盤陣列的鏡像功能,或者集成第三方的實(shí)時(shí)數(shù)據(jù)備份解決方案。
備份驗(yàn)證:定期對(duì)備份進(jìn)行恢復(fù)測試,以驗(yàn)證備份的完整性和有效性。
3. 監(jiān)控和報(bào)警
建立監(jiān)控系統(tǒng)以監(jiān)測Broker的狀態(tài)和性能指標(biāo),及時(shí)發(fā)現(xiàn)并處理同步延遲或失敗等問題。同時(shí),應(yīng)配置報(bào)警機(jī)制,在檢測到可能導(dǎo)致數(shù)據(jù)丟失的異常時(shí)立即通知運(yùn)維人員。
- 4. 集群部署
在不同的數(shù)據(jù)中心部署多個(gè)Broker集群,以防單一數(shù)據(jù)中心出現(xiàn)故障時(shí)影響整個(gè)消息系統(tǒng)??鐢?shù)據(jù)中心的復(fù)制可以通過RocketMQ的跨站點(diǎn)(cross-site)復(fù)制功能來實(shí)現(xiàn)。
通過實(shí)施這些策略,可以確保RocketMQ系統(tǒng)在多數(shù)故障情況下都能保證消息的不丟失,提高整個(gè)消息系統(tǒng)的健壯性和可靠性。這些措施需要結(jié)合具體的業(yè)務(wù)需求和系統(tǒng)環(huán)境來具體實(shí)施。
結(jié)語
綜合以上的策略,我們可以顯著增強(qiáng)在應(yīng)用中使用RocketMQ時(shí)的消息持久性和可靠性。通過仔細(xì)配置消息發(fā)送重試機(jī)制、同步刷盤、主從同步復(fù)制以及實(shí)施定期數(shù)據(jù)備份和強(qiáng)化監(jiān)控報(bào)警系統(tǒng),開發(fā)者能夠?yàn)橄⑾到y(tǒng)構(gòu)建一個(gè)更加堅(jiān)固的安全網(wǎng)。然而,理想的配置方案往往需要根據(jù)業(yè)務(wù)的具體需求和系統(tǒng)的運(yùn)行環(huán)境來定制。建議在部署前進(jìn)行充分的測試,以確保系統(tǒng)的穩(wěn)定性,并在實(shí)際運(yùn)行中持續(xù)監(jiān)控和調(diào)整,以應(yīng)對(duì)不斷變化的業(yè)務(wù)和技術(shù)環(huán)境。這樣的實(shí)踐將大大減少消息丟失的可能性,為應(yīng)用提供穩(wěn)定可靠的消息交換保障。
以上就是RocketMQ消息丟失的場景以及解決方案的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息丟失的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java中基本注解的知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家整理的是一篇關(guān)于java中基本注解的知識(shí)點(diǎn)總結(jié),有需要的朋友們可以跟著學(xué)習(xí)下。2021-06-06Java利用位運(yùn)算實(shí)現(xiàn)比較兩個(gè)數(shù)的大小
這篇文章主要為大家介紹了,在Java中如何不用任何比較判斷符(>,==,<),返回兩個(gè)數(shù)( 32 位整數(shù))中較大的數(shù),感興趣的可以了解一下2022-08-08SpringBoot中SmartLifecycle的使用解析
這篇文章主要介紹了SpringBoot中SmartLifecycle的使用解析,SmartLifecycle是一個(gè)擴(kuò)展了Lifecycle接口,可以跟蹤spring容器ApplicationContext刷新或者關(guān)閉的接口,實(shí)現(xiàn)該接口的實(shí)現(xiàn)類有特定的執(zhí)行順序,需要的朋友可以參考下2023-11-11java對(duì)于目錄下文件的單詞查找操作代碼實(shí)現(xiàn)
這篇文章主要介紹了java對(duì)于目錄下文件的單詞查找操作代碼實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Mybatis中BindingException異常的產(chǎn)生原因及解決過程
BindingException異常是MyBatis框架中自定義的異常,顧名思義指的是綁定出現(xiàn)問題,下面這篇文章主要給大家介紹了關(guān)于MyBatis報(bào)錯(cuò)BindingException異常的產(chǎn)生原因及解決過程,需要的朋友可以參考下2023-06-06