欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Pulsar源碼徹底解決重復(fù)消費(fèi)問題

 更新時(shí)間:2023年05月29日 11:29:53   作者:crossoverJie  
這篇文章主要為大家介紹了Pulsar源碼徹底解決重復(fù)消費(fèi)問題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

背景

最近真是和 Pulsar 杠上了,業(yè)務(wù)團(tuán)隊(duì)反饋說是線上有個(gè)應(yīng)用消息重復(fù)消費(fèi)。

而且在測試環(huán)境是可以穩(wěn)定復(fù)現(xiàn)的,根據(jù)經(jīng)驗(yàn)來看一般能穩(wěn)定復(fù)現(xiàn)的都比較好解決。

定位問題

接著便是定位問題了,根據(jù)之前的經(jīng)驗(yàn)讓業(yè)務(wù)按照這幾種情況先排查一下:

通過排查:1,2可以排除了。

  • 沒有相關(guān)日志
  • 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。

第三個(gè)也在消費(fèi)的入口和提交消息出計(jì)算了時(shí)間,最終發(fā)現(xiàn)都是在2s左右 ACK 的。

偽代碼如下:

Consumer consumer = client.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .topic(topic)
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscriptionName("my-sub")
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                        log.info("msg_id{}",msg.getMessageId().toString());
                        TimeUnit.SECONDS.sleep(2);
                        consumer.acknowledge(msg);
                    }
                })
                .subscribe();

那這就很奇怪了,因?yàn)榇a里配置的 ackTimeout 是 30s,理論上來說是不會存在超時(shí)導(dǎo)致消息重發(fā)的。

為了排除是否是超時(shí)引起的,直接將業(yè)務(wù)代碼注釋掉了,等于是消息收到后立即就 ACK,經(jīng)過測試發(fā)現(xiàn)這樣確實(shí)就沒有重復(fù)消費(fèi)了。

為了再次確認(rèn)是不是和 ackTimeout 有關(guān),直接將 .ackTimeout(30, TimeUnit.SECONDS) 注釋掉后測試,發(fā)現(xiàn)也沒有重復(fù)消費(fèi)了。

確認(rèn)原因

既然如此那一定是和這個(gè)配置有關(guān)了,但看代碼確實(shí)沒有超時(shí),為了定位具體原因只有去看 client 的源碼了。

這里簡單梳理下消息的消費(fèi)的流程:

  • 根據(jù) .receiverQueueSize(1000) 的配置,默認(rèn)情況下 broker 會直接給客戶端推送 1000 條消息。
  • 客戶端將這 1000 條消息保存到內(nèi)部隊(duì)列中。
  • 如果使用同步消費(fèi) receive() 時(shí),本質(zhì)上就是去 take 這個(gè)內(nèi)部隊(duì)列。
  • 如果是使用的是 messageListener 異步消費(fèi)并配置 ackTimeout,每當(dāng)從隊(duì)列里獲得一條消息后便會把這條消息加入 UnAckedMessageTracker 內(nèi)部的一個(gè)時(shí)間輪中,定時(shí)檢測頂部是否存在消息,如果存在則會觸發(fā)重新投遞。
    4.1 加入時(shí)間輪后,異步調(diào)用我們自定義的事件,這個(gè)異步操作是提交到一個(gè)無界隊(duì)列中由單個(gè)線程依次排隊(duì)執(zhí)行(這點(diǎn)是這次問題的關(guān)鍵)
  • 業(yè)務(wù) ACK 的時(shí)候會從時(shí)間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進(jìn)行重新投遞。

整體流程如上圖,代碼細(xì)節(jié)如下圖:

所以問題的根本原因就是寫入時(shí)間輪(UnAckedMessageTracker)開始倒計(jì)時(shí)的線程和回調(diào)業(yè)務(wù)邏輯的不是同一個(gè)線程。

如果業(yè)務(wù)執(zhí)行耗時(shí),等到消息從那個(gè)單線程的無界隊(duì)列中取出來的時(shí)候很有可能已經(jīng)過了 ackTimeou 的時(shí)間,從而導(dǎo)致了超時(shí)重發(fā)。

也就是用戶所理解的 ackTimeout 周期(應(yīng)該進(jìn)入回調(diào)時(shí)候開始計(jì)時(shí))和 SDK 實(shí)現(xiàn)的不一致造成的。

之后我再次確認(rèn)同樣的代碼換為同步消費(fèi)是沒有問題的,不會導(dǎo)致重復(fù)消費(fèi):

while (true) {
Message msg = consumer.receive();
            log.info(
                    "consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString());
            TimeUnit.SECONDS.sleep(2);
            consumer.acknowledge(msg);    
}

查看代碼后發(fā)現(xiàn)同步代碼的獲取消息和加入 UnAckedMessageTracker 時(shí)間輪是同步的,也就不會出現(xiàn)超時(shí)的問題。

總結(jié)

所以其實(shí) 是messageListener 異步消費(fèi)的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker 處移動(dòng)到回調(diào)函數(shù)中同步調(diào)用。

我查看了最新的 2.11.x 版本的代碼依然沒有修復(fù),正準(zhǔn)備提個(gè) PR 切換到 master 時(shí)才發(fā)現(xiàn)已經(jīng)有相關(guān)的 PR 了,只是還沒有發(fā)版。

修復(fù)的背景和思路也是類似的,具體參考:

https://github.com/apache/pul...

其實(shí)業(yè)務(wù)中并不推薦使用 ackTimeout 這個(gè)配置了,不好預(yù)估時(shí)間從而導(dǎo)致超時(shí),而且我相信大部分業(yè)務(wù)配置好 ackTImeout 后直到后續(xù)出問題的時(shí)候才想起來要改。

所以干脆一開始就不要使用。

在 go 版本的 SDK 中直接廢棄掉了這個(gè)參數(shù),推薦使用 nack API 替換。

以上就是Pulsar源碼徹底解決重復(fù)消費(fèi)問題的詳細(xì)內(nèi)容,更多關(guān)于Pulsar重復(fù)消費(fèi)解決的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Springboot獲取文件內(nèi)容如何將MultipartFile轉(zhuǎn)File

    Springboot獲取文件內(nèi)容如何將MultipartFile轉(zhuǎn)File

    本文給大家介紹Springboot獲取文件內(nèi)容,將MultipartFile轉(zhuǎn)File方法,本文結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧
    2024-01-01
  • Java實(shí)現(xiàn)斗地主的發(fā)牌功能

    Java實(shí)現(xiàn)斗地主的發(fā)牌功能

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)斗地主的發(fā)牌功能,含按順序發(fā)牌和玩家牌排序顯示等功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-06-06
  • java poi導(dǎo)出excel時(shí)如何設(shè)置手動(dòng)換行

    java poi導(dǎo)出excel時(shí)如何設(shè)置手動(dòng)換行

    這篇文章主要介紹了java poi導(dǎo)出excel時(shí)如何設(shè)置手動(dòng)換行,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • idea創(chuàng)建springboot項(xiàng)目(版本只能選擇17和21)的解決方法

    idea創(chuàng)建springboot項(xiàng)目(版本只能選擇17和21)的解決方法

    idea2023創(chuàng)建spring boot項(xiàng)目時(shí),java版本無法選擇11,本文主要介紹了idea創(chuàng)建springboot項(xiàng)目(版本只能選擇17和21),下面就來介紹一下解決方法,感興趣的可以了解一下
    2024-01-01
  • Java封裝數(shù)組之添加元素操作實(shí)例分析

    Java封裝數(shù)組之添加元素操作實(shí)例分析

    這篇文章主要介紹了Java封裝數(shù)組之添加元素操作,結(jié)合實(shí)例形式分析了Java封裝數(shù)組實(shí)現(xiàn)元素追加、插入等相關(guān)操作技巧,需要的朋友可以參考下
    2020-03-03
  • Mybatis的核心配置文件使用方法

    Mybatis的核心配置文件使用方法

    Mybatis的核心配置文件有兩個(gè),一個(gè)是全局配置文件,它包含了會深深影響Mybatis行為的設(shè)置和屬性信息;一個(gè)是映射文件,它很簡單,讓用戶能更專注于SQL代碼,本文主要介紹了Mybatis的核心配置文件使用方法,感興趣的可以了解一下
    2023-11-11
  • Spring?Boot3虛擬線程的使用步驟詳解

    Spring?Boot3虛擬線程的使用步驟詳解

    虛擬線程是 Java 19 中引入的一個(gè)新特性,旨在通過簡化線程管理來提升應(yīng)用程序的并發(fā)性能,這篇文章主要介紹了Spring?Boot3虛擬線程的使用步驟,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-03-03
  • Java實(shí)現(xiàn)多個(gè)sheet頁數(shù)據(jù)導(dǎo)出功能

    Java實(shí)現(xiàn)多個(gè)sheet頁數(shù)據(jù)導(dǎo)出功能

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)多個(gè)sheet頁數(shù)據(jù)導(dǎo)出功能的相關(guān)知識,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-03-03
  • java 對象輸入輸出流讀寫文件的操作實(shí)例

    java 對象輸入輸出流讀寫文件的操作實(shí)例

    這篇文章主要介紹了java 對象輸入輸出流讀寫文件的操作實(shí)例的相關(guān)資料,這里使用實(shí)現(xiàn)Serializable接口,需要的朋友可以參考下
    2017-07-07
  • java 實(shí)現(xiàn)圖片圓角處理、背景透明化

    java 實(shí)現(xiàn)圖片圓角處理、背景透明化

    這篇文章主要介紹了java 實(shí)現(xiàn)圖片圓角處理、背景透明化,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11

最新評論