基于python實現(xiàn)usb熱插拔檢測
1 概述
定義一個名為USBMonitor的類,用于在Linux系統(tǒng)中監(jiān)測USB設備的熱插拔事件。
該類利用了pyudev庫來監(jiān)聽內(nèi)核級設備事件,并且可以為USB設備的插入和移除事件注冊回調(diào)函數(shù)。
主要功能特性
- 基于pyudev庫監(jiān)聽內(nèi)核級設備事件:pyudev是一個用于與Linux系統(tǒng)中的udev(設備管理器)交互的庫,可以用來監(jiān)聽 設備事件。
- 支持USB設備插入/移除事件回調(diào):用戶可以通過傳入回調(diào)函數(shù)來處理USB設備的插入和移除事件。
- 自動過濾非USB設備事件:類內(nèi)部使用filter_by(subsystem='usb')方法,確保只監(jiān)聽USB子系統(tǒng)的事件。
- 支持設備詳細信息獲?。禾峁┰O備的詳細信息,包括廠商ID、產(chǎn)品ID、序列號和名稱等。
pyudev概述
pyudev 是 Linux 系統(tǒng)下與 udev 設備管理子系統(tǒng)交互的 Python 綁定庫。它提供對底層硬件設備的細粒度控制能力,支持:
- 設備枚舉:遍歷系統(tǒng)中的所有設備
- 屬性查詢:獲取設備詳細信息(廠商ID、產(chǎn)品ID等)
- 事件監(jiān)控:實時監(jiān)聽 設備插拔事件
- 規(guī)則管理:與 udev 規(guī)則系統(tǒng)集成(需配合系統(tǒng)配置)
類的主要方法:
- __init__:初始化監(jiān)測器,設置回調(diào)函數(shù)。
- _parse_device_info:解析設備信息,將設備對象轉(zhuǎn)換為包含廠商ID、產(chǎn)品ID等信息的字典。
- _event_handler:統(tǒng)一處理設備事件,根據(jù)事件類型調(diào)用相應的回調(diào)函數(shù)。
- start_monitoring:啟動事件監(jiān)聽循環(huán),持續(xù)監(jiān)聽 設備事件并調(diào)用事件處理器。
- start:在單獨的線程中啟動監(jiān)測循環(huán),以確保主線程不受阻塞。
- stop:安全停止監(jiān)測,設置停止標志并等待監(jiān)測線程結(jié)束。
環(huán)境說明
| 環(huán)境 | 版本 |
|---|---|
| python | V3.10 |
| 系統(tǒng) | ubuntu22.04 |
| 依賴 | pyudev |
依賴安裝
pip install pyudev
2 實現(xiàn)代碼
import pyudev
import threading
import time
class USBMonitor:
"""
Linux系統(tǒng)USB設備熱插拔監(jiān)測
功能特性:
- 基于pyudev庫監(jiān)聽內(nèi)核級設備事件
- 支持USB設備插入/移除事件回調(diào)
- 自動過濾非USB設備事件
- 支持設備詳細信息獲取(廠商ID、產(chǎn)品ID等)
"""
def __init__(self, on_device_added=None, on_device_removed=None):
"""
初始化監(jiān)測器
:param on_device_added: 插入回調(diào)函數(shù) func(device_info_dict)
:param on_device_removed: 移除回調(diào)函數(shù) func(device_info_dict)
"""
self.context = pyudev.Context() # 創(chuàng)建pyudev上下文,用于與udev交互
self.monitor = pyudev.Monitor.from_netlink(self.context) # 創(chuàng)建監(jiān)測器,監(jiān)聽內(nèi)核設備事件
self.monitor.filter_by(subsystem='usb') # 只監(jiān)聽USB子系統(tǒng)的事件
self.on_device_added = on_device_added # USB設備插入事件的回調(diào)函數(shù)
self.on_device_removed = on_device_removed # USB設備移除事件的回調(diào)函數(shù)
self.monitor_thread = None # 監(jiān)測線程
self.running = False # 標志監(jiān)測是否正在運行
def _parse_device_info(self, device):
"""解析設備信息為字典
:param device: 設備對象
:return: 包含設備信息的字典
"""
return {
'action': device.action, # 設備事件類型(add/remove)
'devnode': device.device_node, # 設備節(jié)點路徑
'vendor_id': device.get('ID_VENDOR_ID'), # 廠商ID
'product_id': device.get('ID_MODEL_ID'), # 產(chǎn)品ID
'serial': device.get('ID_SERIAL_SHORT'), # 設備序列號
'name': device.get('ID_MODEL') # 設備名稱
}
def _event_handler(self, device):
"""統(tǒng)一事件處理器
根據(jù)設備事件類型調(diào)用相應的回調(diào)函數(shù)
:param device: 設備對象
"""
info = self._parse_device_info(device) # 解析設備信息
if device.action == 'add': # 設備插入事件
if callable(self.on_device_added): # 檢查回調(diào)函數(shù)是否可調(diào)用
self.on_device_added(info)
elif device.action == 'remove': # 設備移除事件
if callable(self.on_device_removed): # 檢查回調(diào)函數(shù)是否可調(diào)用
self.on_device_removed(info)
def start_monitoring(self):
"""啟動事件監(jiān)聽循環(huán)
在循環(huán)中監(jiān)聽 設備事件并調(diào)用事件處理器
"""
self.running = True # 設置運行標志
print("[USB Monitor] 開始監(jiān)測USB設備...")
for device in iter(self.monitor.poll, None): # 持續(xù)監(jiān)聽 設備事件
if not self.running: # 如果停止標志被設置,退出循環(huán)
break
self._event_handler(device) # 調(diào)用事件處理器
def start(self):
"""啟動監(jiān)測線程
在單獨的線程中運行監(jiān)測循環(huán)
"""
if not self.monitor_thread or not self.monitor_thread.is_alive(): # 檢查線程是否已啟動
self.monitor_thread = threading.Thread(target=self.start_monitoring) # 創(chuàng)建新線程
self.monitor_thread.daemon = True # 設置為守護線程
self.monitor_thread.start() # 啟動線程
def stop(self):
"""安全停止監(jiān)測
停止監(jiān)測線程并等待其結(jié)束
"""
self.running = False # 設置停止標志
if self.monitor_thread and self.monitor_thread.is_alive(): # 檢查線程是否仍在運行
self.monitor_thread.join(timeout=2) # 等待線程結(jié)束,超時時間為2秒
# 使用示例
if __name__ == "__main__":
def device_added_callback(device_info):
"""設備插入回調(diào)函數(shù)
:param device_info: 包含設備信息的字典
"""
print(f"[插入] 設備接入: {device_info['name']}")
print(f" 廠商ID: {device_info['vendor_id']}")
print(f" 產(chǎn)品ID: {device_info['product_id']}")
print(f" 設備路徑: {device_info['devnode']}")
def device_removed_callback(device_info):
"""設備移除回調(diào)函數(shù)
:param device_info: 包含設備信息的字典[README](USB熱插拔監(jiān)測.assets/README.md)
"""
print(f"[移除] 設備斷開: {device_info['name']}")
print(f" 設備路徑: {device_info['devnode']}")
monitor = LinuxUSBMonitor(
on_device_added=device_added_callback, # 設置插入回調(diào)函數(shù)
on_device_removed=device_removed_callback # 設置移除回調(diào)函數(shù)
)
try:
monitor.start() # 啟動USB監(jiān)測
print("監(jiān)測運行中,按Ctrl+C停止...")
while True:
time.sleep(1) # 主線程保持運行
except KeyboardInterrupt:
monitor.stop() # 停止USB監(jiān)測
print("\n監(jiān)測已停止")
3 演示效果

4.方法補充
下面我們來看看如何在Windows平臺實現(xiàn)USB設備實時監(jiān)控類
實現(xiàn)代碼
import wmi
import threading
import time
import pythoncom # 用于處理COM庫的線程安全初始化
class USBMonitor:
"""
USB設備實時監(jiān)控類(Windows平臺專用)
功能特性:
- 基于WMI實現(xiàn)設備熱插拔事件監(jiān)聽
- 支持設備插入/移除的雙向回調(diào)機制
- 使用獨立監(jiān)控線程避免阻塞主程序
- 自動處理COM庫線程初始化問題
"""
def __init__(self, on_device_added=None, on_device_removed=None):
"""
初始化監(jiān)控實例
:param on_device_added: 設備插入回調(diào)函數(shù),格式 func(device_obj)
:param on_device_removed: 設備移除回調(diào)函數(shù),格式 func(device_obj)
"""
self.on_device_added = on_device_added
self.on_device_removed = on_device_removed
self.monitor_thread = threading.Thread(target=self.start_monitoring)
self.monitor_thread.daemon = True # 設為守護線程確保主程序退出時自動終止
self.thread_running = False # 線程運行狀態(tài)標志
def get_all_devices(self):
"""獲取當前已連接的USB設備列表(快照)"""
c = wmi.WMI(moniker="root/cimv2")
return c.Win32_USBControllerDevice() # 返回Win32_USBControllerDevice對象集合
def device_added_callback(self, new_device):
"""內(nèi)部設備插入事件處理器"""
if callable(self.on_device_added): # 安全檢查回調(diào)函數(shù)
self.on_device_added(new_device)
def device_removed_callback(self, removed_device):
"""內(nèi)部設備移除事件處理器"""
if callable(self.on_device_removed):
self.on_device_removed(removed_device)
def start_monitoring(self):
"""監(jiān)控線程主循環(huán)(核心邏輯)"""
pythoncom.CoInitialize() # 每個線程必須獨立初始化COM庫
try:
c = wmi.WMI(moniker="root/cimv2")
# 配置插入事件監(jiān)聽器(1秒輪詢間隔)
arr_filter = c.Win32_PnPEntity.watch_for(
notification_type="creation",
delay_secs=1
)
# 配置移除事件監(jiān)聽器
rem_filter = c.Win32_PnPEntity.watch_for(
notification_type="deletion",
delay_secs=1
)
self.thread_running = True
print("[監(jiān)控系統(tǒng)] USB設備監(jiān)控服務啟動")
while self.thread_running:
# 非阻塞式檢測插入事件(0.5秒超時)
try:
new_device = arr_filter(0.5)
self.device_added_callback(new_device)
except wmi.x_wmi_timed_out:
pass # 正常超時,繼續(xù)循環(huán)
# 非阻塞式檢測移除事件
try:
removed_device = rem_filter(0.5)
self.device_removed_callback(removed_device)
except wmi.x_wmi_timed_out:
pass
except KeyboardInterrupt:
print("\n[監(jiān)控系統(tǒng)] 用戶主動終止監(jiān)控")
finally:
pythoncom.CoUninitialize() # 必須釋放COM資源
print("[監(jiān)控系統(tǒng)] 監(jiān)控服務已安全關閉")
def start(self):
"""啟動監(jiān)控服務"""
if not self.monitor_thread.is_alive():
self.monitor_thread.start()
def stop(self):
"""安全停止監(jiān)控服務"""
self.thread_running = False
if self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=2) # 等待線程結(jié)束
# 示例回調(diào)實現(xiàn)
def on_device_added(device):
"""設備插入事件處理示例"""
print(f"[事件] 新設備接入: {device.Description}")
def on_device_removed(device):
"""設備移除事件處理示例"""
print(f"[事件] 設備移除: {device.Description}")
# 使用示例
if __name__ == "__main__":
# 初始化帶回調(diào)的監(jiān)控實例
monitor = USBMonitor(on_device_added, on_device_removed)
try:
monitor.start()
print("主程序運行中,按Ctrl+C停止監(jiān)控...")
while True: # 主線程保持活動
time.sleep(1)
except KeyboardInterrupt:
monitor.stop()
print("程序正常退出")
演示效果

到此這篇關于基于python實現(xiàn)usb熱插拔檢測的文章就介紹到這了,更多相關python監(jiān)測usb熱插拔內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python標準庫內(nèi)置函數(shù)complex介紹
這篇文章主要介紹了Python標準庫內(nèi)置函數(shù)complex介紹,本文先是講解了complex的作用和使用注意,然后給出了使用示例,需要的朋友可以參考下2014-11-11
利用Python監(jiān)控設備電池電量并發(fā)送通知
在日常使用電子設備時,及時了解電池電量狀態(tài)并進行合理充電是非常重要的,本文將使用Python進行設備電池電量的監(jiān)控并發(fā)送通知,有需要的可以了解下2025-03-03
Python3.7 讀取音頻根據(jù)文件名生成腳本的代碼
這篇文章主要介紹了Python3.7 讀取音頻根據(jù)文件名生成字幕腳本的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04
在Python中處理字符串之isdecimal()方法的使用
這篇文章主要介紹了在Python中處理字符串之isdecimal()方法的使用,是Python入門學習的基礎知識,需要的朋友可以參考下2015-05-05

