SpringBoot整合MQTT小結(jié)匯總
前言:
這幾天在準備面試的過程中做的一個小demo,主要是用通過SpringBoot實現(xiàn)一個與MQTT服務交互通信,也是看著別人的項目改的,這兩個技術(shù)之前都沒有接觸過,希望記錄一下可以分享給大家,也好久沒更新了,借此機會更新一波blog。在正式的開始這個項目前還是學了一下SSM和SpringBoot的基礎(chǔ),上手起來不會這么的無力。期間也是查閱了很多的資料和詢問了諸多大佬。
好了話不多說,一步步的搭建項目和原理詳解就在下面了
一、什么是mqtt
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級”通訊協(xié)議,該協(xié)議構(gòu)建于 TCP/IP 協(xié)議上,由 IBM 于 1999 年發(fā)明。MQTT 協(xié)議的主要特征是開放、簡單、輕量級和易于實現(xiàn),這些特征使得它適用于受約束的應用環(huán)境,如:
網(wǎng)絡受限:網(wǎng)絡帶寬較低且傳輸不可靠
終端受限:協(xié)議運行在嵌入式設(shè)備上,嵌入式終端的處理器、內(nèi)存等是受限的
通過 MQTT 協(xié)議,目前已經(jīng)擴展出了數(shù)十種 MQTT 服務器端程序,可以通過 PHP、Java、Python、C、C# 等語言向 MQTT 發(fā)送消息。由于開放源代碼、耗電量小等特點,MQTT 非常適用于物聯(lián)網(wǎng)領(lǐng)域,如傳感器與服務器的通信、傳感器信息采集等。
二、主要思想
發(fā)布/訂閱模式
訂閱發(fā)布模式定義了一種一對多的依賴關(guān)系,讓多個訂閱者對象同時監(jiān)聽某一個主題對象。這個主題對象在自身狀態(tài)變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態(tài)。
將一個系統(tǒng)分割成一系列相互協(xié)作的類有一個很不好的副作用,那就是需要維護相應對象間的一致性,這樣會給維護、擴展和重用都帶來不便。當一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變時,就可以使用訂閱發(fā)布模式了。
一個抽象模型有兩個方面,其中一方面依賴于另一方面,這時訂閱發(fā)布模式可以將這兩者封裝在獨立的對象中,使它們各自獨立地改變和復用。訂閱發(fā)布模式所做的工作其實就是在解耦合。讓耦合的雙方都依賴于抽象,而不是依賴于具體,從而使得各自的變化都不會影響另一邊的變化。
發(fā)布/訂閱模式并不是 MQTT 協(xié)議特有的模式,像我們很多消息中間件都有使用發(fā)布/訂閱模式,這里你是不是想說,這不就是我們所說的觀察者模式嘛,還真不是,這兩個模式很容易混淆。觀察者模式只有觀察者 + 被觀察者兩個角色,而發(fā)布/訂閱模式還有一個經(jīng)紀人 Broker;往更深層次的講觀察者和被觀察者,是松耦合的關(guān)系,而發(fā)布者和訂閱者,則完全不存在耦合。
在我們?nèi)粘懗绦驎r,經(jīng)常遇到下面這種情況:
public void 前端業(yè)務/硬件業(yè)務() { 刷新界面(); 更新數(shù)據(jù)庫(); 對界面更新數(shù)據(jù)(); ……………………………… }
當有前端和硬件業(yè)務產(chǎn)生時,需要依次要去執(zhí)行:刷新界面()、更新數(shù)據(jù)庫()、對界面更新數(shù)據(jù)()等操作。表面上看代碼寫得很工整,其實這里面有很多的問題:
首先,這完全是面向過程開發(fā),根本不適合大型項目。
第二,代碼維護量太大。設(shè)想一下,如果產(chǎn)生業(yè)務后要執(zhí)行10多個操作,那這將是個多么大,多少復雜的類呀,時間一長,可能連開發(fā)者自己都不知道如何去維護了。
第三,擴展性差。如果產(chǎn)生業(yè)務后,要增加一個聲音提示()功能,怎么辦呢?沒錯,只能加在前端業(yè)務/硬件業(yè)務()這個函數(shù)中,這樣一來,就違反了“開放-關(guān)閉原則”。而且修改了原有的函數(shù),那么在測試時,除了要測新增功能外,還要做原功能的回歸測試;在一個大型項目中,做一次回歸測試可能要花費大約兩周左右的時間,而且前提是新增功能沒有影響原來功能及產(chǎn)生新的bug。
那么如何把前端業(yè)務/硬件業(yè)務()函數(shù)同其他函數(shù)進行解耦合呢?別著急,下面就介紹今天的主角----訂閱發(fā)布模式。見下圖:
上面的流程就是對有告警信息產(chǎn)生()這個函數(shù)的描述。我們要做的,就是把產(chǎn)生告警和它需要通知的事件進行解耦,讓它們之間沒有相互依賴的關(guān)系,解耦合圖如下:
事件觸發(fā)者被抽象出來,稱為消息發(fā)布者,即圖中的P。事件接受都被抽象出來,稱為消息訂閱者,即圖中的S。P與S之間通過Broker(即訂閱器)連接。這樣就實現(xiàn)了P與S的解耦。首先,P就把消息發(fā)送到指定的訂閱器上,從始至終,它并不知道也不關(guān)心要把消息發(fā)向哪個S。S如果想接收消息,就要向訂閱器進行訂閱,訂閱成功后,S就可以接收來自Broker的消息了,從始至終,S并不知道也不關(guān)心消息來源于哪個具體的P。同理,S還可以向Broker進行退訂操作,成功退訂后,S就無法接收到來自指定Broker的消息了。這樣就完美的解決了P與S之間的解耦。
三、MQTT重要概念
3.1 MQTT Client
publisher 和 subscriber 都屬于 MQTT Client,之所以有發(fā)布者和訂閱者這個概念,其實是一種相對的概念,就是指當前客戶端是在發(fā)布消息還是在接收消息,發(fā)布和訂閱的功能也可以由同一個 MQTT Client 實現(xiàn)。
MQTT 客戶端是運行 MQTT 庫并通過網(wǎng)絡連接到 MQTT 代理的任何設(shè)備(從微控制器到成熟的服務器)。例如,MQTT 客戶端可以是一個非常小的、資源受限的設(shè)備,它通過無線網(wǎng)絡進行連接并具有一個最低限度的庫?;旧希魏问褂?TCP/IP 協(xié)議使用 MQTT 設(shè)備的都可以稱之為 MQTT Client。MQTT 協(xié)議的客戶端實現(xiàn)非常簡單直接,易于實施是 MQTT 非常適合小型設(shè)備的原因之一。MQTT 客戶端庫可用于多種編程語言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。
3.2 MQTT Broker
與 MQTT Client 對應的就是 MQTT Broker,Broker 是任何發(fā)布/訂閱協(xié)議的核心,根據(jù)實現(xiàn)的不同,代理可以處理多達數(shù)百萬連接的 MQTT Client。
Broker 負責接收所有消息,過濾消息,確定是哪個Client 訂閱了每條消息,并將消息發(fā)送給對應的 Client,Broker 還負責保存會話數(shù)據(jù),這些數(shù)據(jù)包括訂閱的和錯過的消息。Broker 還負責客戶端的身份驗證和授權(quán)。
3.3 MQTT Connection
MQTT 協(xié)議基于 TCP/IP??蛻舳撕痛矶夹枰幸粋€ TCP/IP 協(xié)議支持。
MQTT 連接始終位于一個客戶端和代理之間??蛻舳藦牟恢苯酉嗷ミB接。要發(fā)起連接,客戶端向代理發(fā)送 CONNECT 消息。代理使用 CONNACK 消息和狀態(tài)代碼進行響應。建立連接后,代理將保持打開狀態(tài),直到客戶端發(fā)送斷開連接命令或連接中斷。
3.4 MQTT主要參數(shù)
ClientId:ClientId 的長度可以是 1-23 個字符,在一個服務器上 ClientId 不能重復。如果超過 23 個字符,則服務器返回 CONNACK 消息中的返回碼為 Identifier Rejected。在 MQTT 3.1.1 中,如果您不需要代理持有狀態(tài),您可以發(fā)送一個空的 ClientId??盏?ClientId 導致連接沒有任何狀態(tài)。在這種情況下,clean session 標志必須設(shè)置為 true,否則代理將拒絕連接。
Clean Session:Clean Session 標志告訴代理客戶端是否要建立持久會話。在持久會話 (CleanSession = false) 中,代理存儲客戶端的所有訂閱以及以服務質(zhì)量(QoS)級別 1 或 2 訂閱的客戶端的所有丟失消息。 如果會話不是持久的 (CleanSession = true ),代理不為客戶端存儲任何內(nèi)容,并清除任何先前持久會話中的所有信息。
Username/Password:MQTT 可以發(fā)送用戶名和密碼進行客戶端認證和授權(quán)。但是,如果此信息未加密或散列,則密碼將以純文本形式發(fā)送。我們強烈建議將用戶名和密碼與安全傳輸一起使用。像 HiveMQ 這樣的代理可以使用 SSL 證書對客戶端進行身份驗證,因此不需要用戶名和密碼。
Will Message:LastWillxxx 表示的是遺愿,client 在連接 broker 的時候?qū)O(shè)立一個遺愿,這個遺愿會保存在 broker 中,當 client 因為非正常原因斷開與 broker 的連接時,broker 會將遺愿發(fā)送給訂閱了這個 topic(訂閱遺愿的 topic)的 client。
KeepAlive:keepAlive 是 client 在連接建立時與 broker 通信的時間間隔,通常以秒為單位。這個時間指的是 client 與 broker 在不發(fā)送消息下所能承受的最大時長。
QOS:
此數(shù)字表示消息的服務質(zhì)量 (QoS)。有三個級別:0、1 和 2。服務級別決定了消息到達預期接收者(客戶端或代理)的保證類型。
Payload
:這個是每條消息的實際內(nèi)容。MQTT 是數(shù)據(jù)無關(guān)性的??梢园l(fā)送任何文本、圖像、加密數(shù)據(jù)以及二進制數(shù)據(jù)。
timeout:MQTT會嘗試接收數(shù)據(jù),直到timeout時間到后才會退出。
四、軟件和Apollo
4.1 安裝Apollo
Apollo(阿波羅)是攜程框架部門研發(fā)的分布式配置中心,能夠集中化管理應用不同環(huán)境、不同集群的配置,配置修改后能夠?qū)崟r推送到應用端,并且具備規(guī)范的權(quán)限、流程治理等特性,適用于微服務配置管理場景。
服務端基于Spring Boot和Spring Cloud開發(fā),打包后可以直接運行,不需要額外安裝Tomcat等應用容器。
Java客戶端不依賴任何框架,能夠運行于所有Java運行時環(huán)境,同時對Spring/Spring Boot環(huán)境也有較好的支持。
Apollo下載地址
http://xn--apollo-np7ii83deeq211d/
相關(guān)鏈接:
Apollo 官方安裝教程:https://github.com/ctripcorp/apollo/wiki/Quick-Start
Apollo 分布式部署官方指南:https://github.com/ctripcorp/apollo/wiki/%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2%E6%8C%87%E5%8D%97
Apollo Github 地址:https://github.com/ctripcorp/apollo
4.1.1 解壓,進入到D:\java\apache-apollo-1.7.1\bin 目錄下,執(zhí)行命令
.\apollo.cmd create mybroker2
4.1.2 進入剛剛創(chuàng)
4.1.2 進入剛剛創(chuàng)建好的mybroker/bin目錄,執(zhí)行:
.\apollo-broker.cmd run
4.1.3 瀏覽器打開地址http://127.0.0.1:61680/,默認用戶名:admin,密碼:password,即可登錄主頁面
4.2 安裝Postman
4.3 安裝MQTTBox
Microsoft Store里面就有。
賬號密碼輸入即可
五、代碼實現(xiàn)
5.1 配置pom.xml
<dependencies> <!--導入起步依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <artifactId>spring-boot-starter-integration</artifactId> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <artifactId>spring-integration-mqtt</artifactId> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.0</version> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependencies>
5.2 配置MQTT服務器基本信息
在springBoot配置文件application.yml中配置,添加如下:
#mqtt配置 com: mqtt: url: tcp://127.0.0.1:61613 clientId: mqtt_test1234 topics: topic01,topic02 username: admin password: password timeout: 10 keepalive: 20 #指定服務端口 server: port: 8081 #一般沒改過tomcat服務器的端口不用修改
5.3 配置讀取yml文件的類MqttConfiguration
package com.vcarecity.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * 讀取yml */ @Component @ConfigurationProperties(prefix = "com.mqtt") //對應yml文件中的com下的mqtt文件配置 public class MqttConfiguration { private String url; private String clientId; private String topics; private String username; private String password; private String timeout; private String keepalive; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; public String getUsername() { return username; public void setUsername(String username) { this.username = username; public String getPassword() { return password; public void setPassword(String password) { this.password = password; public String getClientId() { return clientId; public void setClientId(String clientId) { this.clientId = clientId; public String getTopics() { return topics; public void setTopics(String topics) { this.topics = topics; public String getTimeout() { return timeout; public void setTimeout(String timeout) { this.timeout = timeout; public String getKeepalive() { return keepalive; public void setKeepalive(String keepalive) { this.keepalive = keepalive; }
5.4 MQTT生產(chǎn)端的Handler處理
package com.vcarecity.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import com.vcarecity.config.MqttConfiguration; /** * MQTT生產(chǎn)端 * */ @Configuration public class MqttOutboundConfiguration { @Autowired private MqttConfiguration mqttProperties; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getUrl().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); // 接受離線消息 options.setCleanSession(false); //告訴代理客戶端是否要建立持久會話 false為建立持久會話 factory.setConnectionOptions(options); return factory; @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttProperties.getClientId()+"outbound", mqttClientFactory()); messageHandler.setAsync(true); return messageHandler; }
5.5 MQTT消費端的Handler處理
實現(xiàn)了對inboundtopic中的主題監(jiān)聽,當有消息推送到inboundtopic主題上時可以接受
package com.vcarecity.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import com.vcarecity.config.MqttConfiguration; /** * MQTT消費端 * */ @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { @Autowired private MqttConfiguration mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getUrl().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setKeepAliveInterval(2); //接受離線消息 options.setCleanSession(false); factory.setConnectionOptions(options); return factory; //配置client,監(jiān)聽的topic public MessageProducer inbound() { String[] inboundTopics = mqttProperties.getTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics); //對inboundTopics主題進行監(jiān)聽 adapter.setCompletionTimeout(5000); adapter.setQos(1); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel()); return adapter; //通過通道獲取數(shù)據(jù) @ServiceActivator(inputChannel = "mqttInputChannel") //異步處理 public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { // System.out.println("message:"+message); System.out.println("----------------------"); System.out.println("message:"+message.getPayload()); System.out.println("PacketId:"+message.getHeaders().getId()); System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS)); String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); System.out.println("topic:"+topic); } }; }
5.6 寫個Controller類來進行訪問控制測試
package com.vcarecity.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.vcarecity.mqtt.MqttGateway; @RestController public class MqttPubController { @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Autowired private MqttGateway mqttGateway; @RequestMapping("/hello") public String hello() { return "hello!"; } @RequestMapping("/sendMqtt") public String sendMqtt(String sendData){ System.out.println(sendData); System.out.println("進入sendMqtt-------"+sendData); mqttGateway.sendToMqtt("topic01",(String) sendData); return "Test is OK"; } @RequestMapping("/sendMqttTopic") public String sendMqtt(String sendData,String topic){ //System.out.println(sendData+" "+topic); //System.out.println("進入inbound發(fā)送:"+sendData); mqttGateway.sendToMqtt(topic,(String) sendData); return "Test is OK"; }
六、測試
直接調(diào)用Controller中的URL進行調(diào)用測試:
6.1測試生產(chǎn)端的Handler
6.2 測試消費端的Handler
使用Postman:
http://localhost:8081/sendMqttTopic?sendData=this is mq55555&topic=topic01
可以看見測試臺上會出現(xiàn)Message消息,這邊實現(xiàn)的是對inboundtopic中的主題監(jiān)聽實現(xiàn):
剛開始沒有出現(xiàn)上圖效果,查了好久的bug。結(jié)果重啟Apollo就好了
如果我要配置多個client,應該怎么處理呢?這個也簡單
(1)我們只要配置多個通道即可,簡單代碼如下:
//通道2 @Bean public MessageChannel mqttInputChannelTwo() { return new DirectChannel(); } //配置client2,監(jiān)聽的topic:hell2,hello3 public MessageProducer inbound1() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inboundTwo", mqttClientFactory(), "hello2","hello3"); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannelTwo()); return adapter; //通過通道2獲取數(shù)據(jù) @ServiceActivator(inputChannel = "mqttInputChannelTwo") public MessageHandler handlerTwo() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { // System.out.println("message:"+message); System.out.println("----------------------"); System.out.println("message:"+message.getPayload()); System.out.println("PacketId:"+message.getHeaders().getId()); System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS)); String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); System.out.println("topic:"+topic); };
(2)因為我這個項目用的是讀取yml文件的,所以只需要在yml文件中的topics即可,加自己想要的topic。
topics: topic03,topic04,topic01,topic02
以上測試都可以使用MQTTBox完成
后言:
資料參考:
Spring官網(wǎng)對MQTT的支持:MQTT Support (spring.io)
Tackoverflow上面關(guān)于MQTT的資料,需要翻閱墻體:
參考文章:
https://blog.csdn.net/tjvictor/article/details/5223309
https://blog.csdn.net/riemann_/article/details/118686072
到此這篇關(guān)于SpringBoot整合MQTT總結(jié)的文章就介紹到這了,更多相關(guān)SpringBoot整合MQTT內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java發(fā)送http請求時如何處理異步回調(diào)結(jié)果
這篇文章主要介紹了java發(fā)送http請求時如何處理異步回調(diào)結(jié)果問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06Java刪除指定文件夾下的所有內(nèi)容的方法(包括此文件夾)
下面小編就為大家?guī)硪黄狫ava刪除指定文件夾下的所有內(nèi)容的方法(包括此文件夾) 。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-12-12MyBatis測試報錯:Cannot?determine?value?type?from?string?&a
這篇文章主要給大家介紹了關(guān)于MyBatis測試報錯:Cannot?determine?value?type?from?string?'xxx'的解決辦法,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-02-02Java 實戰(zhàn)項目之疫情防控管理系統(tǒng)詳解
讀萬卷書不如行萬里路,只學書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java實現(xiàn)一個疫情防控管理系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11