springboot+rabbitmq實現(xiàn)智能家居實例詳解
引言
前一段有幸參與到一個智能家居項目的開發(fā),由于之前都沒有過這方面的開發(fā)經(jīng)驗,所以對智能硬件的開發(fā)模式和技術(shù)棧都頗為好奇。
智能可燃?xì)怏w報警器
產(chǎn)品是一款可燃?xì)怏w報警器,如果家中燃?xì)庑孤稘舛鹊竭_(dá)一定閾值,報警器檢測到并上傳氣體濃度值給后臺,后臺以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。
用戶還可能向報警器發(fā)一些關(guān)閉報警、調(diào)整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:
但當(dāng)我真正的參與其中開發(fā)時,其實有一點小小的失望,因為在整個研發(fā)過程中,并沒用到什么新的技術(shù),還是常規(guī)的幾種中間件,只不過換個用法而已。
技術(shù)選型用rabbitmq
來做核心的組件,主要考慮到運維成本低,組內(nèi)成員使用的熟練度比較高。
下面和小伙伴分享一下如何用 springboot
+ rabbitmq
搭建物聯(lián)網(wǎng)(IOT
)平臺,其實智能硬件也沒想象的那么高不可攀!
很多小伙伴可能有點懵?rabbitmq
不是消息隊列嗎?怎么又能做智能硬件了?
其實rabbitmq
有兩種協(xié)議,我們平時接觸的消息隊列是用的AMQP
協(xié)議,而用在智能硬件中的是MQTT
協(xié)議。
一、什么是 MQTT協(xié)議?
MQTT
全稱(Message Queue Telemetry Transport):一種基于發(fā)布/訂閱(publish
/subscribe
)模式的輕量級
通訊協(xié)議,通過訂閱相應(yīng)的主題來獲取消息,是物聯(lián)網(wǎng)(Internet of Thing
)中的一個標(biāo)準(zhǔn)傳輸協(xié)議。
該協(xié)議將消息的發(fā)布者(publisher
)與訂閱者(subscriber
)進(jìn)行分離,因此可以在不可靠的網(wǎng)絡(luò)環(huán)境中,為遠(yuǎn)程連接的設(shè)備提供可靠的消息服務(wù),使用方式與傳統(tǒng)的MQ有點類似。
TCP
協(xié)議位于傳輸層,MQTT
協(xié)議位于應(yīng)用層,MQTT
協(xié)議構(gòu)建于TCP/IP
協(xié)議上,也就是說只要支持TCP/IP
協(xié)議棧的地方,都可以使用MQTT
協(xié)議。
二、為什么要用 MQTT協(xié)議?
MQTT
協(xié)議為什么在物聯(lián)網(wǎng)(IOT)中如此受偏愛?而不是其它協(xié)議,比如我們更為熟悉的 HTTP
協(xié)議呢?
- 首先
HTTP
協(xié)議它是一種同步協(xié)議,客戶端請求后需要等待服務(wù)器的響應(yīng)。而在物聯(lián)網(wǎng)(IOT)環(huán)境中,設(shè)備會很受制于環(huán)境的影響,比如帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定等,顯然異步消息協(xié)議更為適合IOT
應(yīng)用程序。 HTTP
是單向的,如果要獲取消息客戶端必須發(fā)起連接,而在物聯(lián)網(wǎng)(IOT)應(yīng)用程序中,設(shè)備或傳感器往往都是客戶端,這意味著它們無法被動地接收來自網(wǎng)絡(luò)的命令。- 通常需要將一條命令或者消息,發(fā)送到網(wǎng)絡(luò)上的所有設(shè)備上。
HTTP
要實現(xiàn)這樣的功能不但很困難,而且成本極高。
三、MQTT協(xié)議介紹
前邊說過MQTT
是一種輕量級的協(xié)議,它只專注于發(fā)消息, 所以此協(xié)議的結(jié)構(gòu)也非常簡單。
MQTT數(shù)據(jù)包
在MQTT
協(xié)議中,一個MQTT
數(shù)據(jù)包由:固定頭
(Fixed header)、 可變頭
(Variable header)、 消息體
(payload)三部分構(gòu)成。
- 固定頭(Fixed header),所有數(shù)據(jù)包中都有固定頭,包含數(shù)據(jù)包類型及數(shù)據(jù)包的分組標(biāo)識。
- 可變頭(Variable header),部分?jǐn)?shù)據(jù)包類型中有可變頭。
- 內(nèi)容消息體(Payload),存在于部分?jǐn)?shù)據(jù)包類,是客戶端收到的具體消息內(nèi)容。
在這里插入圖片描述
1、固定頭
固定頭部,使用兩個字節(jié),共16位:
(4-7)位表示消息類型,使用4位二進(jìn)制表示,可代表如下的16種消息類型,不過 0 和 15位置屬于保留待用,所以共14種消息事件類型。
DUP Flag(重試標(biāo)識)
DUP Flag:保證消息可靠傳輸,消息是否已送達(dá)的標(biāo)識。默認(rèn)為0,只占用一個字節(jié),表示第一次發(fā)送,當(dāng)值為1時,表示當(dāng)前消息先前已經(jīng)被傳送過。
QoS Level(消息質(zhì)量等級)
QoS Level:消息的質(zhì)量等級,后邊會詳細(xì)介紹
RETAIN(持久化)
- 值為
1
:表示發(fā)送的消息需要一直持久保存,而且不受服務(wù)器重啟影響,不但要發(fā)送給當(dāng)前的訂閱者,且以后新加入的客戶端訂閱了此Topic
,訂閱者也會馬上得到推送。注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1
的消息推送。 - 值為
0
:僅為當(dāng)前訂閱者推送此消息。
Remaining Length(剩余長度)
在當(dāng)前消息中剩余的byte
(字節(jié))數(shù),包含可變頭部和消息體payload。
2、可變頭
固定頭部僅定義了消息類型和一些標(biāo)志位,一些消息的元數(shù)據(jù)需要放入可變頭部中。可變頭部內(nèi)容字節(jié)長度 + 消息體payload = 剩余長度。
可變頭部居于固定頭部和payload中間,包含了協(xié)議名稱,版本號,連接標(biāo)志,用戶授權(quán),心跳時間等內(nèi)容。
可變頭存在于這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
3、消息體payload
消息體payload只存在于CONNECT
、PUBLISH
、SUBSCRIBE
、SUBACK
、UNSUBSCRIBE
這幾種類型的消息:
CONNECT
:包含客戶端的ClientId
、訂閱的Topic
、Message
以及用戶名
和密碼
。PUBLISH
:向?qū)?yīng)主題發(fā)送消息。SUBSCRIBE
:要訂閱的主題以及QoS
。SUBACK
:服務(wù)器對于SUBSCRIBE
所申請的主題及QoS
進(jìn)行確認(rèn)和回復(fù)。UNSUBSCRIBE
:取消要訂閱的主題。
消息質(zhì)量(QoS )
消息質(zhì)量
(Quality of Service),即消息的發(fā)送質(zhì)量,發(fā)布者(publisher
)和訂閱者(subscriber
)都可以指定qos
等級,有QoS 0
、QoS 1
、QoS 2
三個等級。
下邊分別說明一下這三個等級的區(qū)別。
1、Qos 0
Qos 0:At most once
(至多一次)只發(fā)送一次消息,不保證消息是否成功送達(dá),沒有確認(rèn)機(jī)制,消息可能會丟失或重復(fù)。
圖片源于網(wǎng)絡(luò),如有侵權(quán)聯(lián)系刪除
2、Qos 1
Qos 1:At least once
(至少一次),相對于QoS 0
而言Qos 1
增加了ack
確認(rèn)機(jī)制,發(fā)送者(publisher
)推送消息到MQTT代理(broker
)時,兩者自身都會先持久化消息,只有當(dāng)publisher
或者 Broker
分別收到 PUBACK
確認(rèn)時,才會刪除自身持久化的消息,否則就會重發(fā)。
但有個問題,盡管我們可以通過確認(rèn)來保證一定收到客戶端 或 服務(wù)器的message
,可我們卻不能保證僅收到一次message
,也就是當(dāng)客戶端publisher
沒收到Broker
的puback
或者 Broker
沒有收到subscriber
的puback
,那么就會一直重發(fā)。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker (傳遞message)
broker -> puback -> publisher delete msg (確認(rèn)傳遞成功)
圖片源于網(wǎng)絡(luò),如有侵權(quán)聯(lián)系刪除
3、Qos 2
Qos 2:Exactly once
(只有一次),相對于QoS 1
,QoS 2
升級實現(xiàn)了僅接受一次message
,publisher
和 broker
同樣對消息進(jìn)行持久化,其中 publisher
緩存了message
和 對應(yīng)的msgID
,而 broker
緩存了 msgID
,可以保證消息不重復(fù),由于又增加了一個confirm
機(jī)制,整個流程變得復(fù)雜很多。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker -> broker store
msgID(傳遞message) broker -> puberc (確認(rèn)傳遞成功)
publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
LWT(最后遺囑)
LWT
全稱為 Last Will and Testament
,其實遺囑是一個由客戶端預(yù)先定義好的主題和對應(yīng)消息,附加在CONNECT
的數(shù)據(jù)包中,包括遺愿主題
、遺愿 QoS
、遺愿消息
等。
當(dāng)MQTT代理 Broker
檢測到有客戶端client
非正常斷開連接時,再由服務(wù)器主動發(fā)布此消息,然后相關(guān)的訂閱者會收到消息。
舉個栗子:聊天室中所有人都訂閱一個叫talk
的主題 ,但小富由于網(wǎng)絡(luò)抖動突然斷開了鏈接,這時聊天室中所有訂閱主題 talk
的客戶端都會收到一個 “小富離開聊天室
” 的遺愿消息。
遺囑的相關(guān)參數(shù):
Will Flag
:是否使用 LWT,1 開啟
Will Topic
:遺愿主題名,不可使用通配符
Will Qos
:發(fā)布遺愿消息時使用的 QoS
Will Retain
:遺愿消息的 Retain 標(biāo)識
Will Message
:遺愿消息內(nèi)容
那客戶端Client
有哪些場景是非正常斷開連接呢?
Broker
檢測到底層的 I/O 異常;- 客戶端 未能在心跳
Keep Alive
的間隔內(nèi)和Broker
進(jìn)行消息交互; - 客戶端 在關(guān)閉底層
TCP
連接前沒有發(fā)送DISCONNECT
數(shù)據(jù)包; - 客戶端 發(fā)送錯誤格式的數(shù)據(jù)包到
Broker
,導(dǎo)致關(guān)閉和客戶端的連接等。
注意:當(dāng)客戶端通過發(fā)布 DISCONNECT
數(shù)據(jù)包斷開連接時,屬于正常斷開連接,并不會觸發(fā) LWT
的機(jī)制,與此同時Broker
還會丟棄掉當(dāng)前客戶端在連接時指定的相關(guān) LWT
參數(shù)。
四、MQTT協(xié)議應(yīng)用場景
MQTT
協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng)、移動互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等領(lǐng)域。使用的場景也是非常非常多,下邊列舉一些:
- 物聯(lián)網(wǎng)M2M通信,物聯(lián)網(wǎng)大數(shù)據(jù)采集
- Android消息推送,WEB消息推送
- 移動即時消息,例如Facebook Messenger
- 智能硬件、智能家具、智能電器
- 車聯(lián)網(wǎng)通信,電動車站樁采集
- 智慧城市、遠(yuǎn)程醫(yī)療、遠(yuǎn)程教育
- 電力、石油與能源等行業(yè)市場
五、代碼實現(xiàn)
具體 rabbitmq
的環(huán)境搭建就不贅述了,網(wǎng)上教程比較多,有條件的用服務(wù)器,沒條件的像我搞個Windows
版的也很快樂嘛。
在這里插入圖片描述
1、啟用 rabbitmq的mqtt協(xié)議
我們先開啟 rabbitmq
的 mqtt
協(xié)議,因為默認(rèn)安裝下是關(guān)閉的,命令如下:
rabbitmq-plugins?enable?rabbitmq_mqtt
2、mqtt 客戶端依賴包
上一步中安裝rabbitmq
環(huán)境并開啟 mqtt
協(xié)議后,實際上mqtt
消息代理服務(wù)就搭建好了,接下來要做的就是實現(xiàn)客戶端消息的推送和訂閱。
這里使用spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
兩個工具包實現(xiàn)。
<!--mqtt依賴包--> <dependency> ????<groupId>org.springframework.integration</groupId> ????<artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> ????<groupId>org.eclipse.paho</groupId> ???????<artifactId>org.eclipse.paho.client.mqttv3</artifactId> ????<version>1.2.0</version> </dependency>
3、消息發(fā)送者
消息的發(fā)送比較簡單,主要是應(yīng)用到@ServiceActivator
注解,需要注意messageHandler.setAsync
屬性,如果設(shè)置成false
,關(guān)閉異步模式發(fā)送消息時可能會阻塞。
@Configuration public?class?IotMqttProducerConfig?{ ????@Autowired ????private?MqttConfig?mqttConfig; ????@Bean ????public?MqttPahoClientFactory?mqttClientFactory()?{ ????????DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory(); ????????factory.setServerURIs(mqttConfig.getServers()); ????????return?factory; ????} ????@Bean ????public?MessageChannel?mqttOutboundChannel()?{ ????????return?new?DirectChannel(); ????} ????@Bean ????@ServiceActivator(inputChannel?=?"iotMqttInputChannel") ????public?MessageHandler?mqttOutbound()?{ ????????MqttPahoMessageHandler?messageHandler?=?new?MqttPahoMessageHandler(mqttConfig.getServerClientId(),?mqttClientFactory()); ????????messageHandler.setAsync(false); ????????messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); ????????return?messageHandler; ????} }
MQTT
對外提供發(fā)送消息的API
時,需要使用@MessagingGateway
注解,去提供一個消息網(wǎng)關(guān)代理,參數(shù)defaultRequestChannel
指定發(fā)送消息綁定的channel
。
可以實現(xiàn)三種API
接口,payload
為發(fā)送的消息,topic
發(fā)送消息的主題,qos
消息質(zhì)量。
@MessagingGateway(defaultRequestChannel?=?"iotMqttInputChannel") public?interface?IotMqttGateway?{ ????//?向默認(rèn)的?topic?發(fā)送消息 ????void?sendMessage2Mqtt(String?payload); ????//?向指定的?topic?發(fā)送消息 ????void?sendMessage2Mqtt(String?payload,@Header(MqttHeaders.TOPIC)?String?topic); ????//?向指定的?topic?發(fā)送消息,并指定服務(wù)質(zhì)量參數(shù) ????void?sendMessage2Mqtt(@Header(MqttHeaders.TOPIC)?String?topic,?@Header(MqttHeaders.QOS)?int?qos,?String?payload); }
4、消息訂閱
消息訂閱和我們平時用的MQ消息監(jiān)聽實現(xiàn)思路基本相似,@ServiceActivator
注解表明當(dāng)前方法用于處理MQTT
消息,inputChannel
參數(shù)指定了用于接收消息的channel
。
/** ?*?@Author:?xiaofu ?*?@Description:?消息訂閱配置 ?*?@date?2020/6/8?18:24 ?*/ @Configuration public?class?IotMqttSubscriberConfig?{ ????@Autowired ????private?MqttConfig?mqttConfig; ????@Bean ????public?MqttPahoClientFactory?mqttClientFactory()?{ ????????DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory(); ????????factory.setServerURIs(mqttConfig.getServers()); ????????return?factory; ????} ????@Bean ????public?MessageChannel?iotMqttInputChannel()?{ ????????return?new?DirectChannel(); ????} ????@Bean ????public?MessageProducer?inbound()?{ ????????MqttPahoMessageDrivenChannelAdapter?adapter?=?new?MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),?mqttClientFactory(),?mqttConfig.getDefaultTopic()); ????????adapter.setCompletionTimeout(5000); ????????adapter.setConverter(new?DefaultPahoMessageConverter()); ????????adapter.setQos(1); ????????adapter.setOutputChannel(iotMqttInputChannel()); ????????return?adapter; ????} ????/** ?????*?@author?xiaofu ?????*?@description?消息訂閱 ?????*?@date?2020/6/8?18:20 ?????*/ ????@Bean ????@ServiceActivator(inputChannel?=?"iotMqttInputChannel") ????public?MessageHandler?handlerTest()?{ ????????return?message?->?{ ????????????try?{ ????????????????String?string?=?message.getPayload().toString(); ????????????????System.out.println("接收到消息:"?+?string); ????????????}?catch?(MessagingException?ex)?{ ????????????????//logger.info(ex.getMessage()); ????????????} ????????}; ????} }
六、測試消息
額~ 由于本渣渣對硬件一竅不通,為了模擬硬件的發(fā)送消息,只能借助一下工具,其實硬件端實現(xiàn)MQTT
協(xié)議,跟我們前邊的基本沒什么區(qū)別,只不過換種語言嵌入到硬件中而已。
這里選的測試工具為mqttbox
,下載地址:http://workswithweb.com/mqttbox.html
1、測試消息發(fā)送
我們用先用mqttbox
模擬向主題mqtt_test_topic
發(fā)送消息,看后臺是否能成功接收到。
看到后臺成功拿到了向主題mqtt_test_topic
發(fā)送的消息。
2、測試消息訂閱
用mqttbox
模擬訂閱主題mqtt_test_topic
,在后臺向主題mqtt_test_topic
發(fā)送一條消息,這里我簡單的寫了個controller
調(diào)用API發(fā)送消息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是后臺向主題 mqtt_test_topic 發(fā)送的消息
我們看mqttbox
的訂閱消息,已經(jīng)成功的接收到了后臺的消息,到此我們的MQTT
通信環(huán)境就算搭建成功了。如果把mqttbox
工具換成具體硬件設(shè)備,整個流程就是我們常說的智能家居了,其實真的沒那么難。
七、應(yīng)用注意事項
在我們實際的生產(chǎn)環(huán)境中遇到過的問題,這里分享一下讓大家少踩坑。
clientId 要唯一
在客戶端connect
連接的時,會有一個clientId
參數(shù),需要每個客戶端都保持唯一的。但我們在開發(fā)測試階段clientId
直接在代碼中寫死了,而且服務(wù)都是單實例部署,并沒有暴露出什么問題。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),?mqttClientFactory(),?mqttConfig.getDefaultTopic());
然而在生產(chǎn)環(huán)境內(nèi)側(cè)的時候,由于服務(wù)是多實例集群部署,結(jié)果出現(xiàn)了下邊的奇怪問題。同一時間內(nèi)只能有一個客戶端能拿到消息,其他客戶端不但不能消費消息,而且還在不斷的掉線重連:Lost connection: 已斷開連接; retrying...
。
這就是由于clientId
相同導(dǎo)致客戶端間相互競爭消費,最后將clientId
獲取方式換成從發(fā)號器中拿,問題就好了,所以這個地方是需要特別注意的。
平時程序在開發(fā)環(huán)境沒問題,可偏偏到了生產(chǎn)環(huán)境就一大堆問題,很多都是因為服務(wù)部署方式不同導(dǎo)致的。所以多學(xué)習(xí)分布式還是很有必要的。
八、其他中間件
MQTT
它只是一種協(xié)議,支持MQTT
協(xié)議的消息中間件產(chǎn)品非常多,下邊的也只是其中的一部分
- Mosquitto
- Eclipse Paho
- RabbitMQ
- Apache ActiveMQ
- HiveMQ
- JoramMQ
- ThingMQ
- VerneMQ
- Apache Apollo
- emqttd Xively
- IBM Websphere .....
總結(jié)
我也是第一次做和硬件相關(guān)的項目,之前聽到智能家居都會覺得好高大上,但實際上手開發(fā)后發(fā)現(xiàn),技術(shù)嘛萬變不離其宗,也只是換種用法而已。
以上就是springboot+rabbitmq實現(xiàn)智能家居實例詳解的詳細(xì)內(nèi)容,更多關(guān)于springboot rabbitmq智能家居的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java線程池ForkJoinPool(工作竊取算法)的使用
Fork就是把一個大任務(wù)切分為若干個子任務(wù)并行地執(zhí)行,Join就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。Fork/Join?框架使用的是工作竊取算法。本文主要介紹了ForkJoinPool的使用,需要的可以參考一下2022-11-11Java實現(xiàn)調(diào)用jython執(zhí)行python文件的方法
這篇文章主要介紹了Java實現(xiàn)調(diào)用jython執(zhí)行python文件的方法,結(jié)合實例形式分析了Java調(diào)用jython執(zhí)行python文件的常見操作技巧及相關(guān)問題解決方法,需要的朋友可以參考下2018-03-03Android開發(fā)中Socket通信的基本實現(xiàn)方法講解
這篇文章主要介紹了Android開發(fā)中Socket通信的基本實現(xiàn)方法講解,是安卓上移動互聯(lián)網(wǎng)程序開發(fā)的基礎(chǔ),需要的朋友可以參考下2015-12-12詳解SpringCloud Gateway之過濾器GatewayFilter
這篇文章主要介紹了詳解SpringCloud Gateway之過濾器GatewayFilter,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-10-10Java訪問者模式實現(xiàn)優(yōu)雅的對象結(jié)構(gòu)處理
Java訪問者模式是一種行為型設(shè)計模式,它通過將數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)操作分離,實現(xiàn)對復(fù)雜對象結(jié)構(gòu)的處理。它將數(shù)據(jù)結(jié)構(gòu)中的每個元素都轉(zhuǎn)換為訪問者能夠識別的形式,從而使得數(shù)據(jù)操作可以在不影響數(shù)據(jù)結(jié)構(gòu)的前提下進(jìn)行擴(kuò)展和變化2023-04-04使用JavaWeb webSocket實現(xiàn)簡易的點對點聊天功能實例代碼
這篇文章主要介紹了使用JavaWeb webSocket實現(xiàn)簡易的點對點聊天功能實例代碼的相關(guān)資料,內(nèi)容介紹的非常詳細(xì),具有參考借鑒價值,感興趣的朋友一起學(xué)習(xí)吧2016-05-05