python使用watchdog實現(xiàn)文件資源監(jiān)控
Python文件監(jiān)控庫watchdogs簡介
隨著Python在各種應(yīng)用領(lǐng)域中的廣泛使用,其生態(tài)環(huán)境也在持續(xù)繁榮,特別是各種第三方庫的誕生,極大地豐富了開發(fā)者的工具箱。在眾多庫中, watchdog 作為一個高效的文件系統(tǒng)監(jiān)控庫,對于實時響應(yīng)文件系統(tǒng)的各種事件(如文件創(chuàng)建、修改、刪除等)提供了極大便利。無論是在需要監(jiān)控項目文件變化以觸發(fā)自動化構(gòu)建的開發(fā)場景,還是在需要日志記錄、文件備份等運維任務(wù)中, watchdog 都能發(fā)揮重要的作用。
文件資源監(jiān)控
安裝:pip install watchdog
watchdog 支持跨平臺文件資源監(jiān)控,可以檢測指定文件夾下文件及文件夾變動,當(dāng)出現(xiàn)不同變動(新增、刪除、修改)后,可進(jìn)行相應(yīng)處理。
from time import sleep, strftime from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class FileMonitoring(FileSystemEventHandler): # event.is_directory 判斷是否是目錄操作 def on_modified(self, event): # 文件或文件夾被修改時觸發(fā)(新增、刪除及修改文件或文件夾都會被監(jiān)測為文件夾的修改) tm = strftime("%Y-%m-%d %H:%M:%S") print(f'{tm} 修改文件夾{event.src_path}') def on_created(self, event): # 文件或目錄被創(chuàng)建時觸發(fā) tm = strftime("%Y-%m-%d %H:%M:%S") if event.is_directory: print(f'{tm} 創(chuàng)建文件夾{event.src_path}') else: print(f'{tm} 創(chuàng)建文件{event.src_path}') def on_deleted(self, event): # 文件或目錄被刪除時觸發(fā) tm = strftime("%Y-%m-%d %H:%M:%S") if event.is_directory: print(f'{tm} 刪除文件夾{event.src_path}') else: print(f'{tm} 刪除文件{event.src_path}') def on_moved(self, event): # 目錄的創(chuàng)建和刪除操作,被Watchdog視為移動操作,因此通過on_moved方法進(jìn)行捕捉,重命名也可用此捕獲。 tm = strftime("%Y-%m-%d %H:%M:%S") if event.is_directory: print(f'{tm} 重命名文件夾:文件夾{event.src_path} 重命名為 {event.dest_path}') else: print(f'{tm} 重命名文件:文件{event.src_path} 重命名為 {event.dest_path}') class MyFileMonitor(): def __init__(self): self.event_handler = FileMonitoring() # 初始化事件處理器實例 def sever(self, watched_dir, recursive: bool = True): ''' :param watched_dir: .schedule 方法設(shè)置要監(jiān)控的目錄路徑和事件處理器 :param recursive: recursive=True 參數(shù)以遞歸地監(jiān)控目錄內(nèi)的所有子目錄 :return: ''' self.observer = Observer() # 初始化觀察器 self.observer.schedule(self.event_handler, watched_dir, recursive=recursive) # 將事件處理器關(guān)聯(lián)到觀察器 return self.observer def run(self, watched_dir, recursive: bool = True): ''' 啟動觀察器并進(jìn)入無限循環(huán),直到用戶按下 Ctrl+C 中斷程序, observer.stop() 和 observer.join() 來正確停止和清理觀察器資源。 ''' observer = self.sever(watched_dir, recursive) observer.start() try: while True: sleep(.1) except KeyboardInterrupt: observer.stop() observer.join()
監(jiān)測文件夾新增、刪除、修改文件夾
在Python中,watchdog
庫確實提供了方便的方式來監(jiān)控文件夾及其內(nèi)部文件的新增、刪除、修改以及重命名等事件。下面是一個簡單的示例,展示如何使用 watchdog
來監(jiān)聽一個文件夾中的這些事件
import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 定義事件處理器類 class MyHandler(FileSystemEventHandler): def on_modified(self, event): print(f'File modified: {event.src_path}') def on_created(self, event): print(f'File created: {event.src_path}') def on_deleted(self, event): print(f'File deleted: {event.src_path}') # 對于文件夾的創(chuàng)建、刪除操作,請注意: # 目錄級別的事件可通過on_moved方法捕獲,因為創(chuàng)建和刪除目錄也表現(xiàn)為移動事件 def on_moved(self, event): if event.is_directory: what = "Directory" else: what = "File" print(f'{what} moved: {event.src_path} to {event.dest_path}') # 初始化事件處理器實例 event_handler = MyHandler() # 指定要監(jiān)控的文件夾路徑 watched_dir = '/path/to/watch' # 初始化觀察器 observer = Observer() # 將事件處理器關(guān)聯(lián)到觀察器 observer.schedule(event_handler, watched_dir, recursive=True) # 啟動觀察器 observer.start() print(f'Starting to watch {watched_dir} for file system events...') try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
在這個例子中,MyHandler 類繼承了 FileSystemEventHandler 并實現(xiàn)了幾個方法來處理不同的文件系統(tǒng)事件。on_modified 方法會在文件被修改時觸發(fā),on_created 在文件或目錄被創(chuàng)建時觸發(fā),而 on_deleted 在文件或目錄被刪除時觸發(fā)。對于目錄的創(chuàng)建和刪除操作,由于Watchdog將它們視為移動操作,因此通過 on_moved 方法進(jìn)行捕捉,并檢查 event.is_directory 屬性以區(qū)分是文件還是目錄。
通過 observer.schedule 方法設(shè)置要監(jiān)控的目錄路徑和事件處理器,并設(shè)置 recursive=True 參數(shù)以遞歸地監(jiān)控目錄內(nèi)的所有子目錄。
最后啟動觀察器并進(jìn)入無限循環(huán),直到用戶按下 Ctrl+C 中斷程序,此時會調(diào)用 observer.stop() 和 observer.join() 來正確停止和清理觀察器資源。
到此這篇關(guān)于python使用watchdog實現(xiàn)文件資源監(jiān)控的文章就介紹到這了,更多相關(guān)python watchdog文件資源監(jiān)控內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解python百行有效代碼實現(xiàn)漢諾塔小游戲(簡約版)
這篇文章主要介紹了詳解python百行有效代碼實現(xiàn)漢諾塔小游戲(簡約版),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10Keras 切換后端方式(Theano和TensorFlow)
這篇文章主要介紹了Keras 切換后端方式(Theano和TensorFlow),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06