Spring?Kafka中如何通過參數(shù)配置解決超時(shí)問題詳解
背景
這是我們團(tuán)隊(duì)負(fù)責(zé)的一個(gè)不太核心的服務(wù)。之前與外部交互時(shí)應(yīng)外部要求由普通kafka集群改成加密kafka集群。我們是數(shù)據(jù)生產(chǎn)端。
改的過程中并跑上線,60%的請(qǐng)求耗時(shí)增加了2倍,也還是在百毫秒的量級(jí)可以接受。但是每次重啟的第一個(gè)請(qǐng)求要5s以上,會(huì)超過;運(yùn)行過程中,一兩個(gè)月也會(huì)有一次超時(shí)。因?yàn)槲覀冇腥沃卦嚕w沒有影響成功率。
上線的時(shí)候我們問過網(wǎng)絡(luò)組,還專門請(qǐng)教過公司專業(yè)負(fù)責(zé)kafka的團(tuán)隊(duì)。結(jié)論是:第一,這個(gè)慢是外部交互方的問題,不是咱們這邊可以處理的;第二,參數(shù)上也沒有什么可以調(diào)優(yōu)的。
我們團(tuán)隊(duì)內(nèi)部還是不信邪,調(diào)了幾個(gè)參數(shù),加測(cè)之后上線了。頻繁度降到了現(xiàn)在的一兩個(gè)月一次超時(shí),但是沒有根治。因?yàn)楸旧磉@個(gè)服務(wù)不是特別核心,本身外部是允許有一定失敗率的,而且現(xiàn)在實(shí)際上也沒有失敗,幾年內(nèi)業(yè)務(wù)量也是很平穩(wěn)的:1分鐘4筆。
而我上班時(shí)間的狀態(tài)基本上是我站在兩個(gè)人中間,我目的是想問一個(gè)人問題,結(jié)果卻先要回答另外一個(gè)人的問題,這時(shí)候還會(huì)出現(xiàn)第四個(gè)人說別的事。這個(gè)優(yōu)先級(jí)排不上。但是心疼開發(fā)小哥哥,每一兩個(gè)月就要處理一下因?yàn)檫@件事引起的告警。雖然實(shí)際不影響,告警出來了,我們就要排查核對(duì)是否還是這個(gè)問題,并且確實(shí)通過重試將消息推送出去了。
所以本次利用周末,希望可以根治這個(gè)疑難雜癥,減少運(yùn)維成本。
思路
前期已經(jīng)明確了這個(gè)外部的加密集群建立連接和數(shù)據(jù)傳輸速度都慢于之前的普通集群。之所以第一次慢和每一兩個(gè)月會(huì)慢一次都是連接斷開重連造成的。之前我們進(jìn)行過參數(shù)調(diào)優(yōu),調(diào)優(yōu)做的就是因?yàn)?分鐘4筆請(qǐng)求,線上以最小部署單元3臺(tái)機(jī)器部署,每臺(tái)機(jī)器1分鐘預(yù)計(jì)處理一筆請(qǐng)求。根據(jù)這個(gè)數(shù)據(jù)調(diào)整了空閑自動(dòng)斷開連接的時(shí)間間隔,保證連接不會(huì)因?yàn)榭臻e自動(dòng)斷開。線上驗(yàn)證有效,也側(cè)面證實(shí)了是連接過程慢引起的超時(shí)。
因?yàn)榻⑦B接過程慢,這個(gè)主要是外部提供的集群就是如此。既然目前并不影響實(shí)際發(fā)送成功率。人家代表的是大佬,我們也不好太強(qiáng)硬的去推他們解決。所以我的思路有兩個(gè):
第一,探索將建立連接與發(fā)送數(shù)據(jù)分離的可行性:程序啟動(dòng)后先將連接建立好再提供服務(wù)。如果生產(chǎn)端是這樣實(shí)現(xiàn)的。那也許還可以進(jìn)行連接自動(dòng)探測(cè),如果連接斷開則自動(dòng)重連,不要等發(fā)送數(shù)據(jù)時(shí)再發(fā)現(xiàn)連接已斷開。
第二,其實(shí)第一種思路的可行性渺茫,只是需要驗(yàn)證一下自己的想法。一般的這種消息中間件,消費(fèi)端是這樣實(shí)現(xiàn)的。但是生產(chǎn)端采用了更簡(jiǎn)單的方式:讀寫數(shù)據(jù)的時(shí)候再探測(cè)連接是否可用,不可用則重新建立連接。這種用在發(fā)送本來就是異步的,對(duì)發(fā)送延遲本身敏感度也不高的場(chǎng)景。生產(chǎn)端本來就是這種場(chǎng)景,并且通過測(cè)試實(shí)際上也確實(shí)是在發(fā)送時(shí)建立的第一次連接。kafka生產(chǎn)端原本就是這種設(shè)計(jì)的可能性極大。如果是這種情況,那就在生產(chǎn)端真正使用異步,給調(diào)用方返回“受理成功”,保證調(diào)用方不超時(shí)。自己再通過接受回調(diào)保證實(shí)際的成功。
這個(gè)事情真要做,還有兩個(gè)隱形需求:
1、因?yàn)橥獠坑行枨?,?shù)據(jù)可以偶爾少發(fā),但是不能重復(fù)發(fā)送。所以不能使用業(yè)務(wù)級(jí)別的數(shù)據(jù)發(fā)送來實(shí)現(xiàn)探測(cè)功能。重試也要保證上條確實(shí)沒有收到。
2、改造不能太大,研發(fā)成本要小。
過程
因?yàn)槲以诰W(wǎng)上搜到的這方面都是入門級(jí),沒有什么解決這個(gè)問題的相關(guān)資料。所以采用的主要方法是讀源碼和官方文檔。當(dāng)然,本文的方法是有前提知識(shí)儲(chǔ)備基礎(chǔ)的。就是《白話TCP/IP原理》系列的相關(guān)內(nèi)容:https://mp.weixin.qq.com/s/Y2k3AW2ZjWbB1w63gsSRag
步驟一,查詢版本特性
我們目前用到的kafka客戶端版本是
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.8.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency>
spring-kafka對(duì)應(yīng)的官網(wǎng)的大版本是2.5,所以先點(diǎn)開了2.5.17.RELEASE對(duì)應(yīng)的參考文檔??吹揭痪溆杏眯畔ⅲ?/p>
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed.
默認(rèn)消費(fèi)者和生產(chǎn)者工廠現(xiàn)在已經(jīng)可以在生產(chǎn)者和消費(fèi)者創(chuàng)建和關(guān)閉時(shí)引發(fā)一個(gè)回調(diào)。耗時(shí)的連接建立過程是可以監(jiān)聽的,我們可以通過打日志進(jìn)行監(jiān)控。
步驟二,查源碼
首先我們看一下類圖,看不清楚沒有關(guān)系??催@里就好:
首先發(fā)現(xiàn)Producer、Consumer和Sender都是通過KafkaClient(接口),也就是NetworkClient(實(shí)現(xiàn)類)進(jìn)行網(wǎng)絡(luò)活動(dòng)的。其次發(fā)現(xiàn)NetworkClient是在傳輸層和應(yīng)用層之間起了一個(gè)緩沖的作用,解耦了各個(gè)部件。
Producer、Consumer和AdminClient主要管理requests;NetworkClient主要管理connection;Selector主要管理sockets channel。這些被管理對(duì)象我在之前的網(wǎng)絡(luò)系列里都講過。
如果不看代碼,我站在設(shè)計(jì)者角度結(jié)合類圖猜想:生產(chǎn)端實(shí)際使用的是KafkaTemplate的send方法,具體的參數(shù)都是由DefaultKafkaProducerFactory接收。實(shí)際上連接的建立是Producer類進(jìn)行。而在Producer類依賴于NetworkClient。而實(shí)際上進(jìn)行連接應(yīng)該在Sender類。Sender是一個(gè)Runnable異步線程來做,那實(shí)際建立連接的是run方法中。
我跟蹤源碼驗(yàn)證了猜想。NetworkClient里有個(gè)initiateConnect的私有方法,是建立連接用的,跟蹤它就可以知道調(diào)用的地方。跟蹤下來,主要入口在NetworkClient的poll方法,注釋如下:
/** * Do actual reads and writes to sockets. * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * @param now The current time in milliseconds * @return The list of responses received */ @Override public List<ClientResponse> poll(long timeout, long now) {
人家明確說了是讀寫時(shí)才會(huì)調(diào)用。證實(shí)了思路一不可行。
步驟三,查自身的代碼
按照思路二,進(jìn)行異步化。本身生產(chǎn)端就應(yīng)該是異步的,為什么異步?jīng)]有生效呢?結(jié)合KafkaTemplate的send方法源代碼和項(xiàng)目中自己寫的代碼。異步部分大體是這樣:
SettableListenableFuture future = new SettableListenableFuture(); future.set("OK"); future.get(); future.addCallback((sendResult) -> { try { System.out.println("成功"); } catch (Exception e) { } }, r -> { System.out.println("失敗"); }); System.out.println("============end==============");
就是說KafkaTemplate的異步是靠使用SettableListenableFuture實(shí)現(xiàn)的,實(shí)際上它的set方法會(huì)馬上觸發(fā)callback,是同步的。代碼是先同步調(diào)用set,并且還手動(dòng)調(diào)用了get(這個(gè)方法會(huì)等待直到返回結(jié)果)。所以整體是同步的?;蛘咧苯舆@么看,future實(shí)現(xiàn)異步要有一個(gè)Callable或者Runnable的線程方法,人家SettableListenableFuture第一行源碼就禁用了Callable。這個(gè)我看了2.5.17.RELEASE這個(gè)更高版本的spring-kafka,實(shí)現(xiàn)沒有做更改。
也就是說spring-kafka自身起碼在2.5.X版本里異步?jīng)]有起到作用。
問題清楚了修改也很簡(jiǎn)單,比如可以加個(gè)異步注解將整個(gè)發(fā)送方法做異步,重試等邏輯也放到這個(gè)方法中。給調(diào)用方只返回受理成功。具體怎么解決交給開發(fā)小哥哥。
總結(jié)
幸虧我上周已經(jīng)提前規(guī)劃好周一要休假。否則現(xiàn)在都2點(diǎn)半了明天上班也沒精神。主要時(shí)間花在異步不生效的問題上。其實(shí)排查異步不生效的思路是很簡(jiǎn)單清晰的。耗時(shí)長(zhǎng)是因?yàn)椋旱谝?,不敢相信spring官方實(shí)現(xiàn)的,竟然使用異步的代碼實(shí)際效果沒有異步;第二,關(guān)于異步我在網(wǎng)上搜索了一下,都是按照項(xiàng)目中配置的那樣。官方這樣說,大家這樣說,我總得考慮是不是自己搞錯(cuò)了。
所以我反復(fù)的驗(yàn)證、反復(fù)的debug之后也不敢下結(jié)論。仔細(xì)研究了源碼仍然不敢下結(jié)論。直到終于搜索到一篇文章說要實(shí)現(xiàn)異步除了要使用addCallback之外還要加異步標(biāo)簽。人間清醒的我,馬上意識(shí)到文章實(shí)際用了兩種不同方法實(shí)現(xiàn)異步。作者之所以認(rèn)為這是一個(gè)方法的兩個(gè)部分大概也是發(fā)現(xiàn)其實(shí)spring-kafka的異步?jīng)]好使吧。
到此這篇關(guān)于Spring Kafka中如何通過參數(shù)配置解決超時(shí)問題的文章就介紹到這了,更多相關(guān)Spring Kafka參數(shù)配置解決超時(shí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
命令行中 javac、java、javap 的使用小結(jié)
使用 java 命令運(yùn)行一個(gè).class文件,需要使用該類的全限定類名,同時(shí)需要在當(dāng)前路徑下有該類的包層次文件夾,這篇文章主要介紹了命令行中 javac、java、javap 的使用小結(jié),需要的朋友可以參考下2023-07-07springboot如何配置上傳文件的maxRequestSize
這篇文章主要介紹了springboot如何配置上傳文件的maxRequestSize,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03SpringBoot?Schedule調(diào)度任務(wù)的動(dòng)態(tài)管理
Scheduled定時(shí)任務(wù)是Spring?boot自身提供的功能,所以不需要引入Maven依賴包,下面這篇文章主要給大家介紹了關(guān)于SpringBoot通過@Scheduled實(shí)現(xiàn)定時(shí)任務(wù)以及問題解決的相關(guān)資料,需要的朋友可以參考下2023-02-02一文帶你學(xué)會(huì)Spring?JDBC的使用
JDBC?就是?數(shù)據(jù)庫開發(fā)?操作的?代名詞,因?yàn)橹灰乾F(xiàn)代商業(yè)項(xiàng)目的開發(fā)那么一定是離不開?數(shù)據(jù)庫?的,不管你搞的是什么,只要是想使用動(dòng)態(tài)的開發(fā)結(jié)構(gòu),那么一定就是?JDBC?,那么下面來教教大家傳統(tǒng)JDBC的使用2022-09-09mybatis動(dòng)態(tài)插入list傳入List參數(shù)的實(shí)例代碼
本文通過實(shí)例代碼給大家介紹了mybatis動(dòng)態(tài)插入list,Mybatis 傳入List參數(shù)的方法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下吧2018-04-04springboot配置文件屬性變量引用方式${}和@@用法及區(qū)別說明
這篇文章主要介紹了springboot配置文件屬性變量引用方式${}和@@用法及區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03阿里Sentinel支持Spring Cloud Gateway的實(shí)現(xiàn)
這篇文章主要介紹了阿里Sentinel支持Spring Cloud Gateway的實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-04-04