如何基于sqlite實(shí)現(xiàn)kafka延時(shí)消息詳解
1、需求
延時(shí)消息(或者說定時(shí)消息)是業(yè)務(wù)系統(tǒng)里一個(gè)常見的功能點(diǎn)。常用業(yè)務(wù)場景如:
1) 訂單超時(shí)取消
2) 離線超過指定時(shí)間的用戶,召回通知
3) 手機(jī)消失多久后通知監(jiān)護(hù)人……
現(xiàn)流行的實(shí)現(xiàn)方案主要有:
1)數(shù)據(jù)庫定時(shí)輪詢,掃描到達(dá)到延時(shí)時(shí)間的記錄,業(yè)務(wù)處理,刪除該記錄
2)jdk 自帶延時(shí)隊(duì)列(DelayQueue),或優(yōu)化的時(shí)間輪算法
3)redis 有序集合
4)支持延時(shí)消息的分布式消息隊(duì)列
但以上方案,都存在各種缺陷:
1)定時(shí)輪詢間隔小,則對數(shù)據(jù)庫造成很大壓力,分布式微服務(wù)架構(gòu)不好適配。
2)jdk 自帶延時(shí)隊(duì)列,占用內(nèi)存高,服務(wù)重啟則丟失消息,分布式微服務(wù)架構(gòu)不好適配。
3)redis 有序集合比較合適,但內(nèi)存貴,分布式微服務(wù)架構(gòu)不好適配。
4)現(xiàn)在主流的 RocketMQ 不支持任意延時(shí)時(shí)間的延時(shí)消息,RabbitMQ或ActiveMQ 性能不夠好,發(fā)送配置麻煩,kafka不支持延時(shí)消息。
因此,我想實(shí)現(xiàn)一個(gè)適配分布式微服務(wù)架構(gòu)、高性能、方便業(yè)務(wù)系統(tǒng)使用的延時(shí)消息轉(zhuǎn)發(fā)中間件。
2、實(shí)現(xiàn)思路
要保證高性能,推薦使用 kafka 或者 RocketMQ 做分布式消息隊(duì)列。當(dāng)前是基于 sqlite 實(shí)現(xiàn) kafka 延時(shí)消息。
當(dāng)前實(shí)現(xiàn)思路是基于kafka的,實(shí)際適用于任意MQ產(chǎn)品。
2.1 整體實(shí)現(xiàn)思路
2.2 程序業(yè)務(wù)邏輯
1)業(yè)務(wù)系統(tǒng)先推送延時(shí)消息到統(tǒng)一延時(shí)消息隊(duì)列
2)定時(shí)讀取延時(shí)消息隊(duì)列的延時(shí)消息,保存于本地,提交偏移量
3)定時(shí)掃描本地到達(dá)延時(shí)期限的消息,轉(zhuǎn)發(fā)到實(shí)際業(yè)務(wù)消息隊(duì)列
4)刪除本地延時(shí)消息
2.3 實(shí)現(xiàn)細(xì)節(jié)
1)一個(gè)業(yè)務(wù)處理流程使用一個(gè)sqlite數(shù)據(jù)庫文件,可并發(fā)執(zhí)行提高性能。
2)使用雪花算法生成 id 。
3)沒有延時(shí)消息時(shí),線程休眠一定時(shí)間,減低kafka集群、和本地io壓力。
4)本地存儲(chǔ)使用 sqlite。
2.4 依賴框架
1)kafka-client
2)sqlite
3)slf4j+log4j2
4)jackson
3、性能測試
測試機(jī)器: i5-6500,16GB內(nèi)存,機(jī)械硬盤
延時(shí)消息大小: 1kb
并發(fā)處理數(shù):1
已本地簡單測試,性能表現(xiàn):
1) 1個(gè)并發(fā)處理數(shù)就可以達(dá)到1秒存儲(chǔ)、轉(zhuǎn)發(fā)、刪除 約15000條延時(shí)消息,2 個(gè)可以達(dá)到 30000條/s ……
2) 一次性處理1萬條記錄,是經(jīng)過多次對比試驗(yàn)得出的合適批次大小
也測試了其它兩個(gè)本地存儲(chǔ)方案的性能:
1)直接存讀 json 文件,讀寫性能太差(約1200條記錄/s,慢在頻繁創(chuàng)建、打開、關(guān)閉文件,隨機(jī)磁盤io);
2)RocksDB 存讀,寫入性能非常好(97000條記錄/s),但篩選到期延時(shí)消息性能太差了,在數(shù)據(jù)量大于100w時(shí),表現(xiàn)不如 sqlite,而且運(yùn)行時(shí)占用內(nèi)存、cpu 資源非常高。
4、部署
4.1 系統(tǒng)環(huán)境依賴
1)jdk 1.8
2)kafka 1.1.0
可以自行替換為符合實(shí)際kafka版本的jar包(不會(huì)有沖突的,jar包版本和kafka服務(wù)版本不一致可能會(huì)有異常[無法拉取消息、提交失敗等])。
可修改pom.xml內(nèi)的 kafka_version
<kafka_version>1.1.0</kafka_version>
重新打包即可。當(dāng)前程序可以獨(dú)立部署,對現(xiàn)有工程項(xiàng)目無侵入性。
4.2 安裝
1)在項(xiàng)目根目錄執(zhí)行 maven 打包后,會(huì)生成 dev_ops 文件
2)在 dev_ops 目錄下執(zhí)行 java -jar kafka_delay_sqlite-20220102.jar 即可啟動(dòng)程序
3)如需修改配置,可在dev_ops目錄內(nèi)創(chuàng)建kafka.properties文件,設(shè)置自定義配置
默認(rèn)配置如下:
# kafka 連接url [ip:port,ip:port……] kafka.url=127.0.0.1:9092 # 延時(shí)消息本地存儲(chǔ)路徑,建議使用絕對值 kafka.delay.store.path=/data/kafka_delay # 統(tǒng)一延時(shí)消息topic kafka.delay.topic=common_delay_msg # 消費(fèi)者組id kafka.delay.group.id=common_delay_app # 并發(fā)處理數(shù)。限制條件: workers 小于等于topic分區(qū)數(shù) kafka.delay.workers=2
4)業(yè)務(wù)方發(fā)送 kafka 消息到 topic (common_delay_msg)
消息體參數(shù)說明:
{ "topic": "實(shí)際業(yè)務(wù)topic", "messageKey": "消息的key,影響發(fā)送到那個(gè)分區(qū)", "message": "業(yè)務(wù)消息內(nèi)容", "delayTime": 1641470704 }
delayTime: 指定延時(shí)時(shí)限,秒級別時(shí)間戳
消息體案例:
{ "topic": "cancel_order", "messageKey": "123456", "message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}", "delayTime": 1641470704 }
4.3 程序遷移
復(fù)制 延時(shí)消息保存目錄 到新機(jī)器,重啟部署、啟動(dòng)程序即可。(該配置項(xiàng)所在目錄 kafka.delay.store.path=/data/kafka_delay)
4.4 排查日志
日志默認(rèn)輸出到 /logs/kafka_delay/ ,日志輸出方式為異步輸出。
system.log 記錄了系統(tǒng) info 級別以上的日志,info級別日志不是立刻輸出的,所以程序重啟時(shí),可能會(huì)丟失部分日志
exception.log 記錄了系統(tǒng) warn 級別以上的日志,日志配置為立即輸出,程序正常重啟,不會(huì)丟失日志,重點(diǎn)關(guān)注這個(gè)日志即可。
如需自定義日志配置,可以在 log4j2.xml 進(jìn)行配置。
如果要進(jìn)行本地調(diào)試,可以解開注釋,否則控制臺沒有日志輸出:
<Root level="info"> <!--非本地調(diào)試環(huán)境下,建議注釋掉 console_appender--> <!--<AppenderRef ref="console_appender"/>--> <AppenderRef ref="system_log_appender"/> <AppenderRef ref="system_error_log_appender"/> </Root>
5、注意事項(xiàng)
1) 由于設(shè)置了線程空閑時(shí)休眠機(jī)制,延時(shí)消息最大可能會(huì)推遲8秒鐘發(fā)送。
如果覺得延遲時(shí)間比較大,可以自行修改源碼的配置,重新打包即可。
KafkaUtils.subscribe()
MsgTransferTask.run()
2) 當(dāng)前程序嚴(yán)格依賴于系統(tǒng)時(shí)鐘,注意配置程序部署服務(wù)器的時(shí)鐘和業(yè)務(wù)服務(wù)器時(shí)鐘一致
3) 建議配置統(tǒng)一延時(shí)消息隊(duì)列(common_delay_msg)的分區(qū)數(shù)為 2 的倍數(shù)
4) 每個(gè) kafka.delay.workers 約需要 200 mb 內(nèi)存,默認(rèn)配置為2 , jvm 建議配置 1 GB 以上內(nèi)存,避免頻繁gc 。
workers 增大后,不要再減小,否則會(huì)導(dǎo)致部分 sqlite 數(shù)據(jù)庫沒有線程訪問,消息丟失。
并發(fā)處理數(shù)越大,延時(shí)消息處理效率越高,但需要注意不要大于topic的分區(qū)數(shù)。
需要自行測試多少個(gè)并發(fā)處理數(shù)就會(huì)達(dá)到磁盤io、網(wǎng)絡(luò)帶寬上限。
當(dāng)前程序主要瓶頸在于磁盤io和網(wǎng)絡(luò)帶寬,實(shí)際內(nèi)存和cpu資源占用極低。
5) 程序運(yùn)行時(shí),不要操作延時(shí)消息保存目錄即里面的文件
6) 當(dāng)前配置為正常情況下不會(huì)拋棄消息模式,但程序重啟時(shí),存在重復(fù)發(fā)送消息的可能,下游業(yè)務(wù)系統(tǒng)需要做好冪等性處理。
如果kafka集群異常,當(dāng)前配置為重新發(fā)送16次,如果仍不能恢復(fù)過來,則拋棄當(dāng)前消息,實(shí)際生產(chǎn)環(huán)境里,基本不可能出現(xiàn)該場景。
如果確定消息不能拋棄,需要自行修改源碼(MsgTransferTask.run,KafkaUtils.send(……)),重新打包、部署。
7) 程序出現(xiàn)未知異常(sqlite被手動(dòng)修改、磁盤滿了……),會(huì)直接結(jié)束程序運(yùn)行。
6、閑聊
整體思路,實(shí)現(xiàn),源碼里都比較清晰,如果 RocketMQ 也有自定義延時(shí)需求,參考著修改源碼即可,實(shí)現(xiàn)邏輯是一樣的。
如果要盡可能的實(shí)現(xiàn)延時(shí)消息的最終處理,可以再額外采用2個(gè)延遲消息處理方案:
1、每天掃描一次數(shù)據(jù)庫,把符合延時(shí)條件的記錄統(tǒng)一處理一次
2、惰性處理,當(dāng)用戶再次訪問某功能點(diǎn)時(shí),再修改相關(guān)符合延時(shí)條件的記錄
作者郵箱:1950249908@qq.com ,如有問題,歡迎騷擾。也歡迎大家加群談?wù)?,QQ群: 777804773
源碼路徑:
https://gitee.com/yushengruohui/delay_message
https://github.com/yushengruohui/delay_message
總結(jié)
到此這篇關(guān)于如何基于sqlite實(shí)現(xiàn)kafka延時(shí)消息的文章就介紹到這了,更多相關(guān)sqlite實(shí)現(xiàn)kafka延時(shí)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解使用SSM實(shí)現(xiàn)簡單工作流系統(tǒng)之實(shí)現(xiàn)篇
這篇文章主要介紹了使用SSM實(shí)現(xiàn)簡單工作流系統(tǒng)之實(shí)現(xiàn)篇,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12java使用TimeZone將中國標(biāo)準(zhǔn)時(shí)間轉(zhuǎn)成時(shí)區(qū)值
這篇文章主要介紹了java使用TimeZone將中國標(biāo)準(zhǔn)時(shí)間轉(zhuǎn)成時(shí)區(qū)值的相關(guān)資料,需要的朋友可以參考下2023-11-11Java遞歸基礎(chǔ)與遞歸的宏觀語意實(shí)例分析
這篇文章主要介紹了Java遞歸基礎(chǔ)與遞歸的宏觀語意,結(jié)合實(shí)例形式分析了java遞歸的相關(guān)原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-03-03idea快速搭建springboot項(xiàng)目的操作方法
下面小編就為大家分享一篇idea快速搭建springboot項(xiàng)目的操作方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2017-12-12java 中sendredirect()和forward()方法的區(qū)別
這篇文章主要介紹了java 中sendredirect()和forward()方法的區(qū)別,需要的朋友可以參考下2017-08-08淺析SpringBoot自動(dòng)裝配的實(shí)現(xiàn)
springboot開箱即用,其實(shí)實(shí)現(xiàn)了自動(dòng)裝配,本文重點(diǎn)給大家介紹SpringBoot是如何做到自動(dòng)裝配的,感興趣的朋友跟隨小編一起看看吧2022-02-02