Java中的RocketMQ消費冪等詳解
什么是消息冪等
當出現(xiàn)消費者對某條消息重復消費的情況時,重復消費的結果與消費一次的結果是相同的,并且多次消費并未對業(yè)務系統(tǒng)業(yè)務產生任何負面影響,那么這個消費過程就是消息冪等,在互聯(lián)網(wǎng)應用中尤其在網(wǎng)絡不穩(wěn)定的情況下,消息很有可能會出現(xiàn)重復發(fā)送或者重復消費,如果重復的消息可能會影響業(yè)務處理,那么就應該應用消息冪等處理
消費冪等的必要性
在互聯(lián)網(wǎng)應用中,尤其在網(wǎng)絡不穩(wěn)定的情況下,消息隊列 RocketMQ 的消息有可能會出現(xiàn)重復,這個重復簡單可以概括為以下情況:
- 發(fā)送時消息重復
當一條消息已被成功發(fā)送到服務端并完成持久化,此時出現(xiàn)了網(wǎng)絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。 如果此時生產者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內容相同并且 Message ID 也相同的消息。
- 投遞時消息重復
消息消費的場景下,消息已投遞到消費者并完成業(yè)務處理,當客戶端給服務端反饋應答的時候網(wǎng)絡閃斷。 為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端將在網(wǎng)絡恢復后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內容相同并且 Message ID 也相同的消息。
- Rebalance時消息重復(包括但不限于網(wǎng)絡抖動、Broker 重啟以及訂閱方應用重啟)
當消息隊列 RocketMQ 的 Broker 或客戶端重啟、擴容或縮容時,會觸發(fā) Rebalance,此時消費者可能會收到重復消息。
通用解決方案
兩要素
冪等性方案的設計中涉及兩項要輸,冪等令牌,與唯一性處理,只要充分利用好者兩要素,就可以設計出好的冪等解決方案。
- 冪等令牌:是生產者和消費者兩者中的既定令牌,通常具有唯一業(yè)務標識的字符串
- 唯一性處理:服務端通過采用一定的算法策略,保證同一個業(yè)務邏輯下不會被重復執(zhí)行成功多次
解決方案
對于常見的系統(tǒng),冪等性操作的通用性解決方案如下:、
- 首先通過緩存去重,在緩存中如果已經(jīng)存在某冪等令牌,則說明本次操作是重復性操作,若緩存中沒有命中,則進入下一步
- 在唯一性處理之前,先在數(shù)據(jù)庫中查詢冪等令牌作為索引的數(shù)據(jù)是否存在,若存在,則說明本次操作為重復性操作,若不存在進行下一步。
- 在同一事務中完成三項操作:唯一性處理后,將冪等令牌寫到緩存中,并將冪等令牌作為唯一索引的數(shù)據(jù)寫入DB中
設置業(yè)務key
因為 Message ID 有可能出現(xiàn)沖突(重復)的情況,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)。 最好的方式是以業(yè)務唯一標識作為冪等處理的關鍵依據(jù),而業(yè)務的唯一標識可以通過消息 Key 進行設置:
Message message = new Message(); message.setKey("ORDERID_100"); SendResult sendResult = producer.send(message);
訂閱方收到消息時可以根據(jù)消息的 Key 進行冪等處理:
consumer.subscribe("ons_test", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { String key = message.getKey() // 根據(jù)業(yè)務唯一標識的 key 做冪等處理 } });
注意:
消息重復消費的情況是不能去避免的,我們需要考慮的就是在有重復消息的情況之下,怎么取保證冪等性,那么在保證冪等性的有一個關鍵,就是在發(fā)送消息的時候攜帶一個業(yè)務Key,然后在接收到消息后先去獲得這個業(yè)務Key,然后在消費方的數(shù)據(jù)庫當中判斷一下這個業(yè)務Key所對應的消息有沒有消費過,如果沒有消費過就接著消費,消費完了在數(shù)據(jù)中存儲一下,或者緩存數(shù)據(jù)庫中存儲也行,如果當前業(yè)務Key對應的消息已經(jīng)消費過,那么直接舍棄即可!
支付實例場景
- 當支付請求到達后,首先在Redis中獲取key作為支付流水號的緩存value,若value不為空,則說明本次支付是重復操作,業(yè)務系統(tǒng)直接返回調用側重復支付標識,若value為空,則進入下一步操作
- DBMS中根據(jù)支付流水號查詢是否存在相應的實例,若存在,則說明本次支付是重復操作,業(yè)務系統(tǒng)直接返回調用側支付標識,若不存在,則說明本次操作是首次操作,進入下一步完成唯一性處理
- 在分布式系統(tǒng)中完成三項操作
- 完成支付任務
- 將當前支付流水號作為key,任意字段作為value,寫入到Redis緩存中
- 將當前支付流水號作為主鍵,與其他相關數(shù)據(jù)共同寫入到DBMS中
到此這篇關于Java中的RocketMQ消費冪等詳解的文章就介紹到這了,更多相關RocketMQ消費冪等內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
如何使用會話Cookie和Java實現(xiàn)JWT身份驗證
這篇文章主要介紹了如何使用會話Cookie和Java實現(xiàn)JWT身份驗證,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2021-03-03Java,JSP,Servlet獲取當前工程路徑(絕對路徑)問題解析
這篇文章主要介紹了Java,JSP,Servlet獲取當前工程路徑(絕對路徑)問題解析,需要的朋友可以參考下。2017-09-09MyBatis使用resultMap如何解決列名和屬性名不一致
這篇文章主要介紹了MyBatis使用resultMap如何解決列名和屬性名不一致的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01