欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python 自動(dòng)識(shí)別并連接串口的實(shí)現(xiàn)

 更新時(shí)間:2021年01月19日 11:52:40   作者:菜雞學(xué)長(zhǎng)  
這篇文章主要介紹了python 自動(dòng)識(shí)別并連接串口的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

這個(gè)屬于我項(xiàng)目中一個(gè)函數(shù),跟大家分享一下我的思路及最終實(shí)現(xiàn)

在編寫串口通信工具中,需要實(shí)現(xiàn)一個(gè)函數(shù),自動(dòng)找到對(duì)應(yīng)com 口,并且連接該com口,保證后續(xù)通信正常
作為初始化過程的一部分。

思路

在win 設(shè)備管理器中,經(jīng)常會(huì)出現(xiàn)多個(gè)com 口,但并不是每個(gè)com 口都是目標(biāo)設(shè)備所鏈接的。
嘗試打開每個(gè)com 口,輸入enter 按鍵, 正確的com 口,會(huì)有ack log 返回,表明通信 正常

否則,沒有任何log 返回,則判斷為非目標(biāo)設(shè)備所連接的com 口。

實(shí)現(xiàn)

在這里插入圖片描述

嘗試去打開所有com 口,然后發(fā)送enter, 如果在一段時(shí)間內(nèi)有返回值,檢查com 口收到的字節(jié)數(shù),如果非零,則表明找到了對(duì)應(yīng)的com 口。

完整測(cè)試代碼如下:

import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime

# default value
baunRate = 115200
is_read = False
is_write = False
write_buff = []
sys_buff = []
mSerial = None
callback = None
is_opened = 0
is_registed = 0


class SerialPort:

  def __init__(self,port,buand):

    self.port = serial.Serial(port,buand)
    self.port.close()
    if not self.port.isOpen():
      self.port.open()

    #the index of data_bytes for read operation,私有屬性
    #only used in read lines
    self.__read_ptr = 0
    self.__read_head = 0
    #store all read bytes
    # used in read date, read lines
    self.__data_bytes = bytearray()

  def port_open(self):
    if not self.port.isOpen():
      self.port.open()

  def port_close(self):
    self.port.close()

  def send(self):
    global is_write
    global write_buff

    while is_write:
      if len(write_buff):
        msg = write_buff.pop(0)
        msg = msg+"\n"
        cmd = msg.encode()
        try:

          self.port.write(cmd)
        except:
          write_buff.clear()
          is_write = False
    write_buff.clear()

  def read_data(self):
    global is_read
    global is_opened
    byte_cnt = 0
    while is_read:
      try:
        count = self.port.inWaiting()
        if count > 0:
          rec_str = self.port.read(count)
          self.__data_bytes = self.__data_bytes+rec_str
        #print("receive:",rec_str.decode())
          #print(rec_str)
          byte_cnt += count
          if not is_opened:
            is_opened = 1
      #print("累計(jì)收到:",byte_cnt)
      #time.sleep(0.5)
        self.read_lines()
      except:
        deinit()


  #將當(dāng)前所有的數(shù)據(jù)都讀出,讀取位置不變,每次讀取指針依次移動(dòng),不漏數(shù)據(jù), 讀取行為一直在進(jìn)行
  def read_lines(self):
    #reset
    line_cnt = 0
    data_len = len(self.__data_bytes)
    #print ("")
    #print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len)
    if self.__read_ptr >=data_len:
      return
    #get all lines in current data_bytes
    while(self.__read_ptr < data_len-1):
      if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d):
        tmp = bytearray()
        tmp = self.__data_bytes[self.__read_head:self.__read_ptr]

        try:
          line = tmp.decode()
        except:
          self.__read_head = self.__read_ptr + 2
          self.__read_ptr = self.__read_head
          continue
        iprint(line)
        line_cnt += 1
        self.__read_head = self.__read_ptr + 2
        self.__read_ptr = self.__read_head
      else:
        self.__read_ptr = self.__read_ptr + 1

def auto_open_serial():
  global baunRate
  global mSerial
  global callback
  global is_registed
  global is_opened
  #reset
  deinit()
  # 列出所有當(dāng)前的com口
  port_list = list(serial.tools.list_ports.comports())
  port_list_name = []
  #get all com
  if len(port_list) <= 0:
    iprint("the serial port can't find!")
    return False
  else:
    for itms in port_list:
      port_list_name.append(itms.device)
  #try open
  #print(port_list_name)
  for i in port_list_name:
    try:
      mSerial = SerialPort(i,baunRate)
      iprint("try open %s"%i)
      start_task()
      send("")
      #return True
      time.sleep(1)
      if is_opened:
        iprint("connect %s successfully"%i)
        return True
      else:
        deinit()
        if i == port_list_name[len(port_list_name)-1]:
          iprint("uart don't open")
          break
        continue
    except:
      iprint(" uart don't open")
  deinit()
  return False

def deinit():
  global mSerial
  global is_write
  global is_read
  global write_buff
  global is_opened

  if mSerial:
    mSerial.port_close()

  is_opened = 0
  is_read = False
  is_write = False
  write_buff = []

  mSerial = None
  time.sleep(1)

def init():
  global mSerial
  global callback
  global is_registed
  global is_opened
  global is_read
  global is_write
  #retry
  retry_time = 0
  while not auto_open_serial():
    if not is_opened:
      iprint("wait for uart connect, retry %s"%str(retry_time))
    else:
      return True
    retry_time += 1
    time.sleep(2)
    if retry_time == 10:
      iprint(" open uart fail")
      return False

def send(msg):
  global mSerial
  global is_write
  global write_buff
  if is_write:
    write_buff.append(msg)

def start_task():
  global mSerial
  global is_write
  global is_read

  if mSerial:
    is_write = True
    t1 = threading.Thread (target=mSerial.send)
    t1.setDaemon (False)
    t1.start ()

    is_read = True
    t2 = threading.Thread (target=mSerial.read_data)
    t2.setDaemon (False)
    t2.start ()

def iprint(msg):
  global callback
  global is_registed

  msg = "[Uart] "+str(msg)
  if is_registed:
    callback.append(msg)
  else:
    print(msg)

def start_sys_cmd():
  global is_registed
  if is_registed:
    t3 = threading.Thread (target=process_receive_sys_cmd)
    t3.setDaemon (False)
    t3.start()

def process_receive_sys_cmd():
  global sys_buff
  global is_registed
  global callback
  #print("process_receive_sys_cmd")
  while is_registed:
    #print ("wait,process_receive_sys_cmd")
    if len(sys_buff):
      #print ("receive,process_receive_sys_cmd")
      line = sys_buff.pop(0)
      if "init" in line:
        if is_opened and is_read and is_write:
          iprint("already open uart")
          break
        iprint("start init")
        init()
    if is_opened:
      break
  iprint("Eixt uart sys thread")

def register_cback(list):
  global callback
  global is_registed

  callback = list
  is_registed = 1


def unregister_cback():
  global callback
  callback.clear()

if __name__ == '__main__':

  receive = []
  register_cback(receive)
  sys_buff.append("init")
  start_sys_cmd()

  def process_receive_msg():
    global receive
    while True:
      #print("wait")
      if len(receive):
        #print("receive")
        print(receive.pop(0))

  t = threading.Thread(target=process_receive_msg)
  t.setDaemon(False)
  t.start()

到此這篇關(guān)于python 自動(dòng)識(shí)別并連接串口的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)python 自動(dòng)識(shí)別并連接串口內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • pycharm必知的一些簡(jiǎn)單設(shè)置方法

    pycharm必知的一些簡(jiǎn)單設(shè)置方法

    這篇文章主要介紹了pycharm必知的一些簡(jiǎn)單設(shè)置方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • Python基于pygame實(shí)現(xiàn)單機(jī)版五子棋對(duì)戰(zhàn)

    Python基于pygame實(shí)現(xiàn)單機(jī)版五子棋對(duì)戰(zhàn)

    這篇文章主要為大家詳細(xì)介紹了Python基于pygame實(shí)現(xiàn)單機(jī)版五子棋對(duì)戰(zhàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • Python視頻爬蟲實(shí)現(xiàn)下載頭條視頻功能示例

    Python視頻爬蟲實(shí)現(xiàn)下載頭條視頻功能示例

    這篇文章主要介紹了Python視頻爬蟲實(shí)現(xiàn)下載頭條視頻功能,涉及Python正則匹配、網(wǎng)絡(luò)傳輸及文件讀寫等相關(guān)操作技巧,需要的朋友可以參考下
    2018-05-05
  • Python集成C#實(shí)現(xiàn)界面操作下載文件功能的全過程

    Python集成C#實(shí)現(xiàn)界面操作下載文件功能的全過程

    使用腳本進(jìn)行下載的需求很常見,下面這篇文章主要給大家介紹了關(guān)于Python集成C#實(shí)現(xiàn)界面操作下載文件功能的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-03-03
  • python 從csv讀數(shù)據(jù)到mysql的實(shí)例

    python 從csv讀數(shù)據(jù)到mysql的實(shí)例

    今天小編就為大家分享一篇python 從csv讀數(shù)據(jù)到mysql的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-06-06
  • 在Python的Django框架中編寫錯(cuò)誤提示頁面

    在Python的Django框架中編寫錯(cuò)誤提示頁面

    這篇文章主要介紹了在Python的Django框架中編寫錯(cuò)誤提示頁面,包括傳統(tǒng)的404頁面和設(shè)置連接中斷警告等,需要的朋友可以參考下
    2015-07-07
  • 用來將對(duì)象持久化的python pickle模塊

    用來將對(duì)象持久化的python pickle模塊

    這篇文章主要為大家介紹了用來將對(duì)象持久化的python pickle模塊的用例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • python構(gòu)建自定義回調(diào)函數(shù)詳解

    python構(gòu)建自定義回調(diào)函數(shù)詳解

    在工作中,回調(diào)函數(shù)使用的場(chǎng)景是非常多的,下面我們就來通過例子程序來詳細(xì)了解利用了Python的屬性機(jī)制構(gòu)建了一個(gè)自定義回調(diào)函數(shù)的使用
    2017-06-06
  • 詳細(xì)一文帶你分清Python中的模塊、包和庫

    詳細(xì)一文帶你分清Python中的模塊、包和庫

    這篇文章主要介紹了詳細(xì)一文帶你分清Python中的模塊、包和庫,Python?模塊(Module),是一個(gè)?Python?文件,以?.py?結(jié)尾,包含了?Python?對(duì)象定義和Python語句,模塊能定義函數(shù),類和變量,模塊也能包含可執(zhí)行的代碼,需要的朋友可以參考下
    2023-08-08
  • Django多個(gè)app urls配置代碼實(shí)例

    Django多個(gè)app urls配置代碼實(shí)例

    這篇文章主要介紹了Django多個(gè)app urls配置代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11

最新評(píng)論