python實(shí)現(xiàn)事件驅(qū)動(dòng)
本文實(shí)例為大家分享了python實(shí)現(xiàn)事件驅(qū)動(dòng)的具體代碼,供大家參考,具體內(nèi)容如下
EventManager事件管理類實(shí)現(xiàn),大概就百來(lái)行代碼左右。
# encoding: UTF-8 # 系統(tǒng)模塊 from Queue import Queue, Empty from threading import * ################################################# class EventManager: #---------------------------------------------------------------------- def __init__(self): """初始化事件管理器""" # 事件對(duì)象列表 self.__eventQueue = Queue() # 事件管理器開關(guān) self.__active = False # 事件處理線程 self.__thread = Thread(target = self.__Run) # 這里的__handlers是一個(gè)字典,用來(lái)保存對(duì)應(yīng)的事件的響應(yīng)函數(shù) # 其中每個(gè)鍵對(duì)應(yīng)的值是一個(gè)列表,列表中保存了對(duì)該事件監(jiān)聽的響應(yīng)函數(shù),一對(duì)多 self.__handlers = {} #---------------------------------------------------------------------- def __Run(self): """引擎運(yùn)行""" while self.__active == True: try: # 獲取事件的阻塞時(shí)間設(shè)為1秒 event = self.__eventQueue.get(block = True, timeout = 1) self.__EventProcess(event) except Empty: pass #---------------------------------------------------------------------- def __EventProcess(self, event): """處理事件""" # 檢查是否存在對(duì)該事件進(jìn)行監(jiān)聽的處理函數(shù) if event.type_ in self.__handlers: # 若存在,則按順序?qū)⑹录鬟f給處理函數(shù)執(zhí)行 for handler in self.__handlers[event.type_]: handler(event) #---------------------------------------------------------------------- def Start(self): """啟動(dòng)""" # 將事件管理器設(shè)為啟動(dòng) self.__active = True # 啟動(dòng)事件處理線程 self.__thread.start() #---------------------------------------------------------------------- def Stop(self): """停止""" # 將事件管理器設(shè)為停止 self.__active = False # 等待事件處理線程退出 self.__thread.join() #---------------------------------------------------------------------- def AddEventListener(self, type_, handler): """綁定事件和監(jiān)聽器處理函數(shù)""" # 嘗試獲取該事件類型對(duì)應(yīng)的處理函數(shù)列表,若無(wú)則創(chuàng)建 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要注冊(cè)的處理器不在該事件的處理器列表中,則注冊(cè)該事件 if handler not in handlerList: handlerList.append(handler) #---------------------------------------------------------------------- def RemoveEventListener(self, type_, handler): """移除監(jiān)聽器的處理函數(shù)""" #讀者自己試著實(shí)現(xiàn) #---------------------------------------------------------------------- def SendEvent(self, event): """發(fā)送事件,向事件隊(duì)列中存入事件""" self.__eventQueue.put(event) ######################################################################## """事件對(duì)象""" class Event: def __init__(self, type_=None): self.type_ = type_ # 事件類型 self.dict = {} # 字典用于保存具體的事件數(shù)據(jù)
測(cè)試代碼
#------------------------------------------------------------------- # encoding: UTF-8 import sys from datetime import datetime from threading import * from EventManager import * #事件名稱 新文章 EVENT_ARTICAL = "Event_Artical" #事件源 公眾號(hào) class PublicAccounts: def __init__(self,eventManager): self.__eventManager = eventManager def WriteNewArtical(self): #事件對(duì)象,寫了新文章 event = Event(type_=EVENT_ARTICAL) event.dict["artical"] = u'如何寫出更優(yōu)雅的代碼\n' #發(fā)送事件 self.__eventManager.SendEvent(event) print u'公眾號(hào)發(fā)送新文章\n' #監(jiān)聽器 訂閱者 class Listener: def __init__(self,username): self.__username = username #監(jiān)聽器的處理函數(shù) 讀文章 def ReadArtical(self,event): print(u'%s 收到新文章' % self.__username) print(u'正在閱讀新文章內(nèi)容:%s' % event.dict["artical"]) """測(cè)試函數(shù)""" #-------------------------------------------------------------------- def test(): listner1 = Listener("thinkroom") #訂閱者1 listner2 = Listener("steve")#訂閱者2 eventManager = EventManager() #綁定事件和監(jiān)聽器響應(yīng)函數(shù)(新文章) eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical) eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical) eventManager.Start() publicAcc = PublicAccounts(eventManager) timer = Timer(2, publicAcc.WriteNewArtical) timer.start() if __name__ == '__main__': test()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
tensorflow dataset.shuffle、dataset.batch、dataset.repeat順序區(qū)別詳
這篇文章主要介紹了tensorflow dataset.shuffle、dataset.batch、dataset.repeat順序區(qū)別詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2020-06-06python修改字典內(nèi)key對(duì)應(yīng)值的方法
這篇文章主要介紹了python修改字典內(nèi)key對(duì)應(yīng)值的方法,涉及Python中字典賦值的相關(guān)實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07深入理解Python虛擬機(jī)中列表(list)的實(shí)現(xiàn)原理及源碼剖析
在本篇文章當(dāng)中主要給大家介紹?cpython?虛擬機(jī)當(dāng)中針對(duì)列表的實(shí)現(xiàn),在?Python?中,List?是一種非常常用的數(shù)據(jù)類型,可以存儲(chǔ)任何類型的數(shù)據(jù),并且支持各種操作,如添加、刪除、查找、切片等,在本篇文章當(dāng)中將深入去分析這一點(diǎn)是如何實(shí)現(xiàn)的2023-03-03Python編程利用Numpy和PIL庫(kù)將圖片轉(zhuǎn)化為手繪
這篇文章主要介紹了Python編程利用Numpy和PIL庫(kù)將一張圖片轉(zhuǎn)化為手繪風(fēng)格,文中附含詳細(xì)實(shí)現(xiàn)的示例代碼,有需要的朋友可以借鑒參考下2021-09-09Python實(shí)現(xiàn)串口通信(pyserial)過(guò)程解析
這篇文章主要介紹了Python實(shí)現(xiàn)串口通信(pyserial)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09Python數(shù)據(jù)結(jié)構(gòu)與算法之字典樹實(shí)現(xiàn)方法示例
這篇文章主要介紹了Python數(shù)據(jù)結(jié)構(gòu)與算法之字典樹實(shí)現(xiàn)方法,可實(shí)現(xiàn)針對(duì)單詞出現(xiàn)次數(shù)的統(tǒng)計(jì)功能,涉及Python樹結(jié)構(gòu)的定義、遍歷及統(tǒng)計(jì)等相關(guān)操作技巧,需要的朋友可以參考下2017-12-12