在python中寫個自定義數(shù)據(jù)包協(xié)議的打包和解包測試
寫個自定義數(shù)據(jù)包協(xié)議的打包和解包測試
打包代碼
''' 幀數(shù)據(jù)結(jié)構(gòu)創(chuàng)建 ''' import struct class FrameData(): def __init__(self,pixStyle=0x14,width=1,height=1): self.pixStyle = pixStyle self.frameWidth = width self.frameHeight = height self.pkgLen =self.frameWidth *self.frameHeight *(pixStyle & 0x0f)+6 self.buf = [0 for i in range(self.pkgLen)] #數(shù)據(jù)頭尾定義 self.buf[0]=0xff self.buf[1] = self.pixStyle self.buf[2] = self.frameWidth #補丁 if(width>255): self.buf[1]=0x40+(pixStyle & 0x0f) self.buf[2] = 128 self.buf[3] = self.frameHeight self.buf[self.pkgLen - 2] = (self.pixStyle+self.frameWidth+self.frameHeight)&0xff self.buf[self.pkgLen - 1]=0xfe def setCrc(self): r=0 for i in range(1, self.pkgLen-2): r+=self.buf[i] self.buf[self.pkgLen - 2] = r& 0xff def setDataToArray(self,arr): alen= len(arr) dlen= self.pkgLen - 6 rlen=alen if(alen<dlen) else dlen for i in range(4, rlen+4): #print(i,arr[i-4]) self.buf[i]=arr[i-4] self.setCrc() def setDataToOff(self): for i in range(4, self.pkgLen - 2): self.buf[i]=0 self.buf[self.pkgLen - 2]=(self.pixStyle + self.frameWidth + self.frameHeight) & 0xff #self.setCrc() def setDataToOn(self): for i in range(4, self.pkgLen - 2): self.buf[i]=255 self.setCrc() def setDataToRGBW(self,r=0,g=0,b=0,w=0): sty=self.pixStyle & 0x0f if (sty == 1): for i in range(0, int((self.pkgLen - 6) / sty)): self.buf[i * sty + 4] = r if (sty == 2): for i in range(0, int((self.pkgLen - 6) / sty)): self.buf[i * sty + 4] = r self.buf[i * sty + 5] = g if(sty==3): for i in range(0, int((self.pkgLen - 6)/sty)): self.buf[i * sty + 4] = r self.buf[i * sty + 5] = g self.buf[i * sty + 6] = b if (sty == 4): for i in range(0, int((self.pkgLen - 6) / sty)): self.buf[i * sty + 4] = r self.buf[i * sty + 5] = g self.buf[i * sty + 6] = b self.buf[i * sty + 7] = w self.setCrc() def packBytes(self): packstyle = str( self.pkgLen) + 'B' # B 0-255 b 0-127 req = struct.pack(packstyle, *self.buf) return req if __name__ == '__main__': d=FrameData(0x14,10,2) print("init",d.buf) d.setDataToOn() print("on",d.buf) d.setDataToRGBW(255) print("r",d.buf) d.setDataToRGBW(0,255) print("g",d.buf) d.setDataToRGBW(0,0,255) print("b",d.buf) d.setDataToRGBW(0,0,0,255) print("w",d.buf) d.setDataToRGBW(255,255, 255, 255) print("rgbw", d.buf) d.setDataToArray([0,0,0]) print("000", d.buf) import time while True: time.sleep(1)
輸出
init [255, 20, 10, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 254]
on [255, 20, 10, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 208, 254]
r [255, 20, 10, 2, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 12, 254]
g [255, 20, 10, 2, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 12, 254]
b [255, 20, 10, 2, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 12, 254]
w [255, 20, 10, 2, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 12, 254]
rgbw [255, 20, 10, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 208, 254]
000 [255, 20, 10, 2, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 211, 254]
解包代碼
''' 幀數(shù)據(jù)結(jié)構(gòu)解析 ''' from event import * class FramePkg(EventDispatcher): def __init__(self,pixStyle=0x14,width=1,height=1): EventDispatcher.__init__(self) self.pixStyle = pixStyle self.frameWidth = width self.frameHeight = height self.pkgLen =self.frameWidth *self.frameHeight *(pixStyle & 0x0f)+6 self.bufLen = self.pkgLen * 3 self.buf = [0 for i in range(self.bufLen)] self.dataLen = self.pkgLen - 6 self.linedata = [0 for i in range(self.dataLen)] self.startIdx = 0 self.endIdx = 0 #獲取數(shù)據(jù)長度 int def getDatLen(self): if(self.endIdx>=self.startIdx): return self.endIdx-self.startIdx return self.bufLen+self.endIdx-self.startIdx #判斷數(shù)據(jù)長度已經(jīng)到達(dá)一個包長bool def pkgIsOk(self): return self.getDatLen()>=self.pkgLen #檢測包頭為0xff并且有一個包長的數(shù)據(jù)bool def chkStart(self): while(self.buf[self.startIdx]!=0xff and self.pkgIsOk()): self.nextByte() return self.pkgIsOk() #檢測索引超出buf末尾,從頭開始 def chkMaxIdx(self,idx): if (idx > self.bufLen - 1): idx %= self.bufLen # ?? return idx #獲取包尾索引int def getPkgEndIdx(self): return self.chkMaxIdx(self.startIdx+self.pkgLen-1) #檢查包尾為0xfe bool def chkEnd(self): return self.buf[self.getPkgEndIdx()]==0xfe #相對起始索引包數(shù)據(jù)的每個字節(jié)索引的映射int def getIdxFromStart(self,i): return self.chkMaxIdx(self.startIdx+i) #檢測校驗和bool def chkCrc(self): crcidx=self.getPkgEndIdx()-1#包尾之前的一個字節(jié) if(crcidx<0): crcidx= self.bufLen+crcidx#包尾索引在緩沖區(qū)前面,包頭在后面 d= self.buf[crcidx] #print(d) r=0 #計算和 for i in range(1, self.pkgLen-2): #print(self.getIdxFromStart(i),":",self.buf[self.getIdxFromStart(i)]) r+=self.buf[self.getIdxFromStart(i)] r=r&0xff #print(r) return d==r #寫數(shù)據(jù)到顯示區(qū) def writeData(self): r=0 for i in range(4,self.pkgLen-2): r=self.buf[self.getIdxFromStart(i)] #print(r) self.linedata[i-4]=r def nextPkg(self): self.startIdx = self.chkMaxIdx(self.getPkgEndIdx()+1) def nextByte(self): self.startIdx=self.chkMaxIdx(self.startIdx+1) #解析包 def parsePKG(self): while(self.pkgIsOk()): if(self.chkStart() and self.chkEnd() and self.chkCrc()): self.writeData() self.dispatch_event(PKGEvent(PKGEvent.PKG_DATA_OK,self.linedata )) self.nextPkg() else: self.nextByte() def writeToBuf(self,byt): self.buf[self.endIdx]=byt self.endIdx=self.chkMaxIdx(self.endIdx+1) def writeArrayToBuf(self,arr): for i in range(len(arr)): self.writeToBuf(arr[i]) self.parsePKG() #''' if __name__ == '__main__': import time buf=[0 for i in range(30)] buf[0]=0xff buf[1]=0x14 buf[2]=0x01 buf[3]=0x01 buf[4]=0xff buf[5]=0xff buf[6]=0xff buf[7]=0xff buf[8]=0x11 buf[9]=0xfe buf[10]=0xff buf[11]=0x14 buf[12]=0x01 buf[13]=0x01 buf[14]=0xff buf[15]=0xff buf[16]=0xff buf[17]=0xff buf[18]=0x12 buf[19]=0xfe buf[22] = 0xff buf[23] = 0x14 buf[24] = 0x01 buf[25] = 0x01 buf[26] = 0xff buf[27] = 0xff buf[28] = 0xff buf[29] = 0xff buf[0] = 0x12 buf[1] = 0xfe #startIdx=5 #endIdx=2 #dataLen=pkgLen-6 #linedata=[0 for i in range(dataLen)] def on_data_ok(e): print("event ",e.data) pkg= FramePkg() pkg.add_event_listener(PKGEvent.PKG_DATA_OK,on_data_ok) c=0 while True: #print(1) #print(pkg.buf) #print(pkg.linedata ) '''print("getDatLen",pkg.getDatLen()) print("pkgIsOk",pkg.pkgIsOk()) print("chkStart",pkg.chkStart()) print("chkEnd",pkg.chkEnd()) print("chkCrc",pkg.chkCrc()) print("getIdxFromStart",pkg.getIdxFromStart(30)) pkg.writeData () print("getPkgEndIdx-------------",pkg.getPkgEndIdx()) pkg.nextPkg() ''' #print("startIdx",pkg.startIdx) pkg.parsePKG() if (c < 30): pkg.writeToBuf(buf[c]) else: pkg.endIdx=pkg.chkMaxIdx(pkg.endIdx+1) c += 1 if(c%100==0): print(c) time.sleep(0.01)
參考前輩的event類
class Event(object): ''' 事件初始化的一個方式 ''' def __init__(self,event_type,data=None): self._type = event_type self._data = data @property def type(self): return self._type @property def data(self): return self._data class EventDispatcher(object): """ event分發(fā)類 監(jiān)聽和分發(fā)event事件 """ def __init__(self): #初始化事件 self._events = dict() def __del__(self): self._events = None def has_listener(self,event_type,listener): if event_type in self._events.keys(): return listener in self._events[event_type] else: return False def dispatch_event(self,event): """ Dispatch an instance of Event class """ # 分發(fā)event到所有關(guān)聯(lián)的listener if event.type in self._events.keys(): listeners = self._events[event.type] for listener in listeners: listener(event) def add_event_listener(self,event_type,listener): #給某種事件類型添加listner if not self.has_listener(event_type,listener): listeners = self._events.get(event_type,[]) listeners.append(listener) self._events[event_type] = listeners def remove_event_listener(self,event_type,listener): if self.has_listener(event_type,listener): listeners = self._events[event_type] if len(listeners) == 1: del self._events[event_type] else: listeners.remove(listener) self._events[event_type] = listeners class PKGEvent(Event): PKG_DATA_OK = "PKG_DATA_OK" if __name__ == '__main__': class Test(EventDispatcher): def __init__(self, a=1, b=1): EventDispatcher.__init__(self) self.a = a self.b = b def testDispEvent(self): self.dispatch_event(PKGEvent(PKGEvent.PKG_DATA_OK, 124)) t=Test() def on_ok(e): print(e.data) t.add_event_listener(PKGEvent.PKG_DATA_OK,on_ok) import time while True: print(1) t.testDispEvent() time.sleep(1)
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
python 動態(tài)渲染 mysql 配置文件的示例
這篇文章主要介紹了python 動態(tài)渲染 mysql 配置文件的示例,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-11-11Python面向?qū)ο笏枷肱c應(yīng)用入門教程【類與對象】
這篇文章主要介紹了Python面向?qū)ο笏枷肱c應(yīng)用,較為詳細(xì)的分析了Python面向?qū)ο笏枷肱c原理,并結(jié)合實例形式分析了類與對象相關(guān)定義、用法及操作注意事項,需要的朋友可以參考下2019-04-04Conda中環(huán)境遷移到另一個服務(wù)器的實現(xiàn)
本文主要介紹了Conda中的環(huán)境遷移到另一個服務(wù)器,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03Python中函數(shù)調(diào)用9大方法小結(jié)
在Python中,函數(shù)是一種非常重要的編程概念,它們使得代碼模塊化、可重用,并且能夠提高代碼的可讀性,本文將深入探討Python函數(shù)調(diào)用的9種方法,需要的可以參考下2024-01-01Python基于pygame實現(xiàn)的彈力球效果(附源碼)
這篇文章主要介紹了Python基于pygame實現(xiàn)的彈力球效果,涉及pygame圖形動態(tài)操作的相關(guān)的技巧,并附帶了完整的源碼供讀者下載參考,需要的朋友可以參考下2015-11-11Python中文分詞庫jieba(結(jié)巴分詞)詳細(xì)使用介紹
這篇文章主要介紹了Python中文分詞庫jieba(結(jié)巴分詞)提取詞,加載詞,修改詞頻,定義詞庫詳細(xì)使用介紹,需要的朋友可以參考下2022-04-04