使用Python與MQTT實現(xiàn)異步通信功能
什么是MQTT協(xié)議?
MQTT是一種輕量級的發(fā)布/訂閱消息傳輸協(xié)議,設計用于低帶寬和高延遲的網絡環(huán)境,非常適合物聯(lián)網設備之間的通信。其主要特點包括:
- 發(fā)布/訂閱模型:支持多對多的消息傳遞。
- 輕量級設計:較低的網絡開銷。
- 支持QoS等級:提供不同的消息傳遞可靠性。
項目背景
本文的示例代碼實現(xiàn)了一個基于Python的MQTT客戶端。以下功能涵蓋在代碼中:
- 通過SSL安全連接到MQTT代理。
- 支持動態(tài)訂閱多個主題。
- 異步處理消息,提高性能和擴展性。
- 提供自定義消息處理功能。
核心代碼解析
以下是代碼中的主要功能與模塊解析:
MQTT 客戶端類
class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): self.client = mqtt.Client() self.client.username_pw_set(self.username, self.password) self.client.tls_set(ca_certs=self.ca_cert) self.client.on_connect = self.on_connect self.client.on_message = self.on_message
tls_set
:啟用SSL/TLS以確保通信安全。主題訂閱:在連接成功時,自動訂閱指定的主題。
自定義消息處理
def set_message_handler(self, handler): self.custom_message_handler = handler
用戶可通過該方法傳入自定義的回調函數,從而根據業(yè)務邏輯處理消息。
異步啟動客戶端
async def start_async(self): self.connect() await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)
通過異步事件循環(huán)確保消息的高效處理,同時避免阻塞主線程。
示例代碼集成
在主文件main.py
中,定義了如下流程:
- 初始化MQTT客戶端并傳入必要的參數。
- 注冊一個自定義的消息處理函數。
- 利用
asyncio
實現(xiàn)消息處理和其他任務的并發(fā)執(zhí)行。
async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}") mqtt_client.set_message_handler(on_mqtt_message) await mqtt_client.start_async()
使用指南
安裝依賴
確保安裝了paho-mqtt
庫:
pip install paho-mqtt
配置MQTT代理
更新代碼中的代理地址、端口、用戶名、密碼和證書路徑。
運行程序
使用以下命令運行程序:
python main.py
總結
快速搭建一個基于MQTT協(xié)議的實時通信系統(tǒng)。這種架構不僅適用于物聯(lián)網場景,也可以在各種需要實時數據推送的應用中發(fā)揮作用,例如聊天應用和實時監(jiān)控系統(tǒng)。
示例代碼
mqtt.py
import paho.mqtt.client as mqtt from datetime import datetime import asyncio class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): """ 初始化 MQTT 客戶端 """ self.broker = broker self.port = port self.username = username self.password = password self.ca_cert = ca_cert self.topics = topics self.client = mqtt.Client() # 配置 MQTT 客戶端 self.client.username_pw_set(self.username, self.password) self.client.tls_set(ca_certs=self.ca_cert) self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.custom_message_handler = None # 自定義消息處理器 def set_message_handler(self, handler): """ 設置自定義消息處理回調函數 """ self.custom_message_handler = handler def on_connect(self, client, userdata, flags, rc): """ 連接成功時的回調 """ if rc == 0: print("SSL連接成功") for topic in self.topics: client.subscribe(topic) print(f"已訂閱主題: {topic}") else: print(f"連接失敗,返回碼: {rc}") def on_message(self, client, userdata, msg): """ 收到消息時的回調 """ current_time = datetime.now() payload = msg.payload.decode() print(f"收到消息: {msg.topic} -> {payload} 時間: {current_time}") if self.custom_message_handler and self.event_loop: asyncio.run_coroutine_threadsafe( self.custom_message_handler(msg.topic, payload), self.event_loop ) def connect(self): """ 連接到 MQTT 服務器 """ self.client.connect(self.broker, self.port, keepalive=60) async def start_async(self): """ 異步運行 MQTT 客戶端 """ self.connect() # 確保連接到 MQTT 服務器 print("Starting MQTT client loop...") # 異步運行 MQTT 客戶端的事件循環(huán) loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.client.loop_forever)
main.py
import asyncio from mqtt import MQTTClient # MQTT 配置 MQTT_BROKER = "你的服務器地址" MQTT_PORT = 8883 # 使用 SSL 的端口 MQTT_USERNAME = "用戶名" MQTT_PASSWORD = "密碼" CA_CERT = "./emqxsl-ca.crt" # CA 證書路徑 TOPICS = ["clients/disconnect", "uhome/esp32"] # 訂閱的主題列表 async def main(): loop = asyncio.get_running_loop() mqtt_client = MQTTClient( broker=MQTT_BROKER, port=MQTT_PORT, username=MQTT_USERNAME, password=MQTT_PASSWORD, ca_cert=CA_CERT, topics=TOPICS ) async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}") mqtt_client.set_message_handler(on_mqtt_message) mqtt_client.event_loop = loop # 將事件循環(huán)傳遞給 MQTT 客戶端 await mqtt_client.start_async() await asyncio.gather(websocket_task, periodic_task) if __name__ == "__main__": asyncio.run(main())
到此這篇關于使用Python與MQTT實現(xiàn)異步通信功能的文章就介紹到這了,更多相關Python MQTT異步通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python MongoDB 插入數據時已存在則不執(zhí)行,不存在則插入的解決方法
這篇文章主要介紹了Python MongoDB 插入數據時已存在則不執(zhí)行,不存在則插入的解決方法,結合實例形式分析了Python基于日志判斷數據是否已經插入的相關操作技巧,需要的朋友可以參考下2019-09-09用python + openpyxl處理excel2007文檔思路以及心得
最近要幫做RA的老姐寫個合并excel工作表的腳本……源數據是4000+個excel 工作表,分布在9個xlsm文件里,文件內容是中英文混雜的一些數據,需要從每張表中提取需要的部分,分門別類合并到多個大的表里。2014-07-07發(fā)工資啦!教你用Python實現(xiàn)郵箱自動群發(fā)工資條
這篇文章主要介紹了發(fā)工資啦!教你用Python實現(xiàn)郵箱自動群發(fā)工資條,文中有非常詳細的代碼示例,對正在學習python的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05