使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器
需求背景
定時(shí)器的核心功能是能夠周期性地觸發(fā)回調(diào)函數(shù),同時(shí)需要支持啟動(dòng)、停止以及狀態(tài)檢查等操作。在多線程或異步編程場(chǎng)景中,希望定時(shí)器能夠:
- 支持異步操作,避免阻塞主線程;
- 單例化事件循環(huán),節(jié)省資源;
- 優(yōu)雅地管理定時(shí)器的生命周期;
- 提供簡(jiǎn)單的接口,易于使用。
為此,設(shè)計(jì)了一個(gè) Timer
類(lèi),結(jié)合 asyncio
和 threading
,實(shí)現(xiàn)了一個(gè)高效的定時(shí)器。
代碼
完整代碼
import asyncio import threading import time import sys class Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start() @classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"事件循環(huán)異常: {e}") finally: loop.close() @classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop) # 不使用 join,因?yàn)槭刈o(hù)線程會(huì)在主線程退出時(shí)自動(dòng)結(jié)束 def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass # 正常取消時(shí)忽略 except Exception as e: print(f"定時(shí)器循環(huán)異常: {e}") finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown() def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 # print(f"定時(shí)器已啟動(dòng),每{interval}秒執(zhí)行一次") def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False # print("定時(shí)器已停止") def is_active(self): return self.is_running # 使用示例 def callback1(): print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}") def callback2(): print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}") if __name__ == "__main__": timer1 = Timer() timer2 = Timer() timer1.start(2, callback1) timer2.start(3, callback2) try: time.sleep(100) timer1.stop() time.sleep(2) timer2.stop() except KeyboardInterrupt: timer1.stop() timer2.stop() finally: # 確保在程序退出時(shí)清理 Timer._shutdown()
1. 單例事件循環(huán)的實(shí)現(xiàn)
為了避免每個(gè)定時(shí)器都創(chuàng)建一個(gè)獨(dú)立的事件循環(huán),在 Timer
類(lèi)中使用了類(lèi)變量和類(lèi)方法來(lái)管理全局唯一的事件循環(huán):
class Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start()
_loop
:存儲(chǔ)全局的asyncio
事件循環(huán)。_thread
:將事件循環(huán)運(yùn)行在一個(gè)獨(dú)立的守護(hù)線程中,避免阻塞主線程。_lock
:線程鎖,確保在多線程環(huán)境中創(chuàng)建事件循環(huán)時(shí)的線程安全。_ensure_loop
:在需要時(shí)創(chuàng)建或重用事件循環(huán),確保只有一個(gè)全局循環(huán)。
守護(hù)線程(daemon=True
)的設(shè)計(jì)使得程序退出時(shí)無(wú)需顯式關(guān)閉線程,簡(jiǎn)化了資源清理。
2. 事件循環(huán)的運(yùn)行與關(guān)閉
事件循環(huán)的運(yùn)行邏輯封裝在 _run_loop
中:
@classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"事件循環(huán)異常: {e}") finally: loop.close()
run_forever
:讓事件循環(huán)持續(xù)運(yùn)行,直到被外部停止。- 異常處理:捕獲可能的錯(cuò)誤并打印,便于調(diào)試。
finally
:確保循環(huán)關(guān)閉時(shí)資源被正確釋放。
關(guān)閉邏輯則由 _shutdown
方法控制:
@classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop)
當(dāng)所有定時(shí)器都停止時(shí)(_running_timers == 0
),事件循環(huán)會(huì)被安全停止。
3. 定時(shí)器核心邏輯
每個(gè) Timer
實(shí)例負(fù)責(zé)管理一個(gè)獨(dú)立的定時(shí)任務(wù):
def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass # 正常取消時(shí)忽略 finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown()
_stop_event
:一個(gè)asyncio.Event
對(duì)象,用于控制定時(shí)器的停止。_timer_loop
:異步協(xié)程,每隔interval
秒執(zhí)行一次回調(diào)函數(shù)callback
。run_in_executor
:將回調(diào)函數(shù)運(yùn)行在默認(rèn)的線程池中,避免阻塞事件循環(huán)。
4. 啟動(dòng)與停止
啟動(dòng)和停止方法是用戶(hù)的主要接口:
def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False
start
:?jiǎn)?dòng)定時(shí)器,確保事件循環(huán)可用,并記錄運(yùn)行中的定時(shí)器數(shù)量。stop
:通過(guò)設(shè)置_stop_event
并取消任務(wù)來(lái)停止定時(shí)器。
5. 使用示例
以下是一個(gè)簡(jiǎn)單的使用示例:
def callback1(): print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}") def callback2(): print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}") timer1 = Timer() timer2 = Timer() timer1.start(2, callback1) # 每2秒觸發(fā)一次 timer2.start(3, callback2) # 每3秒觸發(fā)一次 time.sleep(10) # 運(yùn)行10秒 timer1.stop() timer2.stop()
輸出可能如下:
回調(diào)1觸發(fā): 14:30:02 回調(diào)2觸發(fā): 14:30:03 回調(diào)1觸發(fā): 14:30:04 回調(diào)1觸發(fā): 14:30:06 回調(diào)2觸發(fā): 14:30:06 ...
設(shè)計(jì)亮點(diǎn)
- 異步與多線程結(jié)合:通過(guò)
asyncio
和threading
,實(shí)現(xiàn)了非阻塞的定時(shí)器,適合高并發(fā)場(chǎng)景。 - 資源高效利用:全局唯一的事件循環(huán)避免了重復(fù)創(chuàng)建的開(kāi)銷(xiāo)。
- 優(yōu)雅的生命周期管理:守護(hù)線程和自動(dòng)關(guān)閉機(jī)制簡(jiǎn)化了資源清理。
- 線程安全:使用鎖機(jī)制確保多線程環(huán)境下的穩(wěn)定性。
適用場(chǎng)景
- 周期性任務(wù)調(diào)度,如數(shù)據(jù)刷新、狀態(tài)檢查。
- 后臺(tái)服務(wù)中的定時(shí)監(jiān)控。
- 游戲或?qū)崟r(shí)應(yīng)用中的計(jì)時(shí)器需求。
總結(jié)
這個(gè)異步定時(shí)器實(shí)現(xiàn)結(jié)合了 Python 的異步編程和多線程特性,提供了一個(gè)輕量、靈活的解決方案。無(wú)論是簡(jiǎn)單的腳本還是復(fù)雜的后臺(tái)服務(wù),它都能勝任。如果你需要一個(gè)可靠的定時(shí)器,不妨試試這個(gè)實(shí)現(xiàn),或者根據(jù)需求進(jìn)一步優(yōu)化它!
以上就是使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器的詳細(xì)內(nèi)容,更多關(guān)于Python異步定時(shí)器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
利用Python實(shí)現(xiàn)去重聚合Excel數(shù)據(jù)并對(duì)比兩份數(shù)據(jù)的差異
在數(shù)據(jù)處理過(guò)程中,常常需要將多個(gè)數(shù)據(jù)表進(jìn)行合并,并進(jìn)行比對(duì),以便找出數(shù)據(jù)的差異和共同之處,本文將介紹如何使用 Pandas 庫(kù)對(duì)兩個(gè) Excel 數(shù)據(jù)表進(jìn)行合并與比對(duì),需要的可以參考下2023-11-11Python連接達(dá)夢(mèng)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)示例
本文主要介紹了Python連接達(dá)夢(mèng)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)示例,dmPython是DM提供的依據(jù)Python DB API version 2.0中API使用規(guī)定而開(kāi)發(fā)的數(shù)據(jù)庫(kù)訪問(wèn)接口,使Python應(yīng)用程序能夠?qū)M數(shù)據(jù)庫(kù)進(jìn)行訪問(wèn)2023-12-12python設(shè)置隨機(jī)種子實(shí)例講解
在本篇文章里小編給大家整理的是關(guān)于python設(shè)置隨機(jī)種子的相關(guān)知識(shí)點(diǎn)以及實(shí)例內(nèi)容,需要的朋友們學(xué)習(xí)下。2019-09-09python之線程通過(guò)信號(hào)pyqtSignal刷新ui的方法
今天小編就為大家分享一篇python之線程通過(guò)信號(hào)pyqtSignal刷新ui的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01Python OpenGL繪制一場(chǎng)煙花盛會(huì)
正值新春佳節(jié),小編今天為大家?guī)?lái)了用Python OpenGL繪制的一場(chǎng)煙花盛會(huì),文中的實(shí)現(xiàn)步驟講解詳細(xì),感興趣的小伙伴可以跟隨小編一起動(dòng)手試一試2022-02-02基于python的selenium兩種文件上傳操作實(shí)現(xiàn)詳解
這篇文章主要介紹了基于python的selenium兩種文件上傳操作實(shí)現(xiàn)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09python爬蟲(chóng)實(shí)現(xiàn)教程轉(zhuǎn)換成 PDF 電子書(shū)
本文給大家分享的是使用python爬蟲(chóng)實(shí)現(xiàn)把《廖雪峰的 Python 教程》轉(zhuǎn)換成PDF的方法和代碼,有需要的小伙伴可以參考下2017-02-02django自定義Field實(shí)現(xiàn)一個(gè)字段存儲(chǔ)以逗號(hào)分隔的字符串
這篇文章主要介紹了django自定義Field實(shí)現(xiàn)一個(gè)字段存儲(chǔ)以逗號(hào)分隔的字符串的示例,需要的朋友可以參考下2014-04-04