Spring?Boot?MQTT?Too?many?publishes?in?progress錯(cuò)誤的解決方案
前言
最近項(xiàng)目中需要與andorid端進(jìn)行交互,采用了MQTT消息進(jìn)行通信,生產(chǎn)環(huán)境中偶爾會(huì)出現(xiàn)Too many publishesin progress(32202)的錯(cuò)誤,嚴(yán)重的影響了正常功能的使用。
項(xiàng)目中采用的是Spring Boot2.0集成的MQTT引入的版本為1.2.0,消息發(fā)送用的是MessagingGateway的方式實(shí)現(xiàn),不熟悉的朋友可以查看這篇文章Spring boot 集成 MQTT詳情
原因分析
出現(xiàn)此問(wèn)題的原因跟MQTT的Qos的設(shè)置有關(guān),所以需要簡(jiǎn)單的介紹下Qos相關(guān)值的含義
0 最多一次的傳輸
發(fā)布者發(fā)送消息到服務(wù)器,沒(méi)有確認(rèn)消息,也不知道對(duì)方是否收到。
1 至少一次的傳輸
發(fā)布者發(fā)布消息保存消息,服務(wù)器(broker)接收到消息,服務(wù)器(broker)發(fā)送消息到訂閱者,服務(wù)器(broker)回一個(gè)PUBACK信息到發(fā)布者讓刪除消息,然后訂閱者接收消息后PUBACK給服務(wù)器讓刪除消息。如果失敗了,在一段時(shí)間確認(rèn)信息沒(méi)有收到,發(fā)送方都會(huì)將消息頭的DUP設(shè)置為1,然后再次發(fā)送消息,消息最少一次到達(dá)服務(wù)。例如網(wǎng)絡(luò)延遲等問(wèn)題,發(fā)布者重復(fù)發(fā)送消息,訂閱者多次訂閱會(huì)產(chǎn)生重復(fù)消息.
2 只有一次的傳輸
Qos為2只是在1的基礎(chǔ)上做了改進(jìn),在發(fā)布者發(fā)送到服務(wù)器之后多了消息的確認(rèn)以及多了消息msgID的緩存,重復(fù)信息的去重。在服務(wù)器發(fā)送到訂閱者之后也多了消息的確認(rèn)。
項(xiàng)目中使用了MQTT發(fā)送消息的地方比較多,且一般都是以Qos為0,那么為什么Qos為0,在并發(fā)量比較大的情況下就會(huì)出現(xiàn)Too many publishesin progress(32202)的錯(cuò)誤,報(bào)錯(cuò)的內(nèi)容的源碼如下:
當(dāng)actualInFlight超出設(shè)置的maxInflight最大值時(shí)就會(huì)報(bào)此錯(cuò)誤,那么具體是什么原因?qū)е碌哪兀课覀冃枰ㄟ^(guò)源碼來(lái)分析此問(wèn)題的原因。
源碼分析
關(guān)于源碼的閱讀我們需要整理主線(xiàn)思路,MQTT發(fā)送消息主線(xiàn)分為消息push到緩存中和異步發(fā)送兩部分。
MQTT的Push消息到緩存中時(shí)序圖
MqttPahoMessageHandler的publish方法
說(shuō)明: checkConection檢查連接后,在發(fā)送消息。
MqttAsyncClient的publish方法
ClientComms的internalSend方法
ClientState的send方法
MqttPublish消息類(lèi)型,繼承了父類(lèi)MqttWireMessage,而在MqttWireMessage的構(gòu)造方法中將消息id設(shè)置為0
SaveToken的源碼實(shí)現(xiàn)如下:
通過(guò)前面這幾步的操作,消息已經(jīng)放入到HashTable緩存中,準(zhǔn)備異步發(fā)送。
異步發(fā)送消息時(shí)序圖
說(shuō)明:MqttAsyncClient的connect為客戶(hù)端建立連接,興趣的可以看下源碼。
ClientComms的conncect方法
ConnectBG的run方法
CommsSender的run方法
1.從clientState中獲取消息 2.通過(guò)消息id去hashtable中獲取緩存消息 3.消息不為空,執(zhí)行消息發(fā)送 4.調(diào)用notifySent方法刪除消息,且actualInFlight執(zhí)行遞減操作。
CommsSender的notifySent方法
小結(jié)
在高并發(fā)的場(chǎng)景下,pendingMessage可能會(huì)添加多條數(shù)據(jù),Qos設(shè)置為0的時(shí)候,存入tokens(Hashtable)中的key一直是0,當(dāng)執(zhí)行tokenStore.getToken在發(fā)送方法之后會(huì)remove所有數(shù)據(jù),由于tokenStore中已經(jīng)不存在值,因?yàn)橐呀?jīng)被上一次已經(jīng)全部remove了,當(dāng)再次getToken的消息時(shí)獲取會(huì)為空,不在發(fā)送信息,使得actualInFlight沒(méi)有遞減,所以才經(jīng)過(guò)一段時(shí)間后actualInFlight就會(huì)超出設(shè)置最大值,從而報(bào)錯(cuò)。
//存放待發(fā)送消息的Vector數(shù)組 volatile private Vector pendingMessages;
解決方案
方案1:發(fā)送消息時(shí)設(shè)置為Qos=1
此方案雖然可以解決此問(wèn)題,但存在如下的缺點(diǎn):
- 網(wǎng)絡(luò)延遲時(shí)會(huì)發(fā)送重復(fù)消息問(wèn)題,導(dǎo)致消費(fèi)者重復(fù)消費(fèi),關(guān)于重復(fù)的消息解決需要進(jìn)行相關(guān)的冪等性操作,增加了修改的復(fù)雜度和成本。
- 發(fā)送消息需要進(jìn)行消息確認(rèn),網(wǎng)絡(luò)資源消耗過(guò)大。
方案2: 修改maxInflight的默認(rèn)值,例如將其修改為50
``` MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setMaxInflight(50); ```
此方案雖然修改比較簡(jiǎn)單,但是并沒(méi)有從根本上解決問(wèn)題,只是緩解了出現(xiàn)錯(cuò)誤的時(shí)間,如果項(xiàng)目中并發(fā)量比較低,可以采用此方案解決。
方案3:將消息配置為多客戶(hù)端模式
由于mqttMessageHandler只會(huì)引用一個(gè)paho客戶(hù)端,所以才會(huì)想到增加客戶(hù)端模式來(lái)提高并發(fā)量.需要重寫(xiě)MqttPahoMessageHandler類(lèi)的相關(guān)方法。雖然可以解決此問(wèn)題,如果對(duì)MQTT的源碼不是很了解,不建議采用此方案,不利于后續(xù)的版本升級(jí)。
方案4:升級(jí)mqtt版本為1.2.1
在1.2.1的版本中官方已經(jīng)進(jìn)行了相關(guān)的修改,當(dāng)qos=0已經(jīng)不存入tokenStore了,每次發(fā)送完之后就會(huì)刪除掉token以及釋放id,所以就不會(huì)出現(xiàn)Too many publishes in progress的問(wèn)題。
引入1.2.1的版本會(huì)帶來(lái)https驗(yàn)證問(wèn)題,因?yàn)樵贛qtt的1.2.1版本中,增加了https的驗(yàn)證需要添加相關(guān)配置,否則啟動(dòng)時(shí)會(huì)報(bào)安全認(rèn)證錯(cuò)誤。
解決方案:如果項(xiàng)目中沒(méi)有開(kāi)啟https認(rèn)證,需要設(shè)置HttpsHostnameVerificationEnabled為false即可。
mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
總結(jié)
本文通過(guò)定位MQTT錯(cuò)誤,詳細(xì)的講解了MQtt消息的發(fā)送流程,解決的方案雖然有多種,我們需要結(jié)合實(shí)際的業(yè)務(wù)情況來(lái)解決問(wèn)題,關(guān)于MQtt如果還有其他疑問(wèn),可以隨時(shí)反饋,大家共同學(xué)習(xí),共同進(jìn)步。
到此這篇關(guān)于Spring Boot MQTT Too many publishes in progress錯(cuò)誤的解決方案的文章就介紹到這了,更多相關(guān)Spring Boot內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot集成mqtt的實(shí)踐開(kāi)發(fā)
- SpringBoot+MQTT+apollo實(shí)現(xiàn)訂閱發(fā)布功能的示例
- springboot 實(shí)現(xiàn)mqtt物聯(lián)網(wǎng)的示例代碼
- SpringBoot2.0集成MQTT消息推送功能實(shí)現(xiàn)
- SpringBoot集成mqtt的多模塊項(xiàng)目配置詳解
- 如何在Spring Boot中使用MQTT
- SpringBoot整合MQTT并實(shí)現(xiàn)異步線(xiàn)程調(diào)用的問(wèn)題
- springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- Springboot整合mqtt服務(wù)的示例代碼
- Spring?boot?集成?MQTT詳情
相關(guān)文章
淺析java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別
以下是對(duì)java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別進(jìn)行了詳細(xì)的解析。需要的朋友可以過(guò)來(lái)參考下2013-08-08Spring Boot集成教程之異步調(diào)用Async
在項(xiàng)目中,當(dāng)訪(fǎng)問(wèn)其他人的接口較慢或者做耗時(shí)任務(wù)時(shí),不想程序一直卡在耗時(shí)任務(wù)上,想程序能夠并行執(zhí)行,我們可以使用多線(xiàn)程來(lái)并行的處理任務(wù),也可以使用spring提供的異步處理方式@Async。需要的朋友們下面來(lái)一起看看吧。2018-03-03Java數(shù)據(jù)結(jié)構(gòu)之棧與綜合計(jì)算器的實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)中棧與綜合計(jì)算器的實(shí)現(xiàn),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解一下2022-10-10三種簡(jiǎn)單排序算法(使用java實(shí)現(xiàn))
下面小編就為大家?guī)?lái)一篇三種簡(jiǎn)單排序算法(使用java實(shí)現(xiàn))。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-07-07Java實(shí)現(xiàn)的計(jì)時(shí)器【秒表】功能示例
這篇文章主要介紹了Java實(shí)現(xiàn)的計(jì)時(shí)器【秒表】功能,結(jié)合實(shí)例形式分析了Java結(jié)合JFrame框架的計(jì)時(shí)器功能相關(guān)操作技巧,需要的朋友可以參考下2019-02-02Java多線(xiàn)程高并發(fā)中的Fork/Join框架機(jī)制詳解
本文主要介紹了 Java 多線(xiàn)程高并發(fā)中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設(shè)計(jì)方式和部分實(shí)現(xiàn)源碼,感興趣的朋友跟隨小編一起看看吧2021-11-11intellij IDEA配置springboot的圖文教程
Spring Boot是由Pivotal團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來(lái)簡(jiǎn)化新Spring應(yīng)用的初始搭建以及開(kāi)發(fā)過(guò)程。接下來(lái)通過(guò)本文給大家介紹intellij IDEA配置springboot的圖文教程,感興趣的朋友一起看看吧2018-03-03