在 Python 中使用 MQTT的方法
Python 是一種廣泛使用的解釋型、高級編程、通用型編程語言。Python 的設計哲學強調(diào)代碼的可讀性和簡潔的語法(尤其是使用空格縮進劃分代碼塊,而非使用大括號或者關鍵詞)。Python 讓開發(fā)者能夠用更少的代碼表達想法,不管是小型還是大型程序,該語言都試圖讓程序的結構清晰明了。
MQTT 是一種基于發(fā)布/訂閱模式的 輕量級物聯(lián)網(wǎng)消息傳輸協(xié)議 ,可以用極少的代碼和帶寬為聯(lián)網(wǎng)設備提供實時可靠的消息服務,它廣泛應用于物聯(lián)網(wǎng)、移動互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等行業(yè)。
本文主要介紹如何在 Python 項目中使用 paho-mqtt 客戶端庫 ,實現(xiàn)客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發(fā)消息等功能。
項目初始化
本項目使用 Python 3.6 進行開發(fā)測試,讀者可用如下命令確認 Python 的版本。
➜ ~ python3 --version Python 3.6.7
選擇 MQTT 客戶端庫
paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持。它還提供了一些幫助程序功能,使將消息發(fā)布到 MQTT 服務器變得非常簡單。
Pip 安裝 Paho MQTT 客戶端
Pip 是 Python 包管理工具,該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
Python MQTT 使用
連接 MQTT 服務器
本文將使用 EMQ X 提供的 免費公共 MQTT 服務器 ,該服務基于 EMQ X 的 MQTT 物聯(lián)網(wǎng)云平臺 創(chuàng)建。服務器接入信息如下:
- Broker: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
導入 Paho MQTT客戶端
from paho.mqtt import client as mqtt_client
設置 MQTT Broker 連接參數(shù)
設置 MQTT Broker 連接地址,端口以及 topic,同時我們調(diào)用 Python random.randint 函數(shù)隨機生成 MQTT 客戶端 id。
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
編寫 MQTT 連接函數(shù)
編寫連接回調(diào)函數(shù) on_connect ,該函數(shù)將在客戶端連接后被調(diào)用,在該函數(shù)中可以依據(jù) rc 來判斷客戶端是否連接成功。通常同時我們將創(chuàng)建一個 MQTT 客戶端,該客戶端將連接到 broker.emqx.io 。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
發(fā)布消息
首先定義一個 while 循環(huán)語句,在循環(huán)中我們將設置每秒調(diào)用 MQTT 客戶端 publish 函數(shù)向 /python/mqtt 主題發(fā)送消息。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
訂閱消息
編寫消息回調(diào)函數(shù) on_message ,該函數(shù)將在客戶端從 MQTT Broker 收到消息后被調(diào)用,在該函數(shù)中我們將打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
完整代碼
消息發(fā)布代碼
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息訂閱代碼
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息訂閱代碼
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
測試
消息發(fā)布
運行 MQTT 消息發(fā)布代碼,我們將看到客戶端連接成功,并且成功將消息發(fā)布。
python3 pub.py

消息訂閱
運行 MQTT 消息訂閱代碼,我們將看到客戶端連接成功,并且成功接收到發(fā)布的消息。
python3 sub.py

總結
至此,我們完成了使用 paho-mqtt 客戶端連接到 公共 MQTT 服務器 ,并實現(xiàn)了測試客戶端與 MQTT 服務器的連接、消息發(fā)布和訂閱。
與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設備側的業(yè)務邏輯實現(xiàn),使用 Python 您可以減少代碼上的邏輯復雜度,降低與設備的交互成本。我們相信在物聯(lián)網(wǎng)領域 Python 將會有更廣泛的應用。
接下來我們將會陸續(xù)發(fā)布更多關于物聯(lián)網(wǎng)開發(fā)及 Python 的相關文章,敬請關注。
以上就是在 Python 中使用 MQTT的方法的詳細內(nèi)容,更多關于Python 中使用 MQTT的資料請關注腳本之家其它相關文章!
相關文章
Python2.7實現(xiàn)多進程下開發(fā)多線程示例
這篇文章主要為大家詳細介紹了Python2.7實現(xiàn)多進程下開發(fā)多線程示例,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-05-05
將string類型的數(shù)據(jù)類型轉換為spark rdd時報錯的解決方法
今天小編就為大家分享一篇關于將string類型的數(shù)據(jù)類型轉換為spark rdd時報錯的解決方法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-02-02
PyTorch實現(xiàn)FedProx聯(lián)邦學習算法
這篇文章主要為大家介紹了PyTorch實現(xiàn)FedProx的聯(lián)邦學習算法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05

