Pulsar源碼徹底解決重復消費問題
背景
最近真是和 Pulsar
杠上了,業(yè)務團隊反饋說是線上有個應用消息重復消費。
而且在測試環(huán)境是可以穩(wěn)定復現(xiàn)的,根據(jù)經(jīng)驗來看一般能穩(wěn)定復現(xiàn)的都比較好解決。
定位問題
接著便是定位問題了,根據(jù)之前的經(jīng)驗讓業(yè)務按照這幾種情況先排查一下:
通過排查:1,2可以排除了。
- 沒有相關日志
- 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。
第三個也在消費的入口和提交消息出計算了時間,最終發(fā)現(xiàn)都是在2s左右 ACK 的。
偽代碼如下:
Consumer consumer = client.newConsumer() .subscriptionType(SubscriptionType.Shared) .enableRetry(true) .topic(topic) .ackTimeout(30, TimeUnit.SECONDS) .subscriptionName("my-sub") .messageListener(new MessageListener<byte[]>() { @SneakyThrows @Override public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { log.info("msg_id{}",msg.getMessageId().toString()); TimeUnit.SECONDS.sleep(2); consumer.acknowledge(msg); } }) .subscribe();
那這就很奇怪了,因為代碼里配置的 ackTimeout 是 30s,理論上來說是不會存在超時導致消息重發(fā)的。
為了排除是否是超時引起的,直接將業(yè)務代碼注釋掉了,等于是消息收到后立即就 ACK,經(jīng)過測試發(fā)現(xiàn)這樣確實就沒有重復消費了。
為了再次確認是不是和 ackTimeout 有關,直接將 .ackTimeout(30, TimeUnit.SECONDS)
注釋掉后測試,發(fā)現(xiàn)也沒有重復消費了。
確認原因
既然如此那一定是和這個配置有關了,但看代碼確實沒有超時,為了定位具體原因只有去看 client 的源碼了。
這里簡單梳理下消息的消費的流程:
- 根據(jù)
.receiverQueueSize(1000)
的配置,默認情況下 broker 會直接給客戶端推送 1000 條消息。 - 客戶端將這 1000 條消息保存到內(nèi)部隊列中。
- 如果使用同步消費
receive()
時,本質(zhì)上就是去take
這個內(nèi)部隊列。 - 如果是使用的是
messageListener
異步消費并配置ackTimeout
,每當從隊列里獲得一條消息后便會把這條消息加入UnAckedMessageTracker
內(nèi)部的一個時間輪中,定時檢測頂部是否存在消息,如果存在則會觸發(fā)重新投遞。
4.1 加入時間輪后,異步
調(diào)用我們自定義的事件,這個異步操作是提交到一個無界隊列中由單個線程依次排隊執(zhí)行(這點是這次問題的關鍵) - 業(yè)務 ACK 的時候會從時間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進行重新投遞。
整體流程如上圖,代碼細節(jié)如下圖:
所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker
)開始倒計時的線程和回調(diào)業(yè)務邏輯的不是同一個線程。
如果業(yè)務執(zhí)行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經(jīng)過了 ackTimeou 的時間,從而導致了超時重發(fā)。
也就是用戶所理解的 ackTimeout
周期(應該進入回調(diào)時候開始計時)和 SDK 實現(xiàn)的不一致造成的。
之后我再次確認同樣的代碼換為同步消費是沒有問題的,不會導致重復消費:
while (true) { Message msg = consumer.receive(); log.info( "consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString()); TimeUnit.SECONDS.sleep(2); consumer.acknowledge(msg); }
查看代碼后發(fā)現(xiàn)同步代碼的獲取消息和加入 UnAckedMessageTracker
時間輪是同步的,也就不會出現(xiàn)超時的問題。
總結
所以其實 是messageListener
異步消費的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker
處移動到回調(diào)函數(shù)中同步調(diào)用。
我查看了最新的 2.11.x
版本的代碼依然沒有修復,正準備提個 PR 切換到 master 時才發(fā)現(xiàn)已經(jīng)有相關的 PR 了,只是還沒有發(fā)版。
修復的背景和思路也是類似的,具體參考:
https://github.com/apache/pul...
其實業(yè)務中并不推薦使用 ackTimeout 這個配置了,不好預估時間從而導致超時,而且我相信大部分業(yè)務配置好 ackTImeout
后直到后續(xù)出問題的時候才想起來要改。
所以干脆一開始就不要使用。
在 go 版本的 SDK 中直接廢棄掉了這個參數(shù),推薦使用 nack API 替換。
以上就是Pulsar源碼徹底解決重復消費問題的詳細內(nèi)容,更多關于Pulsar重復消費解決的資料請關注腳本之家其它相關文章!
相關文章
Springboot獲取文件內(nèi)容如何將MultipartFile轉File
本文給大家介紹Springboot獲取文件內(nèi)容,將MultipartFile轉File方法,本文結合示例代碼給大家介紹的非常詳細,感興趣的朋友一起看看吧2024-01-01idea創(chuàng)建springboot項目(版本只能選擇17和21)的解決方法
idea2023創(chuàng)建spring boot項目時,java版本無法選擇11,本文主要介紹了idea創(chuàng)建springboot項目(版本只能選擇17和21),下面就來介紹一下解決方法,感興趣的可以了解一下2024-01-01Java實現(xiàn)多個sheet頁數(shù)據(jù)導出功能
這篇文章主要為大家詳細介紹了Java實現(xiàn)多個sheet頁數(shù)據(jù)導出功能的相關知識,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2024-03-03