python使用MQTT給硬件傳輸圖片的實(shí)現(xiàn)方法
最近因需要用python寫一個(gè)微服務(wù)來用MQTT給硬件傳輸圖片,其中python用的是flask框架,大概流程如下:

協(xié)議為:
需要將圖片數(shù)據(jù)封裝成多個(gè)消息進(jìn)行傳輸,每個(gè)消息傳輸?shù)臄?shù)據(jù)字節(jié)數(shù)為1400Byte。
消息(MQTT Payload) 格式:Web服務(wù)器-------->BASE:

反饋:BASE---------> Web服務(wù)器:

如果Web服務(wù)器發(fā)送完一個(gè)“數(shù)據(jù)傳輸消息”后,5S內(nèi)沒有收到MQTT“反饋消息”或者收到的反饋中顯示“數(shù)據(jù)包不完整”,則重發(fā)該“數(shù)據(jù)傳輸消息”。
程序流程圖
根據(jù)上面的協(xié)議,可以得到如下的流程圖:

代碼如下:
# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *
# 日志配置信息
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
)
class Mqtt(object):
def __init__(self, img_data, size):
self.MQTTHOST = '*******'
self.MQTTPORT = "******"
# 訂閱和發(fā)送的主題
self.topic_from_base = 'mqttTestSub'
self.topic_to_base = 'mqttTestPub'
self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
self.client = mqtt.Client(self.client_id)
# 完成鏈接后的回掉函數(shù)
self.client.on_connect = self.on_connect
# 圖片大小
self.size = size
# 用于跳出死循環(huán),結(jié)束任務(wù)
self.finished = None
# 包的編號(hào)
self.index = 0
# 將收到的圖片數(shù)據(jù)按大小分成列表
self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]
# 記錄發(fā)布后的數(shù)據(jù),用于監(jiān)控時(shí)延
self.pub_time = 0
self.header_to_base = 0xffffeeee
self.header_from_base = 0xeeeeffff
# 功能標(biāo)識(shí)
self.function_begin = 0x01
self.function_doing = 0x02
self.function_finished = 0x03
# 包的完整和非完整狀態(tài)
self.whole_package = 0x01
self.bad_package = 0x00
# 頭信息的格式,小端模式
self.format_to_base = "<Lbhh"
self.format_from_base = "<Lbhb"
# 如果重發(fā)包時(shí),用于檢查是否重發(fā)第一個(gè)包
self.first = True
# 如果重發(fā)包時(shí),用于檢查是否重發(fā)最后一個(gè)包
self.last = False
self.begin_data = 'image.jpg;' + str(self.size)
# 鏈接mqtt服務(wù)器函數(shù)
def on_mqtt_connect(self):
self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
self.client.loop_start()
# 鏈接完成后的回調(diào)函數(shù)
def on_connect(self, client, userdata, flags, rc):
logging.info("+++ Connected with result code {} +++".format(str(rc)))
self.client.subscribe(self.topic_from_base)
# 訂閱函數(shù)
def subscribe(self):
self.client.subscribe(self.topic_from_base, 1)
# 消息到來處理函數(shù)
self.client.on_message = self.on_message
# 接收到信息后的回調(diào)函數(shù)
def on_message(self, client, userdata, msg):
# 如果接受第一個(gè)包則不需要重發(fā)第一個(gè)
self.first = False
# 將接受到的包進(jìn)行解壓,得到一個(gè)元組
base_tuple = struct.unpack(self.format_from_base, msg.payload)
logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
logging.info("+++ package_number is {}, package_status_from_base is {} +++"
.format(base_tuple[2], base_tuple[3]))
# 檢查接受到信息的頭部是否正確
if base_tuple[0] == self.header_from_base:
logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))
# 是否完成傳輸,如果完成則退出
if base_tuple[1] == self.function_finished:
logging.info("+++ finish work +++")
self.finished = 1
self.client.disconnect()
else:
# 是否是最后一個(gè)包
if self.index == len(self.image_data_list) - 1:
self.publish('finished', self.function_finished)
self.last = True
logging.info("+++ finished_data_to_base is finished+++")
else:
# 如果接收到的包不是 0x03則進(jìn)行傳送數(shù)據(jù)
if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
logging.info("+++ package_number is {}, package_status_from_base is {} +++"
.format(base_tuple[2],base_tuple[3]))
# 如果數(shù)據(jù)的反饋中,包的狀態(tài)是1則繼續(xù)發(fā)下一個(gè)包
if base_tuple[3] == self.whole_package:
self.publish(self.index, self.function_doing)
logging.info("+++ data_to_base is finished+++")
self.index += 1
# 如果數(shù)據(jù)的反饋中,包的狀態(tài)是0則重發(fā)數(shù)據(jù)包
elif base_tuple[3] == self.bad_package:
re_package_number = base_tuple[2]
self.publish(re_package_number-1, self.function_doing)
logging.info("+++ re_data_to_base is finished+++")
else:
logging.info("+++ package_status_from_base is not 0 or 1 +++")
self.client.disconnect()
else:
logging.info("+++ function_identifier is illegal +++")
self.client.disconnect()
else:
logging.info("+++ header_from_base is illegal +++")
self.client.disconnect()
# 數(shù)據(jù)發(fā)送函數(shù)
def publish(self, index, fuc):
# 看是否是最后一個(gè)包
if index == 'finished':
length = 0
package_number = 0
data = b''
else:
length = len(self.image_data_list[index])
package_number = index
data = self.image_data_list[index]
# 打包數(shù)據(jù)頭信息
buffer = struct.pack(
self.format_to_base,
self.header_to_base,
fuc,
package_number,
length
)
to_base_data = buffer + data
# mqtt發(fā)送
self.client.publish(
self.topic_to_base,
to_base_data
)
self.pub_time = time.time()
# 發(fā)送第一個(gè)包函數(shù)
def publish_begin(self):
buffer = struct.pack(
self.format_to_base,
self.header_to_base,
self.function_begin,
0,
len(self.begin_data.encode('utf-8')),
)
begin_data = buffer + self.begin_data.encode('utf-8')
self.client.publish(self.topic_to_base, begin_data)
# 控制函數(shù)
def control(self):
self.on_mqtt_connect()
self.publish_begin()
begin_time = time.time()
self.pub_time = time.time()
self.subscribe()
while True:
time.sleep(1)
# 超過5秒重傳
date = time.time() - self.pub_time
if date > 5:
# 是否重傳第一個(gè)包
if self.first == True:
self.publish_begin()
logging.info('+++ this is timeout first_data +++')
# 是否重傳最后一個(gè)包
elif self.last == True:
self.publish('finished', self.function_finished)
logging.info('+++ this is timeout last_data +++')
else:
self.publish(self.index-1, self.function_doing)
logging.info('+++ this is timeout middle_data +++')
if self.finished == 1:
logging.info('+++ all works is finished+++')
break
print(str(time.time()-begin_time) + 'begin_time - end_time')
app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)
# 接受參數(shù)
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)
class GetImage(Resource):
# 得到參數(shù)并從圖床下載到本地
def get(self):
args = parser.parse_args()
url = args.get('url')
response = requests.get(url)
# 獲取圖片
image = Image.open(BytesIO(response.content))
# 存取圖片
add = os.path.join(os.path.abspath(''), 'image.jpg')
image.save(add)
# 得到圖片大小
size = os.path.getsize(add)
f = open(add, 'rb')
imageData = f.read()
f.close()
# 進(jìn)行mqtt傳輸
mqtt = Mqtt(imageData, size)
mqtt.control()
# 刪除文件
os.remove(add)
logging.info('*** the result of control is {} ***'.format(1))
return jsonify({
"imageData": 1
})
api.add_resource(GetImage, '/image')
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
總結(jié)
以上所述是小編給大家介紹的python使用MQTT給硬件傳輸圖片的實(shí)現(xiàn)方法,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!
相關(guān)文章
Python 基于隊(duì)列實(shí)現(xiàn) tcp socket 連接池的方法
這篇文章主要介紹了Python 基于隊(duì)列實(shí)現(xiàn) tcp socket 連接池的方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2025-05-05
Python?數(shù)據(jù)分析教程探索性數(shù)據(jù)分析
這篇文章主要介紹了Python?數(shù)據(jù)分析教程探索性數(shù)據(jù)分析,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-08-08
淺談Keras中shuffle和validation_split的順序
這篇文章主要介紹了淺談Keras中shuffle和validation_split的順序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06
使用Python中OpenCV和深度學(xué)習(xí)進(jìn)行全面嵌套邊緣檢測(cè)
這篇文章主要介紹了使用Python中OpenCV和深度學(xué)習(xí)進(jìn)行全面嵌套邊緣檢測(cè),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
Python接口自動(dòng)化淺析Token應(yīng)用原理
本文主要介紹token基本概念、運(yùn)行原理及在自動(dòng)化中接口如何攜帶token進(jìn)行訪問,附含源碼,內(nèi)容非常詳細(xì)易理解,有需要的朋友可以參考下2021-08-08

