python中的生成器實現(xiàn)周期性報文發(fā)送功能
使用python中的生成器實現(xiàn)周期性發(fā)送列表中數(shù)值的報文發(fā)送功能。
功能開發(fā)背景:提取cantest工具采集到的現(xiàn)場報文數(shù)據(jù),希望使用原始的現(xiàn)場數(shù)據(jù)模擬驗證程序現(xiàn)有邏輯,需要開發(fā)一個工具能夠自動按照報文發(fā)送周期依次發(fā)送采集到的報文數(shù)據(jù)中的一個數(shù)值。
功能開發(fā)需求:多個報文發(fā)送對象共用同一個報文發(fā)送線程,多個對象間的報文發(fā)送周期不同,多個對象間的總報文發(fā)送數(shù)據(jù)長度不同,能夠允許報文發(fā)送過程中斷及恢復某個對象的報文發(fā)送。
功能開發(fā)實現(xiàn)邏輯:在固定發(fā)送對象某個數(shù)值的基礎(chǔ)程序版本上增加新的功能,考慮使用python中生成器實現(xiàn)周期性提取對象數(shù)值發(fā)送報文的功能。
目前只需要發(fā)送兩個對象的報文數(shù)據(jù),先定義兩個使用yield生成器:
? ? def yield_item_value_1(self): ? ? ? ? item_value_list = self.item_value_dict[item_list[0]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i] ? ? def yield_item_value_2(self): ? ? ? ? item_value_list = self.item_value_dict[item_list[1]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i]
報文發(fā)送線程中的run()函數(shù):
def run(self): # 實時更新item的被選狀態(tài) self.get_checkbox_res_func() # 獲取每個對象的實際物理值 self.get_item_value_dict() self.item1_value_func = self.yield_item_value_1() self.item2_value_func = self.yield_item_value_2() while self.Flag: if any(msg_send_flag_dict.values()): # 每隔second秒執(zhí)行func函數(shù) timer = Timer(0.01, self.tick_10ms_func) timer.start() self.send_working_msg(self.working_can_device, self.working_can_channel) timer.join() else: mes_info = "Goodbye *** 自動發(fā)送所有報文數(shù)據(jù)結(jié)束?。。? toastone = wx.MessageDialog(None, mes_info, "信息提示", wx.YES_DEFAULT | wx.ICON_QUESTION) if toastone.ShowModal() == wx.ID_YES: # 如果點擊了提示框的確定按鈕 toastone.Destroy() # 則關(guān)閉提示框 break
報文周期性發(fā)送函數(shù):
def send_working_msg(self, can_device, device_id): for idx in range(len(item_list)): if msg_send_flag_dict[item_list[idx]] == 1: msg_id_idx = msg_operation_list.index("報文ID") - 1 msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip()) # 獲取報文發(fā)送幀類型 msg_type_idx = msg_operation_list.index("幀類型") - 1 msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx]) msg_type = 1 if msg_type == "擴展幀" else 0 # 獲取報文發(fā)送周期 msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1 msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx]) send_cycle = msg_cycle / 10 if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle: # 開始喂值 if idx == 0: try: item_phyValue = next(self.item1_value_func) except StopIteration: msg_send_flag_dict[item_list[idx]] = 0 continue else: try: item_phyValue = next(self.item2_value_func) except StopIteration: msg_send_flag_dict[item_list[idx]] = 0 continue msg_data = self.get_item_msg(item_list[idx], item_phyValue) if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0): print("發(fā)送報文成功") # print("msg_data", msg_data) msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0 else: pass # print("發(fā)送報文失敗") # mes_info = "發(fā)送報文失敗" # toastone = wx.MessageDialog(None, mes_info, "信息提示", # wx.YES_DEFAULT | wx.ICON_QUESTION) # if toastone.ShowModal() == wx.ID_YES: # 如果點擊了提示框的確定按鈕 # toastone.Destroy() # 則關(guān)閉提示框
功能實現(xiàn)邏輯的待優(yōu)化點:存在多個對象就需要定義多個存儲報文數(shù)據(jù)的生成器。
上述功能實現(xiàn)邏輯優(yōu)化如下:
? ? def set_yield_func(self): ? ? ? ? item_yield_func_dict = dict() ? ? ? ? for i in range(len(item_list)): ? ? ? ? ? ? item_yield_func_dict[item_list[i]] = self.yield_item_value(i) ? ? ? ? return item_yield_func_dict ? ? def yield_item_value(self, item_idx): ? ? ? ? item_value_list = self.item_value_dict[item_list[item_idx]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i]
報文發(fā)送線程的run()函數(shù)中調(diào)用這個存儲對象報文發(fā)送數(shù)據(jù)生成器的字典item_yield_func_dict:
def run(self): # 實時更新item的被選狀態(tài) self.get_checkbox_res_func() # 獲取每個對象的實際物理值 self.get_item_value_dict() self.item_yield_func_dict = self.set_yield_func() …………
從存儲每個對象生成器的字典item_yield_func_dict中獲取生成器對象:
try: item_phyValue = next(self.item_yield_func_dict[item_list[idx]]) except StopIteration: msg_send_flag_dict[item_list[idx]] = 0 continue
到此這篇關(guān)于python中的生成器實現(xiàn)周期性報文發(fā)送功能的文章就介紹到這了,更多相關(guān)python 周期性報文發(fā)送 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
你知道怎么改進Python 二分法和牛頓迭代法求算術(shù)平方根嗎
這篇文章主要介紹了Python編程實現(xiàn)二分法和牛頓迭代法求平方根代碼的改進,具有一定參考價值,需要的朋友可以了解下,希望能夠給你帶來幫助2021-08-08利用Python計算質(zhì)數(shù)與完全數(shù)的方法實例
這篇文章主要介紹了利用Python計算質(zhì)數(shù)與完全數(shù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03Python實現(xiàn)多維數(shù)據(jù)分析的示例詳解
多維數(shù)據(jù)分析是對數(shù)據(jù)的信息分析,它考慮了許多關(guān)系,這篇文章主要為大家詳細介紹了一些使用Python分析多維/多變量數(shù)據(jù)的基本技術(shù),希望對大家有所幫助2023-11-11利用Python實現(xiàn)K-Means聚類的方法實例(案例:用戶分類)
k-means是發(fā)現(xiàn)給定數(shù)據(jù)集的k個簇的算法,也就是將數(shù)據(jù)集聚合為k類的算法,下面這篇文章主要給大家介紹了關(guān)于利用Python實現(xiàn)K-Means聚類的相關(guān)資料,需要的朋友可以參考下2022-05-05