Python如何實(shí)現(xiàn)Paramiko的二次封裝
Paramiko是一個(gè)用于執(zhí)行SSH命令的Python第三方庫,使用該庫可實(shí)現(xiàn)自動(dòng)化運(yùn)維的所有任務(wù),如下是一些常用代碼的封裝方式,多數(shù)代碼為半成品,只是敲代碼時(shí)的備份副本防止丟失,僅供參考。
目前本人巡檢百臺(tái)設(shè)備完全無壓力,如果要巡檢過千臺(tái)則需要多線程的支持,過萬臺(tái)則需要加入智能判斷等。
實(shí)現(xiàn)命令執(zhí)行: 直接使用過程化封裝,執(zhí)行CMD命令.
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
try:
ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
stdin , stdout , stderr = ssh.exec_command(command)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
except Exception:
return None
實(shí)現(xiàn)磁盤巡檢: 獲取磁盤空間并返回字典格式
def GetAllDiskSpace(address,username,password,port):
ref_dict = {}
cmd_dict = {"Linux\n" : "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
"AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
}
# 首先檢測(cè)系統(tǒng)版本
os_version = BatchCMD(address,username,password,port,"uname")
for version,run_cmd in cmd_dict.items():
if(version == os_version):
# 根據(jù)不同版本選擇不同的命令
os_ref = BatchCMD(address,username,password,port,run_cmd)
ref_list= os_ref.split("\n")
# 循環(huán)將其轉(zhuǎn)換為字典
for each in ref_list:
# 判斷最后是否為空,過濾最后一項(xiàng)
if each != "":
ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
return ref_dict
# 磁盤巡檢總函數(shù)
def DiskMain():
with open("db.json", "r", encoding="utf-8") as read_fp:
load_json = read_fp.read()
js = json.loads(load_json)
base = js.get("base")
count = len(base)
for each in range(0,count):
print("\033[37m-\033[0m" * 80)
print("\033[35m 檢測(cè)地址: {0:10} \t 用戶名: {1:10} \t 密碼: {2:10} \t 端口: {3:4}\033[0m".
format(base[each][1],base[each][2],base[each][3],base[each][4]))
print("\033[37m-\033[0m" * 80)
ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4])
for k,v in ref.items():
# 判斷是否存在空盤
if( v.split("%")[0] != "-"):
# 將占用百分比轉(zhuǎn)換為整數(shù)
space_ret = int(v.split("%")[0])
if space_ret >= 70:
print("\033[31m 磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} \033[0m".format(k,v))
continue
if space_ret >= 50:
print("\033[33m 磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} \033[0m".format(k, v))
continue
else:
print("\033[34m 磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} \033[0m".format(k, v))
continue
print()
# 組內(nèi)傳遞用戶名密碼時(shí)調(diào)用此方法
def GroupDiskMain(address,username,password,port):
ref = GetAllDiskSpace(address,username,password,port)
for k, v in ref.items():
if (v.split("%")[0] != "-"):
space_ret = int(v.split("%")[0])
if space_ret >= 70:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} -> [警告]".format(k, v))
continue
if space_ret >= 50:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} -> [警惕]".format(k, v))
continue
else:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} -> [正常]".format(k, v))
continue
print()
獲取系統(tǒng)內(nèi)存利用率: 獲取系統(tǒng)內(nèi)存利用率
def GetAllMemSpace(address,username,password,port):
cmd_dict = {"Linux\n" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
"AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
}
# 首先檢測(cè)系統(tǒng)版本
os_version = BatchCMD(address,username,password,port,"uname")
for version,run_cmd in cmd_dict.items():
if(version == os_version):
# 根據(jù)不同版本選擇不同的命令
os_ref = BatchCMD(address,username,password,port,run_cmd)
# 首先現(xiàn)將KB轉(zhuǎn)化為MB
mem_total = math.ceil( int(os_ref.split(":")[0].replace("\n","")) / 1024)
mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)
mem_used = str( int(mem_total) - int(mem_free))
# 計(jì)算占用空間百分比
percentage = 100 - int(mem_free / int(mem_total / 100))
print("內(nèi)存總計(jì)空間: {}".format(str(mem_total) + " MB"))
print("內(nèi)存剩余空間: {}".format(str(mem_free) + " MB"))
print("內(nèi)存已用空間: {}".format(str(mem_used) + " MB"))
print("計(jì)算百分比: {}".format(str(percentage) + " %"))
獲取系統(tǒng)進(jìn)程信息: 獲取系統(tǒng)進(jìn)程信息,并返回字典格式
def GetAllProcessSpace(address,username,password,port):
ref_dict = {}
cmd_dict = {"Linux\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
"AIX\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
}
os_version = BatchCMD(address,username,password,port,"uname")
for version,run_cmd in cmd_dict.items():
if(version == os_version):
os_ref = BatchCMD(address, username, password, port, run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
return ref_dict
# 巡檢進(jìn)程是否存在
def ProcessMain():
with open("db.json", "r", encoding="utf-8") as read_fp:
load_json = read_fp.read()
js = json.loads(load_json)
process = js.get("process")
process_count = len(process)
for x in range(0,process_count):
# 根據(jù)process中的值查詢base中的賬號(hào)密碼
base = js.get("base")
if( list(process[x].keys())[0] == base[x][0] ):
# 拿到賬號(hào)密碼之后再提取出他們的進(jìn)程ID于進(jìn)程名
print("\033[37m-\033[0m" * 80)
print("\033[35m 檢測(cè)地址: {0:10} \t 用戶名: {1:10} \t 密碼: {2:10} \t 端口: {3:4}\033[0m".
format(base[x][1], base[x][2], base[x][3], base[x][4]))
print("\033[37m-\033[0m" * 80)
ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4])
# ref_val = 全部進(jìn)程列表 proc_val = 需要檢測(cè)的進(jìn)程列表
ref_val = list(ref_dic.values())
proc_val = list(process[x].values())[0]
# 循環(huán)比較是否在列表中
for each in proc_val:
flag = each in ref_val
if(flag == True):
print("\033[34m 進(jìn)程: {0:50} 狀態(tài): √ \033[0m".format(each))
else:
print("\033[31m 進(jìn)程: {0:50} 狀態(tài): × \033[0m".format(each))
實(shí)現(xiàn)劇本運(yùn)行功能: 針對(duì)特定一臺(tái)主機(jī)運(yùn)行劇本功能,隨便寫的一個(gè)版本,僅供參考
def RunRule(address,username,password,port,playbook):
os_version = BatchCMD(address,username,password,port,"uname")
if(os_version == list(playbook.keys())[0]):
play = list(playbook.values())[0]
print()
print("\033[37m-\033[0m" * 130)
print("\033[35m 系統(tǒng)類型: {0:4} \t 地址: {1:10} \t 用戶名: {2:10} \t 密碼: {3:15} \t 端口: {4:4}\033[0m"
.format(os_version.replace("\n",""),address,username,password,port))
print("\033[37m-\033[0m" * 130)
for each in range(0,len(play)):
RunCmd = play[each] + " > /dev/null 2>&1 && echo $?"
print("\033[30m [>] 派發(fā)命令: {0:100} \t 狀態(tài): {1:5} \033[0m".format(
RunCmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派發(fā)"))
os_ref = BatchCMD(address, username, password, port, RunCmd)
if(os_ref == "0\n"):
print("\033[34m [√] 運(yùn)行命令: {0:100} \t 狀態(tài): {1:5} \033[0m".format(
RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派發(fā)完成"))
else:
print("\033[31m [×] 運(yùn)行命令: {0:100} \t 狀態(tài): {1:5} \033[0m".format(
RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派發(fā)失敗"))
# 既然失敗了,就把剩下的也打出來吧,按照失敗處理
for x in range(each+1,len(play)):
print("\033[31m [×] 運(yùn)行命令: {0:100} \t 狀態(tài): {1:5} \033[0m".format(
play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "終止執(zhí)行"))
break
else:
return 0
# 批量: 傳入主機(jī)組不同主機(jī)執(zhí)行不同劇本
def RunPlayBook(HostList,PlayBook):
count = len(HostList)
error = []
success = []
for each in range(0,count):
ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook)
if ref == 0:
error.append(HostList[each][0])
else:
success.append(HostList[each][0])
print("\n\n")
print("-" * 130)
print("執(zhí)行清單")
print("-" * 130)
for each in success:
print("成功主機(jī): {}".format(each))
for each in error:
print("失敗主機(jī): {}".format(each))
# 運(yùn)行測(cè)試
def PlayBookRun():
playbook = \
{
"Linux\n":
[
"ifconfig",
"vmstat",
"ls",
"netstat -an",
"ifconfis",
"cat /etc/passwd | grep 'root' | awk '{print $2}'"
]
}
addr_list = \
[
["192.168.1.127", "root", "1233", "22"],
["192.168.1.126", "root", "1203", "22"]
]
# 指定addr_list這幾臺(tái)機(jī)器執(zhí)行playbook劇本
RunPlayBook(addr_list,playbook)
過程化實(shí)現(xiàn)文件上傳下載: 文件傳輸功能 PUT上傳 GET下載
def BatchSFTP(address,username,password,port,soruce,target,flag):
transport = paramiko.Transport((address, int(port)))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
if flag == "PUT":
try:
ret = sftp.put(soruce, target)
if ret !="":
transport.close()
return 1
else:
transport.close()
return 0
transport.close()
except Exception:
transport.close()
return 0
elif flag == "GET":
try:
target = str(address + "_" + target)
os.chdir("./recv_file")
ret = sftp.get(soruce, target)
if ret != "":
transport.close()
return 1
else:
transport.close()
return 0
transport.close()
except Exception:
transport.close()
return 0
# 批量將本地文件 source 上傳到目標(biāo) target 中
def PutRemoteFile(source,target):
with open("db.json", "r", encoding="utf-8") as fp:
load_json = fp.read()
js = json.loads(load_json)
base = js.get("base")
print("-" * 130)
print("接收主機(jī) \t\t 登錄用戶 \t 登錄密碼 \t 登錄端口 \t 本地文件 \t\t 傳輸?shù)?\t\t\t 傳輸狀態(tài)")
print("-" * 130)
for each in range(0,len(base)):
# 先判斷主機(jī)是否可通信
ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],"uname")
if ref == None:
print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未連通\033[0m".format(
base[each][1],base[each][2],base[each][3],base[each][4],source,target))
continue
ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"PUT")
if(ref == 1):
print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸成功\033[0m".format(
base[each][1],base[each][2],base[each][3],base[each][4],source,target))
else:
print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸失敗\033[0m".format(
base[each][1], base[each][2], base[each][3], base[each][4], source, target))
# 批量將目標(biāo)文件拉取到本地特定目錄(存在缺陷)
def GetRemoteFile(source,target):
with open("db.json", "r", encoding="utf-8") as fp:
load_json = fp.read()
js = json.loads(load_json)
base = js.get("base")
print("-" * 130)
print("發(fā)送主機(jī) \t\t 登錄用戶 \t 登錄密碼 \t 登錄端口 \t\t 遠(yuǎn)程文件 \t\t 拉取到 \t\t\t 傳輸狀態(tài)")
print("-" * 130)
for each in range(0,len(base)):
ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], "uname")
if ref == None:
print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未連通\033[0m".format(
base[each][1], base[each][2], base[each][3], base[each][4], source, target))
continue
ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"GET")
if(ref == 1):
print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸成功\033[0m".format(
base[each][1],base[each][2],base[each][3],base[each][4],source,target))
else:
print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸失敗\033[0m".format(
base[each][1], base[each][2], base[each][3], base[each][4], source, target))
另一種命令執(zhí)行方法:
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
try:
ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
stdin , stdout , stderr = ssh.exec_command(command)
result = stdout.read()
if len(result) != 0:
return result
else:
return -1
except Exception:
return -1
# 通過獲取主機(jī)Ping狀態(tài)
def GetPing():
fp = open("unix_base.db", "r", encoding="utf-8")
count = len(open("unix_base.db", "r", encoding="utf-8").readlines())
print("-" * 100)
print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".format("IP地址","機(jī)器系統(tǒng)","設(shè)備SN","機(jī)房位置","存活狀態(tài)","主機(jī)作用"))
print("-" * 100)
for each in range(count):
ref = eval(fp.readline())
ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?")
if(int(ret)==0):
print("{0:20} \t {1:10} \t {2:11} \t {3:5} \t {4:9} \t {5:40}".
format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))
else:
print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".
format(ref[0],ref[1],ref[2],ref[3],"異常",ref[4]))
fp.close()
# ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print $2'}
def GetProcessStatus():
fp = open("unix_process.db", "r", encoding="utf-8")
count = len(open("unix_process.db", "r", encoding="utf-8").readlines())
for each in range(count):
proc = eval(fp.readline())
proc_len = len(proc)
print("-" * 70)
print("---> 巡檢地址: {0:10} \t 登錄用戶: {1:7} \t 登錄密碼: {2:10}".format(proc[0],proc[1],proc[2]))
print("-" * 70)
for process in range(3, proc_len):
command = "ps aux | grep \'{}\' | grep -v \'grep\' | awk '{}' | head -1".format(proc[process],"{print $2}")
try:
ref = BatchCMD(proc[0],proc[1],proc[2],22,command)
if(int(ref)!=-1):
print("進(jìn)程: {0:18} \t PID: {1:10} \t 狀態(tài): {2}".format(proc[process], int(ref),"√"))
else:
print("進(jìn)程: {0:18} \t PID:{1:10} \t 狀態(tài): {2}".format(proc[process], 0,"×"))
except Exception:
print("進(jìn)程: {0:18} \t PID:{1:10} \t 狀態(tài): {2}".format(proc[process], 0,"×"))
print()
fp.close()
def GetDiskStatus():
fp = open("unix_disk.db", "r", encoding="utf-8")
count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
for each in range(count):
proc = eval(fp.readline())
proc_len = len(proc)
print("-" * 100)
print("---> 巡檢地址: {0:10} \t 登錄系統(tǒng): {1:7} \t 登錄賬號(hào): {2:10} 登錄密碼: {3:10}".
format(proc[0],proc[1],proc[2],proc[3]))
print("-" * 100)
try:
ref = BatchCMD(proc[0], proc[2], proc[3], 22, "df | grep -v 'Filesystem'")
st = str(ref).replace("\\n", "\n")
print(st.replace("b'", "").replace("'", ""))
except Exception:
pass
print()
fp.close()
# 運(yùn)行命令
def RunCmd(command,system):
fp = open("unix_disk.db", "r", encoding="utf-8")
count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
for each in range(count):
proc = eval(fp.readline())
proc_len = len(proc)
if proc[1] == system:
print("-" * 100)
print("---> 巡檢地址: {0:10} \t 登錄系統(tǒng): {1:7} \t 登錄賬號(hào): {2:10} 登錄密碼: {3:10}".
format(proc[0],proc[1],proc[2],proc[3]))
print("-" * 100)
try:
ref = BatchCMD(proc[0], proc[2], proc[3], 22, command)
st = str(ref).replace("\\n", "\n")
print(st.replace("b'", "").replace("'", ""))
except Exception:
pass
fp.close()
面向?qū)ο蟮姆庋b方法: 使用面向?qū)ο蠓庋b,可極大的提高復(fù)用性。
import paramiko
class MySSH:
def __init__(self,address,username,password,default_port = 22):
self.address = address
self.default_port = default_port
self.username = username
self.password = password
self.obj = paramiko.SSHClient()
self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.obj.connect(self.address,self.default_port,self.username,self.password)
self.objsftp = self.obj.open_sftp()
def BatchCMD(self,command):
stdin , stdout , stderr = self.obj.exec_command(command)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
def GetRemoteFile(self,remotepath,localpath):
self.objsftp.get(remotepath,localpath)
def PutLocalFile(self,localpath,remotepath):
self.objsftp.put(localpath,remotepath)
def GetFileSize(self,file_path):
ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
return ref
def CloseSSH(self):
self.objsftp.close()
self.obj.close()
if __name__ == '__main__':
ssh = MySSH('192.168.191.3','root','1233',22)
ref = ssh.BatchCMD("ifconfig")
print(ref)
sz = ssh.GetFileSize("/etc/passwd")
print(sz)
ssh.CloseSSH()
第二次封裝完善。
import paramiko,os,json,re
class MySSH:
def __init__(self,address,username,password,default_port = 22):
self.address = address
self.default_port = default_port
self.username = username
self.password = password
try:
self.obj = paramiko.SSHClient()
self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)
self.objsftp = self.obj.open_sftp()
except Exception:
pass
def BatchCMD(self,command):
try:
stdin , stdout , stderr = self.obj.exec_command(command,timeout=3)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
except Exception:
return None
def GetRemoteFile(self,remote_path,local_path):
try:
self.objsftp.get(remote_path,local_path)
return True
except Exception:
return False
def PutLocalFile(self,localpath,remotepath):
try:
self.objsftp.put(localpath,remotepath)
return True
except Exception:
return False
def CloseSSH(self):
self.objsftp.close()
self.obj.close()
# 獲取文件大小
def GetFileSize(self,file_path):
ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
return ref.replace("\n","")
# 判斷文件是否存在
def IsFile(self,file_path):
return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))
通過Eval函數(shù)解析執(zhí)行: 自定義語法規(guī)則與函數(shù),通過Eval函數(shù)實(shí)現(xiàn)解析執(zhí)行. 沒寫完,僅供參考。
import json,os,sys,math
import argparse,time,re
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
try:
ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
stdin , stdout , stderr = ssh.exec_command(command)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
except Exception:
return None
# ------------------------------------------------------------------------
# 內(nèi)置解析方法
def GetDisk(x):
return str(x)
def GetCPULoad():
return str(10)
# 句式解析器,解析句子并執(zhí)行
def judge(string):
# 如果匹配到IF則執(zhí)行判斷條件解析
if re.findall(r'IF{ (.*?) }', string, re.M) != []:
# 則繼續(xù)提取出他的表達(dá)式
ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
subject = re.findall(ptr,string)[0]
subject_list = subject.split(" ")
# 拼接語句并執(zhí)行
sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2]
# 組合后執(zhí)行,返回結(jié)果
if eval(sentence):
return "IF"
else:
return False
# 如果匹配到put則執(zhí)行上傳解析
elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
print("put")
return False
# 獲取特定目錄下所有的劇本
def GetAllRule():
rootdir = os.getcwd() + "\\rule\\"
all_files = [f for f in os.listdir(rootdir)]
print("-" * 90)
print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("劇本名稱","應(yīng)用平臺(tái)","應(yīng)用端口","執(zhí)行主機(jī)數(shù)","命令條數(shù)"))
print("-" * 90)
for switch in all_files:
# 首先判斷文件結(jié)尾是否為Json
if( switch.endswith(".json") == True):
all_switch_dir = rootdir + switch
try:
# 判斷文件內(nèi)部是否符合JSON規(guī)范
with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
# 判斷是否存在指定字段來識(shí)別規(guī)范
load = json.loads(read_file.read())
if load.get("framework") != None and load.get("task_sequence") != None:
run_addr_count = len(load.get("address_list"))
command_count = len(load.get("task_sequence"))
print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
except ValueError:
pass
# 指定一個(gè)劇本并運(yùn)行
def RunPlayBook(rule_name):
rootdir = os.getcwd() + "\\rule\\"
all_files = [f for f in os.listdir(rootdir)]
for switch in all_files:
if( switch.endswith(".json") == True):
all_switch_dir = rootdir + switch
# 尋找到需要加載的劇本地址
if( switch == rule_name):
with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
data = json.loads(read_file.read())
address_list = data.get("address_list")
# 循環(huán)每個(gè)主機(jī)任務(wù)
for each in address_list:
# 得到劇本內(nèi)容
task_sequence = data.get("task_sequence")
default_port = data.get("default_port")
print("-" * 90)
print("地址: {0:15} 用戶名: {1:10} 密碼: {2:10}".format(each[0],each[1],each[2]))
print("-" * 90)
for task in task_sequence:
flag = judge(task[0])
if flag == "IF":
ref = BatchCMD(each[0],each[1],each[2],default_port,task[1])
print(ref)
elif flag == False:
ref = BatchCMD(each[0],each[1],each[2],default_port,task[0])
print(ref)
if __name__ == "__main__":
RunPlayBook("get_log.json")
定義劇本規(guī)范如下。
{
"framework": "Centos",
"version": "7.0",
"address_list":
[
["192.168.191.3","root","1233"]
],
"default_port": "22",
"task_sequence":
[
["ifconfig"],
["IF{ GetLastCmdFlag() == True }","uname"]
]
}
詞法分析: 詞法分析解析劇本內(nèi)容。
# 獲取特定目錄下所有的劇本
def GetAllRule():
rootdir = os.getcwd() + "\\rule\\"
all_files = [f for f in os.listdir(rootdir)]
print("-" * 90)
print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("劇本名稱","應(yīng)用平臺(tái)","應(yīng)用端口","執(zhí)行主機(jī)數(shù)","命令條數(shù)"))
print("-" * 90)
for switch in all_files:
# 首先判斷文件結(jié)尾是否為Json
if( switch.endswith(".json") == True):
all_switch_dir = rootdir + switch
try:
# 判斷文件內(nèi)部是否符合JSON規(guī)范
with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
# 判斷是否存在指定字段來識(shí)別規(guī)范
load = json.loads(read_file.read())
if load.get("framework") != None and load.get("task_sequence") != None:
run_addr_count = len(load.get("address_list"))
command_count = len(load.get("task_sequence"))
print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
except ValueError:
pass
# 句式解析器,解析句子并執(zhí)行
def judge(string):
# 如果匹配到IF則執(zhí)行判斷條件解析
if re.findall(r'IF{ (.*?) }', string, re.M) != []:
# 則繼續(xù)提取出他的表達(dá)式
ptr = re.compile(r'IF[{] (.*?) [}]',re.S)
subject = re.findall(ptr,string)[0]
subject_list = subject.split(" ")
# 公開接口,執(zhí)行命令
ssh = MySSH("192.168.191.3","root","1233","22")
# 組合命令并執(zhí)行
sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2])
if eval(sentence):
return "IF",ssh
else:
return False
# 如果匹配到put則執(zhí)行上傳解析
elif re.findall(r'PUT{ (.*?) }', string, re.M) != []:
print("put")
return False
# 指定一個(gè)劇本并運(yùn)行
def RunPlayBook(rule_name):
rootdir = os.getcwd() + "\\rule\\"
all_files = [f for f in os.listdir(rootdir)]
for switch in all_files:
if( switch.endswith(".json") == True):
all_switch_dir = rootdir + switch
# 尋找到需要加載的劇本地址
if( switch == rule_name):
with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
data = json.loads(read_file.read())
address_list = data.get("address_list")
# 循環(huán)每個(gè)主機(jī)任務(wù)
for each in address_list:
# 得到劇本內(nèi)容
task_sequence = data.get("task_sequence")
default_port = data.get("default_port")
print("-" * 90)
print("地址: {0:15} 用戶名: {1:10} 密碼: {2:10}".format(each[0],each[1],each[2]))
print("-" * 90)
for task in task_sequence:
flag,obj = judge(task[0])
if flag == "IF":
ret = obj.BatchCMD(task[1])
print(ret)
if __name__ == '__main__':
ret = judge("IF{ ssh.GetFileSize('/etc/passwd') >= 4 }")
print(ret)
MySSH類最終封裝: 通過面向?qū)ο髮?duì)其進(jìn)行封裝,實(shí)現(xiàn)了查詢CPU,負(fù)載,內(nèi)存利用率,磁盤容量,等通用數(shù)據(jù)的獲取。
import paramiko, math,json
class MySSH:
def __init__(self, address, username, password, default_port):
self.address = address
self.default_port = default_port
self.username = username
self.password = password
# 初始化,遠(yuǎn)程模塊
def Init(self):
try:
self.ssh_obj = paramiko.SSHClient()
self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
allow_agent=False, look_for_keys=False)
self.sftp_obj = self.ssh_obj.open_sftp()
except Exception:
return False
# 執(zhí)行非交互命令
def BatchCMD(self, command):
try:
stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
except Exception:
return None
# 將遠(yuǎn)程文件下載到本地
def GetRemoteFile(self, remote_path, local_path):
try:
self.sftp_obj.get(remote_path, local_path)
return True
except Exception:
return False
# 將本地文件上傳到遠(yuǎn)程
def PutLocalFile(self, localpath, remotepath):
try:
self.sftp_obj.put(localpath, remotepath)
return True
except Exception:
return False
# 關(guān)閉接口
def CloseSSH(self):
try:
self.sftp_obj.close()
self.ssh_obj.close()
except Exception:
pass
# 獲取文件大小
def GetFileSize(self, file_path):
ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
return ref.replace("\n", "")
# 判斷文件是否存在
def IsFile(self, file_path):
return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))
# 獲取系統(tǒng)型號(hào)
def GetSystemVersion(self):
return self.BatchCMD("uname")
# 檢測(cè)目標(biāo)主機(jī)存活狀態(tài)
def GetPing(self):
try:
if self.GetSystemVersion() != None:
return True
else:
return False
except Exception:
return False
# 獲取文件列表,并得到大小
def GetFileList(self, path):
try:
ref_list = []
self.sftp_obj.chdir(path)
file_list = self.sftp_obj.listdir("./")
for sub_path in file_list:
dict = {}
file_size = self.GetFileSize(path + sub_path)
dict[path + sub_path] = file_size
ref_list.append(dict)
return ref_list
except Exception:
return False
# 將遠(yuǎn)程文件全部打包后拉取到本地
def GetTarPackageAll(self, path):
try:
file_list = self.sftp_obj.listdir(path)
self.sftp_obj.chdir(path)
for packageName in file_list:
self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
return True
except Exception:
return True
# 獲取磁盤空間并返回字典
def GetAllDiskSpace(self):
ref_dict = {}
cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
"AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
# 根據(jù)不同版本選擇不同的命令
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
# 循環(huán)將其轉(zhuǎn)換為字典
for each in ref_list:
# 判斷最后是否為空,過濾最后一項(xiàng)
if each != "":
ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
return ref_dict
except Exception:
return False
# 獲取系統(tǒng)內(nèi)存利用率
def GetAllMemSpace(self):
cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
"AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
# 根據(jù)不同版本選擇不同的命令
os_ref = self.BatchCMD(run_cmd)
# 首先現(xiàn)將KB轉(zhuǎn)化為MB
mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)
# 計(jì)算占用空間百分比
percentage = 100 - int(mem_free / int(mem_total / 100))
# 拼接字典數(shù)據(jù)
return {"Total": str(mem_total), "Free": str(mem_free), "Percentage": str(percentage)}
except Exception:
return False
# 獲取系統(tǒng)進(jìn)程信息,并返回字典格式
def GetAllProcessSpace(self):
ref_dict = {}
cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
"AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
return ref_dict
except Exception:
return False
# 獲取CPU利用率
def GetCPUPercentage(self):
ref_dict = {}
cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
"AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
each = each.split(":")
ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]}
return ref_dict
except Exception:
return False
# 獲取機(jī)器的負(fù)載情況
def GetLoadAVG(self):
ref_dict = {}
cmd_dict = {"Linux\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'",
"AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
each = each.replace(",","").split(":")
ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
return ref_dict
return False
except Exception:
return False
# 修改當(dāng)前用戶密碼
def SetPasswd(self,username,password):
try:
os_id = self.BatchCMD("id | awk {'print $1'}")
print(os_id)
if(os_id == "uid=0(root)\n"):
self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
return True
except Exception:
return False
# 定義超類,集成基類MySSH
class SuperSSH(MySSH):
def __init__(self,address, username, password, default_port):
super(SuperSSH, self).__init__(address, username, password, default_port)
我們繼續(xù)為上面的代碼加上命令行,讓其可以直接使用,這里需要遵循一定的格式規(guī)范,我們使用JSON解析數(shù)據(jù),JSON格式如下.
{
"aix":
[
["192.168.1.1","root","123123"],
["192.168.1.1","root","2019"],
],
"suse":
[
["192.168.1.1","root","123123"],
],
"centos":
[
["192.168.1.1","root","123123"],
]
}
接著是主程序代碼,如下所示.
# -*- coding: utf-8 -*-
from MySSH import MySSH
import json,os,sys,argparse
class InitJson():
def __init__(self,db):
self.db_name = db
def GetPlatform(self,plat):
with open(self.db_name, "r", encoding="utf-8") as Read_Pointer:
load_json = json.loads(Read_Pointer.read())
for k,v in load_json.items():
try:
if k == plat:
return v
except Exception:
return None
return None
if __name__ == "__main__":
ptr = InitJson("database.json")
parser = argparse.ArgumentParser()
parser.add_argument("-G","--group",dest="group",help="指定主機(jī)組")
parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令")
parser.add_argument("--get",dest="get",help="指定獲取數(shù)據(jù)類型<ping>")
parser.add_argument("--dst", dest="dst_file",help="目標(biāo)位置")
parser.add_argument("--src",dest="src_file",help="原文件路徑")
args = parser.parse_args()
# 批量CMD --group=aix --cmd=ls
if args.group and args.cmd:
platform = ptr.GetPlatform(args.group)
success,error = [],[]
for each in platform:
ssh = MySSH(each[0], each[1], each[2], 22)
if ssh.Init() != False:
print("-" * 140)
print("主機(jī): {0:15} \t 賬號(hào): {1:10} \t 密碼: {2:10} \t 命令: {3:30}".
format(each[0], each[1], each[2], args.cmd))
print("-" * 140)
print(ssh.BatchCMD(args.cmd))
ssh.CloseSSH()
success.append(each[0])
else:
error.append(each[0])
ssh.CloseSSH()
print("\n\n","-" * 140,
"\n 執(zhí)行報(bào)告 \n",
"-" * 140,
"\n失敗主機(jī): {}\n".format(error),
"-" * 140)
# 批量獲取主機(jī)其他數(shù)據(jù) --group=centos --get=ping
if args.group and args.get:
platform = ptr.GetPlatform(args.group)
success, error = [], []
for each in platform:
ssh = MySSH(each[0], each[1], each[2], 22)
# 判斷是否為ping
if ssh.Init() != False:
if args.get == "ping":
ret = ssh.GetPing()
if ret == True:
print("[*] 主機(jī): {} 存活中.".format(each[0]))
# 收集磁盤數(shù)據(jù)
elif args.get == "disk":
print("-" * 140)
print("主機(jī): {0:15} \t 賬號(hào): {1:10} \t 密碼: {2:10}".
format(each[0], each[1], each[2]))
print("-" * 140)
ret = ssh.GetAllDiskSpace()
for k, v in ret.items():
if (v.split("%")[0] != "-"):
space_ret = int(v.split("%")[0])
if space_ret >= 70:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} -> [警告]".format(k, v))
continue
if space_ret >= 50:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5} -> [警惕]".format(k, v))
continue
else:
print("磁盤分區(qū): {0:30} \t 磁盤占用: {1:5}".format(k, v))
continue
print()
else:
error.append(each[0])
ssh.CloseSSH()
print("\n\n", "-" * 140,
"\n 執(zhí)行報(bào)告 \n",
"-" * 140,
"\n失敗主機(jī): {}\n".format(error),
"-" * 140)
# 實(shí)現(xiàn)文件上傳過程 --group=centos --src=./a.txt --dst=/tmp/test.txt
if args.group and args.src_file and args.dst_file:
platform = ptr.GetPlatform(args.group)
success, error = [], []
for each in platform:
ssh = MySSH(each[0], each[1], each[2], 22)
if ssh.Init() != False:
ret = ssh.PutLocalFile(args.src_file,args.dst_file)
if ret == True:
print("主機(jī): {} \t 本地文件: {} \t ---> 傳到: {}".format(each[0], args.src_file,args.dst_file))
else:
error.append(each[0])
ssh.CloseSSH()
print("\n\n", "-" * 140,
"\n 執(zhí)行報(bào)告 \n",
"-" * 140,
"\n失敗主機(jī): {}\n".format(error),
"-" * 140)
簡(jiǎn)單的使用命令:
遠(yuǎn)程CMD: python main.py --group=centos --cmd="free -h | grep -v 'total'"

判斷存活: python main.py --group=centos --get="ping"

拉取磁盤:python main.py --group=suse --get="disk"

批量上傳文件: python main.py --group=suse --src="./aaa" --dst="/tmp/bbb.txt"

由于我的設(shè)備少,所以沒開多線程,擔(dān)心開多線程對(duì)目標(biāo)造成過大壓力,也沒啥必要。
番外: 另外我研究了一個(gè)主機(jī)分組的小工具,加上命令執(zhí)行代碼量才800行,實(shí)現(xiàn)了一個(gè)分組數(shù)據(jù)庫,在這里記下使用方法。
默認(rèn)運(yùn)行進(jìn)入一個(gè)交互式shell環(huán)境。
Init = 初始化json文件,ShowHostList=顯示所有主機(jī),ShowGroup=顯示所有組,ShowAllGroup=顯示所有主機(jī)包括組。

添加修改與刪除記錄,命令如下。

添加刪除主機(jī)組。

通過UUID向主機(jī)組中添加或刪除主機(jī)記錄。

測(cè)試主機(jī)組連通性。

以上就是Python如何實(shí)現(xiàn)Paramiko的二次封裝的詳細(xì)內(nèi)容,更多關(guān)于Python實(shí)現(xiàn)Paramiko的二次封裝的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Python使用Paramiko庫實(shí)現(xiàn)SSH管理詳解
- python的paramiko模塊基本用法詳解
- Python通過paramiko庫實(shí)現(xiàn)遠(yuǎn)程執(zhí)行l(wèi)inux命令的方法
- Python運(yùn)維自動(dòng)化之paramiko模塊應(yīng)用實(shí)例
- Python中paramiko模塊的基礎(chǔ)操作與排錯(cuò)問題
- Python遠(yuǎn)程SSH庫Paramiko詳細(xì)操作
- python 第三方庫paramiko的常用方式
- python 使用paramiko模塊進(jìn)行封裝,遠(yuǎn)程操作linux主機(jī)的示例代碼
- Python paramiko使用方法代碼匯總
- Python Paramiko模塊中exec_command()和invoke_shell()兩種操作區(qū)別
相關(guān)文章
YOLOv5車牌識(shí)別實(shí)戰(zhàn)教程(四)模型優(yōu)化與部署
這篇文章主要介紹了YOLOv5車牌識(shí)別實(shí)戰(zhàn)教程(四)模型優(yōu)化與部署,在這個(gè)教程中,我們將一步步教你如何使用YOLOv5進(jìn)行車牌識(shí)別,幫助你快速掌握YOLOv5車牌識(shí)別技能,需要的朋友可以參考下2023-04-04
Python中類方法@classmethod和靜態(tài)方法@staticmethod解析
這篇文章主要介紹了Python中類方法@classmethod和靜態(tài)方法@staticmethod解析,python中存在三種方法,分別為常規(guī)方法(定義中傳入self)、@classmethod修飾的類方法、@staticmethod修飾的靜態(tài)方法,,需要的朋友可以參考下2023-08-08
Python實(shí)現(xiàn)如何根據(jù)文件后綴進(jìn)行分類
本文主要為大家詳細(xì)介紹了如何通過python實(shí)現(xiàn)根據(jù)文件后綴實(shí)現(xiàn)分類,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以關(guān)注一下2021-12-12
Python實(shí)現(xiàn)簡(jiǎn)單字典樹的方法
這篇文章主要介紹了Python實(shí)現(xiàn)簡(jiǎn)單字典樹的方法,實(shí)例分析了Python字典樹的定義、實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2016-04-04
淺析Python+OpenCV使用攝像頭追蹤人臉面部血液變化實(shí)現(xiàn)脈搏評(píng)估
這篇文章主要介紹了Python+OpenCV使用攝像頭追蹤人臉面部血液變化實(shí)現(xiàn)脈搏評(píng)估,本文通過一段代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-10-10

