Golang中Kafka的重復(fù)消費(fèi)和消息丟失問題的解決方案
前言
在Kafka中,生產(chǎn)者(Producer)和消費(fèi)者(Consumer)是通過發(fā)布訂閱模式進(jìn)行協(xié)作的,生產(chǎn)者將消息發(fā)送到Kafka集群,而消費(fèi)者從Kafka集群中拉取消息進(jìn)行消費(fèi),無論是生產(chǎn)者發(fā)送消息到Kafka集群還是消費(fèi)者從Kafka集群中拉取消息進(jìn)行消費(fèi),都是容易出現(xiàn)問題的,比較典型的就是消費(fèi)端的重復(fù)消費(fèi)問題、生產(chǎn)端和消費(fèi)端產(chǎn)生的消息丟失問題。下面將對這兩個(gè)問題出現(xiàn)的場景以及常見的解決方案進(jìn)行講解。
一、重復(fù)消費(fèi)
1.1 重復(fù)消費(fèi)出現(xiàn)的場景
重復(fù)消費(fèi)出現(xiàn)的常見場景主要分為兩種,一種是 Consumer在消費(fèi)過程中,應(yīng)用進(jìn)程被強(qiáng)制kill掉或者發(fā)生異常退出(掛掉…),另一種則是Consumer消費(fèi)的時(shí)間過長。
1.1.1 Consumer消費(fèi)過程中,進(jìn)程掛掉/異常退出
在Kafka消費(fèi)端的使用中,位移(Offset)的提交有兩種方式,自動提交和手動提交。自動提交情況下,當(dāng)消費(fèi)者拉取一批消息進(jìn)行消費(fèi)后,需要進(jìn)行Offset的提交,在消費(fèi)端提交Offset之前,Consumer掛掉了,當(dāng)Consumer重啟后再次拉取Offset,這時(shí)候拉取的依然是掛掉之前消費(fèi)的Offset,因此造成重復(fù)消費(fèi)的問題。在手動提交模式下,在提交代碼調(diào)用之前,Consumer掛掉也會造成重復(fù)消費(fèi)。
1.1.2 消費(fèi)者消費(fèi)時(shí)間過長
Kafka消費(fèi)端的參數(shù)max.poll.interval.ms
定義了兩次poll的最大間隔,它的默認(rèn)值是 5 分鐘,表示 Consumer 如果在 5 分鐘之內(nèi)無法消費(fèi)完 poll方法返回的消息,那么Consumer 會主動發(fā)起“離開組”的請求。
在離開消費(fèi)組后,開始Rebalance,因此提交Offset失敗。之后重新Rebalance,消費(fèi)者再次分配Partition后,再次poll拉取消息依然從之前消費(fèi)過的消息處開始消費(fèi),這樣就造成重復(fù)消費(fèi)。而且若不解決消費(fèi)單次消費(fèi)時(shí)間過長的問題,這部分消息可能會一直重復(fù)消費(fèi)。
整體上來說,如果我們在消費(fèi)中將消息數(shù)據(jù)處理入庫,但是在執(zhí)行Offset提交時(shí),Kafka宕機(jī)或者網(wǎng)絡(luò)原因等無法提交Offset,當(dāng)我們重啟服務(wù)或者Rebalance過程觸發(fā),Consumer將再次消費(fèi)此消息數(shù)據(jù)。
1.2 重復(fù)消費(fèi)解決方案
1.2.1 針對于消費(fèi)端掛掉等原因造成的重復(fù)消費(fèi)問題
這部分主要集中在消費(fèi)端的編碼層面,需要我們在設(shè)計(jì)代碼時(shí)以冪等性的角度進(jìn)行開發(fā)設(shè)計(jì),保證同一數(shù)據(jù)無論進(jìn)行多少次消費(fèi),所造成的結(jié)果都一樣。處理方式可以在消息體中添加唯一標(biāo)識(比如將消息生成md5保存到Mysql或者是Redis中,在處理消息前先檢查下Mysql/Redis是否已經(jīng)處理過該消息了),消費(fèi)端進(jìn)行確認(rèn)此唯一標(biāo)識是否已經(jīng)消費(fèi)過,如果消費(fèi)過,則不進(jìn)行之后處理。從而盡可能的避免了重復(fù)消費(fèi)。
冪等角度大概兩種實(shí)現(xiàn):
- 將唯一標(biāo)識存入第三方介質(zhì)(如Redis),要操作數(shù)據(jù)的時(shí)候先判斷第三方介質(zhì)(數(shù)據(jù)庫或者緩存)有沒有這個(gè)唯一標(biāo)識。
- 將版本號(offset)存入到數(shù)據(jù)里面,然后再要操作數(shù)據(jù)的時(shí)候用這個(gè)版本號做樂觀鎖,當(dāng)版本號大于原先的才能操作。
1.2.2 針對于Consumer消費(fèi)時(shí)間過長帶來的重復(fù)消費(fèi)問題
- 提高單條消息的處理速度。例如對消息處理中比較耗時(shí)的操作可通過異步的方式進(jìn)行處理、利用多線程處理等。
- 其次,在縮短單條消息消費(fèi)時(shí)常的同時(shí),根據(jù)實(shí)際場景可將
max.poll.interval.ms
值設(shè)置大一點(diǎn),避免不必要的rebalance,此外可適當(dāng)減小max.poll.records
的值,默認(rèn)值是500,可根據(jù)實(shí)際消息速率適當(dāng)調(diào)小。
二、消息丟失
在Kafka中,消息丟失在Kafka的生產(chǎn)端和消費(fèi)端都會出現(xiàn)。在此之前我們先來了解一下生產(chǎn)者和消費(fèi)者的原理。
2.1 生產(chǎn)端問題
生產(chǎn)者原理:
Kafka生產(chǎn)者生產(chǎn)消息后,會將消息發(fā)送到Kafka集群的Leader中,然后Kafka集群的Leader收到消息后會返回ACK確認(rèn)消息給生產(chǎn)者Producer。主要拆解為以下幾個(gè)步驟。
- Producer先從Kafka集群找到該P(yáng)artition的Leader。
- Producer將消息發(fā)送給Leader,Leader將該消息寫入本地。
- Follwer從Leader pull消息,寫入本地Log后Leader發(fā)送ACK。
- Leader 收到所有 ISR 中的 Replica 的 ACK 后,增加High Watermark,并向 Producer 發(fā)送 ACK。
- 因此,Kafka集群(其實(shí)是分區(qū)的Leader)最終會返回一個(gè)ACK來確認(rèn)Producer推送消息的結(jié)果,這里Kafka提供了三種模式:
NoResponse RequiredAcks = 0
:這個(gè)代表的就是不進(jìn)行消息推送是否成功的確認(rèn)。WaitForLocal RequiredAcks = 1
:當(dāng)local(Leader)確認(rèn)接收成功后,就可以返回了。WaitForAll RequiredAcks = -1
:當(dāng)所有的Leader和Follower都接收成功時(shí),才會返回。
因此這個(gè)配置的影響也分為下面三種情況:
- 設(shè)置為0,Producer不進(jìn)行消息發(fā)送的確認(rèn),Kafka集群(Broker)可能由于一些原因并沒有收到對應(yīng)消息,從而引起消息丟失。
- 設(shè)置為1,Producer在確認(rèn)到 Topic Leader 已經(jīng)接收到消息后,完成發(fā)送,此時(shí)有可能 Follower 并沒有接收到對應(yīng)消息。此時(shí)如果 Leader 突然宕機(jī),在經(jīng)過選舉之后,沒有接到消息的 Follower 晉升為 Leader,從而引起消息丟失。
- 設(shè)置為-1,可以很好的確認(rèn)Kafka集群是否已經(jīng)完成消息的接收和本地化存儲,并且可以在Producer發(fā)送失敗時(shí)進(jìn)行重試。
生產(chǎn)端解決消息丟失方案:
- 通過設(shè)置RequiredAcks模式來解決,選用WaitForAll(對應(yīng)值為-1)可以保證數(shù)據(jù)推送成功,不過會影響延時(shí)。
- 引入重試機(jī)制,設(shè)置重試次數(shù)和重試間隔。
- 當(dāng)然,最后就是使用Kafka的多副本機(jī)制保證Kafka集群本身的可靠性,確保當(dāng)Leader掛掉之后能進(jìn)行Follower選舉晉升為新的Leader。
2.2 消費(fèi)端問題
消費(fèi)端的消息丟失問題:
消費(fèi)端的消息丟失主要是因?yàn)樵谙M(fèi)過程中出現(xiàn)了異常,但是對應(yīng)消息的 Offset 已經(jīng)提交,那么消費(fèi)異常的消息將會丟失。
前面介紹過,Offset的提交包括手動提交和自動提交,可通過kafka.consumer.enable-auto-commit
進(jìn)行配置。
手動提交可以靈活的確認(rèn)是否將本次消費(fèi)數(shù)據(jù)的Offset進(jìn)行提交,可以很好的避免消息丟失的情況。
自動提交是引起消息丟失的主要誘因。因?yàn)橄⒌南M(fèi)并不會影響到Offset的提交。
大部分的解決方案為了盡可能的保證數(shù)據(jù)的完整性,都是盡量去選用手動提交的方式,當(dāng)數(shù)據(jù)處理完之后再進(jìn)行提交。
當(dāng)然,在golang中我們主要使用sarama包的Kafka,sarama自動提交的原理是先進(jìn)行標(biāo)記,再進(jìn)行提交,如下代碼所示:
type exampleConsumerGroupHandler struct{} func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil } func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d ", msg.Topic, msg.Partition, msg.Offset) // 標(biāo)記消息已處理,sarama會自動提交 // 處理數(shù)據(jù)(如真正持久化mysql...) sess.MarkMessage(msg, "") } return nil
因此,我們完全可以在標(biāo)記之前進(jìn)行數(shù)據(jù)的處理,例如插入Mysql等,當(dāng)出現(xiàn)插入成功后程序崩潰,下一次最多重復(fù)消費(fèi)一次(因?yàn)檫€沒標(biāo)記,Offset沒有提交),而不會因?yàn)镺ffset超前,導(dǎo)致應(yīng)用層消息丟失了。
手動提交模式下當(dāng)然是很靈活的控制的,但確實(shí)已經(jīng)沒必要了:
consumerConfig := sarama.NewConfig() consumerConfig.Version = sarama.V2_8_0_0 consumerConfig.Consumer.Return.Errors = false consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // 禁用自動提交,改為手動 consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s ", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) // 插入mysql.... // 手動提交模式下,也需要先進(jìn)行標(biāo)記 sess.MarkMessage(msg, "") consumerCount++ if consumerCount%3 == 0 { // 手動提交,不能頻繁調(diào)用 t1 := time.Now().Nanosecond() sess.Commit() t2 := time.Now().Nanosecond() fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms") } } return nil }
到此這篇關(guān)于Golang中Kafka的重復(fù)消費(fèi)和消息丟失問題的解決方案的文章就介紹到這了,更多相關(guān)Golang Kafka重復(fù)消費(fèi)和消息丟失內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言實(shí)現(xiàn)有規(guī)律的數(shù)字版本號的排序工具
這篇文章主要為大家詳細(xì)介紹了如何利用Go語言實(shí)現(xiàn)有規(guī)律的數(shù)字版本號的排序工具,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-01-01golang?gorm的預(yù)加載及軟刪硬刪的數(shù)據(jù)操作示例
這篇文章主要介紹了golang?gorm的預(yù)加載及軟刪硬刪的數(shù)據(jù)操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04深入探索Go語言中的高效數(shù)據(jù)結(jié)構(gòu)堆
堆,作為一種基本的數(shù)據(jù)結(jié)構(gòu),以其在優(yōu)先隊(duì)列和排序算法中提供高效解決方案的能力而聞名。在本文中,我們將深入探討堆的內(nèi)部工作原理,包括其特性、實(shí)現(xiàn)細(xì)節(jié)以及在現(xiàn)代編程中的應(yīng)用2008-06-06Go語言切片前或中間插入項(xiàng)與內(nèi)置copy()函數(shù)詳解
這篇文章主要介紹了Go語言切片前或中間插入項(xiàng)與內(nèi)置copy()函數(shù)詳解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04詳解Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心
這篇文章主要為大家為大家介紹了Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05