欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Python與MQTT實(shí)現(xiàn)異步通信功能

 更新時(shí)間:2024年12月19日 17:21:12   作者:小噔小咚什么東東  
物聯(lián)網(wǎng)(IoT)和實(shí)時(shí)通信的世界中,消息隊(duì)列遙測(cè)傳輸(MQTT)協(xié)議因其輕量級(jí)、可靠性和實(shí)時(shí)性成為廣受歡迎的選擇,本文給大家介紹了使用Python與MQTT實(shí)現(xiàn)異步通信功能,需要的朋友可以參考下

什么是MQTT協(xié)議?

MQTT是一種輕量級(jí)的發(fā)布/訂閱消息傳輸協(xié)議,設(shè)計(jì)用于低帶寬和高延遲的網(wǎng)絡(luò)環(huán)境,非常適合物聯(lián)網(wǎng)設(shè)備之間的通信。其主要特點(diǎn)包括:

  • 發(fā)布/訂閱模型:支持多對(duì)多的消息傳遞。
  • 輕量級(jí)設(shè)計(jì):較低的網(wǎng)絡(luò)開(kāi)銷。
  • 支持QoS等級(jí):提供不同的消息傳遞可靠性。

項(xiàng)目背景

本文的示例代碼實(shí)現(xiàn)了一個(gè)基于Python的MQTT客戶端。以下功能涵蓋在代碼中:

  • 通過(guò)SSL安全連接到MQTT代理。
  • 支持動(dòng)態(tài)訂閱多個(gè)主題。
  • 異步處理消息,提高性能和擴(kuò)展性。
  • 提供自定義消息處理功能。

核心代碼解析

以下是代碼中的主要功能與模塊解析:

MQTT 客戶端類

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        self.client = mqtt.Client()
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

  • tls_set:?jiǎn)⒂肧SL/TLS以確保通信安全。

  • 主題訂閱:在連接成功時(shí),自動(dòng)訂閱指定的主題。

自定義消息處理

def set_message_handler(self, handler):
    self.custom_message_handler = handler

用戶可通過(guò)該方法傳入自定義的回調(diào)函數(shù),從而根據(jù)業(yè)務(wù)邏輯處理消息。

異步啟動(dòng)客戶端

async def start_async(self):
    self.connect()
    await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)

通過(guò)異步事件循環(huán)確保消息的高效處理,同時(shí)避免阻塞主線程。

示例代碼集成

在主文件main.py中,定義了如下流程:

  • 初始化MQTT客戶端并傳入必要的參數(shù)。
  • 注冊(cè)一個(gè)自定義的消息處理函數(shù)。
  • 利用asyncio實(shí)現(xiàn)消息處理和其他任務(wù)的并發(fā)執(zhí)行。
async def on_mqtt_message(topic, payload):
    print(f"Custom handler: {topic} -> {payload}")

mqtt_client.set_message_handler(on_mqtt_message)
await mqtt_client.start_async()

使用指南

安裝依賴

確保安裝了paho-mqtt庫(kù):

pip install paho-mqtt

配置MQTT代理

更新代碼中的代理地址、端口、用戶名、密碼和證書(shū)路徑。

運(yùn)行程序

使用以下命令運(yùn)行程序:

python main.py

總結(jié)

快速搭建一個(gè)基于MQTT協(xié)議的實(shí)時(shí)通信系統(tǒng)。這種架構(gòu)不僅適用于物聯(lián)網(wǎng)場(chǎng)景,也可以在各種需要實(shí)時(shí)數(shù)據(jù)推送的應(yīng)用中發(fā)揮作用,例如聊天應(yīng)用和實(shí)時(shí)監(jiān)控系統(tǒng)。

示例代碼

mqtt.py

import paho.mqtt.client as mqtt
from datetime import datetime
import asyncio

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        """
        初始化 MQTT 客戶端
        """
        self.broker = broker
        self.port = port
        self.username = username
        self.password = password
        self.ca_cert = ca_cert
        self.topics = topics
        self.client = mqtt.Client()

        # 配置 MQTT 客戶端
        self.client.username_pw_set(self.username, self.password)
        self.client.tls_set(ca_certs=self.ca_cert)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.custom_message_handler = None  # 自定義消息處理器

    def set_message_handler(self, handler):
        """
        設(shè)置自定義消息處理回調(diào)函數(shù)
        """
        self.custom_message_handler = handler

    def on_connect(self, client, userdata, flags, rc):
        """
        連接成功時(shí)的回調(diào)
        """
        if rc == 0:
            print("SSL連接成功")
            for topic in self.topics:
                client.subscribe(topic)
                print(f"已訂閱主題: {topic}")
        else:
            print(f"連接失敗,返回碼: {rc}")

    def on_message(self, client, userdata, msg):
        """
        收到消息時(shí)的回調(diào)
        """
        current_time = datetime.now()
        payload = msg.payload.decode()
        print(f"收到消息: {msg.topic} -> {payload} 時(shí)間: {current_time}")

        if self.custom_message_handler and self.event_loop:
            asyncio.run_coroutine_threadsafe(
                self.custom_message_handler(msg.topic, payload),
                self.event_loop
            )

    def connect(self):
        """
        連接到 MQTT 服務(wù)器
        """
        self.client.connect(self.broker, self.port, keepalive=60)

    async def start_async(self):
        """
        異步運(yùn)行 MQTT 客戶端
        """
        self.connect()  # 確保連接到 MQTT 服務(wù)器
        print("Starting MQTT client loop...")

        # 異步運(yùn)行 MQTT 客戶端的事件循環(huán)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.client.loop_forever)

main.py

import asyncio
from mqtt import MQTTClient

# MQTT 配置
MQTT_BROKER = "你的服務(wù)器地址"
MQTT_PORT = 8883  # 使用 SSL 的端口
MQTT_USERNAME = "用戶名"
MQTT_PASSWORD = "密碼"
CA_CERT = "./emqxsl-ca.crt"  # CA 證書(shū)路徑
TOPICS = ["clients/disconnect", "uhome/esp32"]  # 訂閱的主題列表


async def main():
    loop = asyncio.get_running_loop()

    mqtt_client = MQTTClient(
        broker=MQTT_BROKER,
        port=MQTT_PORT,
        username=MQTT_USERNAME,
        password=MQTT_PASSWORD,
        ca_cert=CA_CERT,
        topics=TOPICS
    )

    async def on_mqtt_message(topic, payload):
        print(f"Custom handler: {topic} -> {payload}")

    mqtt_client.set_message_handler(on_mqtt_message)
    mqtt_client.event_loop = loop  # 將事件循環(huán)傳遞給 MQTT 客戶端
    await mqtt_client.start_async()


    await asyncio.gather(websocket_task, periodic_task)


if __name__ == "__main__":
    asyncio.run(main())

到此這篇關(guān)于使用Python與MQTT實(shí)現(xiàn)異步通信功能的文章就介紹到這了,更多相關(guān)Python MQTT異步通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python 算法題——快樂(lè)數(shù)的多種解法

    python 算法題——快樂(lè)數(shù)的多種解法

    看書(shū),看視頻都可以幫助你學(xué)習(xí)代碼,但都只是輔助作用,學(xué)好 Python,最重要的還是 多敲代碼,多刷題。本文講述算法題快樂(lè)數(shù)的多種解法,幫你打開(kāi)思路
    2021-05-05
  • Appium自動(dòng)化測(cè)試中獲取Toast信息操作

    Appium自動(dòng)化測(cè)試中獲取Toast信息操作

    本文主要介紹了Appium自動(dòng)化測(cè)試中獲取Toast信息操作,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • python之當(dāng)你發(fā)現(xiàn)QTimer不能用時(shí)的解決方法

    python之當(dāng)你發(fā)現(xiàn)QTimer不能用時(shí)的解決方法

    今天小編就為大家分享一篇python之當(dāng)你發(fā)現(xiàn)QTimer不能用時(shí)的解決方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • python中常用的內(nèi)置模塊匯總

    python中常用的內(nèi)置模塊匯總

    Python內(nèi)置的模塊有很多,我們也已經(jīng)接觸了不少相關(guān)模塊,接下來(lái)咱們就來(lái)做一些匯總和介紹,在此我會(huì)整理出項(xiàng)目開(kāi)發(fā)最常用的來(lái)進(jìn)行講解,感興趣的朋友跟隨小編一起看看吧
    2022-01-01
  • Python自動(dòng)化神器Playwright的用法詳解

    Python自動(dòng)化神器Playwright的用法詳解

    python Playwright 是一個(gè) Python 庫(kù),它提供了一個(gè)高級(jí) API,用于自動(dòng)化 Web 瀏覽器,它支持 chrome、firefox 和 webkit 瀏覽器,并提供了一種簡(jiǎn)單易用的方法來(lái)模擬用戶在瀏覽器中的行為,本文小編將詳細(xì)的給大家介紹一下Python自動(dòng)化神器Playwright的用法
    2025-04-04
  • python 基于AioHttp 異步抓取火星圖片

    python 基于AioHttp 異步抓取火星圖片

    這篇文章主要介紹了python 基于AioHttp 異步抓取火星圖片的方法,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下
    2021-03-03
  • python處理DICOM并計(jì)算三維模型體積

    python處理DICOM并計(jì)算三維模型體積

    這篇文章主要為大家詳細(xì)介紹了python處理DICOM,并計(jì)算三維模型體積,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-02-02
  • python內(nèi)置模塊datetime.timedelta計(jì)算時(shí)間間隔示例代碼

    python內(nèi)置模塊datetime.timedelta計(jì)算時(shí)間間隔示例代碼

    Python的datetime模塊提供了處理日期和時(shí)間的功能,包括datetime.date、datetime.time、datetime.datetime、datetime.timedelta等類,timedelta用于表示時(shí)間間隔,支持日期和時(shí)間的加減運(yùn)算,文中給出了詳細(xì)的代碼示例,需要的朋友可以參考下
    2025-04-04
  • Python實(shí)現(xiàn)數(shù)據(jù)的序列化操作詳解

    Python實(shí)現(xiàn)數(shù)據(jù)的序列化操作詳解

    在日常開(kāi)發(fā)中,對(duì)數(shù)據(jù)進(jìn)行序列化和反序列化是常見(jiàn)的數(shù)據(jù)操作,Python提供了兩個(gè)模塊方便開(kāi)發(fā)者實(shí)現(xiàn)數(shù)據(jù)的序列化操作,即?json?模塊和?pickle?模塊。本文就為大家詳細(xì)講解這兩個(gè)模塊的使用,需要的可以參考一下
    2022-07-07
  • python的random模塊及加權(quán)隨機(jī)算法的python實(shí)現(xiàn)方法

    python的random模塊及加權(quán)隨機(jī)算法的python實(shí)現(xiàn)方法

    下面小編就為大家?guī)?lái)一篇python的random模塊及加權(quán)隨機(jī)算法的python實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-01-01

最新評(píng)論