Python封裝zabbix-get接口的代碼分享
Zabbix 是一款強(qiáng)大的開(kāi)源網(wǎng)管監(jiān)控工具,該工具的客戶端與服務(wù)端是分開(kāi)的,我們可以直接使用自帶的zabbix_get
命令來(lái)實(shí)現(xiàn)拉取客戶端上的各種數(shù)據(jù),在本地組裝參數(shù)并使用Popen開(kāi)子線程執(zhí)行該命令,即可實(shí)現(xiàn)批量監(jiān)測(cè)。
封裝Engine類: 該類的主要封裝了Zabbix接口的調(diào)用,包括最基本的參數(shù)收集.
import subprocess,datetime,time,math class Engine(): def __init__(self,address,port): self.address = address self.port = port def GetValue(self,key): try: command = "get.exe -s {0} -p {1} -k {2}".format(self.address,self.port,key).split(" ") start = datetime.datetime.now() process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) while process.poll() is None: time.sleep(1) now = datetime.datetime.now() if (now - start).seconds > 2: return 0 return str(process.stdout.readlines()[0].split()[0],"utf-8") except Exception: return 0 # ping檢測(cè) def GetPing(self): ref_dict = {"Address":0,"Ping":0} ref_dict["Address"] = self.address ref_dict["Ping"] = self.GetValue("agent.ping") if ref_dict["Ping"] == "1": return ref_dict else: ref_dict["Ping"] = "0" return ref_dict return ref_dict # 獲取主機(jī)組基本信息 def GetSystem(self): ref_dict = { "Address" : 0 ,"HostName" : 0,"Uname":0 } ref_dict["Address"] = self.address ref_dict["HostName"] = self.GetValue("system.hostname") ref_dict["Uname"] = self.GetValue("system.uname") return ref_dict # 獲取CPU利用率 def GetCPU(self): ref_dict = { "Address": 0 ,"Core": 0,"Active":0 , "Avg1": 0 ,"Avg5":0 , "Avg15":0 } ref_dict["Address"] = self.address ref_dict["Core"] = self.GetValue("system.cpu.num") ref_dict["Active"] = math.ceil(float(self.GetValue("system.cpu.util"))) ref_dict["Avg1"] = self.GetValue("system.cpu.load[,avg1]") ref_dict["Avg5"] = self.GetValue("system.cpu.load[,avg5]") ref_dict["Avg15"] = self.GetValue("system.cpu.load[,avg15]") return ref_dict # 獲取內(nèi)存利用率 def GetMemory(self): ref_dict = { "Address":"0","Total":"0","Free":0,"Percentage":"0" } ref_dict["Address"] = self.address fps = self.GetPing() if fps['Ping'] != "0": ref_dict["Total"] = self.GetValue("vm.memory.size[total]") ref_dict["Free"] = self.GetValue("vm.memory.size[free]") # 計(jì)算百分比: percentage = 100 - int(Free/int(Total/100)) ref_dict["Percentage"] = str( 100 - int( int(ref_dict.get("Free")) / (int(ref_dict.get("Total"))/100)) ) + "%" return ref_dict else: return ref_dict # 獲取磁盤(pán)數(shù)據(jù) def GetDisk(self): ref_list = [] fps = self.GetPing() if fps['Ping'] != "0": disk_ = eval( self.GetValue("vfs.fs.discovery")) for x in range(len(disk_)): dict_ = {"Address": 0, "Name": 0, "Type": 0, "Free": 0} dict_["Address"] = self.address dict_["Name"] = disk_[x].get("{#FSNAME}") dict_["Type"] = disk_[x].get("{#FSTYPE}") if dict_["Type"] != "UNKNOWN": pfree = self.GetValue("vfs.fs.size[\"{0}\",pfree]".format(dict_["Name"])) dict_["Free"] = str(math.ceil(float(pfree))) else: dict_["Free"] = -1 ref_list.append(dict_) return ref_list return ref_list # 獲取進(jìn)程狀態(tài) def GetProcessStatus(self,process_name): fps = self.GetPing() dict_ = {"Address": '0', "ProcessName": '0', "ProcessCount": '0', "Status": '0'} if fps['Ping'] != "0": proc_id = self.GetValue("proc.num[\"{}\"]".format(process_name)) dict_['Address'] = self.address dict_['ProcessName'] = process_name if proc_id != "0": dict_['ProcessCount'] = proc_id dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_ # 獲取端口開(kāi)放狀態(tài) def GetNetworkPort(self,port): dict_ = {"Address": '0', "Status": 'False'} dict_['Address'] = self.address fps = self.GetPing() if fps['Ping'] != "0": port_ = self.GetValue("net.tcp.listen[{}]".format(port)) if port_ == "1": dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_ # 檢測(cè)Web服務(wù)器狀態(tài) 通過(guò)本地地址:端口 => 檢測(cè)目標(biāo)地址:端口 def CheckWebServerStatus(self,check_addr,check_port): dict_ = {"local_address": "0", "remote_address": "0", "remote_port": "0", "Status":"False"} fps = self.GetPing() dict_['local_address'] = self.address dict_['remote_address'] = check_addr dict_['remote_port'] = check_port if fps['Ping'] != "0": check_ = self.GetValue("net.tcp.port[\"{}\",\"{}\"]".format(check_addr,check_port)) if check_ == "1": dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_
當(dāng)我們需要使用時(shí),只需要定義變量調(diào)用即可,其調(diào)用代碼如下。
from engine import Engine if __name__ == "__main__": ptr_windows = Engine("127.0.0.1","10050") ret = ptr_windows.GetDisk() if len(ret) != 0: for item in ret: addr = item.get("Address") name = item.get("Name") type = item.get("Type") space = item.get("Free") if type != "UNKNOWN" and space != -1: print("地址: {} --> 盤(pán)符: {} --> 格式: {} --> 剩余空間: {}".format(addr,name,type,space))
到此這篇關(guān)于Python封裝zabbix-get接口的代碼分享的文章就介紹到這了,更多相關(guān)Python封裝zabbix-get接口內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pandas 數(shù)據(jù)實(shí)現(xiàn)行間計(jì)算的方法
今天小編就為大家分享一篇pandas 數(shù)據(jù)實(shí)現(xiàn)行間計(jì)算的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06python web框架Flask實(shí)現(xiàn)圖形驗(yàn)證碼及驗(yàn)證碼的動(dòng)態(tài)刷新實(shí)例
在本篇文章里小編給大家整理的是關(guān)于python web框架Flask實(shí)現(xiàn)圖形驗(yàn)證碼的相關(guān)知識(shí)點(diǎn),有需要的朋友們參考下。2019-10-10使用pyqt 實(shí)現(xiàn)重復(fù)打開(kāi)多個(gè)相同界面
今天小編就為大家分享一篇使用pyqt 實(shí)現(xiàn)重復(fù)打開(kāi)多個(gè)相同界面,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12django+xadmin+djcelery實(shí)現(xiàn)后臺(tái)管理定時(shí)任務(wù)
這篇文章主要介紹了django+xadmin+djcelery實(shí)現(xiàn)后臺(tái)管理定時(shí)任務(wù),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-08-08如何徹底解決python?NameError:name?'__file__'?is?not?
這篇文章主要給大家介紹了關(guān)于如何徹底解決python?NameError:name?'__file__'?is?not?defined的相關(guān)資料,文中通過(guò)圖文將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02