如何使用Python連接?SSH?服務(wù)器并執(zhí)行命令
實(shí)際開發(fā)中,有時候經(jīng)常需要查看日志,有時候使用ssh工具打開就為了看一下錯誤日志又比較麻煩,所以今天帶來一個簡單的基于python的小工具.
首先需要先安裝一個庫 paramiko
使用命令直接安裝
pip install paramiko
paramiko庫是一個開源的、基于SSH2協(xié)議的庫,可以實(shí)現(xiàn)SSH連接以及數(shù)據(jù)的傳輸。
paramiko是用純Python實(shí)現(xiàn)的SSH2客戶端,支持身份驗(yàn)證、SFTP客戶端以及遠(yuǎn)程執(zhí)行命令等功能。paramiko庫提供了豐富的類和方法,幫助用戶快速實(shí)現(xiàn)SSH通信功能。
在實(shí)際應(yīng)用中,paramiko庫常用于構(gòu)建自動化運(yùn)維系統(tǒng)、遠(yuǎn)程部署、多機(jī)協(xié)作等場景.
定義MySshClient 類
class MySshClient:
def __init__(self, ssh_client):
self.ssh_client = ssh_client
def exec_command(self, cmd):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(cmd)
return stdin, stdout, stderr
except Exception as e:
print(f"Error executing command {cmd}: {e}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.ssh_client.close()
在此代碼中,我們定義了一個 MySshClient 類,用于連接 SSH 服務(wù)器并執(zhí)行命令。您可以使用該類創(chuàng)建一個實(shí)例,并通過 connect() 方法連接到 SSH 服務(wù)器。
def connect(host, port, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
try:
ssh.load_system_host_keys()
ssh.connect(host, port, username, password,timeout=3)
except paramiko.AuthenticationException:
raise Exception(f"在主機(jī) {host}連接失敗,請檢查你的參數(shù)")
except paramiko.SSHException as e:
raise Exception(f"在 {host}連接出錯: {e}")
except paramiko.BadHostKeyException as e:
raise Exception(f" {host} 無法驗(yàn)證通過: {e}")
except Exception as e:
raise Exception(f" 連接到{host}:{port}: {e}超時")
return ssh這個是給函數(shù)加上異常處理,方便代碼的調(diào)試
執(zhí)行命令
在連接到 SSH 服務(wù)器后,您可以使用 exec_command() 方法執(zhí)行命令。該方法返回一個元組,包含服務(wù)器的輸出、錯誤和輸入。您可以將此元組提供給 with 關(guān)鍵字,以便在需要時進(jìn)行處理。
2.連接到主機(jī)
在連接到 SSH 服務(wù)器之前,您需要指定主機(jī)名、端口、用戶名和密碼。例如:
host = input("Host: 請輸入主機(jī)名")
port = int(input("Port: 默認(rèn)為22")or 22)
username = input("Username: 請輸入用戶名")
password = getpass.getpass("Password: 請輸入密碼")port = int(input(“Port: 默認(rèn)為22”)or 22) 這段代碼的意思是
默認(rèn)端口22,如果你的ssh端口不是22,可以自行修改
密碼的話,如果不想被別人看見,使用getpass函數(shù)進(jìn)行加密
創(chuàng)建并連接到 MySshClient 實(shí)例
ssh = connect(host, port, username, password)
連接成功后,執(zhí)行命令并處理輸出
連接成功后,您可以使用 exec_command() 方法執(zhí)行命令,并使用 with 關(guān)鍵字處理服務(wù)器的輸出:
with MySshClient(ssh) as client:
stdin, stdout, stderr = client.exec_command("ls -l")
with stdout:
print(stdout.read().decode())
with stderr:
print(stderr.read().decode())這將在連接到 SSH 服務(wù)器后,執(zhí)行 “ls -l” 命令,并輸出結(jié)果。請注意,此示例僅顯示輸出,如果有錯誤,錯誤信息將顯示在控制臺。
完整代碼如下:
import paramiko
import os
import getpass
class MySshClient:
def __init__(self, ssh_client):
self.ssh_client = ssh_client
def exec_command(self, cmd):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(cmd)
return stdin, stdout, stderr
except Exception as e:
print(f"Error executing command {cmd}: {e}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.ssh_client.close()
def connect(host, port, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
try:
ssh.load_system_host_keys()
ssh.connect(host, port, username, password,timeout=3)
except paramiko.AuthenticationException:
raise Exception(f"在主機(jī) {host}連接失敗,請檢查你的參數(shù)")
except paramiko.SSHException as e:
raise Exception(f"Error connecting to {host}: {e}")
except paramiko.BadHostKeyException as e:
raise Exception(f"Host key for {host} could not be verified: {e}")
except Exception as e:
raise Exception(f" 連接到{host}:{port}: {e}超時")
return ssh
if __name__ == '__main__':
host = input("Host: 請輸入主機(jī)名")
port = int(input("Port: 默認(rèn)為22")or 22)
username = input("Username: 請輸入用戶名")
password = getpass.getpass("Password: 請輸入密碼")
print('連接中...........')
ssh = connect(host, port, username, password,)
with MySshClient(ssh) as client:
stdin, stdout, stderr = client.exec_command("ls -l")
with stdout:
print(stdout.read().decode())
with stderr:
print(stderr.read().decode())當(dāng)然你也可以使用sftp進(jìn)行傳輸
# 上傳
current_path = os.getcwd() #獲取當(dāng)前路徑
print(current_path)
filename = '\\main.py' # 文件名
file = current_path + f'{filename}' # 當(dāng)前文件的路徑
sftp = client.open_sftp() # 使用sftp連接
sftp.put(file, '/root/main.py') # 上傳文件
sftp.get('/root/server.sh', current_path+'/code.sh') #下載文件到此這篇關(guān)于使用 Python 連接 SSH 服務(wù)器并執(zhí)行命令的文章就介紹到這了,更多相關(guān)Python 連接 SSH 服務(wù)器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python對ElasticSearch獲取數(shù)據(jù)及操作
這篇文章主要為大家詳細(xì)介紹了Python對ElasticSearch獲取數(shù)據(jù)及操作,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04
Python報錯SyntaxError:unexpected?EOF?while?parsing的解決辦法
在運(yùn)行或編寫一個程序時常會遇到錯誤異常,這時python會給你一個錯誤提示類名,告訴出現(xiàn)了什么樣的問題,下面這篇文章主要給大家介紹了關(guān)于Python報錯SyntaxError:unexpected?EOF?while?parsing的解決辦法,需要的朋友可以參考下2022-07-07

