Java中的RocketMQ消費(fèi)冪等詳解
什么是消息冪等
當(dāng)出現(xiàn)消費(fèi)者對(duì)某條消息重復(fù)消費(fèi)的情況時(shí),重復(fù)消費(fèi)的結(jié)果與消費(fèi)一次的結(jié)果是相同的,并且多次消費(fèi)并未對(duì)業(yè)務(wù)系統(tǒng)業(yè)務(wù)產(chǎn)生任何負(fù)面影響,那么這個(gè)消費(fèi)過(guò)程就是消息冪等,在互聯(lián)網(wǎng)應(yīng)用中尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息很有可能會(huì)出現(xiàn)重復(fù)發(fā)送或者重復(fù)消費(fèi),如果重復(fù)的消息可能會(huì)影響業(yè)務(wù)處理,那么就應(yīng)該應(yīng)用消息冪等處理
消費(fèi)冪等的必要性
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下,消息隊(duì)列 RocketMQ 的消息有可能會(huì)出現(xiàn)重復(fù),這個(gè)重復(fù)簡(jiǎn)單可以概括為以下情況:
- 發(fā)送時(shí)消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶(hù)端宕機(jī),導(dǎo)致服務(wù)端對(duì)客戶(hù)端應(yīng)答失敗。 如果此時(shí)生產(chǎn)者意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
- 投遞時(shí)消息重復(fù)
消息消費(fèi)的場(chǎng)景下,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶(hù)端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷。 為了保證消息至少被消費(fèi)一次,消息隊(duì)列 RocketMQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息。
- Rebalance時(shí)消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動(dòng)、Broker 重啟以及訂閱方應(yīng)用重啟)
當(dāng)消息隊(duì)列 RocketMQ 的 Broker 或客戶(hù)端重啟、擴(kuò)容或縮容時(shí),會(huì)觸發(fā) Rebalance,此時(shí)消費(fèi)者可能會(huì)收到重復(fù)消息。
通用解決方案
兩要素
冪等性方案的設(shè)計(jì)中涉及兩項(xiàng)要輸,冪等令牌,與唯一性處理,只要充分利用好者兩要素,就可以設(shè)計(jì)出好的冪等解決方案。
- 冪等令牌:是生產(chǎn)者和消費(fèi)者兩者中的既定令牌,通常具有唯一業(yè)務(wù)標(biāo)識(shí)的字符串
- 唯一性處理:服務(wù)端通過(guò)采用一定的算法策略,保證同一個(gè)業(yè)務(wù)邏輯下不會(huì)被重復(fù)執(zhí)行成功多次
解決方案
對(duì)于常見(jiàn)的系統(tǒng),冪等性操作的通用性解決方案如下:、
- 首先通過(guò)緩存去重,在緩存中如果已經(jīng)存在某冪等令牌,則說(shuō)明本次操作是重復(fù)性操作,若緩存中沒(méi)有命中,則進(jìn)入下一步
- 在唯一性處理之前,先在數(shù)據(jù)庫(kù)中查詢(xún)冪等令牌作為索引的數(shù)據(jù)是否存在,若存在,則說(shuō)明本次操作為重復(fù)性操作,若不存在進(jìn)行下一步。
- 在同一事務(wù)中完成三項(xiàng)操作:唯一性處理后,將冪等令牌寫(xiě)到緩存中,并將冪等令牌作為唯一索引的數(shù)據(jù)寫(xiě)入DB中
設(shè)置業(yè)務(wù)key
因?yàn)?Message ID 有可能出現(xiàn)沖突(重復(fù))的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。 最好的方式是以業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識(shí)可以通過(guò)消息 Key 進(jìn)行設(shè)置:
Message message = new Message(); message.setKey("ORDERID_100"); SendResult sendResult = producer.send(message);
訂閱方收到消息時(shí)可以根據(jù)消息的 Key 進(jìn)行冪等處理:
consumer.subscribe("ons_test", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { String key = message.getKey() // 根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)的 key 做冪等處理 } });
注意:
消息重復(fù)消費(fèi)的情況是不能去避免的,我們需要考慮的就是在有重復(fù)消息的情況之下,怎么取保證冪等性,那么在保證冪等性的有一個(gè)關(guān)鍵,就是在發(fā)送消息的時(shí)候攜帶一個(gè)業(yè)務(wù)Key,然后在接收到消息后先去獲得這個(gè)業(yè)務(wù)Key,然后在消費(fèi)方的數(shù)據(jù)庫(kù)當(dāng)中判斷一下這個(gè)業(yè)務(wù)Key所對(duì)應(yīng)的消息有沒(méi)有消費(fèi)過(guò),如果沒(méi)有消費(fèi)過(guò)就接著消費(fèi),消費(fèi)完了在數(shù)據(jù)中存儲(chǔ)一下,或者緩存數(shù)據(jù)庫(kù)中存儲(chǔ)也行,如果當(dāng)前業(yè)務(wù)Key對(duì)應(yīng)的消息已經(jīng)消費(fèi)過(guò),那么直接舍棄即可!
支付實(shí)例場(chǎng)景
- 當(dāng)支付請(qǐng)求到達(dá)后,首先在Redis中獲取key作為支付流水號(hào)的緩存value,若value不為空,則說(shuō)明本次支付是重復(fù)操作,業(yè)務(wù)系統(tǒng)直接返回調(diào)用側(cè)重復(fù)支付標(biāo)識(shí),若value為空,則進(jìn)入下一步操作
- DBMS中根據(jù)支付流水號(hào)查詢(xún)是否存在相應(yīng)的實(shí)例,若存在,則說(shuō)明本次支付是重復(fù)操作,業(yè)務(wù)系統(tǒng)直接返回調(diào)用側(cè)支付標(biāo)識(shí),若不存在,則說(shuō)明本次操作是首次操作,進(jìn)入下一步完成唯一性處理
- 在分布式系統(tǒng)中完成三項(xiàng)操作
- 完成支付任務(wù)
- 將當(dāng)前支付流水號(hào)作為key,任意字段作為value,寫(xiě)入到Redis緩存中
- 將當(dāng)前支付流水號(hào)作為主鍵,與其他相關(guān)數(shù)據(jù)共同寫(xiě)入到DBMS中
到此這篇關(guān)于Java中的RocketMQ消費(fèi)冪等詳解的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)冪等內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
maven?解包依賴(lài)項(xiàng)中的文件的解決方法
Maven是java中的一種項(xiàng)目管理、項(xiàng)目構(gòu)建、依賴(lài)管理的工具,接下來(lái)通過(guò)本文給大家介紹maven?解包依賴(lài)項(xiàng)中的文件,需要的朋友可以參考下2022-07-07java 字符串相減(很簡(jiǎn)單的一個(gè)方法)
本篇文章是對(duì)java中關(guān)于字符串相減的一個(gè)簡(jiǎn)單的方法進(jìn)行了介紹,需要的朋友參考下2013-07-07Spring?Boot集成LiteFlow規(guī)則引擎的詳細(xì)過(guò)程
本文詳細(xì)介紹了如何在Spring?Boot應(yīng)用程序中集成LiteFlow規(guī)則引擎,并探討如何使用LiteFlow庫(kù)來(lái)實(shí)現(xiàn)業(yè)務(wù)流程的規(guī)則處理,將通過(guò)具體的示例來(lái)展示如何在Spring?Boot應(yīng)用程序中配置和使用LiteFlow規(guī)則引擎,以提高系統(tǒng)的靈活性和可維護(hù)性,感興趣的朋友跟隨小編一起看看吧2024-07-07javafx tableview鼠標(biāo)觸發(fā)更新屬性詳解
這篇文章主要為大家詳細(xì)介紹了javafx tableview鼠標(biāo)觸發(fā)更新屬性的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08