欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中使用matplotlib繪制mqtt數(shù)據(jù)實時圖像功能

 更新時間:2021年09月09日 16:35:04   作者:A_Coding_man  
這篇文章主要介紹了Python中使用matplotlib繪制mqtt數(shù)據(jù)實時圖像,本代碼中publish是一個死循環(huán),數(shù)據(jù)一直往外發(fā)送,詳細代碼跟隨小編一起通過本文學(xué)習(xí)下吧

效果圖

請?zhí)砑訄D片描述

mqtt發(fā)布

本代碼中publish是一個死循環(huán),數(shù)據(jù)一直往外發(fā)送。

import random
import time
from paho.mqtt import client as mqtt_client
import json
from datetime import datetime

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
client_id = f'python-mqtt-{random.randint(0, 1000)}'  # 隨機生成客戶端id


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    while True:
        time.sleep(0.01)
        msg = json.dumps({"MAC": "0123456789",
                          "samplerate": 12,
                          "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),
                          "battery": 0.5,
                          "acc": [
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                          ]})
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

mqtt訂閱

from paho.mqtt import client as mqtt_client
import time
import os

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"

def connect_mqtt(client_id):
    """    MQTT 連接函數(shù)。    """
    def on_connect(client, userdata, flags, rc):
        """
        連接回調(diào)函數(shù)
        在客戶端連接后被調(diào)用,在該函數(shù)中可以依據(jù) rc 來判斷客戶端是否連接成功。
        """
        if rc == 0:
            print("Connected to MQTT Broker! return code %d" % rc)
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set('uname', 'upwd')  # 鏈接mqtt所需的用戶名和密碼,沒有可不寫
    client.on_connect = on_connect
    client.connect(broker , port)
    return client


def subscribe(client: mqtt_client, a_topic):
    """     訂閱消息       """
    def on_message(client, userdata, msg):
        """
        消息回調(diào)函數(shù)
        在客戶端從 MQTT Broker 收到消息后被調(diào)用,在該函數(shù)中我們將打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。
         * 這里可添加自定義數(shù)據(jù)處理程序
        """
        print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))

    client.subscribe(topic)
    client.on_message = on_message


def run(client_id, topic):
    client = connect_mqtt(client_id)
    subscribe(client, topic)
    client.loop_forever()

if __name__ == '__main__':
    run('test_eartag-003-python-li', 'zk100/gw/#')

matplotlib繪制動態(tài)圖

import matplotlib.pyplot as plt
import numpy as np

count = 100  # 圖中最多數(shù)據(jù)量

ax = list(range(count))  # 保存圖1數(shù)據(jù)
ay = [0] * 100
bx = list(range(count))  # 保存圖2數(shù)據(jù)
by = [0] * 100
num = count  # 計數(shù)

plt.ion()  # 開啟一個畫圖的窗口進入交互模式,用于實時更新數(shù)據(jù)
plt.rcParams['figure.figsize'] = (10, 10)  # 圖像顯示大小
plt.rcParams['font.sans-serif'] = ['SimHei']  # 防止中文標簽亂碼,還有通過導(dǎo)入字體文件的方法
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5  # 設(shè)置曲線線條寬度
plt.tight_layout()
while True:
    plt.clf()  # 清除刷新前的圖表,防止數(shù)據(jù)量過大消耗內(nèi)存
    plt.suptitle("總標題", fontsize=30)  # 添加總標題,并設(shè)置文字大小
    g1 = np.random.random()  # 生成隨機數(shù)畫圖
    # 圖表1
    ax.append(num)  # 追加x坐標值
    ay.append(g1)  # 追加y坐標值
    agraphic = plt.subplot(2, 1, 1)
    agraphic.set_title('子圖表標題1')  # 添加子標題
    agraphic.set_xlabel('x軸', fontsize=10)  # 添加軸標簽
    agraphic.set_ylabel('y軸', fontsize=20)
    plt.plot(ax[-count:], ay[-count:], 'g-')  # 等于agraghic.plot(ax,ay,'g-')
    # 圖表2
    bx.append(num)
    by.append(g1)
    bgraghic = plt.subplot(2, 1, 2)
    bgraghic.set_title('子圖表標題2')
    bgraghic.plot(bx[-count:], by[-count:], 'r^')

    plt.pause(0.001)  # 設(shè)置暫停時間,太快圖表無法正常顯示
    num = num + 1

matplotlib繪制mqtt數(shù)據(jù)實時圖像

  • 單線程

先啟動mqtt訂閱服務(wù)
mqtt訂閱中有阻塞,更新數(shù)據(jù)后因訂閱服務(wù)沒有結(jié)束,導(dǎo)致繪圖程序無法繪圖
先啟動繪圖程序
繪圖程序本身也是個循環(huán),拿不到mqtt的實時數(shù)據(jù),圖像無法更新

  • 兩個服務(wù)加入?yún)f(xié)程,也不行。具體原因還不知道,容后補充。
  • mqtt作為線程啟動,可解決上述問題
import json
import random
from paho.mqtt import client as mqtt_client
import time
import datetime
from math import ceil, floor
import matplotlib.pyplot as plt
import _thread

# 公共變量
broker = 'broker.emqx.io'
topic = "/python/mqtt/li"
port = 1883
client_id = f'python-mqtt-li-{random.randint(0, 100)}'

show_num = 300

x_num = [-1]  # 計數(shù)
acc1 = []
acc2 = []
acc3 = []
acc4 = []
acc5 = []
acc6 = []
stime = []


"""mqtt subscribe topic"""
def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime):
    """將字符串型【毫秒級】格式化時間 轉(zhuǎn)為 【13位】整型時間戳"""
    datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f")
    obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0
    return obj_stamp


def int2datetime(int_float_timestamp):
    """
    有小數(shù)點:分離小數(shù)點,整數(shù)轉(zhuǎn)為格式化時間,小數(shù)點直接跟在后面
    無小數(shù)點:從第10位進行分離,
    所以本函數(shù)只適用于時間戳整數(shù)位數(shù)大于9且小于11.
    """
    if '.' in str(int_float_timestamp):
        int_float = str(int_float_timestamp).split('.')
        date = time.localtime(int(int_float[0]))
        tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date)
        secondafter = '.' + str(int_float[1])
        return str(tempDate) + secondafter


def parse_mqttmsg(msg):
    """解析mqt頭數(shù)據(jù)   MAC samplerate sampletime battery acc"""
    content = json.loads(msg.payload.decode())
    span = 1000 / content['samplerate'] * 10
    time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000]
    sampletime = content['sampletime']
    sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime)
    acc = content['acc']
    for i in range(len(acc)):
        x_num.append(x_num[-1] + 1)
        acc1.append(acc[i][0])
        acc2.append(acc[i][1])
        acc3.append(acc[i][2])
        acc4.append(acc[i][3])
        acc5.append(acc[i][4])
        acc6.append(acc[i][5])
        if i != 0:
            sampletime_int += time_span[i % 2]
            stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000))
        else:
            stime.append(sampletime)
        print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
            pass

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        parse_mqttmsg(msg)

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


""" draw figures """
def draw_figure():
    plt.ion()  # 開啟一個畫圖的窗口進入交互模式,用于實時更新數(shù)據(jù)
    plt.rcParams['figure.figsize'] = (10, 10)  # 圖像顯示大小
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 防止中文標簽亂碼,還有通過導(dǎo)入字體文件的方法
    plt.rcParams['axes.unicode_minus'] = False
    plt.rcParams['lines.linewidth'] = 0.5  # 設(shè)置曲線線條寬度


    count = 0
    while True:
        plt.clf()  # 清除刷新前的圖表,防止數(shù)據(jù)量過大消耗內(nèi)存
        plt.suptitle("總標題", fontsize=30)  # 添加總標題,并設(shè)置文字大小
        plt.tight_layout()

        # 圖表1
        agraphic = plt.subplot(2, 1, 1)
        agraphic.set_title('子圖表標題1')  # 添加子標題
        agraphic.set_xlabel('x軸', fontsize=10)  # 添加軸標簽
        agraphic.set_ylabel('y軸', fontsize=20)
        plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-')
        try:
            xtricks = list(range(len(acc1) - show_num, len(acc1), 10))  # **1**
            xlabels = [stime[i] for i in xtricks]  # **2**
            plt.xticks(xtricks, xlabels, rotation=15)
        except:
            pass

        # 圖表2
        bgraghic = plt.subplot(2, 1, 2)
        bgraghic.set_title('子圖表標題2')
        bgraghic.set_xlabel('x軸', fontsize=10)  # 添加軸標簽
        bgraghic.set_ylabel('y軸', fontsize=20)
        bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^')

        plt.pause(0.001)  # 設(shè)置暫停時間,太快圖表無法正常顯示
        count = count + 1


if __name__ == '__main__':
    # 多線程
    _thread.start_new_thread(run, ())
    draw_figure()

到此這篇關(guān)于Python中使用matplotlib繪制mqtt數(shù)據(jù)實時圖像的文章就介紹到這了,更多相關(guān)matplotlib繪制mqtt數(shù)據(jù)實時圖像內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解Python中with語句的用法

    詳解Python中with語句的用法

    這篇文章主要介紹了Python中with語句的用法,with語句的使用是Python學(xué)習(xí)過程當(dāng)中的基礎(chǔ)知識,本文來自于IBM官方技術(shù)文檔,需要的朋友可以參考下
    2015-04-04
  • python命令行參數(shù)解析OptionParser類用法實例

    python命令行參數(shù)解析OptionParser類用法實例

    這篇文章主要介紹了python命令行參數(shù)解析OptionParser類用法實例,需要的朋友可以參考下
    2014-10-10
  • 使用tensorflow框架在Colab上跑通貓狗識別代碼

    使用tensorflow框架在Colab上跑通貓狗識別代碼

    這篇文章主要介紹了使用tensorflow框架在Colab上跑通貓狗識別代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • 10張動圖學(xué)會python循環(huán)與遞歸問題

    10張動圖學(xué)會python循環(huán)與遞歸問題

    今天為大家整理了十張動圖GIFS,有助于認識循環(huán)、遞歸、二分檢索等概念的具體運行情況。代碼實例以Python語言編寫,非常不錯,感興趣的朋友跟隨小編一起學(xué)習(xí)吧
    2021-02-02
  • Python中五種不同解析庫的應(yīng)用與提取速度對比

    Python中五種不同解析庫的應(yīng)用與提取速度對比

    這篇文章主要為大家詳細介紹了Python中五種不同解析庫的應(yīng)用與提取速度對比,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2025-05-05
  • 9個提高?Python?編程的小技巧

    9個提高?Python?編程的小技巧

    這篇文章主要介紹了9個提高?Python?編程的小技巧,下文分享python編程技巧,需要的小伙伴可以參考一下,希望對你的學(xué)習(xí)有所幫助
    2022-05-05
  • Python中如何優(yōu)雅的合并兩個字典(dict)方法示例

    Python中如何優(yōu)雅的合并兩個字典(dict)方法示例

    字典是Python語言中唯一的映射類型,在我們?nèi)粘9ぷ髦薪?jīng)常會遇到,下面這篇文章主要給大家介紹了關(guān)于Python中如何優(yōu)雅的合并兩個字典(dict)的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-08-08
  • 詳解Python函數(shù)中的幾種參數(shù)

    詳解Python函數(shù)中的幾種參數(shù)

    這篇文章主要為大家介紹了Python參數(shù)的使用,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2021-12-12
  • Python 類,對象,數(shù)據(jù)分類,函數(shù)參數(shù)傳遞詳解

    Python 類,對象,數(shù)據(jù)分類,函數(shù)參數(shù)傳遞詳解

    這篇文章主要介紹了深入理解Python 類,對象,數(shù)據(jù)分類,函數(shù)參數(shù)傳遞,涉及具體代碼示例,具有一定參考價值,需要的朋友可以了解下。
    2021-09-09
  • numpy.bincount用于復(fù)數(shù)權(quán)重的方法

    numpy.bincount用于復(fù)數(shù)權(quán)重的方法

    numpy.bincount是NumPy庫中的一個函數(shù),它用于計算整數(shù)數(shù)組中每個值的出現(xiàn)次數(shù),numpy.bincount函數(shù)在統(tǒng)計整數(shù)數(shù)組中每個值的出現(xiàn)次數(shù)或權(quán)重和時非常有用,本文給大家介紹numpy.bincount如何用于復(fù)數(shù)權(quán)重,感興趣的朋友跟隨小編一起看看吧
    2023-11-11

最新評論