python ssh 執(zhí)行shell命令的示例
# -*- coding: utf-8 -*-
import paramiko
import threading
def run(host_ip, username, password, command):
ssh = paramiko.SSHClient()
try:
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host_ip, 22, username, password)
print('===================exec on [%s]=====================' % host_ip)
stdin, stdout, stderr = ssh.exec_command(command, timeout=300)
out = stdout.readlines()
for o in out:
print (o.strip('\n'))
except Exception as ex:
print('error, host is [%s], msg is [%s]' % (host_ip, ex.message))
finally:
ssh.close()
if __name__ == '__main__':
# 將需要批量執(zhí)行命令的host ip地址填到這里
# eg: host_ip_list = ['IP1', 'IP2']
host_ip_list = ['147.116.20.19']
for _host_ip in host_ip_list:
# 用戶名,密碼,執(zhí)行的命令填到這里
run(_host_ip, 'tzgame', 'tzgame@1234', 'df -h')
run(_host_ip, 'tzgame', 'tzgame@1234', 'ping -c 5 220.181.38.148')
pycrypto,由于 paramiko 模塊內(nèi)部依賴pycrypto,所以先下載安裝pycrypto
pip3 install pycrypto pip3 install paramiko
(1)基于用戶名和密碼的連接
import paramiko
# 創(chuàng)建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務(wù)器
ssh.connect(hostname='c1.salt.com', port=22, username='GSuser', password='123')
# 執(zhí)行命令
stdin, stdout, stderr = ssh.exec_command('ls')
# 獲取命令結(jié)果
result = stdout.read()
# 關(guān)閉連接
ssh.close()
(2)基于公鑰秘鑰連接
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 創(chuàng)建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務(wù)器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
# 執(zhí)行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結(jié)果
result = stdout.read()
# 關(guān)閉連接
ssh.close()
SFTPClient:
用于連接遠(yuǎn)程服務(wù)器并進行上傳下載功能。
(1)基于用戶名密碼上傳下載
import paramiko
transport = paramiko.Transport(('hostname',22))
transport.connect(username='GSuser',password='123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務(wù)器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
(2)基于公鑰秘鑰上傳下載
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='GSuser', pkey=private_key )
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務(wù)器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
下面是多線程執(zhí)行版本
#!/usr/bin/python
#coding:utf-8
import threading
import subprocess
import os
import sys
sshport = 13131
log_path = 'update_log'
output = {}
def execute(s, ip, cmd, log_path_today):
with s:
cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd)
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output[ip] = ret.stdout.readlines()
if __name__ == "__main__":
if len(sys.argv) != 3:
print "Usage: %s config.ini cmd" % sys.argv[0]
sys.exit(1)
if not os.path.isfile(sys.argv[1]):
print "Usage: %s is not file!" % sys.argv[1]
sys.exit(1)
cmd = sys.argv[2]
f = open(sys.argv[1],'r')
list = f.readlines()
f.close()
today = datetime.date.today()
log_path_today = '%s/%s' % (log_path,today)
if not os.path.isdir(log_path_today):
os.makedirs(log_path_today)
threading_num = 100
if threading_num > len(list):
threading_num = len(list)
s = threading.Semaphore(threading_num)
for line in list:
ip = line.strip()
t = threading.Thread(target=execute,args=(s, ip,cmd,log_path_today))
t.setDaemon(True)
t.start()
main_thread = threading.currentThread()
for t in threading.enumerate():
if t is main_thread:
continue
t.join()
for ip,result in output.items():
print "%s: " % ip
for line in result:
print " %s" % line.strip()
print "Done!"
以上腳本讀取兩個參數(shù),第一個為存放IP的文本,第二個為shell命令
執(zhí)行效果如下

# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading
lock = threading.Lock()
def get_html(url):
response = requests.get(url, timeout=10)
# print(response.status_code)
try:
if response.status_code == 200:
# print(response.text)
return response.text
else:
return None
except RequestException:
print("請求失敗")
# return None
def parse_html(html_text):
html = etree.HTML(html_text)
if len(html) > 0:
img_src = html.xpath("http://img[@class='photothumb lazy']/@data-original") # 元素提取方法
# print(img_src)
return img_src
else:
print("解析頁面元素失敗")
def get_image_pages(url):
html_text = get_html(url) # 獲取搜索url響應(yīng)內(nèi)容
# print(html_text)
if html_text is not None:
html = etree.HTML(html_text) # 生成XPath解析對象
last_page = html.xpath("http://div[@class='pages']//a[last()]/@href") # 提取最后一頁所在href鏈接
print(last_page)
if last_page:
max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group() # 使用正則表達(dá)式提取鏈接中的頁碼數(shù)字
print(max_page)
print(type(max_page))
return int(max_page) # 將字符串頁碼轉(zhuǎn)為整數(shù)并返回
else:
print("暫無數(shù)據(jù)")
return None
else:
print("查詢結(jié)果失敗")
def get_all_image_url(page_number):
base_url = 'https://imgbin.com/free-png/naruto/'
image_urls = []
x = 1 # 定義一個標(biāo)識,用于給每個圖片url編號,從1遞增
for i in range(1, page_number):
url = base_url + str(i) # 根據(jù)頁碼遍歷請求url
try:
html = get_html(url) # 解析每個頁面的內(nèi)容
if html:
data = parse_html(html) # 提取頁面中的圖片url
# print(data)
# time.sleep(3)
if data:
for j in data:
image_urls.append({
'name': x,
'value': j
})
x += 1 # 每提取一個圖片url,標(biāo)識x增加1
except RequestException as f:
print("遇到錯誤:", f)
continue
# print(image_urls)
return image_urls
def get_image_content(url):
try:
r = requests.get(url, timeout=15)
if r.status_code == 200:
return r.content
return None
except RequestException:
return None
def main(url, image_name):
semaphore.acquire() # 加鎖,限制線程數(shù)
print('當(dāng)前子線程: {}'.format(threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
try:
file_path = '{0}/{1}.jpg'.format(save_path, image_name)
if not os.path.exists(file_path): # 判斷是否存在文件,不存在則爬取
with open(file_path, 'wb') as f:
f.write(get_image_content(url))
f.close()
print('第{}個文件保存成功'.format(image_name))
else:
print("第{}個文件已存在".format(image_name))
semaphore.release() # 解鎖imgbin-多線程-重寫run方法.py
except FileNotFoundError as f:
print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
print("報錯:", f)
raise
except TypeError as e:
print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url))
print("報錯:", e)
class MyThread(threading.Thread):
"""繼承Thread類重寫run方法創(chuàng)建新進程"""
def __init__(self, func, args):
"""
:param func: run方法中要調(diào)用的函數(shù)名
:param args: func函數(shù)所需的參數(shù)
"""
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
print('當(dāng)前子線程: {}'.format(threading.current_thread().name))
self.func(self.args[0], self.args[1])
# 調(diào)用func函數(shù)
# 因為這里的func函數(shù)其實是上述的main()函數(shù),它需要2個參數(shù);args傳入的是個參數(shù)元組,拆解開來傳入
if __name__ == '__main__':
start = time.time()
print('這是主線程:{}'.format(threading.current_thread().name))
urls = get_all_image_url(5) # 獲取所有圖片url列表
thread_list = [] # 定義一個列表,向里面追加線程
semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
for t in urls:
# print(i)
m = MyThread(main, (t["value"], t["name"])) # 調(diào)用MyThread類,得到一個實例
thread_list.append(m)
for m in thread_list:
m.start() # 調(diào)用start()方法,開始執(zhí)行
for m in thread_list:
m.join() # 子線程調(diào)用join()方法,使主線程等待子線程運行完畢之后才退出
end = time.time()
print(end-start)
# get_image_pages(https://imgbin.com/free-png/Naruto)
以上就是python ssh 執(zhí)行shell命令的示例的詳細(xì)內(nèi)容,更多關(guān)于python ssh 執(zhí)行shell命令的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python3安裝模塊報錯Microsoft Visual C++ 14.0 is required的解決方法
這篇文章主要介紹了Python3安裝模塊報錯Microsoft Visual C++ 14.0 is required的解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07
如何將DataFrame數(shù)據(jù)寫入csv文件及讀取
在Python中進行數(shù)據(jù)處理時,經(jīng)常會用到CSV文件的讀寫操作,當(dāng)需要將list數(shù)據(jù)保存到CSV文件時,可以使用內(nèi)置的csv模塊,若data是一個list,saveData函數(shù)能夠?qū)ist中每個元素存儲在CSV文件的一行,但需要注意的是,默認(rèn)情況下讀取出的CSV數(shù)據(jù)類型為str2024-09-09
昨晚我用python幫隔壁小姐姐P證件照然后發(fā)現(xiàn)
大家好,我是Lex 喜歡欺負(fù)超人那個Lex 建議大家收藏哦,以后幫小姐姐P自拍,證件照,調(diào)尺寸,背景,摳圖,直接10行代碼搞定,瞬間高大上2021-08-08
Python 刪除連續(xù)出現(xiàn)的指定字符的實例
今天小編就為大家分享一篇Python 刪除連續(xù)出現(xiàn)的指定字符的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06
Python中xml和dict格式轉(zhuǎn)換的示例代碼
最近在做APP的接口,遇到XML格式的請求數(shù)據(jù),費了很大勁來解決,下面小編給大家分享下Python中xml和dict格式轉(zhuǎn)換問題,感興趣的朋友跟隨小編一起看看吧2019-11-11

