Python中使用matplotlib繪制mqtt數(shù)據(jù)實(shí)時(shí)圖像功能
效果圖

mqtt發(fā)布
本代碼中publish是一個(gè)死循環(huán),數(shù)據(jù)一直往外發(fā)送。
import random
import time
from paho.mqtt import client as mqtt_client
import json
from datetime import datetime
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
client_id = f'python-mqtt-{random.randint(0, 1000)}' # 隨機(jī)生成客戶端id
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
while True:
time.sleep(0.01)
msg = json.dumps({"MAC": "0123456789",
"samplerate": 12,
"sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),
"battery": 0.5,
"acc": [
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
]})
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
mqtt訂閱
from paho.mqtt import client as mqtt_client
import time
import os
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
def connect_mqtt(client_id):
""" MQTT 連接函數(shù)。 """
def on_connect(client, userdata, flags, rc):
"""
連接回調(diào)函數(shù)
在客戶端連接后被調(diào)用,在該函數(shù)中可以依據(jù) rc 來(lái)判斷客戶端是否連接成功。
"""
if rc == 0:
print("Connected to MQTT Broker! return code %d" % rc)
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
# client.username_pw_set('uname', 'upwd') # 鏈接mqtt所需的用戶名和密碼,沒(méi)有可不寫(xiě)
client.on_connect = on_connect
client.connect(broker , port)
return client
def subscribe(client: mqtt_client, a_topic):
""" 訂閱消息 """
def on_message(client, userdata, msg):
"""
消息回調(diào)函數(shù)
在客戶端從 MQTT Broker 收到消息后被調(diào)用,在該函數(shù)中我們將打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。
* 這里可添加自定義數(shù)據(jù)處理程序
"""
print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))
client.subscribe(topic)
client.on_message = on_message
def run(client_id, topic):
client = connect_mqtt(client_id)
subscribe(client, topic)
client.loop_forever()
if __name__ == '__main__':
run('test_eartag-003-python-li', 'zk100/gw/#')
matplotlib繪制動(dòng)態(tài)圖
import matplotlib.pyplot as plt
import numpy as np
count = 100 # 圖中最多數(shù)據(jù)量
ax = list(range(count)) # 保存圖1數(shù)據(jù)
ay = [0] * 100
bx = list(range(count)) # 保存圖2數(shù)據(jù)
by = [0] * 100
num = count # 計(jì)數(shù)
plt.ion() # 開(kāi)啟一個(gè)畫(huà)圖的窗口進(jìn)入交互模式,用于實(shí)時(shí)更新數(shù)據(jù)
plt.rcParams['figure.figsize'] = (10, 10) # 圖像顯示大小
plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文標(biāo)簽亂碼,還有通過(guò)導(dǎo)入字體文件的方法
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5 # 設(shè)置曲線線條寬度
plt.tight_layout()
while True:
plt.clf() # 清除刷新前的圖表,防止數(shù)據(jù)量過(guò)大消耗內(nèi)存
plt.suptitle("總標(biāo)題", fontsize=30) # 添加總標(biāo)題,并設(shè)置文字大小
g1 = np.random.random() # 生成隨機(jī)數(shù)畫(huà)圖
# 圖表1
ax.append(num) # 追加x坐標(biāo)值
ay.append(g1) # 追加y坐標(biāo)值
agraphic = plt.subplot(2, 1, 1)
agraphic.set_title('子圖表標(biāo)題1') # 添加子標(biāo)題
agraphic.set_xlabel('x軸', fontsize=10) # 添加軸標(biāo)簽
agraphic.set_ylabel('y軸', fontsize=20)
plt.plot(ax[-count:], ay[-count:], 'g-') # 等于agraghic.plot(ax,ay,'g-')
# 圖表2
bx.append(num)
by.append(g1)
bgraghic = plt.subplot(2, 1, 2)
bgraghic.set_title('子圖表標(biāo)題2')
bgraghic.plot(bx[-count:], by[-count:], 'r^')
plt.pause(0.001) # 設(shè)置暫停時(shí)間,太快圖表無(wú)法正常顯示
num = num + 1
matplotlib繪制mqtt數(shù)據(jù)實(shí)時(shí)圖像
- 單線程
先啟動(dòng)mqtt訂閱服務(wù)
mqtt訂閱中有阻塞,更新數(shù)據(jù)后因訂閱服務(wù)沒(méi)有結(jié)束,導(dǎo)致繪圖程序無(wú)法繪圖
先啟動(dòng)繪圖程序
繪圖程序本身也是個(gè)循環(huán),拿不到mqtt的實(shí)時(shí)數(shù)據(jù),圖像無(wú)法更新
- 兩個(gè)服務(wù)加入?yún)f(xié)程,也不行。具體原因還不知道,容后補(bǔ)充。
- mqtt作為線程啟動(dòng),可解決上述問(wèn)題
import json
import random
from paho.mqtt import client as mqtt_client
import time
import datetime
from math import ceil, floor
import matplotlib.pyplot as plt
import _thread
# 公共變量
broker = 'broker.emqx.io'
topic = "/python/mqtt/li"
port = 1883
client_id = f'python-mqtt-li-{random.randint(0, 100)}'
show_num = 300
x_num = [-1] # 計(jì)數(shù)
acc1 = []
acc2 = []
acc3 = []
acc4 = []
acc5 = []
acc6 = []
stime = []
"""mqtt subscribe topic"""
def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime):
"""將字符串型【毫秒級(jí)】格式化時(shí)間 轉(zhuǎn)為 【13位】整型時(shí)間戳"""
datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f")
obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0
return obj_stamp
def int2datetime(int_float_timestamp):
"""
有小數(shù)點(diǎn):分離小數(shù)點(diǎn),整數(shù)轉(zhuǎn)為格式化時(shí)間,小數(shù)點(diǎn)直接跟在后面
無(wú)小數(shù)點(diǎn):從第10位進(jìn)行分離,
所以本函數(shù)只適用于時(shí)間戳整數(shù)位數(shù)大于9且小于11.
"""
if '.' in str(int_float_timestamp):
int_float = str(int_float_timestamp).split('.')
date = time.localtime(int(int_float[0]))
tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date)
secondafter = '.' + str(int_float[1])
return str(tempDate) + secondafter
def parse_mqttmsg(msg):
"""解析mqt頭數(shù)據(jù) MAC samplerate sampletime battery acc"""
content = json.loads(msg.payload.decode())
span = 1000 / content['samplerate'] * 10
time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000]
sampletime = content['sampletime']
sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime)
acc = content['acc']
for i in range(len(acc)):
x_num.append(x_num[-1] + 1)
acc1.append(acc[i][0])
acc2.append(acc[i][1])
acc3.append(acc[i][2])
acc4.append(acc[i][3])
acc5.append(acc[i][4])
acc6.append(acc[i][5])
if i != 0:
sampletime_int += time_span[i % 2]
stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000))
else:
stime.append(sampletime)
print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
pass
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
# print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
parse_mqttmsg(msg)
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
""" draw figures """
def draw_figure():
plt.ion() # 開(kāi)啟一個(gè)畫(huà)圖的窗口進(jìn)入交互模式,用于實(shí)時(shí)更新數(shù)據(jù)
plt.rcParams['figure.figsize'] = (10, 10) # 圖像顯示大小
plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文標(biāo)簽亂碼,還有通過(guò)導(dǎo)入字體文件的方法
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5 # 設(shè)置曲線線條寬度
count = 0
while True:
plt.clf() # 清除刷新前的圖表,防止數(shù)據(jù)量過(guò)大消耗內(nèi)存
plt.suptitle("總標(biāo)題", fontsize=30) # 添加總標(biāo)題,并設(shè)置文字大小
plt.tight_layout()
# 圖表1
agraphic = plt.subplot(2, 1, 1)
agraphic.set_title('子圖表標(biāo)題1') # 添加子標(biāo)題
agraphic.set_xlabel('x軸', fontsize=10) # 添加軸標(biāo)簽
agraphic.set_ylabel('y軸', fontsize=20)
plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-')
try:
xtricks = list(range(len(acc1) - show_num, len(acc1), 10)) # **1**
xlabels = [stime[i] for i in xtricks] # **2**
plt.xticks(xtricks, xlabels, rotation=15)
except:
pass
# 圖表2
bgraghic = plt.subplot(2, 1, 2)
bgraghic.set_title('子圖表標(biāo)題2')
bgraghic.set_xlabel('x軸', fontsize=10) # 添加軸標(biāo)簽
bgraghic.set_ylabel('y軸', fontsize=20)
bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^')
plt.pause(0.001) # 設(shè)置暫停時(shí)間,太快圖表無(wú)法正常顯示
count = count + 1
if __name__ == '__main__':
# 多線程
_thread.start_new_thread(run, ())
draw_figure()
到此這篇關(guān)于Python中使用matplotlib繪制mqtt數(shù)據(jù)實(shí)時(shí)圖像的文章就介紹到這了,更多相關(guān)matplotlib繪制mqtt數(shù)據(jù)實(shí)時(shí)圖像內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python命令行參數(shù)解析OptionParser類用法實(shí)例
這篇文章主要介紹了python命令行參數(shù)解析OptionParser類用法實(shí)例,需要的朋友可以參考下2014-10-10
使用tensorflow框架在Colab上跑通貓狗識(shí)別代碼
這篇文章主要介紹了使用tensorflow框架在Colab上跑通貓狗識(shí)別代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04
10張動(dòng)圖學(xué)會(huì)python循環(huán)與遞歸問(wèn)題
今天為大家整理了十張動(dòng)圖GIFS,有助于認(rèn)識(shí)循環(huán)、遞歸、二分檢索等概念的具體運(yùn)行情況。代碼實(shí)例以Python語(yǔ)言編寫(xiě),非常不錯(cuò),感興趣的朋友跟隨小編一起學(xué)習(xí)吧2021-02-02
Python中五種不同解析庫(kù)的應(yīng)用與提取速度對(duì)比
這篇文章主要為大家詳細(xì)介紹了Python中五種不同解析庫(kù)的應(yīng)用與提取速度對(duì)比,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-05-05
Python中如何優(yōu)雅的合并兩個(gè)字典(dict)方法示例
字典是Python語(yǔ)言中唯一的映射類型,在我們?nèi)粘9ぷ髦薪?jīng)常會(huì)遇到,下面這篇文章主要給大家介紹了關(guān)于Python中如何優(yōu)雅的合并兩個(gè)字典(dict)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-08-08
Python 類,對(duì)象,數(shù)據(jù)分類,函數(shù)參數(shù)傳遞詳解
這篇文章主要介紹了深入理解Python 類,對(duì)象,數(shù)據(jù)分類,函數(shù)參數(shù)傳遞,涉及具體代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。2021-09-09
numpy.bincount用于復(fù)數(shù)權(quán)重的方法
numpy.bincount是NumPy庫(kù)中的一個(gè)函數(shù),它用于計(jì)算整數(shù)數(shù)組中每個(gè)值的出現(xiàn)次數(shù),numpy.bincount函數(shù)在統(tǒng)計(jì)整數(shù)數(shù)組中每個(gè)值的出現(xiàn)次數(shù)或權(quán)重和時(shí)非常有用,本文給大家介紹numpy.bincount如何用于復(fù)數(shù)權(quán)重,感興趣的朋友跟隨小編一起看看吧2023-11-11

