欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Python與MQTT實現(xiàn)異步通信功能

 更新時間:2024年12月19日 17:21:12   作者:小噔小咚什么東東  
物聯(lián)網(IoT)和實時通信的世界中,消息隊列遙測傳輸(MQTT)協(xié)議因其輕量級、可靠性和實時性成為廣受歡迎的選擇,本文給大家介紹了使用Python與MQTT實現(xiàn)異步通信功能,需要的朋友可以參考下

什么是MQTT協(xié)議?

MQTT是一種輕量級的發(fā)布/訂閱消息傳輸協(xié)議,設計用于低帶寬和高延遲的網絡環(huán)境,非常適合物聯(lián)網設備之間的通信。其主要特點包括:

  • 發(fā)布/訂閱模型:支持多對多的消息傳遞。
  • 輕量級設計:較低的網絡開銷。
  • 支持QoS等級:提供不同的消息傳遞可靠性。

項目背景

本文的示例代碼實現(xiàn)了一個基于Python的MQTT客戶端。以下功能涵蓋在代碼中:

  • 通過SSL安全連接到MQTT代理。
  • 支持動態(tài)訂閱多個主題。
  • 異步處理消息,提高性能和擴展性。
  • 提供自定義消息處理功能。

核心代碼解析

以下是代碼中的主要功能與模塊解析:

MQTT 客戶端類

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        self.client = mqtt.Client()
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

  • tls_set:啟用SSL/TLS以確保通信安全。

  • 主題訂閱:在連接成功時,自動訂閱指定的主題。

自定義消息處理

def set_message_handler(self, handler):
    self.custom_message_handler = handler

用戶可通過該方法傳入自定義的回調函數,從而根據業(yè)務邏輯處理消息。

異步啟動客戶端

async def start_async(self):
    self.connect()
    await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)

通過異步事件循環(huán)確保消息的高效處理,同時避免阻塞主線程。

示例代碼集成

在主文件main.py中,定義了如下流程:

  • 初始化MQTT客戶端并傳入必要的參數。
  • 注冊一個自定義的消息處理函數。
  • 利用asyncio實現(xiàn)消息處理和其他任務的并發(fā)執(zhí)行。
async def on_mqtt_message(topic, payload):
    print(f"Custom handler: {topic} -> {payload}")

mqtt_client.set_message_handler(on_mqtt_message)
await mqtt_client.start_async()

使用指南

安裝依賴

確保安裝了paho-mqtt庫:

pip install paho-mqtt

配置MQTT代理

更新代碼中的代理地址、端口、用戶名、密碼和證書路徑。

運行程序

使用以下命令運行程序:

python main.py

總結

快速搭建一個基于MQTT協(xié)議的實時通信系統(tǒng)。這種架構不僅適用于物聯(lián)網場景,也可以在各種需要實時數據推送的應用中發(fā)揮作用,例如聊天應用和實時監(jiān)控系統(tǒng)。

示例代碼

mqtt.py

import paho.mqtt.client as mqtt
from datetime import datetime
import asyncio

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        """
        初始化 MQTT 客戶端
        """
        self.broker = broker
        self.port = port
        self.username = username
        self.password = password
        self.ca_cert = ca_cert
        self.topics = topics
        self.client = mqtt.Client()

        # 配置 MQTT 客戶端
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.custom_message_handler = None  # 自定義消息處理器

    def set_message_handler(self, handler):
        """
        設置自定義消息處理回調函數
        """
        self.custom_message_handler = handler

    def on_connect(self, client, userdata, flags, rc):
        """
        連接成功時的回調
        """
        if rc == 0:
            print("SSL連接成功")
            for topic in self.topics:
                client.subscribe(topic)
                print(f"已訂閱主題: {topic}")
        else:
            print(f"連接失敗,返回碼: {rc}")

    def on_message(self, client, userdata, msg):
        """
        收到消息時的回調
        """
        current_time = datetime.now()
        payload = msg.payload.decode()
        print(f"收到消息: {msg.topic} -> {payload} 時間: {current_time}")

        if self.custom_message_handler and self.event_loop:
            asyncio.run_coroutine_threadsafe(
                self.custom_message_handler(msg.topic, payload),
                self.event_loop
            )

    def connect(self):
        """
        連接到 MQTT 服務器
        """
        self.client.connect(self.broker, self.port, keepalive=60)

    async def start_async(self):
        """
        異步運行 MQTT 客戶端
        """
        self.connect()  # 確保連接到 MQTT 服務器
        print("Starting MQTT client loop...")

        # 異步運行 MQTT 客戶端的事件循環(huán)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.client.loop_forever)

main.py

import asyncio
from mqtt import MQTTClient

# MQTT 配置
MQTT_BROKER = "你的服務器地址"
MQTT_PORT = 8883  # 使用 SSL 的端口
MQTT_USERNAME = "用戶名"
MQTT_PASSWORD = "密碼"
CA_CERT = "./emqxsl-ca.crt"  # CA 證書路徑
TOPICS = ["clients/disconnect", "uhome/esp32"]  # 訂閱的主題列表


async def main():
    loop = asyncio.get_running_loop()

    mqtt_client = MQTTClient(
        broker=MQTT_BROKER,
        port=MQTT_PORT,
        username=MQTT_USERNAME,
        password=MQTT_PASSWORD,
        ca_cert=CA_CERT,
        topics=TOPICS
    )

    async def on_mqtt_message(topic, payload):
        print(f"Custom handler: {topic} -> {payload}")

    mqtt_client.set_message_handler(on_mqtt_message)
    mqtt_client.event_loop = loop  # 將事件循環(huán)傳遞給 MQTT 客戶端
    await mqtt_client.start_async()


    await asyncio.gather(websocket_task, periodic_task)


if __name__ == "__main__":
    asyncio.run(main())

到此這篇關于使用Python與MQTT實現(xiàn)異步通信功能的文章就介紹到這了,更多相關Python MQTT異步通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 如何快速一次性卸載所有python包(第三方庫)呢

    如何快速一次性卸載所有python包(第三方庫)呢

    這篇文章主要介紹了如何快速一次性卸載所有python包(第三方庫)呢,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-10-10
  • Python MongoDB 插入數據時已存在則不執(zhí)行,不存在則插入的解決方法

    Python MongoDB 插入數據時已存在則不執(zhí)行,不存在則插入的解決方法

    這篇文章主要介紹了Python MongoDB 插入數據時已存在則不執(zhí)行,不存在則插入的解決方法,結合實例形式分析了Python基于日志判斷數據是否已經插入的相關操作技巧,需要的朋友可以參考下
    2019-09-09
  • 關于python中密碼加鹽的學習體會小結

    關于python中密碼加鹽的學習體會小結

    這篇文章主要介紹了關于python中密碼加鹽的學習體會小結,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-07-07
  • python之singledispatch單分派問題

    python之singledispatch單分派問題

    這篇文章主要介紹了python之singledispatch單分派問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • 用python + openpyxl處理excel2007文檔思路以及心得

    用python + openpyxl處理excel2007文檔思路以及心得

    最近要幫做RA的老姐寫個合并excel工作表的腳本……源數據是4000+個excel 工作表,分布在9個xlsm文件里,文件內容是中英文混雜的一些數據,需要從每張表中提取需要的部分,分門別類合并到多個大的表里。
    2014-07-07
  • 發(fā)工資啦!教你用Python實現(xiàn)郵箱自動群發(fā)工資條

    發(fā)工資啦!教你用Python實現(xiàn)郵箱自動群發(fā)工資條

    這篇文章主要介紹了發(fā)工資啦!教你用Python實現(xiàn)郵箱自動群發(fā)工資條,文中有非常詳細的代碼示例,對正在學習python的小伙伴們有很好地幫助,需要的朋友可以參考下
    2021-05-05
  • Python用5行代碼寫一個自定義簡單二維碼

    Python用5行代碼寫一個自定義簡單二維碼

    今天小編就為大家分享一篇關于Python用5行代碼寫一個自定義簡單二維碼的文章,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-10-10
  • Python通過30秒就能學會的漂亮短程序代碼(過程全解)

    Python通過30秒就能學會的漂亮短程序代碼(過程全解)

    這篇文章主要介紹了Python之30秒就能學會的漂亮短程序代碼,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-10-10
  • Tesserocr庫的正確安裝方式

    Tesserocr庫的正確安裝方式

    今天小編就為大家分享一篇關于Tesserocr庫的正確安裝方式,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-10-10
  • Python之Pygame的Draw繪圖

    Python之Pygame的Draw繪圖

    Pygame 中提供了一個draw模塊用來繪制一些簡單的圖形狀,比如矩形、多邊形、圓形、直線、弧線等。本文主要介紹Pygame中的Draw繪圖,感興趣的同學可以參考閱讀
    2023-04-04

最新評論