python實(shí)現(xiàn)簡(jiǎn)易內(nèi)存監(jiān)控
本例主要功能:每隔3秒獲取系統(tǒng)內(nèi)存,當(dāng)內(nèi)存超過設(shè)定的警報(bào)值時(shí),獲取所有進(jìn)程占用內(nèi)存并發(fā)出警報(bào)聲。內(nèi)存值和所有進(jìn)程占用內(nèi)存記入log,log文件按天命名。
1 獲取cpu、內(nèi)存、進(jìn)程信息
利用WMI
簡(jiǎn)單說明下,WMI的全稱是Windows Management Instrumentation,即Windows管理規(guī)范。它是Windows操作系統(tǒng)上管理數(shù)據(jù)和操作的基礎(chǔ)設(shè)施。我們可以使用WMI腳本或者應(yīng)用自動(dòng)化管理任務(wù)等。
安裝模塊
WMI下載地址
win32com下載地址:
學(xué)會(huì)使用WMI
不錯(cuò)的教程
獲取cpu、內(nèi)存、磁盤
def getSysInfo(wmiService = None): result = {} if wmiService == None: wmiService = wmi.WMI() # cpu for cpu in wmiService.Win32_Processor(): timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime()) result['cpuPercent'] = cpu.loadPercentage # memory cs = wmiService.Win32_ComputerSystem() os = wmiService.Win32_OperatingSystem() result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024) result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024) result['memPercent']=result['memFree'] * 100 /result['memTotal'] #disk result['diskTotal'] = 0 result['diskFree'] = 0 for disk in wmiService.Win32_LogicalDisk(DriveType=3): result['diskTotal'] += int(disk.Size) result['diskFree'] += int(disk.FreeSpace) result['diskTotal'] = int(result['diskTotal']/1024/1024) result['diskFree'] = int(result['diskFree']/1024/1024) return result
獲取所有進(jìn)程占用內(nèi)存
def getAllProcessInfo(mywmi = None): """取出全部進(jìn)程的進(jìn)程名,進(jìn)程ID,進(jìn)程實(shí)際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ allProcessList = [] allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process") #print (allProcess.count) for j in allProcess: #print j.Properties_("PercentPrivilegedTime").__int__() ##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__() #for pro in j.Properties_: # print (pro.name) #break name = j.Properties_("name").__str__() if name != "_Total" and name !="Idle": pid = j.Properties_("IDProcess").__str__() PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__() WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024 WorkingSet = j.Properties_("WorkingSet").__int__()/1024 allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime]) return allProcessList
也可以用psutil
import psutil,time from operator import itemgetter, attrgetter def getProcessInfo(p): """取出指定進(jìn)程占用的進(jìn)程名,進(jìn)程ID,進(jìn)程實(shí)際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ try: cpu = int(p.cpu_percent(interval=0)) memory = p.memory_info() rss = memory.rss/1024 vms = memory.vms/1024 name = p.name() pid = p.pid except psutil.Error: name = "Closed_Process" pid = 0 rss = 0 vms = 0 cpu = 0 #return [name.upper(), pid, rss, vms] return [name, pid, vms, rss, cpu] def getAllProcessInfo(): """取出全部進(jìn)程的進(jìn)程名,進(jìn)程ID,進(jìn)程實(shí)際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ instances = [] all_processes = list(psutil.process_iter()) for proc in all_processes: proc.cpu_percent(interval=0) #此處sleep1秒是取正確取出CPU使用率的重點(diǎn) time.sleep(1) for proc in all_processes: instances.append(getProcessInfo(proc)) return instances if __name__ == '__main__': processInfoList = getAllProcessInfo() processInfoList.sort(key=itemgetter(2), reverse=True) for p in processInfoList: print(p)
2. 保存log
配置config文件
[loggers] keys=example01 [logger_example01] handlers=hand04 [handlers] keys=hand04 [handler_hand04] class=handlers.TimedRotatingFileHandler level=DEBUG formatter=form01 args=('./logs/monitor.log', 'd', 1, 7) [formatters] keys=form01,form02 [formatter_form01] format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s datefmt=%Y-%m-%d %H:%M:%S
記錄log
import logging import logging.config logger.info("message") logger.warning("message") logger.error("message")
3. 完整代碼
文件夾結(jié)構(gòu):
maintain
–monitor
—-logs
—-logger.conf
—-monitor.py
–packages
—-init.py
—-processinfo.py
—-sysinfo.py
monitor
import wmi import time import winsound import logging import logging.config from operator import itemgetter, attrgetter from os import path import packages.sysinfo #使用wmi #import packages.ProcessInfo #使用 #def ShowProcessInfo(): # processInfoList = packages.ProcessInfo.getAllProcessInfo() # processInfoList.sort(key=itemgetter(2), reverse=True) # for p in processInfoList: # logger.info(p) def ShowProcessInfo(wmiService = None): processInfoList = packages.sysinfo.getAllProcessInfo(wmiService) processInfoList.sort(key=itemgetter(2), reverse=True) for p in processInfoList: logger.info(p) if __name__ == '__main__': MemPerWorningLine = 50 MemPerErrorLine = 20 ErrorAlertCount = 10 ProcessInfoCount = 10 counterProcessInfo = ProcessInfoCount print("Memory monitor start!") log_file_path = path.join(path.dirname(path.abspath(__file__)), 'logger.conf') #print(log_file_path) logging.config.fileConfig(log_file_path) logger = logging.getLogger("example01") wmiService = wmi.WMI() while True: memPercent = int(packages.sysinfo.getSysInfo(wmiService)['memPercent']) strMemPercent = 'FreeMemory: ' + str(memPercent) + '%' if(memPercent < MemPerErrorLine): logger.error(strMemPercent) #ProcessInfoList counterProcessInfo+=1 if(counterProcessInfo >= ProcessInfoCount): ShowProcessInfo(wmiService) counterProcessInfo = 0 #ALert counter = 1 while counter <= ErrorAlertCount: winsound.Beep(2080, 100) time.sleep(0.1) counter += 1 elif(memPercent < MemPerWorningLine): logger.warning(strMemPercent) #ProcessInfoList counterProcessInfo+=1 if(counterProcessInfo >= ProcessInfoCount): ShowProcessInfo(wmiService) counterProcessInfo = 0 #ALert winsound.Beep(2015, 2000) else: logger.info(strMemPercent) time.sleep(3)
sysinfo
# -*- coding: utf-8 -*- import wmi import os import sys import platform import time import win32api import win32com from win32com.client import GetObject from operator import itemgetter, attrgetter def getSysInfo(wmiService = None): result = {} if wmiService == None: wmiService = wmi.WMI() # cpu for cpu in wmiService.Win32_Processor(): timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime()) result['cpuPercent'] = cpu.loadPercentage # memory cs = wmiService.Win32_ComputerSystem() os = wmiService.Win32_OperatingSystem() result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024) result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024) result['memPercent']=result['memFree'] * 100 /result['memTotal'] #disk result['diskTotal'] = 0 result['diskFree'] = 0 for disk in wmiService.Win32_LogicalDisk(DriveType=3): result['diskTotal'] += int(disk.Size) result['diskFree'] += int(disk.FreeSpace) result['diskTotal'] = int(result['diskTotal']/1024/1024) result['diskFree'] = int(result['diskFree']/1024/1024) return result def sys_version(): c = wmi.WMI () #獲取操作系統(tǒng)版本 for sys in c.Win32_OperatingSystem(): print ("Version:%s" % sys.Caption.encode("UTF8"),"Vernum:%s" % sys.BuildNumber) print (sys.OSArchitecture.encode("UTF8"))#系統(tǒng)是32位還是64位的 print (sys.NumberOfProcesses) #當(dāng)前系統(tǒng)運(yùn)行的進(jìn)程總數(shù) def cpu_mem(): c = wmi.WMI () #CPU類型和內(nèi)存 for processor in c.Win32_Processor(): #print "Processor ID: %s" % processor.DeviceID print ("Process Name: %s" % processor.Name.strip() ) for Memory in c.Win32_PhysicalMemory(): print ("Memory Capacity: %.fMB" %(int(Memory.Capacity)/1048576)) def cpu_use(): #5s取一次CPU的使用率 c = wmi.WMI() while True: for cpu in c.Win32_Processor(): timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime()) print ('%s | Utilization: %s: %d %%' % (timestamp, cpu.DeviceID, cpu.LoadPercentage)) time.sleep(5) def disk(): c = wmi.WMI () #獲取硬盤分區(qū) for physical_disk in c.Win32_DiskDrive (): for partition in physical_disk.associators ("Win32_DiskDriveToDiskPartition"): for logical_disk in partition.associators ("Win32_LogicalDiskToPartition"): print (physical_disk.Caption.encode("UTF8"), partition.Caption.encode("UTF8"), logical_disk.Caption) #獲取硬盤使用百分情況 for disk in c.Win32_LogicalDisk (DriveType=3): print (disk.Caption, "%0.2f%% free" % (100.0 * long (disk.FreeSpace) / long (disk.Size))) def network(): c = wmi.WMI () #獲取MAC和IP地址 for interface in c.Win32_NetworkAdapterConfiguration (IPEnabled=1): print ("MAC: %s" % interface.MACAddress ) for ip_address in interface.IPAddress: print ("ip_add: %s" % ip_address ) print #獲取自啟動(dòng)程序的位置 for s in c.Win32_StartupCommand (): print ("[%s] %s <%s>" % (s.Location.encode("UTF8"), s.Caption.encode("UTF8"), s.Command.encode("UTF8"))) #獲取當(dāng)前運(yùn)行的進(jìn)程 for process in c.Win32_Process (): print (process.ProcessId, process.Name) def getAllProcessInfo(mywmi = None): """取出全部進(jìn)程的進(jìn)程名,進(jìn)程ID,內(nèi)存(專有工作集), 工作集 """ allProcessList = [] allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process") #print (allProcess.count) for j in allProcess: #print j.Properties_("PercentPrivilegedTime").__int__() ##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__() #for pro in j.Properties_: # print (pro.name) #break name = j.Properties_("name").__str__() if name != "_Total" and name !="Idle": pid = j.Properties_("IDProcess").__str__() PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__() WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024 WorkingSet = j.Properties_("WorkingSet").__int__()/1024 allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime]) # allProcess = mywmi.ExecQuery("select * from Win32_Process") # for i in allProcess: # Name = str(i.Properties_("Name")) # ProcessID = int(i.Properties_("ProcessID")) # WorkingSetSize = int(i.Properties_("WorkingSetSize"))/1024 # #VirtualSize = int(i.Properties_("VirtualSize"))/1024 # PeakWorkingSetSize = int(i.Properties_("PeakWorkingSetSize"))/1024 # CreationDate = str(i.Properties_("CreationDate")) # allProcessList.append([Name, ProcessID, WorkingSetSize, PeakWorkingSetSize, CreationDate]) return allProcessList #def main(): #sys_version() #cpu_mem() #disk() #network() #cpu_use() if __name__ == '__main__': #mywmi = GetObject("winmgmts:") mywmi = wmi.WMI() processInfoList = getAllProcessInfo(mywmi) processInfoList.sort(key=itemgetter(2), reverse=True) for processinfo in processInfoList: print(processinfo)
processinfo
import psutil,time from operator import itemgetter, attrgetter def getProcessInfo(p): """取出指定進(jìn)程占用的進(jìn)程名,進(jìn)程ID,進(jìn)程實(shí)際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ try: cpu = int(p.cpu_percent(interval=0)) memory = p.memory_info() rss = memory.rss/1024 vms = memory.vms/1024 name = p.name() pid = p.pid except psutil.Error: name = "Closed_Process" pid = 0 rss = 0 vms = 0 cpu = 0 #return [name.upper(), pid, rss, vms] return [name, pid, vms, rss, cpu] def getAllProcessInfo(): """取出全部進(jìn)程的進(jìn)程名,進(jìn)程ID,進(jìn)程實(shí)際內(nèi)存, 虛擬內(nèi)存,CPU使用率 """ instances = [] all_processes = list(psutil.process_iter()) for proc in all_processes: proc.cpu_percent(interval=0) #此處sleep1秒是取正確取出CPU使用率的重點(diǎn) time.sleep(1) for proc in all_processes: instances.append(getProcessInfo(proc)) return instances if __name__ == '__main__': processInfoList = getAllProcessInfo() processInfoList.sort(key=itemgetter(2), reverse=True) for p in processInfoList: print(p)
logger
#logger.conf ############################################### [loggers] keys=root,example01,example02 [logger_root] level=DEBUG handlers=hand01,hand02 [logger_example01] handlers=hand01,hand04 qualname=example01 propagate=0 [logger_example02] handlers=hand01,hand03 qualname=example02 propagate=0 ############################################### [handlers] keys=hand01,hand02,hand03,hand04 [handler_hand01] class=StreamHandler level=INFO formatter=form02 args=(sys.stderr,) [handler_hand02] class=FileHandler level=DEBUG formatter=form01 args=('myapp.log', 'a') [handler_hand03] class=handlers.RotatingFileHandler level=INFO formatter=form02 args=('myapp.log', 'a', 10*1024*1024, 5) [handler_hand04] class=handlers.TimedRotatingFileHandler level=DEBUG formatter=form01 args=('./logs/monitor.log', 'd', 1, 7) ############################################### [formatters] keys=form01,form02 [formatter_form01] format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s datefmt=%Y-%m-%d %H:%M:%S [formatter_form02] format=%(asctime)-12s: %(levelname)-8s %(message)s datefmt=%Y-%m-%d %H:%M:%S
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
tensorflow 動(dòng)態(tài)獲取 BatchSzie 的大小實(shí)例
這篇文章主要介紹了tensorflow 動(dòng)態(tài)獲取 BatchSzie 的大小實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06python游戲?qū)崙?zhàn)項(xiàng)目之童年經(jīng)典超級(jí)瑪麗
史上十大最經(jīng)典小霸王游戲中魂斗羅只能排在第二,那么第一是誰?最經(jīng)典最風(fēng)靡的當(dāng)屬超級(jí)瑪麗,那個(gè)戴帽子的大胡子穿著背帶褲的馬里奧哪個(gè)不認(rèn)得,小編帶你用python實(shí)現(xiàn)超級(jí)瑪麗緬懷童年2021-09-09selenium+python實(shí)現(xiàn)自動(dòng)登錄腳本
下面小編就為大家分享一篇selenium+python實(shí)現(xiàn)自動(dòng)登錄腳本,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-04-04python簡(jiǎn)單圖片操作:打開\顯示\保存圖像方法介紹
這篇文章主要介紹了python簡(jiǎn)單圖片操作:打開\顯示\保存圖像方法介紹,還涉及將圖片保存為灰度圖的簡(jiǎn)單方法示例,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11一文詳解如何在Python中實(shí)現(xiàn)switch語句
這篇文章主要給大家介紹了關(guān)于如何在Python中實(shí)現(xiàn)switch語句的相關(guān)資料,今天在學(xué)習(xí)python的過程中,發(fā)現(xiàn)python沒有switch這個(gè)語法,所以這里給大家總結(jié)下,需要的朋友可以參考下2023-09-09在 Windows 下搭建高效的 django 開發(fā)環(huán)境的詳細(xì)教程
這篇文章主要介紹了如何在 Windows 下搭建高效的 django 開發(fā)環(huán)境,本文通過一篇詳細(xì)教程實(shí)例代碼相結(jié)合給大家講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07