欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何基于sqlite實(shí)現(xiàn)kafka延時(shí)消息詳解

 更新時(shí)間:2022年01月10日 09:25:45   作者:余生若暉  
這篇文章主要給大家介紹了關(guān)于如何基于sqlite實(shí)現(xiàn)kafka延時(shí)消息的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

1、需求

延時(shí)消息(或者說定時(shí)消息)是業(yè)務(wù)系統(tǒng)里一個(gè)常見的功能點(diǎn)。常用業(yè)務(wù)場景如:

1) 訂單超時(shí)取消

2) 離線超過指定時(shí)間的用戶,召回通知

3) 手機(jī)消失多久后通知監(jiān)護(hù)人……

現(xiàn)流行的實(shí)現(xiàn)方案主要有:

1)數(shù)據(jù)庫定時(shí)輪詢,掃描到達(dá)到延時(shí)時(shí)間的記錄,業(yè)務(wù)處理,刪除該記錄

2)jdk 自帶延時(shí)隊(duì)列(DelayQueue),或優(yōu)化的時(shí)間輪算法

3)redis 有序集合

4)支持延時(shí)消息的分布式消息隊(duì)列

但以上方案,都存在各種缺陷:

1)定時(shí)輪詢間隔小,則對數(shù)據(jù)庫造成很大壓力,分布式微服務(wù)架構(gòu)不好適配。

2)jdk 自帶延時(shí)隊(duì)列,占用內(nèi)存高,服務(wù)重啟則丟失消息,分布式微服務(wù)架構(gòu)不好適配。

3)redis 有序集合比較合適,但內(nèi)存貴,分布式微服務(wù)架構(gòu)不好適配。

4)現(xiàn)在主流的 RocketMQ 不支持任意延時(shí)時(shí)間的延時(shí)消息,RabbitMQ或ActiveMQ 性能不夠好,發(fā)送配置麻煩,kafka不支持延時(shí)消息。

因此,我想實(shí)現(xiàn)一個(gè)適配分布式微服務(wù)架構(gòu)、高性能、方便業(yè)務(wù)系統(tǒng)使用的延時(shí)消息轉(zhuǎn)發(fā)中間件。

2、實(shí)現(xiàn)思路

要保證高性能,推薦使用 kafka 或者 RocketMQ 做分布式消息隊(duì)列。當(dāng)前是基于 sqlite 實(shí)現(xiàn) kafka 延時(shí)消息。

當(dāng)前實(shí)現(xiàn)思路是基于kafka的,實(shí)際適用于任意MQ產(chǎn)品。

2.1 整體實(shí)現(xiàn)思路

2.2 程序業(yè)務(wù)邏輯

1)業(yè)務(wù)系統(tǒng)先推送延時(shí)消息到統(tǒng)一延時(shí)消息隊(duì)列

2)定時(shí)讀取延時(shí)消息隊(duì)列的延時(shí)消息,保存于本地,提交偏移量

3)定時(shí)掃描本地到達(dá)延時(shí)期限的消息,轉(zhuǎn)發(fā)到實(shí)際業(yè)務(wù)消息隊(duì)列

4)刪除本地延時(shí)消息

2.3 實(shí)現(xiàn)細(xì)節(jié)

1)一個(gè)業(yè)務(wù)處理流程使用一個(gè)sqlite數(shù)據(jù)庫文件,可并發(fā)執(zhí)行提高性能。

2)使用雪花算法生成 id 。

3)沒有延時(shí)消息時(shí),線程休眠一定時(shí)間,減低kafka集群、和本地io壓力。

4)本地存儲(chǔ)使用 sqlite。

2.4 依賴框架

1)kafka-client

2)sqlite

3)slf4j+log4j2

4)jackson

3、性能測試

測試機(jī)器: i5-6500,16GB內(nèi)存,機(jī)械硬盤

延時(shí)消息大小: 1kb

并發(fā)處理數(shù):1

已本地簡單測試,性能表現(xiàn):

1) 1個(gè)并發(fā)處理數(shù)就可以達(dá)到1秒存儲(chǔ)、轉(zhuǎn)發(fā)、刪除 約15000條延時(shí)消息,2 個(gè)可以達(dá)到 30000條/s ……

2) 一次性處理1萬條記錄,是經(jīng)過多次對比試驗(yàn)得出的合適批次大小

也測試了其它兩個(gè)本地存儲(chǔ)方案的性能:

1)直接存讀 json 文件,讀寫性能太差(約1200條記錄/s,慢在頻繁創(chuàng)建、打開、關(guān)閉文件,隨機(jī)磁盤io);

2)RocksDB 存讀,寫入性能非常好(97000條記錄/s),但篩選到期延時(shí)消息性能太差了,在數(shù)據(jù)量大于100w時(shí),表現(xiàn)不如 sqlite,而且運(yùn)行時(shí)占用內(nèi)存、cpu 資源非常高。

4、部署

4.1 系統(tǒng)環(huán)境依賴

1)jdk 1.8

2)kafka 1.1.0

可以自行替換為符合實(shí)際kafka版本的jar包(不會(huì)有沖突的,jar包版本和kafka服務(wù)版本不一致可能會(huì)有異常[無法拉取消息、提交失敗等])。

可修改pom.xml內(nèi)的 kafka_version

<kafka_version>1.1.0</kafka_version>

重新打包即可。當(dāng)前程序可以獨(dú)立部署,對現(xiàn)有工程項(xiàng)目無侵入性。

4.2 安裝

1)在項(xiàng)目根目錄執(zhí)行 maven 打包后,會(huì)生成 dev_ops 文件

2)在 dev_ops 目錄下執(zhí)行 java -jar kafka_delay_sqlite-20220102.jar 即可啟動(dòng)程序

3)如需修改配置,可在dev_ops目錄內(nèi)創(chuàng)建kafka.properties文件,設(shè)置自定義配置

默認(rèn)配置如下:

# kafka 連接url [ip:port,ip:port……]
kafka.url=127.0.0.1:9092
# 延時(shí)消息本地存儲(chǔ)路徑,建議使用絕對值
kafka.delay.store.path=/data/kafka_delay
# 統(tǒng)一延時(shí)消息topic
kafka.delay.topic=common_delay_msg
# 消費(fèi)者組id
kafka.delay.group.id=common_delay_app
# 并發(fā)處理數(shù)。限制條件: workers 小于等于topic分區(qū)數(shù)
kafka.delay.workers=2

4)業(yè)務(wù)方發(fā)送 kafka 消息到 topic (common_delay_msg)

消息體參數(shù)說明:

{
  "topic": "實(shí)際業(yè)務(wù)topic",
  "messageKey": "消息的key,影響發(fā)送到那個(gè)分區(qū)",
  "message": "業(yè)務(wù)消息內(nèi)容",
  "delayTime": 1641470704
}

delayTime: 指定延時(shí)時(shí)限,秒級別時(shí)間戳

消息體案例:

{
  "topic": "cancel_order",
  "messageKey": "123456",
  "message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}",
  "delayTime": 1641470704
}

4.3 程序遷移

復(fù)制 延時(shí)消息保存目錄 到新機(jī)器,重啟部署、啟動(dòng)程序即可。(該配置項(xiàng)所在目錄 kafka.delay.store.path=/data/kafka_delay)

4.4 排查日志

日志默認(rèn)輸出到 /logs/kafka_delay/ ,日志輸出方式為異步輸出。

system.log 記錄了系統(tǒng) info 級別以上的日志,info級別日志不是立刻輸出的,所以程序重啟時(shí),可能會(huì)丟失部分日志

exception.log 記錄了系統(tǒng) warn 級別以上的日志,日志配置為立即輸出,程序正常重啟,不會(huì)丟失日志,重點(diǎn)關(guān)注這個(gè)日志即可。

如需自定義日志配置,可以在 log4j2.xml 進(jìn)行配置。

如果要進(jìn)行本地調(diào)試,可以解開注釋,否則控制臺沒有日志輸出:

        <Root level="info">
            <!--非本地調(diào)試環(huán)境下,建議注釋掉 console_appender-->
            <!--<AppenderRef ref="console_appender"/>-->
            <AppenderRef ref="system_log_appender"/>
            <AppenderRef ref="system_error_log_appender"/>
        </Root>

5、注意事項(xiàng)

1) 由于設(shè)置了線程空閑時(shí)休眠機(jī)制,延時(shí)消息最大可能會(huì)推遲8秒鐘發(fā)送。

如果覺得延遲時(shí)間比較大,可以自行修改源碼的配置,重新打包即可。

KafkaUtils.subscribe()

MsgTransferTask.run()

2) 當(dāng)前程序嚴(yán)格依賴于系統(tǒng)時(shí)鐘,注意配置程序部署服務(wù)器的時(shí)鐘和業(yè)務(wù)服務(wù)器時(shí)鐘一致

3) 建議配置統(tǒng)一延時(shí)消息隊(duì)列(common_delay_msg)的分區(qū)數(shù)為 2 的倍數(shù)

4) 每個(gè) kafka.delay.workers 約需要 200 mb 內(nèi)存,默認(rèn)配置為2 , jvm 建議配置 1 GB 以上內(nèi)存,避免頻繁gc 。

workers 增大后,不要再減小,否則會(huì)導(dǎo)致部分 sqlite 數(shù)據(jù)庫沒有線程訪問,消息丟失。

并發(fā)處理數(shù)越大,延時(shí)消息處理效率越高,但需要注意不要大于topic的分區(qū)數(shù)。

需要自行測試多少個(gè)并發(fā)處理數(shù)就會(huì)達(dá)到磁盤io、網(wǎng)絡(luò)帶寬上限。

當(dāng)前程序主要瓶頸在于磁盤io和網(wǎng)絡(luò)帶寬,實(shí)際內(nèi)存和cpu資源占用極低。

5) 程序運(yùn)行時(shí),不要操作延時(shí)消息保存目錄即里面的文件

6) 當(dāng)前配置為正常情況下不會(huì)拋棄消息模式,但程序重啟時(shí),存在重復(fù)發(fā)送消息的可能,下游業(yè)務(wù)系統(tǒng)需要做好冪等性處理。

如果kafka集群異常,當(dāng)前配置為重新發(fā)送16次,如果仍不能恢復(fù)過來,則拋棄當(dāng)前消息,實(shí)際生產(chǎn)環(huán)境里,基本不可能出現(xiàn)該場景。

如果確定消息不能拋棄,需要自行修改源碼(MsgTransferTask.run,KafkaUtils.send(……)),重新打包、部署。

7) 程序出現(xiàn)未知異常(sqlite被手動(dòng)修改、磁盤滿了……),會(huì)直接結(jié)束程序運(yùn)行。

6、閑聊

整體思路,實(shí)現(xiàn),源碼里都比較清晰,如果 RocketMQ 也有自定義延時(shí)需求,參考著修改源碼即可,實(shí)現(xiàn)邏輯是一樣的。

如果要盡可能的實(shí)現(xiàn)延時(shí)消息的最終處理,可以再額外采用2個(gè)延遲消息處理方案:

1、每天掃描一次數(shù)據(jù)庫,把符合延時(shí)條件的記錄統(tǒng)一處理一次

2、惰性處理,當(dāng)用戶再次訪問某功能點(diǎn)時(shí),再修改相關(guān)符合延時(shí)條件的記錄

作者郵箱:1950249908@qq.com ,如有問題,歡迎騷擾。也歡迎大家加群談?wù)?,QQ群: 777804773

源碼路徑:

https://gitee.com/yushengruohui/delay_message

https://github.com/yushengruohui/delay_message

總結(jié)

到此這篇關(guān)于如何基于sqlite實(shí)現(xiàn)kafka延時(shí)消息的文章就介紹到這了,更多相關(guān)sqlite實(shí)現(xiàn)kafka延時(shí)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論