Spring?Boot?MQTT?Too?many?publishes?in?progress錯誤的解決方案
前言
最近項目中需要與andorid端進行交互,采用了MQTT消息進行通信,生產(chǎn)環(huán)境中偶爾會出現(xiàn)Too many publishesin progress(32202)的錯誤,嚴重的影響了正常功能的使用。
項目中采用的是Spring Boot2.0集成的MQTT引入的版本為1.2.0,消息發(fā)送用的是MessagingGateway的方式實現(xiàn),不熟悉的朋友可以查看這篇文章Spring boot 集成 MQTT詳情
原因分析
出現(xiàn)此問題的原因跟MQTT的Qos的設(shè)置有關(guān),所以需要簡單的介紹下Qos相關(guān)值的含義
0 最多一次的傳輸
發(fā)布者發(fā)送消息到服務器,沒有確認消息,也不知道對方是否收到。
1 至少一次的傳輸
發(fā)布者發(fā)布消息保存消息,服務器(broker)接收到消息,服務器(broker)發(fā)送消息到訂閱者,服務器(broker)回一個PUBACK信息到發(fā)布者讓刪除消息,然后訂閱者接收消息后PUBACK給服務器讓刪除消息。如果失敗了,在一段時間確認信息沒有收到,發(fā)送方都會將消息頭的DUP設(shè)置為1,然后再次發(fā)送消息,消息最少一次到達服務。例如網(wǎng)絡延遲等問題,發(fā)布者重復發(fā)送消息,訂閱者多次訂閱會產(chǎn)生重復消息.
2 只有一次的傳輸
Qos為2只是在1的基礎(chǔ)上做了改進,在發(fā)布者發(fā)送到服務器之后多了消息的確認以及多了消息msgID的緩存,重復信息的去重。在服務器發(fā)送到訂閱者之后也多了消息的確認。
項目中使用了MQTT發(fā)送消息的地方比較多,且一般都是以Qos為0,那么為什么Qos為0,在并發(fā)量比較大的情況下就會出現(xiàn)Too many publishesin progress(32202)的錯誤,報錯的內(nèi)容的源碼如下:
當actualInFlight超出設(shè)置的maxInflight最大值時就會報此錯誤,那么具體是什么原因?qū)е碌哪兀课覀冃枰ㄟ^源碼來分析此問題的原因。
源碼分析
關(guān)于源碼的閱讀我們需要整理主線思路,MQTT發(fā)送消息主線分為消息push到緩存中和異步發(fā)送兩部分。
MQTT的Push消息到緩存中時序圖
MqttPahoMessageHandler的publish方法
說明: checkConection檢查連接后,在發(fā)送消息。
MqttAsyncClient的publish方法
ClientComms的internalSend方法
ClientState的send方法
MqttPublish消息類型,繼承了父類MqttWireMessage,而在MqttWireMessage的構(gòu)造方法中將消息id設(shè)置為0
SaveToken的源碼實現(xiàn)如下:
通過前面這幾步的操作,消息已經(jīng)放入到HashTable緩存中,準備異步發(fā)送。
異步發(fā)送消息時序圖
說明:MqttAsyncClient的connect為客戶端建立連接,興趣的可以看下源碼。
ClientComms的conncect方法
ConnectBG的run方法
CommsSender的run方法
1.從clientState中獲取消息 2.通過消息id去hashtable中獲取緩存消息 3.消息不為空,執(zhí)行消息發(fā)送 4.調(diào)用notifySent方法刪除消息,且actualInFlight執(zhí)行遞減操作。
CommsSender的notifySent方法
小結(jié)
在高并發(fā)的場景下,pendingMessage可能會添加多條數(shù)據(jù),Qos設(shè)置為0的時候,存入tokens(Hashtable)中的key一直是0,當執(zhí)行tokenStore.getToken在發(fā)送方法之后會remove所有數(shù)據(jù),由于tokenStore中已經(jīng)不存在值,因為已經(jīng)被上一次已經(jīng)全部remove了,當再次getToken的消息時獲取會為空,不在發(fā)送信息,使得actualInFlight沒有遞減,所以才經(jīng)過一段時間后actualInFlight就會超出設(shè)置最大值,從而報錯。
//存放待發(fā)送消息的Vector數(shù)組 volatile private Vector pendingMessages;
解決方案
方案1:發(fā)送消息時設(shè)置為Qos=1
此方案雖然可以解決此問題,但存在如下的缺點:
- 網(wǎng)絡延遲時會發(fā)送重復消息問題,導致消費者重復消費,關(guān)于重復的消息解決需要進行相關(guān)的冪等性操作,增加了修改的復雜度和成本。
- 發(fā)送消息需要進行消息確認,網(wǎng)絡資源消耗過大。
方案2: 修改maxInflight的默認值,例如將其修改為50
``` MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setMaxInflight(50); ```
此方案雖然修改比較簡單,但是并沒有從根本上解決問題,只是緩解了出現(xiàn)錯誤的時間,如果項目中并發(fā)量比較低,可以采用此方案解決。
方案3:將消息配置為多客戶端模式
由于mqttMessageHandler只會引用一個paho客戶端,所以才會想到增加客戶端模式來提高并發(fā)量.需要重寫MqttPahoMessageHandler類的相關(guān)方法。雖然可以解決此問題,如果對MQTT的源碼不是很了解,不建議采用此方案,不利于后續(xù)的版本升級。
方案4:升級mqtt版本為1.2.1
在1.2.1的版本中官方已經(jīng)進行了相關(guān)的修改,當qos=0已經(jīng)不存入tokenStore了,每次發(fā)送完之后就會刪除掉token以及釋放id,所以就不會出現(xiàn)Too many publishes in progress的問題。
引入1.2.1的版本會帶來https驗證問題,因為在Mqtt的1.2.1版本中,增加了https的驗證需要添加相關(guān)配置,否則啟動時會報安全認證錯誤。
解決方案:如果項目中沒有開啟https認證,需要設(shè)置HttpsHostnameVerificationEnabled為false即可。
mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
總結(jié)
本文通過定位MQTT錯誤,詳細的講解了MQtt消息的發(fā)送流程,解決的方案雖然有多種,我們需要結(jié)合實際的業(yè)務情況來解決問題,關(guān)于MQtt如果還有其他疑問,可以隨時反饋,大家共同學習,共同進步。
到此這篇關(guān)于Spring Boot MQTT Too many publishes in progress錯誤的解決方案的文章就介紹到這了,更多相關(guān)Spring Boot內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot集成mqtt的實踐開發(fā)
- SpringBoot+MQTT+apollo實現(xiàn)訂閱發(fā)布功能的示例
- springboot 實現(xiàn)mqtt物聯(lián)網(wǎng)的示例代碼
- SpringBoot2.0集成MQTT消息推送功能實現(xiàn)
- SpringBoot集成mqtt的多模塊項目配置詳解
- 如何在Spring Boot中使用MQTT
- SpringBoot整合MQTT并實現(xiàn)異步線程調(diào)用的問題
- springboot整合netty-mqtt-client實現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- Springboot整合mqtt服務的示例代碼
- Spring?boot?集成?MQTT詳情
相關(guān)文章
淺析java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別
以下是對java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別進行了詳細的解析。需要的朋友可以過來參考下2013-08-08Spring Boot集成教程之異步調(diào)用Async
在項目中,當訪問其他人的接口較慢或者做耗時任務時,不想程序一直卡在耗時任務上,想程序能夠并行執(zhí)行,我們可以使用多線程來并行的處理任務,也可以使用spring提供的異步處理方式@Async。需要的朋友們下面來一起看看吧。2018-03-03Java數(shù)據(jù)結(jié)構(gòu)之棧與綜合計算器的實現(xiàn)
這篇文章主要為大家詳細介紹了Java數(shù)據(jù)結(jié)構(gòu)中棧與綜合計算器的實現(xiàn),文中的示例代碼講解詳細,具有一定的學習價值,感興趣的小伙伴可以了解一下2022-10-10Java多線程高并發(fā)中的Fork/Join框架機制詳解
本文主要介紹了 Java 多線程高并發(fā)中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設(shè)計方式和部分實現(xiàn)源碼,感興趣的朋友跟隨小編一起看看吧2021-11-11intellij IDEA配置springboot的圖文教程
Spring Boot是由Pivotal團隊提供的全新框架,其設(shè)計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。接下來通過本文給大家介紹intellij IDEA配置springboot的圖文教程,感興趣的朋友一起看看吧2018-03-03