kafka監(jiān)聽問題的解決和剖析
問題如下:
- kafka為什么監(jiān)聽不到數(shù)據(jù)
- kafka為什么會有重復數(shù)據(jù)發(fā)送
- kafka數(shù)據(jù)重復如何解決
- 為什么kafka會出現(xiàn)倆個消費端都可以消費問題
- kafka監(jiān)聽配置文件
一. 解決問題一(kafka監(jiān)聽不到數(shù)據(jù))
首先kafka監(jiān)聽不得到數(shù)據(jù),檢查如下
- 檢查配置文件是否正確(可能會出現(xiàn)改了監(jiān)聽地址,監(jiān)聽Topic,監(jiān)聽的地址的數(shù)量問題)
- 檢查接收數(shù)據(jù)的正確性(比如原生的代碼,可能是用byte序列化接收的數(shù)據(jù),而你接收使用String。也是配置文件序列化問題,還有與發(fā)送者商量問題)
- 檢查kafka版本問題(一般的版本其實是沒什么問題的,只有個別版本會出現(xiàn)監(jiān)聽不到問題)
- 沒有加
@Component 犯了最不應(yīng)該出差錯的問題
如果出現(xiàn)監(jiān)聽不到數(shù)據(jù)的問題,那么就試試更改方法一二,如果不可以在去試試方法三,之前出現(xiàn)這個問題也是查過 一般查到都會說 “低版本的服務(wù)器接收不到高版本的生產(chǎn)者發(fā)送的消息”,但是凈由測試使用 用1.0.5RELEASE 和 2.6.3反復測試,并沒有任何的問題。
如果按照版本一致,那么根本就不現(xiàn)實,因為可能不同的項目,springboot版本不一致的話,可能有的springboot版本低,那么你還得要求自己維護項目版本升級?如果出現(xiàn)第四種情況就無話可說了。
二. 解決問題二(kafka為什么會有重復數(shù)據(jù)發(fā)送)
重復數(shù)據(jù)的發(fā)送問題如下
- 可能在發(fā)送者的那里的事務(wù)問題。mysql存儲事務(wù)發(fā)生異常導致回滾操作,但是kafka消息卻是已經(jīng)發(fā)送到了服務(wù)器中。此事肯定會出現(xiàn)重復問題
- 生產(chǎn)者設(shè)置時間問題,生產(chǎn)發(fā)送設(shè)置的時間內(nèi),消息沒完成發(fā)送,生產(chǎn)者以為消費者掛掉,便重新發(fā)送一個,導致重復
- offset問題,當項目重啟,offset走到某一個位置已扔到kafka服務(wù)器中,但是項目被重啟.那么offset會是在原本重啟的那一個點的地方再次發(fā)送一次,這是kafka設(shè)計的問題,防止出現(xiàn)丟失數(shù)據(jù)問題
三. 解決問題三(kafka數(shù)據(jù)重復如何解決)
目前我是使用的Redis進行的排重法,用的是Redis中的set,保證里面不存在重復,保證Redis里面不會存入太多的臟數(shù)據(jù)。并定期清理
粘貼一下我的排重(Redis排重法)
//kafka prefix String cache = "kafka_cache"; //kafka suffix Calendar c = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //0點,目前是為了設(shè)置為這一天的固定時間。這個完全可以去寫個工具類自己弄,為了看的更清楚,麻煩了一點的寫入 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00"); String gtimeStart = sdf2.format(c.getTime()); long time = sdf.parse(gtimeStart).getTime(); //此位置為了設(shè)置是否是新的一天,新的一天需要設(shè)置定時時間,保證redis中不會存儲太多無用數(shù)據(jù) Boolean flag = false; //數(shù)據(jù)接收 Set<String> range = new HashSet<>(); //判斷是否存在 if (redisTemplate.hasKey(cache + time)) { //存在則取出這個set range = redisTemplate.opsForSet().members(cache + time); }else { //不存在,則為下面過期時間的設(shè)置鋪墊 flag = true; } //判斷監(jiān)聽到的數(shù)據(jù)是否是重復 if (range.contains("測試需要")) { //重復則排出,根據(jù)邏輯自己修改 continue; } else { //添加進去 redisTemplate.opsForSet().add(cache + time, i+""); if (flag){ //設(shè)置為24小時,保證新一天使用,之前使用的存儲會消失掉 redisTemplate.expire(cache + time,24,TimeUnit.HOURS); //不會在進入這個里面,如果多次的存入過期時間,那么這個key的過期時間就永遠是24小時,一直就不會過期 flag = false; } }
四. 解決問題四(為什么kafka會出現(xiàn)倆個消費端都可以消費問題)
原因是因為在不同groupId之下,kafka接收到以后,會給監(jiān)聽他的每一個組發(fā)送一個他所收到的消息,但是兩個消費端監(jiān)聽同一個租,那么就只有一個消費端可以消費到。
五. 粘一下我的監(jiān)聽配置文件
# 指定kafka 代理地址,可以多個,用逗號間隔 spring.kafka.bootstrap-servers= localhost:9092 # 指定默認消費者group id spring.kafka.consumer.group-id= test # 是否自動提交 spring.kafka.consumer.enable-auto-commit= true # 提交間隔的毫秒 spring.kafka.consumer.auto-commit-interval.ms=60000 # 最大輪詢的次數(shù) spring.kafka.consumer.max-poll-records=1 # 將偏移量重置為最新偏移量 spring.kafka.consumer.auto-offset-reset=earliest # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
總結(jié)
到此這篇關(guān)于kafka監(jiān)聽問題的解決和剖析的文章就介紹到這了,更多相關(guān)kafka監(jiān)聽問題內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring boot實現(xiàn)profiles動態(tài)切換的示例
Spring Boot支持在不同的環(huán)境下使用不同的配置文件,該技術(shù)非常有利于持續(xù)集成,在構(gòu)建項目的時候只需要使用不同的構(gòu)建命令就可以生成不同運行環(huán)境下war包,而不需要手動切換配置文件。2020-10-10Mockito mock Kotlin Object類方法報錯解決方法
這篇文章主要介紹了Mockito mock Kotlin Object類方法報錯解決方法,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-09-09修改jar包package目錄結(jié)構(gòu)操作方法
這篇文章主要介紹了修改jar包package目錄結(jié)構(gòu)操作方法,本文給大家介紹的非常詳細,具有一定的參考借鑒價值 ,需要的朋友可以參考下2019-07-07Java實現(xiàn)利用圖片或視頻生成GIF并發(fā)送微信
這篇文章主要為大家詳細介紹了Java語言如何利用圖片或視頻實現(xiàn)生成GIF并發(fā)送微信的功能,文中的示例代碼講解詳細,感興趣的小伙伴可以嘗試一下2022-11-11java8中的List<String>轉(zhuǎn)List<Integer>的實例代碼
這篇文章主要介紹了java8中的List<String>轉(zhuǎn)List<Integer>,轉(zhuǎn)換list列表String到列表Intger,java8提供了stream很好的進行操作,本文通過示例代碼給大家介紹的非常詳細,需要的朋友可以參考下2023-07-07