Python實(shí)現(xiàn)MQTT通信的示例代碼
適用于物聯(lián)網(wǎng)設(shè)備、傳感器數(shù)據(jù)采集等場景
1. 安裝paho-mqtt庫
安裝Python的MQTT客戶端庫 paho-mqtt
,支持MQTT v3.1/v3.1.1協(xié)議,兼容性強(qiáng)
pip install paho-mqtt==1.6.1 # 推薦使用1.6.1版本避免兼容性問題
2. 搭建MQTT代理服務(wù)器(Broker)
選擇本地搭建或使用公共Broker:
本地搭建(以EMQX為例)
下載并解壓EMQX開源版。Directory listing for EMQX: /v5.3.2/ | EMQ
配置EMQX,EMQX的配置文件位于etc\emqx.conf
,可以根據(jù)需要修改配置文件,例如更改端口號、添加插件等:18083 EMQX Dashboard 管理控制臺端口
在命令行中進(jìn)入安裝目錄的 bin
文件夾,執(zhí)行以下命令啟動服務(wù)
檢查啟動狀態(tài)emqx_ctl status
訪問 http://localhost:18083
進(jìn)入管理界面(默認(rèn)賬號:admin/public
)
公共Broker(如HiveMQ)
使用免費(fèi)公共服務(wù)器:
broker_address = "broker.hivemq.com" broker_port = 1883
3. Python實(shí)現(xiàn)基礎(chǔ)通信功能
分為 發(fā)布者(Publisher) 和 訂閱者(Subscriber) 兩類客戶端:
3.1 發(fā)布者代碼示例publisher.py
# python 3.8 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 broker = '127.0.0.1' port = 1883 topic = "python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' username = 'emqx' password = '**********' def connect_mqtt(): def on_connect(client, userdata, flags, rc,properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # client = mqtt_client.Client(client_id) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) # client.tls_set(ca_certs='./server-ca.crt') # client.username_pw_set(username, password) client.username_pw_set('admin', 'public') client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
3.2 訂閱者代碼示例subscriber.py
# python3.8 import random from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 broker = '127.0.0.1' port = 1883 topic = "python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'emqx' password = '**********' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc,properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # client = mqtt_client.Client(client_id) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) # client.tls_set(ca_certs='./server-ca.crt') # client.username_pw_set(username, password) client.username_pw_set('admin', 'public') client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
3.3或者放同一個文件:
from paho.mqtt import client as mqtt_client from threading import Thread import time broker = '127.0.0.1' port = 1883 # broker = 'broker.emqx.io' # port = 1883 su_topic = [("python_mqtt_server", 2), ("python_mqtt_command", 2)] # 訂閱頻道,數(shù)組格式為(topic,qos) pu_topic = ["python_mqtt_server"] # 發(fā)布頻道 client_id = 'websocket_publisher_001' # 將連接與訂閱綁在一起,防止復(fù)用client導(dǎo)致連接沖突 def connect_mqtt(): # For paho-mqtt 2.0.0, you need to add the properties parameter. def on_connect(client, userdata, flags, rc, properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # For paho-mqtt 2.0.0, you need to set callback_api_version. client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) client.username_pw_set('admin', 'public') client.on_connect = on_connect client.connect(broker, port) subscribe_many(client) print("訂閱完成") return client # 訂閱多個頻道 def subscribe_many(client): # 根據(jù)client源碼可知,訂閱支持的數(shù)組格式是(topic,qos) client.subscribe(su_topic) # 收到信息的回調(diào)函數(shù) def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") # 這里可以編寫一些收到信息后的處理,比如收到什么信息開啟什么任務(wù),任務(wù)完成后再調(diào)publish發(fā)布信息給終端等等........ # ----------處理例子------------------ r_msg = msg.payload.decode() if int(r_msg) % 2 == 0: print("收到偶數(shù)") # ----------------------------------- # 發(fā)布信息函數(shù) def publish(client): for i in range(1, 11): time.sleep(1) msg = f"{i}" result = client.publish(pu_topic[0], msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{pu_topic[0]}`") else: print(f"Failed to send message to topic {pu_topic[0]}") if __name__ == '__main__': client = connect_mqtt() client.username_pw_set('admin','public') client.on_message = on_message # 重寫回調(diào)函數(shù),可以在方法體里,根據(jù)需求改收到信息后干什么 # 多線程,這里啟動一個發(fā)布任務(wù)的線程,模擬有一個發(fā)布消息的客戶端 thread1 = Thread(target=publish, args=(client,)) # --*如要要實(shí)現(xiàn)流水線式的收發(fā),則可以將線程內(nèi)方法改為任務(wù)隊列處理,用一個全局變量管理任務(wù)執(zhí)行,執(zhí)行后再publish thread1.start() client.loop_forever() # 這個要加,監(jiān)聽(on_message)與loop_forever配套
關(guān)鍵配置說明
訂閱所有傳感器子主題
client.subscribe("sensors/#", qos=1) ?
WebSocket協(xié)議支持
- 必須設(shè)置
transport="websockets"
,否則默認(rèn)使用TCP連接。 - 端口需與Broker配置匹配(例如EMQX默認(rèn)WebSocket端口為8083)。
QoS與消息保留
qos=1
:確保消息至少送達(dá)一次。retain=True
:Broker保留最后一條消息,新訂閱者立即收到。
異常處理
- 使用
try-except
捕獲網(wǎng)絡(luò)中斷或Broker不可用問題。 loop_start()
適用于需要異步處理的場景(如GUI應(yīng)用)。
4. 關(guān)鍵功能擴(kuò)展
異步通信,使用多線程實(shí)現(xiàn)非阻塞操作:
from threading import Thread def start_async(client): Thread(target=client.loop_start).start()
服務(wù)質(zhì)量(QoS)設(shè)置 支持至多一次(0)、至少一次(1)、恰好一次(2)三種級別:
client.publish(topic, payload, qos=1) # 設(shè)置QoS為1
5. 注意事項(xiàng)
- 連接安全性:若需SSL加密,可使用
client.tls_set()
方法。 - 主題設(shè)計:遵循分層結(jié)構(gòu)(如
sensor/room1/temp
),支持通配符+
(單層)和#
(多層)。 - 異常處理:添加
on_disconnect
回調(diào)函數(shù)實(shí)現(xiàn)斷線重連邏輯。
完整示例流程
- 啟動EMQX Broker服務(wù)。
- 運(yùn)行訂閱者代碼監(jiān)聽主題
home/sensor/temp
。 - 運(yùn)行發(fā)布者代碼發(fā)送溫度數(shù)據(jù)。
- 在EMQX管理界面驗(yàn)證消息收發(fā)狀態(tài)
到此這篇關(guān)于Python實(shí)現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關(guān)Python MQTT通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python中常見的幾種音頻數(shù)據(jù)讀取、保存方式總結(jié)
Python是一種非常適合進(jìn)行音頻處理和音頻分析的語言,因?yàn)樗性S多強(qiáng)大的庫可以使用,下面這篇文章主要給大家介紹了關(guān)于python中常見的幾種音頻數(shù)據(jù)讀取、保存方式,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06Python實(shí)現(xiàn)繪制圣誕樹和煙花的示例代碼
這不是圣誕節(jié)快到了,還不用Python繪制個圣誕樹和煙花讓女朋友開心開心,也算是親手做的,稍稍花了點(diǎn)心思,學(xué)會了趕緊畫給你的那個她吧2022-12-12tensorflow1.0學(xué)習(xí)之模型的保存與恢復(fù)(Saver)
這篇文章主要介紹了tensorflow1.0學(xué)習(xí)之模型的保存與恢復(fù)(Saver) ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04Python基于matplotlib繪制棧式直方圖的方法示例
這篇文章主要介紹了Python基于matplotlib繪制棧式直方圖的方法,涉及Python使用matplotlib進(jìn)行圖形繪制的相關(guān)操作技巧,需要的朋友可以參考下2017-08-08numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete
這篇文章主要介紹了numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02聊聊通過celery_one避免Celery定時任務(wù)重復(fù)執(zhí)行的問題
Celery Once 也是利用 Redis 加鎖來實(shí)現(xiàn), Celery Once 在 Task 類基礎(chǔ)上實(shí)現(xiàn)了 QueueOnce 類,該類提供了任務(wù)去重的功能,今天通過本文給大家介紹通過celery_one避免Celery定時任務(wù)重復(fù)執(zhí)行的問題,感興趣的朋友一起看看吧2021-10-10python獲取的html中都是\\u003e實(shí)現(xiàn)轉(zhuǎn)成正確字符
這篇文章主要介紹了python獲取的html中都是\\u003e實(shí)現(xiàn)轉(zhuǎn)成正確字符方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07