欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python使用MQTT給硬件傳輸圖片的實現(xiàn)方法

 更新時間:2019年05月05日 09:43:48   作者:Young5566  
最近因需要用python寫一個微服務來用MQTT給硬件傳輸圖片,其中python用的是flask框架。這篇文章主要介紹了python使用MQTT給硬件傳輸圖片,需要的朋友可以參考下

最近因需要用python寫一個微服務來用MQTT給硬件傳輸圖片,其中python用的是flask框架,大概流程如下:


協(xié)議為:

需要將圖片數據封裝成多個消息進行傳輸,每個消息傳輸的數據字節(jié)數為1400Byte。
消息(MQTT Payload) 格式:Web服務器-------->BASE:

反饋:BASE---------> Web服務器:

如果Web服務器發(fā)送完一個“數據傳輸消息”后,5S內沒有收到MQTT“反饋消息”或者收到的反饋中顯示“數據包不完整”,則重發(fā)該“數據傳輸消息”。

程序流程圖

根據上面的協(xié)議,可以得到如下的流程圖:


代碼如下:

# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *
# 日志配置信息
logging.basicConfig(
  level=logging.INFO,
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
)
class Mqtt(object):
  def __init__(self, img_data, size):
    self.MQTTHOST = '*******'
    self.MQTTPORT = "******"
    # 訂閱和發(fā)送的主題
    self.topic_from_base = 'mqttTestSub'
    self.topic_to_base = 'mqttTestPub'
    self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    self.client = mqtt.Client(self.client_id)
    # 完成鏈接后的回掉函數
    self.client.on_connect = self.on_connect
    # 圖片大小
    self.size = size
    # 用于跳出死循環(huán),結束任務
    self.finished = None
    # 包的編號
    self.index = 0
    # 將收到的圖片數據按大小分成列表
    self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]
    # 記錄發(fā)布后的數據,用于監(jiān)控時延
    self.pub_time = 0
    self.header_to_base = 0xffffeeee
    self.header_from_base = 0xeeeeffff
    # 功能標識
    self.function_begin = 0x01
    self.function_doing = 0x02
    self.function_finished = 0x03
    # 包的完整和非完整狀態(tài)
    self.whole_package = 0x01
    self.bad_package = 0x00
    # 頭信息的格式,小端模式
    self.format_to_base = "<Lbhh"
    self.format_from_base = "<Lbhb"
    # 如果重發(fā)包時,用于檢查是否重發(fā)第一個包
    self.first = True
    # 如果重發(fā)包時,用于檢查是否重發(fā)最后一個包
    self.last = False
    self.begin_data = 'image.jpg;' + str(self.size)
  # 鏈接mqtt服務器函數
  def on_mqtt_connect(self):
    self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
    self.client.loop_start()
  # 鏈接完成后的回調函數
  def on_connect(self, client, userdata, flags, rc):
    logging.info("+++ Connected with result code {} +++".format(str(rc)))
    self.client.subscribe(self.topic_from_base)
  # 訂閱函數
  def subscribe(self):
    self.client.subscribe(self.topic_from_base, 1)
    # 消息到來處理函數
    self.client.on_message = self.on_message
  # 接收到信息后的回調函數
  def on_message(self, client, userdata, msg):
    # 如果接受第一個包則不需要重發(fā)第一個
    self.first = False
    # 將接受到的包進行解壓,得到一個元組
    base_tuple = struct.unpack(self.format_from_base, msg.payload)
    logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
    logging.info("+++ package_number is {}, package_status_from_base is {} +++"
           .format(base_tuple[2], base_tuple[3]))
    # 檢查接受到信息的頭部是否正確
    if base_tuple[0] == self.header_from_base:
      logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))
      # 是否完成傳輸,如果完成則退出
      if base_tuple[1] == self.function_finished:
        logging.info("+++ finish work +++")
        self.finished = 1
        self.client.disconnect()
      else:
        # 是否是最后一個包
        if self.index == len(self.image_data_list) - 1:
          self.publish('finished', self.function_finished)
          self.last = True
          logging.info("+++ finished_data_to_base is finished+++")
        else:
          # 如果接收到的包不是 0x03則進行傳送數據
          if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
            logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                   .format(base_tuple[2],base_tuple[3]))
            # 如果數據的反饋中,包的狀態(tài)是1則繼續(xù)發(fā)下一個包
            if base_tuple[3] == self.whole_package:
              self.publish(self.index, self.function_doing)
              logging.info("+++ data_to_base is finished+++")
              self.index += 1
            # 如果數據的反饋中,包的狀態(tài)是0則重發(fā)數據包
            elif base_tuple[3] == self.bad_package:
              re_package_number = base_tuple[2]
              self.publish(re_package_number-1, self.function_doing)
              logging.info("+++ re_data_to_base is finished+++")
            else:
              logging.info("+++ package_status_from_base is not 0 or 1 +++")
              self.client.disconnect()
          else:
            logging.info("+++ function_identifier is illegal +++")
            self.client.disconnect()
    else:
      logging.info("+++ header_from_base is illegal +++")
      self.client.disconnect()
  # 數據發(fā)送函數
  def publish(self, index, fuc):
    # 看是否是最后一個包
    if index == 'finished':
      length = 0
      package_number = 0
      data = b''
    else:
      length = len(self.image_data_list[index])
      package_number = index
      data = self.image_data_list[index]
    # 打包數據頭信息
    buffer = struct.pack(
      self.format_to_base,
      self.header_to_base,
      fuc,
      package_number,
      length
    )
    to_base_data = buffer + data
    # mqtt發(fā)送
    self.client.publish(
      self.topic_to_base,
      to_base_data
    )
    self.pub_time = time.time()
  # 發(fā)送第一個包函數
  def publish_begin(self):
    buffer = struct.pack(
      self.format_to_base,
      self.header_to_base,
      self.function_begin,
      0,
      len(self.begin_data.encode('utf-8')),
    )
    begin_data = buffer + self.begin_data.encode('utf-8')
    self.client.publish(self.topic_to_base, begin_data)
  # 控制函數
  def control(self):
    self.on_mqtt_connect()
    self.publish_begin()
    begin_time = time.time()
    self.pub_time = time.time()
    self.subscribe()
    while True:
      time.sleep(1)
      # 超過5秒重傳
      date = time.time() - self.pub_time
      if date > 5:
        # 是否重傳第一個包
        if self.first == True:
          self.publish_begin()
          logging.info('+++ this is timeout first_data +++')
        # 是否重傳最后一個包
        elif self.last == True:
          self.publish('finished', self.function_finished)
          logging.info('+++ this is timeout last_data +++')
        else:
          self.publish(self.index-1, self.function_doing)
          logging.info('+++ this is timeout middle_data +++')
      if self.finished == 1:
        logging.info('+++ all works is finished+++')
        break
    print(str(time.time()-begin_time) + 'begin_time - end_time')
app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)
# 接受參數
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)
class GetImage(Resource):
  # 得到參數并從圖床下載到本地
  def get(self):
    args = parser.parse_args()
    url = args.get('url')
    response = requests.get(url)
    # 獲取圖片
    image = Image.open(BytesIO(response.content))
    # 存取圖片
    add = os.path.join(os.path.abspath(''), 'image.jpg')
    image.save(add)
    # 得到圖片大小
    size = os.path.getsize(add)
    f = open(add, 'rb')
    imageData = f.read()
    f.close()
    # 進行mqtt傳輸
    mqtt = Mqtt(imageData, size)
    mqtt.control()
    # 刪除文件
    os.remove(add)
    logging.info('*** the result of control is {} ***'.format(1))
    return jsonify({
      "imageData": 1
    })
api.add_resource(GetImage, '/image')
if __name__ == '__main__':
  app.run(debug=True, host='0.0.0.0')

總結

以上所述是小編給大家介紹的python使用MQTT給硬件傳輸圖片的實現(xiàn)方法,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!

相關文章

  • 使用paramiko遠程執(zhí)行命令、下發(fā)文件的實例

    使用paramiko遠程執(zhí)行命令、下發(fā)文件的實例

    下面小編就為大家?guī)硪黄褂胮aramiko遠程執(zhí)行命令、下發(fā)文件的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-10-10
  • python接口測試對修改密碼接口進行壓測

    python接口測試對修改密碼接口進行壓測

    這篇文章主要為大家介紹了python接口測試對修改密碼接口進行壓測的腳本實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-07-07
  • python的getattr和getattribute攔截內置操作實現(xiàn)

    python的getattr和getattribute攔截內置操作實現(xiàn)

    在Python中,getattr和getattribute是用于動態(tài)屬性訪問和自定義屬性訪問行為的重要工具,本文主要介紹了python的getattr和getattribute攔截內置操作實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-01-01
  • 解決Pytorch在測試與訓練過程中的驗證結果不一致問題

    解決Pytorch在測試與訓練過程中的驗證結果不一致問題

    這篇文章主要介紹了解決Pytorch在測試與訓練過程中的驗證結果不一致問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Python遞歸實現(xiàn)猴子吃桃問題及解析

    Python遞歸實現(xiàn)猴子吃桃問題及解析

    這篇文章主要介紹了Python遞歸實現(xiàn)猴子吃桃問題及解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • python切片及sys.argv[]用法詳解

    python切片及sys.argv[]用法詳解

    Sys.argv[]是用來獲取命令行參數的,sys.argv[0]表示代碼本身文件路徑,所以參數從1開始。下面通過實例代碼給大家介紹python切片及sys.argv[]用法,需要的朋友參考下吧
    2018-05-05
  • Pandas DataFrame如何按照一列數據的特定順序進行排序

    Pandas DataFrame如何按照一列數據的特定順序進行排序

    這篇文章主要介紹了Pandas DataFrame如何按照一列數據的特定順序進行排序,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • python爬蟲反爬之圖片驗證功能實現(xiàn)

    python爬蟲反爬之圖片驗證功能實現(xiàn)

    這篇文章主要介紹了python爬蟲反爬之圖片驗證功能實現(xiàn),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-03-03
  • Pandas的AB BA類型數據框去重復

    Pandas的AB BA類型數據框去重復

    這篇文章主要為大家介紹了Pandas的AB BA類型數據框去重復實現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-05-05
  • Python將json文件寫入ES數據庫的方法

    Python將json文件寫入ES數據庫的方法

    這篇文章主要介紹了Python將json文件寫入ES數據庫的方法,本文給大家介紹的非常詳細,具有一定的參考借鑒價值 ,需要的朋友可以參考下
    2019-04-04

最新評論