SpringBoot整合MQTT小結(jié)匯總
前言:
這幾天在準(zhǔn)備面試的過(guò)程中做的一個(gè)小demo,主要是用通過(guò)SpringBoot實(shí)現(xiàn)一個(gè)與MQTT服務(wù)交互通信,也是看著別人的項(xiàng)目改的,這兩個(gè)技術(shù)之前都沒(méi)有接觸過(guò),希望記錄一下可以分享給大家,也好久沒(méi)更新了,借此機(jī)會(huì)更新一波blog。在正式的開始這個(gè)項(xiàng)目前還是學(xué)了一下SSM和SpringBoot的基礎(chǔ),上手起來(lái)不會(huì)這么的無(wú)力。期間也是查閱了很多的資料和詢問(wèn)了諸多大佬。
好了話不多說(shuō),一步步的搭建項(xiàng)目和原理詳解就在下面了
一、什么是mqtt
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于 TCP/IP 協(xié)議上,由 IBM 于 1999 年發(fā)明。MQTT 協(xié)議的主要特征是開放、簡(jiǎn)單、輕量級(jí)和易于實(shí)現(xiàn),這些特征使得它適用于受約束的應(yīng)用環(huán)境,如:
網(wǎng)絡(luò)受限:網(wǎng)絡(luò)帶寬較低且傳輸不可靠
終端受限:協(xié)議運(yùn)行在嵌入式設(shè)備上,嵌入式終端的處理器、內(nèi)存等是受限的
通過(guò) MQTT 協(xié)議,目前已經(jīng)擴(kuò)展出了數(shù)十種 MQTT 服務(wù)器端程序,可以通過(guò) PHP、Java、Python、C、C# 等語(yǔ)言向 MQTT 發(fā)送消息。由于開放源代碼、耗電量小等特點(diǎn),MQTT 非常適用于物聯(lián)網(wǎng)領(lǐng)域,如傳感器與服務(wù)器的通信、傳感器信息采集等。
二、主要思想
發(fā)布/訂閱模式
訂閱發(fā)布模式定義了一種一對(duì)多的依賴關(guān)系,讓多個(gè)訂閱者對(duì)象同時(shí)監(jiān)聽(tīng)某一個(gè)主題對(duì)象。這個(gè)主題對(duì)象在自身狀態(tài)變化時(shí),會(huì)通知所有訂閱者對(duì)象,使它們能夠自動(dòng)更新自己的狀態(tài)。
將一個(gè)系統(tǒng)分割成一系列相互協(xié)作的類有一個(gè)很不好的副作用,那就是需要維護(hù)相應(yīng)對(duì)象間的一致性,這樣會(huì)給維護(hù)、擴(kuò)展和重用都帶來(lái)不便。當(dāng)一個(gè)對(duì)象的改變需要同時(shí)改變其他對(duì)象,而且它不知道具體有多少對(duì)象需要改變時(shí),就可以使用訂閱發(fā)布模式了。
一個(gè)抽象模型有兩個(gè)方面,其中一方面依賴于另一方面,這時(shí)訂閱發(fā)布模式可以將這兩者封裝在獨(dú)立的對(duì)象中,使它們各自獨(dú)立地改變和復(fù)用。訂閱發(fā)布模式所做的工作其實(shí)就是在解耦合。讓耦合的雙方都依賴于抽象,而不是依賴于具體,從而使得各自的變化都不會(huì)影響另一邊的變化。
發(fā)布/訂閱模式并不是 MQTT 協(xié)議特有的模式,像我們很多消息中間件都有使用發(fā)布/訂閱模式,這里你是不是想說(shuō),這不就是我們所說(shuō)的觀察者模式嘛,還真不是,這兩個(gè)模式很容易混淆。觀察者模式只有觀察者 + 被觀察者兩個(gè)角色,而發(fā)布/訂閱模式還有一個(gè)經(jīng)紀(jì)人 Broker;往更深層次的講觀察者和被觀察者,是松耦合的關(guān)系,而發(fā)布者和訂閱者,則完全不存在耦合。
在我們?nèi)粘懗绦驎r(shí),經(jīng)常遇到下面這種情況:
public void 前端業(yè)務(wù)/硬件業(yè)務(wù)() { 刷新界面(); 更新數(shù)據(jù)庫(kù)(); 對(duì)界面更新數(shù)據(jù)(); ……………………………… }
當(dāng)有前端和硬件業(yè)務(wù)產(chǎn)生時(shí),需要依次要去執(zhí)行:刷新界面()、更新數(shù)據(jù)庫(kù)()、對(duì)界面更新數(shù)據(jù)()等操作。表面上看代碼寫得很工整,其實(shí)這里面有很多的問(wèn)題:
首先,這完全是面向過(guò)程開發(fā),根本不適合大型項(xiàng)目。
第二,代碼維護(hù)量太大。設(shè)想一下,如果產(chǎn)生業(yè)務(wù)后要執(zhí)行10多個(gè)操作,那這將是個(gè)多么大,多少?gòu)?fù)雜的類呀,時(shí)間一長(zhǎng),可能連開發(fā)者自己都不知道如何去維護(hù)了。
第三,擴(kuò)展性差。如果產(chǎn)生業(yè)務(wù)后,要增加一個(gè)聲音提示()功能,怎么辦呢?沒(méi)錯(cuò),只能加在前端業(yè)務(wù)/硬件業(yè)務(wù)()這個(gè)函數(shù)中,這樣一來(lái),就違反了“開放-關(guān)閉原則”。而且修改了原有的函數(shù),那么在測(cè)試時(shí),除了要測(cè)新增功能外,還要做原功能的回歸測(cè)試;在一個(gè)大型項(xiàng)目中,做一次回歸測(cè)試可能要花費(fèi)大約兩周左右的時(shí)間,而且前提是新增功能沒(méi)有影響原來(lái)功能及產(chǎn)生新的bug。
那么如何把前端業(yè)務(wù)/硬件業(yè)務(wù)()函數(shù)同其他函數(shù)進(jìn)行解耦合呢?別著急,下面就介紹今天的主角----訂閱發(fā)布模式。見(jiàn)下圖:
上面的流程就是對(duì)有告警信息產(chǎn)生()這個(gè)函數(shù)的描述。我們要做的,就是把產(chǎn)生告警和它需要通知的事件進(jìn)行解耦,讓它們之間沒(méi)有相互依賴的關(guān)系,解耦合圖如下:
事件觸發(fā)者被抽象出來(lái),稱為消息發(fā)布者,即圖中的P。事件接受都被抽象出來(lái),稱為消息訂閱者,即圖中的S。P與S之間通過(guò)Broker(即訂閱器)連接。這樣就實(shí)現(xiàn)了P與S的解耦。首先,P就把消息發(fā)送到指定的訂閱器上,從始至終,它并不知道也不關(guān)心要把消息發(fā)向哪個(gè)S。S如果想接收消息,就要向訂閱器進(jìn)行訂閱,訂閱成功后,S就可以接收來(lái)自Broker的消息了,從始至終,S并不知道也不關(guān)心消息來(lái)源于哪個(gè)具體的P。同理,S還可以向Broker進(jìn)行退訂操作,成功退訂后,S就無(wú)法接收到來(lái)自指定Broker的消息了。這樣就完美的解決了P與S之間的解耦。
三、MQTT重要概念
3.1 MQTT Client
publisher 和 subscriber 都屬于 MQTT Client,之所以有發(fā)布者和訂閱者這個(gè)概念,其實(shí)是一種相對(duì)的概念,就是指當(dāng)前客戶端是在發(fā)布消息還是在接收消息,發(fā)布和訂閱的功能也可以由同一個(gè) MQTT Client 實(shí)現(xiàn)。
MQTT 客戶端是運(yùn)行 MQTT 庫(kù)并通過(guò)網(wǎng)絡(luò)連接到 MQTT 代理的任何設(shè)備(從微控制器到成熟的服務(wù)器)。例如,MQTT 客戶端可以是一個(gè)非常小的、資源受限的設(shè)備,它通過(guò)無(wú)線網(wǎng)絡(luò)進(jìn)行連接并具有一個(gè)最低限度的庫(kù)?;旧?,任何使用 TCP/IP 協(xié)議使用 MQTT 設(shè)備的都可以稱之為 MQTT Client。MQTT 協(xié)議的客戶端實(shí)現(xiàn)非常簡(jiǎn)單直接,易于實(shí)施是 MQTT 非常適合小型設(shè)備的原因之一。MQTT 客戶端庫(kù)可用于多種編程語(yǔ)言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。
3.2 MQTT Broker
與 MQTT Client 對(duì)應(yīng)的就是 MQTT Broker,Broker 是任何發(fā)布/訂閱協(xié)議的核心,根據(jù)實(shí)現(xiàn)的不同,代理可以處理多達(dá)數(shù)百萬(wàn)連接的 MQTT Client。
Broker 負(fù)責(zé)接收所有消息,過(guò)濾消息,確定是哪個(gè)Client 訂閱了每條消息,并將消息發(fā)送給對(duì)應(yīng)的 Client,Broker 還負(fù)責(zé)保存會(huì)話數(shù)據(jù),這些數(shù)據(jù)包括訂閱的和錯(cuò)過(guò)的消息。Broker 還負(fù)責(zé)客戶端的身份驗(yàn)證和授權(quán)。
3.3 MQTT Connection
MQTT 協(xié)議基于 TCP/IP。客戶端和代理都需要有一個(gè) TCP/IP 協(xié)議支持。
MQTT 連接始終位于一個(gè)客戶端和代理之間??蛻舳藦牟恢苯酉嗷ミB接。要發(fā)起連接,客戶端向代理發(fā)送 CONNECT 消息。代理使用 CONNACK 消息和狀態(tài)代碼進(jìn)行響應(yīng)。建立連接后,代理將保持打開狀態(tài),直到客戶端發(fā)送斷開連接命令或連接中斷。
3.4 MQTT主要參數(shù)
ClientId:ClientId 的長(zhǎng)度可以是 1-23 個(gè)字符,在一個(gè)服務(wù)器上 ClientId 不能重復(fù)。如果超過(guò) 23 個(gè)字符,則服務(wù)器返回 CONNACK 消息中的返回碼為 Identifier Rejected。在 MQTT 3.1.1 中,如果您不需要代理持有狀態(tài),您可以發(fā)送一個(gè)空的 ClientId。空的 ClientId 導(dǎo)致連接沒(méi)有任何狀態(tài)。在這種情況下,clean session 標(biāo)志必須設(shè)置為 true,否則代理將拒絕連接。
Clean Session:Clean Session 標(biāo)志告訴代理客戶端是否要建立持久會(huì)話。在持久會(huì)話 (CleanSession = false) 中,代理存儲(chǔ)客戶端的所有訂閱以及以服務(wù)質(zhì)量(QoS)級(jí)別 1 或 2 訂閱的客戶端的所有丟失消息。 如果會(huì)話不是持久的 (CleanSession = true ),代理不為客戶端存儲(chǔ)任何內(nèi)容,并清除任何先前持久會(huì)話中的所有信息。
Username/Password:MQTT 可以發(fā)送用戶名和密碼進(jìn)行客戶端認(rèn)證和授權(quán)。但是,如果此信息未加密或散列,則密碼將以純文本形式發(fā)送。我們強(qiáng)烈建議將用戶名和密碼與安全傳輸一起使用。像 HiveMQ 這樣的代理可以使用 SSL 證書對(duì)客戶端進(jìn)行身份驗(yàn)證,因此不需要用戶名和密碼。
Will Message:LastWillxxx 表示的是遺愿,client 在連接 broker 的時(shí)候?qū)?huì)設(shè)立一個(gè)遺愿,這個(gè)遺愿會(huì)保存在 broker 中,當(dāng) client 因?yàn)榉钦T驍嚅_與 broker 的連接時(shí),broker 會(huì)將遺愿發(fā)送給訂閱了這個(gè) topic(訂閱遺愿的 topic)的 client。
KeepAlive:keepAlive 是 client 在連接建立時(shí)與 broker 通信的時(shí)間間隔,通常以秒為單位。這個(gè)時(shí)間指的是 client 與 broker 在不發(fā)送消息下所能承受的最大時(shí)長(zhǎng)。
QOS:
此數(shù)字表示消息的服務(wù)質(zhì)量 (QoS)。有三個(gè)級(jí)別:0、1 和 2。服務(wù)級(jí)別決定了消息到達(dá)預(yù)期接收者(客戶端或代理)的保證類型。
Payload
:這個(gè)是每條消息的實(shí)際內(nèi)容。MQTT 是數(shù)據(jù)無(wú)關(guān)性的??梢园l(fā)送任何文本、圖像、加密數(shù)據(jù)以及二進(jìn)制數(shù)據(jù)。
timeout:MQTT會(huì)嘗試接收數(shù)據(jù),直到timeout時(shí)間到后才會(huì)退出。
四、軟件和Apollo
4.1 安裝Apollo
Apollo(阿波羅)是攜程框架部門研發(fā)的分布式配置中心,能夠集中化管理應(yīng)用不同環(huán)境、不同集群的配置,配置修改后能夠?qū)崟r(shí)推送到應(yīng)用端,并且具備規(guī)范的權(quán)限、流程治理等特性,適用于微服務(wù)配置管理場(chǎng)景。
服務(wù)端基于Spring Boot和Spring Cloud開發(fā),打包后可以直接運(yùn)行,不需要額外安裝Tomcat等應(yīng)用容器。
Java客戶端不依賴任何框架,能夠運(yùn)行于所有Java運(yùn)行時(shí)環(huán)境,同時(shí)對(duì)Spring/Spring Boot環(huán)境也有較好的支持。
Apollo下載地址
http://xn--apollo-np7ii83deeq211d/
相關(guān)鏈接:
Apollo 官方安裝教程:https://github.com/ctripcorp/apollo/wiki/Quick-Start
Apollo 分布式部署官方指南:https://github.com/ctripcorp/apollo/wiki/%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2%E6%8C%87%E5%8D%97
Apollo Github 地址:https://github.com/ctripcorp/apollo
4.1.1 解壓,進(jìn)入到D:\java\apache-apollo-1.7.1\bin 目錄下,執(zhí)行命令
.\apollo.cmd create mybroker2
4.1.2 進(jìn)入剛剛創(chuàng)
4.1.2 進(jìn)入剛剛創(chuàng)建好的mybroker/bin目錄,執(zhí)行:
.\apollo-broker.cmd run
4.1.3 瀏覽器打開地址http://127.0.0.1:61680/,默認(rèn)用戶名:admin,密碼:password,即可登錄主頁(yè)面
4.2 安裝Postman
4.3 安裝MQTTBox
Microsoft Store里面就有。
賬號(hào)密碼輸入即可
五、代碼實(shí)現(xiàn)
5.1 配置pom.xml
<dependencies> <!--導(dǎo)入起步依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <artifactId>spring-boot-starter-integration</artifactId> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <artifactId>spring-integration-mqtt</artifactId> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.0</version> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependencies>
5.2 配置MQTT服務(wù)器基本信息
在springBoot配置文件application.yml中配置,添加如下:
#mqtt配置 com: mqtt: url: tcp://127.0.0.1:61613 clientId: mqtt_test1234 topics: topic01,topic02 username: admin password: password timeout: 10 keepalive: 20 #指定服務(wù)端口 server: port: 8081 #一般沒(méi)改過(guò)tomcat服務(wù)器的端口不用修改
5.3 配置讀取yml文件的類MqttConfiguration
package com.vcarecity.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * 讀取yml */ @Component @ConfigurationProperties(prefix = "com.mqtt") //對(duì)應(yīng)yml文件中的com下的mqtt文件配置 public class MqttConfiguration { private String url; private String clientId; private String topics; private String username; private String password; private String timeout; private String keepalive; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; public String getUsername() { return username; public void setUsername(String username) { this.username = username; public String getPassword() { return password; public void setPassword(String password) { this.password = password; public String getClientId() { return clientId; public void setClientId(String clientId) { this.clientId = clientId; public String getTopics() { return topics; public void setTopics(String topics) { this.topics = topics; public String getTimeout() { return timeout; public void setTimeout(String timeout) { this.timeout = timeout; public String getKeepalive() { return keepalive; public void setKeepalive(String keepalive) { this.keepalive = keepalive; }
5.4 MQTT生產(chǎn)端的Handler處理
package com.vcarecity.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import com.vcarecity.config.MqttConfiguration; /** * MQTT生產(chǎn)端 * */ @Configuration public class MqttOutboundConfiguration { @Autowired private MqttConfiguration mqttProperties; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getUrl().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); // 接受離線消息 options.setCleanSession(false); //告訴代理客戶端是否要建立持久會(huì)話 false為建立持久會(huì)話 factory.setConnectionOptions(options); return factory; @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttProperties.getClientId()+"outbound", mqttClientFactory()); messageHandler.setAsync(true); return messageHandler; }
5.5 MQTT消費(fèi)端的Handler處理
實(shí)現(xiàn)了對(duì)inboundtopic中的主題監(jiān)聽(tīng),當(dāng)有消息推送到inboundtopic主題上時(shí)可以接受
package com.vcarecity.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import com.vcarecity.config.MqttConfiguration; /** * MQTT消費(fèi)端 * */ @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { @Autowired private MqttConfiguration mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getUrl().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setKeepAliveInterval(2); //接受離線消息 options.setCleanSession(false); factory.setConnectionOptions(options); return factory; //配置client,監(jiān)聽(tīng)的topic public MessageProducer inbound() { String[] inboundTopics = mqttProperties.getTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics); //對(duì)inboundTopics主題進(jìn)行監(jiān)聽(tīng) adapter.setCompletionTimeout(5000); adapter.setQos(1); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel()); return adapter; //通過(guò)通道獲取數(shù)據(jù) @ServiceActivator(inputChannel = "mqttInputChannel") //異步處理 public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { // System.out.println("message:"+message); System.out.println("----------------------"); System.out.println("message:"+message.getPayload()); System.out.println("PacketId:"+message.getHeaders().getId()); System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS)); String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); System.out.println("topic:"+topic); } }; }
5.6 寫個(gè)Controller類來(lái)進(jìn)行訪問(wèn)控制測(cè)試
package com.vcarecity.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.vcarecity.mqtt.MqttGateway; @RestController public class MqttPubController { @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Autowired private MqttGateway mqttGateway; @RequestMapping("/hello") public String hello() { return "hello!"; } @RequestMapping("/sendMqtt") public String sendMqtt(String sendData){ System.out.println(sendData); System.out.println("進(jìn)入sendMqtt-------"+sendData); mqttGateway.sendToMqtt("topic01",(String) sendData); return "Test is OK"; } @RequestMapping("/sendMqttTopic") public String sendMqtt(String sendData,String topic){ //System.out.println(sendData+" "+topic); //System.out.println("進(jìn)入inbound發(fā)送:"+sendData); mqttGateway.sendToMqtt(topic,(String) sendData); return "Test is OK"; }
六、測(cè)試
直接調(diào)用Controller中的URL進(jìn)行調(diào)用測(cè)試:
6.1測(cè)試生產(chǎn)端的Handler
6.2 測(cè)試消費(fèi)端的Handler
使用Postman:
http://localhost:8081/sendMqttTopic?sendData=this is mq55555&topic=topic01
可以看見(jiàn)測(cè)試臺(tái)上會(huì)出現(xiàn)Message消息,這邊實(shí)現(xiàn)的是對(duì)inboundtopic中的主題監(jiān)聽(tīng)實(shí)現(xiàn):
剛開始沒(méi)有出現(xiàn)上圖效果,查了好久的bug。結(jié)果重啟Apollo就好了
如果我要配置多個(gè)client,應(yīng)該怎么處理呢?這個(gè)也簡(jiǎn)單
(1)我們只要配置多個(gè)通道即可,簡(jiǎn)單代碼如下:
//通道2 @Bean public MessageChannel mqttInputChannelTwo() { return new DirectChannel(); } //配置client2,監(jiān)聽(tīng)的topic:hell2,hello3 public MessageProducer inbound1() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inboundTwo", mqttClientFactory(), "hello2","hello3"); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannelTwo()); return adapter; //通過(guò)通道2獲取數(shù)據(jù) @ServiceActivator(inputChannel = "mqttInputChannelTwo") public MessageHandler handlerTwo() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { // System.out.println("message:"+message); System.out.println("----------------------"); System.out.println("message:"+message.getPayload()); System.out.println("PacketId:"+message.getHeaders().getId()); System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS)); String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); System.out.println("topic:"+topic); };
(2)因?yàn)槲疫@個(gè)項(xiàng)目用的是讀取yml文件的,所以只需要在yml文件中的topics即可,加自己想要的topic。
topics: topic03,topic04,topic01,topic02
以上測(cè)試都可以使用MQTTBox完成
后言:
資料參考:
Spring官網(wǎng)對(duì)MQTT的支持:MQTT Support (spring.io)
Tackoverflow上面關(guān)于MQTT的資料,需要翻閱墻體:
參考文章:
https://blog.csdn.net/tjvictor/article/details/5223309
https://blog.csdn.net/riemann_/article/details/118686072
到此這篇關(guān)于SpringBoot整合MQTT總結(jié)的文章就介紹到這了,更多相關(guān)SpringBoot整合MQTT內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java發(fā)送http請(qǐng)求時(shí)如何處理異步回調(diào)結(jié)果
這篇文章主要介紹了java發(fā)送http請(qǐng)求時(shí)如何處理異步回調(diào)結(jié)果問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Java 創(chuàng)建URL的常見(jiàn)問(wèn)題及解決方案
這篇文章主要介紹了Java 創(chuàng)建URL的常見(jiàn)問(wèn)題及解決方案的相關(guān)資料,需要的朋友可以參考下2016-10-10Java實(shí)現(xiàn)斷點(diǎn)下載服務(wù)端與客戶端的示例代碼
這篇文章主要為大家介紹了如何實(shí)現(xiàn)服務(wù)端(Spring Boot)與客戶端(Android)的斷點(diǎn)下載與下載續(xù)傳功能,文中的示例代碼講解詳細(xì),需要的可以參考一下2022-08-08Java刪除指定文件夾下的所有內(nèi)容的方法(包括此文件夾)
下面小編就為大家?guī)?lái)一篇Java刪除指定文件夾下的所有內(nèi)容的方法(包括此文件夾) 。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12MyBatis測(cè)試報(bào)錯(cuò):Cannot?determine?value?type?from?string?&a
這篇文章主要給大家介紹了關(guān)于MyBatis測(cè)試報(bào)錯(cuò):Cannot?determine?value?type?from?string?'xxx'的解決辦法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02Java 實(shí)戰(zhàn)項(xiàng)目之疫情防控管理系統(tǒng)詳解
讀萬(wàn)卷書不如行萬(wàn)里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java實(shí)現(xiàn)一個(gè)疫情防控管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11