基于python實現(xiàn)MQTT發(fā)布訂閱過程原理解析
MQTT簡介
MQTT 全稱為 Message Queuing Telemetry Transport(消息隊列遙測傳輸)是一種基于發(fā)布/訂閱范式的“輕量級”消息協(xié)議。該協(xié)議構(gòu)建于TCP/IP協(xié)議上。
MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)。
其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。
MQTT特點
1、使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。該協(xié)議需要客戶端和服務(wù)端,而協(xié)議中主要有三種身份:發(fā)布者(Publisher)、代理(Broker,服務(wù)器)、訂閱者(Subscriber)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,而消息發(fā)布者可以同時是訂閱者,實現(xiàn)了生產(chǎn)者與消費者的脫耦;
2、對負載內(nèi)容屏蔽的消息傳輸;
3、使用 TCP/IP 提供網(wǎng)絡(luò)連接;
4、有三種消息發(fā)布服務(wù)質(zhì)量:
- “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。
- “至少一次”,確保消息到達,但消息重復(fù)可能會發(fā)生。
- “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。
5、小型傳輸,開銷很?。ü潭ㄩL度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;
6、使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機制。
原理
MQTT協(xié)議有三種身份:發(fā)布者、代理、訂閱者,發(fā)布者和訂閱者都為客戶端,代理為服務(wù)器,同時消息的發(fā)布者也可以是訂閱者(為了節(jié)約內(nèi)存和流量發(fā)布者和訂閱者一般都會定義在一起)。
MQTT傳輸?shù)南⒎譃橹黝}(Topic,可理解為消息的類型,訂閱者訂閱后,就會收到該主題的消息內(nèi)容(payload))和負載(payload,可以理解為消息的內(nèi)容)兩部分。
1.MQTT協(xié)議實現(xiàn)方式
實現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時是訂閱者。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內(nèi)容(payload);
(2)payload,可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。
2.網(wǎng)絡(luò)傳輸與應(yīng)用消息
MQTT會構(gòu)建底層網(wǎng)絡(luò)傳輸:它將建立客戶端到服務(wù)器的連接,提供兩者之間的一個有序的、無損的、基于字節(jié)流的雙向傳輸。
當應(yīng)用數(shù)據(jù)通過MQTT網(wǎng)絡(luò)發(fā)送時,MQTT會把與之相關(guān)的服務(wù)質(zhì)量(QoS)和主題名(Topic)相關(guān)連。
3.MQTT客戶端
一個使用MQTT協(xié)議的應(yīng)用程序或者設(shè)備,它總是建立到服務(wù)器的網(wǎng)絡(luò)連接。客戶端可以:
(1)發(fā)布其他客戶端可能會訂閱的信息;
(2)訂閱其它客戶端發(fā)布的消息;
(3)退訂或刪除應(yīng)用程序的消息;
(4)斷開與服務(wù)器連接。
4.MQTT服務(wù)端
MQTT服務(wù)器以稱為"消息代理"(Broker),可以是一個應(yīng)用程序或一臺設(shè)備。它是位于消息發(fā)布者和訂閱者之間,它可以:
(1)接受來自客戶的網(wǎng)絡(luò)連接;
(2)接受客戶發(fā)布的應(yīng)用信息;
(3)處理來自客戶端的訂閱和退訂請求;
(4)向訂閱的客戶轉(zhuǎn)發(fā)應(yīng)用程序消息。
5.MQTT協(xié)議中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務(wù)質(zhì)量(QoS)。訂閱會與一個會話(Session)關(guān)聯(lián)。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個客戶端與服務(wù)器建立連接后就是一個會話,客戶端和服務(wù)器之間有狀態(tài)交互。會話存在于一個網(wǎng)絡(luò)之間,也可能在客戶端和服務(wù)器之間跨越多個連續(xù)的網(wǎng)絡(luò)連接。
三、主題名(Topic Name)
連接到一個應(yīng)用程序消息的標簽,該標簽與服務(wù)器的訂閱相匹配。服務(wù)器會將消息發(fā)送給訂閱所匹配標簽的每個客戶端。
四、主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
五、負載(Payload)
消息訂閱者所具體接收的內(nèi)容。
6.MQTT協(xié)議中的方法
MQTT協(xié)議中定義了一些方法(也被稱為動作),來于表示對確定資源所進行操作。這個資源可以代表預(yù)先存在的數(shù)據(jù)或動態(tài)生成數(shù)據(jù),這取決于服務(wù)器的實現(xiàn)。通常來說,資源指服務(wù)器上的文件或輸出。主要方法有:
(1)Connect。等待與服務(wù)器建立連接。
(2)Disconnect。等待MQTT客戶端完成所做的工作,并與服務(wù)器斷開TCP/IP會話。
(3)Subscribe。等待完成訂閱。
(4)UnSubscribe。等待服務(wù)器取消客戶端的一個或多個topics訂閱。
(5)Publish。MQTT客戶端發(fā)送消息請求,發(fā)送完成后返回應(yīng)用程序線程。
7.應(yīng)用場景
因為它傳輸消息具有異步性(發(fā)布訂閱模式),同時該協(xié)議本身的輕量特定,因此可用于輕量級應(yīng)用,
可作為物聯(lián)網(wǎng)的通信組件使用,例如樹莓派上完全可以搭建一個mqtt服務(wù)器,當未來智能家居全面普及的時候,
家居中的消息通訊都可用此實現(xiàn),如智能冰箱溫度檢測,房間溫度檢測等信息都能通過mqtt去實現(xiàn),遙感數(shù)據(jù)、
汽車檢測數(shù)據(jù)、智能家居、智慧城市、醫(yī)療醫(yī)護都具有應(yīng)用場景。
客戶端
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @software: PyCharm @file: clicentMqttTest.py @time: 2019/2/22 14:19 @describe: mqtt客戶端 """ import json import sys import os import paho.mqtt.client as mqtt import time sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") TASK_TOPIC = 'test' # 客戶端發(fā)布消息主題 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) """ client_id是連接到代理。如果client_id的長度為零或為零,則行為為由使用的協(xié)議版本定義。如果使用MQTT v3.1.1, 那么一個零長度的客戶機id將被發(fā)送到代理,代理將被發(fā)送為客戶端生成一個隨機變量。如果使用MQTT v3.1,那么id將是 隨機生成的。在這兩種情況下,clean_session都必須為True。如果這在這種情況下不會產(chǎn)生ValueError。 注意:一般情況下如果客戶端服務(wù)端啟用兩個監(jiān)聽那么客戶端client_id 不能與服務(wù)器相同,如這里用時間"20190222142358"作為它的id, 如果與服務(wù)器id相同,則無法接收到消息 """ client = mqtt.Client(client_id, transport='tcp') client.connect("127.0.0.1", 1883, 60) # 此處端口默認為1883,通信端口期keepalive默認60 client.loop_start() def clicent_main(message: str): """ 客戶端發(fā)布消息 :param message: 消息主體 :return: """ time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time())) payload = {"msg": "%s" % message, "data": "%s" % time_now} # publish(主題:Topic; 消息內(nèi)容) client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False)) print("Successful send message!") return True if __name__ == '__main__': msg = "我是一條測試數(shù)據(jù)!" clicent_main(msg) client
服務(wù)端
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: 415900617@qq.com @software: PyCharm @file: serverMqttTest.py @time: 2019/2/22 14:35 @describe: mqtt 服務(wù)端 """ import json import sys import os import time import paho.mqtt.client as mqtt sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") REPORT_TOPIC = 'test' # 主題 def on_connect(client, userdata, flags, rc): print('connected to mqtt with resurt code ', rc) client.subscribe(REPORT_TOPIC) # 訂閱主題 def on_message(client, userdata, msg): """ 接收客戶端發(fā)送的消息 :param client: 連接信息 :param userdata: :param msg: 客戶端返回的消息 :return: """ print("Start server!") payload = json.loads(msg.payload.decode('utf-8')) print(payload) def server_conenet(client): client.on_connect = on_connect # 啟用訂閱模式 client.on_message = on_message # 接收消息 client.connect("127.0.0.1", 1883, 60) # 鏈接 # client.loop_start() # 以start方式運行,需要啟動一個守護線程,讓服務(wù)端運行,否則會隨主線程死亡 client.loop_forever() # 以forever方式阻塞運行。 def server_stop(client): client.loop_stop() # 停止服務(wù)端 sys.exit(0) def server_main(): client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') server_conenet(client) if __name__ == '__main__': # 啟動監(jiān)聽 server_main() server
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Pandas實現(xiàn)(pivot_table函數(shù))數(shù)據(jù)透視表方式
pandas的pivot_table()函數(shù)非常強大,主要用于創(chuàng)建數(shù)據(jù)透視表,重要參數(shù)包括index、values、columns和aggfunc,index用于設(shè)置行索引,類似于SQL中的group by,values用于進行聚合計算的數(shù)據(jù)選擇,columns參數(shù)可設(shè)置列層次,非必須2024-09-09Python實現(xiàn)簡易過濾刪除數(shù)字的方法小結(jié)
這篇文章主要介紹了Python實現(xiàn)簡易過濾刪除數(shù)字的方法,結(jié)合實例形式總結(jié)分析了Python基于正則及內(nèi)置函數(shù)過濾刪除數(shù)字的相關(guān)操作技巧,需要的朋友可以參考下2019-01-01python爬取微信公眾號文章圖片并轉(zhuǎn)為PDF
大家好,本篇文章主要講的是python爬取微信公眾號文章圖片并轉(zhuǎn)為PDF,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下2022-02-02Python中urllib與urllib2模塊的變化與使用詳解
urllib是python提供的一個用于操作URL的模塊,在python2.x中有URllib庫,也有Urllib2庫,在python3.x中Urllib2合并到了Urllib中,我們爬取網(wǎng)頁的時候需要經(jīng)常使用到這個庫,需要的朋友可以參考下2023-05-05TensorFlow tf.nn.conv2d_transpose是怎樣實現(xiàn)反卷積的
這篇文章主要介紹了TensorFlow tf.nn.conv2d_transpose是怎樣實現(xiàn)反卷積的,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04