使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器
需求背景
定時(shí)器的核心功能是能夠周期性地觸發(fā)回調(diào)函數(shù),同時(shí)需要支持啟動(dòng)、停止以及狀態(tài)檢查等操作。在多線程或異步編程場景中,希望定時(shí)器能夠:
- 支持異步操作,避免阻塞主線程;
- 單例化事件循環(huán),節(jié)省資源;
- 優(yōu)雅地管理定時(shí)器的生命周期;
- 提供簡單的接口,易于使用。
為此,設(shè)計(jì)了一個(gè) Timer
類,結(jié)合 asyncio
和 threading
,實(shí)現(xiàn)了一個(gè)高效的定時(shí)器。
代碼
完整代碼
import asyncio import threading import time import sys class Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start() @classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"事件循環(huán)異常: {e}") finally: loop.close() @classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop) # 不使用 join,因?yàn)槭刈o(hù)線程會(huì)在主線程退出時(shí)自動(dòng)結(jié)束 def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass # 正常取消時(shí)忽略 except Exception as e: print(f"定時(shí)器循環(huán)異常: {e}") finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown() def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 # print(f"定時(shí)器已啟動(dòng),每{interval}秒執(zhí)行一次") def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False # print("定時(shí)器已停止") def is_active(self): return self.is_running # 使用示例 def callback1(): print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}") def callback2(): print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}") if __name__ == "__main__": timer1 = Timer() timer2 = Timer() timer1.start(2, callback1) timer2.start(3, callback2) try: time.sleep(100) timer1.stop() time.sleep(2) timer2.stop() except KeyboardInterrupt: timer1.stop() timer2.stop() finally: # 確保在程序退出時(shí)清理 Timer._shutdown()
1. 單例事件循環(huán)的實(shí)現(xiàn)
為了避免每個(gè)定時(shí)器都創(chuàng)建一個(gè)獨(dú)立的事件循環(huán),在 Timer
類中使用了類變量和類方法來管理全局唯一的事件循環(huán):
class Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start()
_loop
:存儲(chǔ)全局的asyncio
事件循環(huán)。_thread
:將事件循環(huán)運(yùn)行在一個(gè)獨(dú)立的守護(hù)線程中,避免阻塞主線程。_lock
:線程鎖,確保在多線程環(huán)境中創(chuàng)建事件循環(huán)時(shí)的線程安全。_ensure_loop
:在需要時(shí)創(chuàng)建或重用事件循環(huán),確保只有一個(gè)全局循環(huán)。
守護(hù)線程(daemon=True
)的設(shè)計(jì)使得程序退出時(shí)無需顯式關(guān)閉線程,簡化了資源清理。
2. 事件循環(huán)的運(yùn)行與關(guān)閉
事件循環(huán)的運(yùn)行邏輯封裝在 _run_loop
中:
@classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"事件循環(huán)異常: {e}") finally: loop.close()
run_forever
:讓事件循環(huán)持續(xù)運(yùn)行,直到被外部停止。- 異常處理:捕獲可能的錯(cuò)誤并打印,便于調(diào)試。
finally
:確保循環(huán)關(guān)閉時(shí)資源被正確釋放。
關(guān)閉邏輯則由 _shutdown
方法控制:
@classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop)
當(dāng)所有定時(shí)器都停止時(shí)(_running_timers == 0
),事件循環(huán)會(huì)被安全停止。
3. 定時(shí)器核心邏輯
每個(gè) Timer
實(shí)例負(fù)責(zé)管理一個(gè)獨(dú)立的定時(shí)任務(wù):
def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass # 正常取消時(shí)忽略 finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown()
_stop_event
:一個(gè)asyncio.Event
對(duì)象,用于控制定時(shí)器的停止。_timer_loop
:異步協(xié)程,每隔interval
秒執(zhí)行一次回調(diào)函數(shù)callback
。run_in_executor
:將回調(diào)函數(shù)運(yùn)行在默認(rèn)的線程池中,避免阻塞事件循環(huán)。
4. 啟動(dòng)與停止
啟動(dòng)和停止方法是用戶的主要接口:
def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False
start
:啟動(dòng)定時(shí)器,確保事件循環(huán)可用,并記錄運(yùn)行中的定時(shí)器數(shù)量。stop
:通過設(shè)置_stop_event
并取消任務(wù)來停止定時(shí)器。
5. 使用示例
以下是一個(gè)簡單的使用示例:
def callback1(): print(f"回調(diào)1觸發(fā): {time.strftime('%H:%M:%S')}") def callback2(): print(f"回調(diào)2觸發(fā): {time.strftime('%H:%M:%S')}") timer1 = Timer() timer2 = Timer() timer1.start(2, callback1) # 每2秒觸發(fā)一次 timer2.start(3, callback2) # 每3秒觸發(fā)一次 time.sleep(10) # 運(yùn)行10秒 timer1.stop() timer2.stop()
輸出可能如下:
回調(diào)1觸發(fā): 14:30:02 回調(diào)2觸發(fā): 14:30:03 回調(diào)1觸發(fā): 14:30:04 回調(diào)1觸發(fā): 14:30:06 回調(diào)2觸發(fā): 14:30:06 ...
設(shè)計(jì)亮點(diǎn)
- 異步與多線程結(jié)合:通過
asyncio
和threading
,實(shí)現(xiàn)了非阻塞的定時(shí)器,適合高并發(fā)場景。 - 資源高效利用:全局唯一的事件循環(huán)避免了重復(fù)創(chuàng)建的開銷。
- 優(yōu)雅的生命周期管理:守護(hù)線程和自動(dòng)關(guān)閉機(jī)制簡化了資源清理。
- 線程安全:使用鎖機(jī)制確保多線程環(huán)境下的穩(wěn)定性。
適用場景
- 周期性任務(wù)調(diào)度,如數(shù)據(jù)刷新、狀態(tài)檢查。
- 后臺(tái)服務(wù)中的定時(shí)監(jiān)控。
- 游戲或?qū)崟r(shí)應(yīng)用中的計(jì)時(shí)器需求。
總結(jié)
這個(gè)異步定時(shí)器實(shí)現(xiàn)結(jié)合了 Python 的異步編程和多線程特性,提供了一個(gè)輕量、靈活的解決方案。無論是簡單的腳本還是復(fù)雜的后臺(tái)服務(wù),它都能勝任。如果你需要一個(gè)可靠的定時(shí)器,不妨試試這個(gè)實(shí)現(xiàn),或者根據(jù)需求進(jìn)一步優(yōu)化它!
以上就是使用Python實(shí)現(xiàn)一個(gè)優(yōu)雅的異步定時(shí)器的詳細(xì)內(nèi)容,更多關(guān)于Python異步定時(shí)器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在linux下實(shí)現(xiàn) python 監(jiān)控usb設(shè)備信號(hào)
今天小編就為大家分享一篇在linux下實(shí)現(xiàn) python 監(jiān)控usb設(shè)備信號(hào),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-07-07關(guān)于torch.flatten()函數(shù)及x=x.view()函數(shù)的理解
這篇文章主要介紹了關(guān)于torch.flatten()函數(shù)及x=x.view()函數(shù)的理解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04Python pyecharts Line折線圖的具體實(shí)現(xiàn)
折線圖在很多圖標(biāo)中都有使用,本文主要介紹了Python pyecharts Line折線圖的具體實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Python報(bào)錯(cuò)TypeError: ‘dict‘ object is not&
在Python開發(fā)的旅程中,報(bào)錯(cuò)信息就像是一個(gè)個(gè)路障,阻礙著我們前進(jìn)的步伐,而“TypeError: ‘dict’ object is not iterable”這個(gè)報(bào)錯(cuò),常常讓開發(fā)者們陷入困惑,那么,這個(gè)報(bào)錯(cuò)究竟是怎么產(chǎn)生的呢?又該如何有效地解決它呢?讓我們一起深入探討,找到解決問題的方法2024-10-10TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程
今天動(dòng)手開始搭建TensorFlow開發(fā)環(huán)境,所以下面這篇文章主要給大家介紹了關(guān)于TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11