python調(diào)用subprocess模塊實(shí)現(xiàn)命令行操作控制SVN的方法
使用python的subprocess模塊實(shí)現(xiàn)對SVN的相關(guān)操作。
設(shè)置GitSvn類,在該類下自定義執(zhí)行SVN常規(guī)操作的方法。
SVN的常規(guī)操作包括:
(1)獲取SVN當(dāng)前版本,通過方法get_version()實(shí)現(xiàn);
(2)下載SVN指定倉庫,通過方法download()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn checkout實(shí)現(xiàn)的下載整個(gè)倉庫功能;
(3)獲取SVN某個(gè)倉庫下的所有文件列表,通過方法search_dir()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn list實(shí)現(xiàn)的獲取倉庫文件列表功能;
(4)在SVN指定位置創(chuàng)建新的文件夾,通過方法mkdir_command()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn mkdir實(shí)現(xiàn)的創(chuàng)建文件夾或者目錄功能;
(5)將本地倉庫文件添加到SVN倉庫,通過方法upload_file()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn add實(shí)現(xiàn)的添加文件功能;
(6)將本地倉庫已添加的文件提交SVN指定倉庫,通過方法commit_command()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn commit實(shí)現(xiàn)的提交文件功能;
(7)刪除SVN倉庫的目錄,通過方法delete_url()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn delete實(shí)現(xiàn)的刪除目錄功能;
(8)鎖定SVN倉庫的文件,通過方法lock_file()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn lock實(shí)現(xiàn)的鎖定文件功能;
(9)將SVN倉庫的文件解除鎖定,通過方法unlock_file()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn unlock實(shí)現(xiàn)的解除文件鎖定功能;
(10)查看SVN倉庫文件當(dāng)前狀態(tài),通過方法check_status()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn status實(shí)現(xiàn)的查看文件狀態(tài)功能;
(11)更新SVN倉庫的某個(gè)文件,通過方法update_file()實(shí)現(xiàn),實(shí)際是通過調(diào)用SVN的命令行操作指令svn up實(shí)現(xiàn)的更新文件功能;
GitSvn類定義如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Logintern09
import os
import time
import subprocess
from log_manage import logger
rq = time.strftime('%Y%m%d', time.localtime(time.time()))
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files")
tempfile = os.path.join(file_path, rq + '.log')
class GitSvn(object):
def __init__(self, bill_addr):
self.bill_addr = bill_addr
def get_version(self):
cmd = "svn info %s" % self.bill_addr
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
logger.info(result)
error = error.decode(encoding="gbk")
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def download(self, dir_path):
with open(tempfile, "r") as f:
result = f.readlines()
for line in result:
if line.startswith("Revision"):
laster_version = int(line.split(":")[-1].strip())
logger.info(line)
break
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version)
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")):
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
cmd = "svn cleanup"
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
def search_dir(self):
cmd = "svn list %s" % self.bill_addr
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def mkdir_command(self, id_num):
cmd = "svn mkdir %s" % id_num
logger.info(cmd)
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def upload_file(self, file_path):
cmd = "svn add %s --force" % file_path
root_path, file_name = os.path.split(file_path)
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=root_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def commit_command(self, file):
# 執(zhí)行了鎖定的用戶執(zhí)行了提交操作(提交操作將自動解鎖)
if os.path.isfile(file):
file_path, file_name = os.path.split(file)
else:
file_path = file
cmd = "svn commit %s -m 'update_files'" % file
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
elif error.startswith("svn: E730053:"):
res = "數(shù)據(jù)上傳失敗,請重新提交數(shù)據(jù)?。?!"
raise SystemError(res)
else:
res = "數(shù)據(jù)上傳失敗,請重新提交數(shù)據(jù)!??!"
raise SystemError(res)
def delete_url(self, url):
cmd = "svn delete %s" % url
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def lock_file(self, file_name):
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn lock %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
elif error.startswith("svn: warning: W160042:"):
res = "系統(tǒng)資源已被其他用戶鎖定,請稍后重試!"
raise SystemError(res)
def unlock_file(self, file_name):
# 不使用--force 參數(shù) 可以解鎖被自己鎖定的文集 即普通的release lock
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn unlock %s --force" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
def check_status(self, file_name):
# ?:不在svn的控制中;M:內(nèi)容被修改;C:發(fā)生沖突;A:預(yù)定加入到版本庫;K:被鎖定
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn status -v %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
# 獲取狀態(tài)結(jié)果解析
status_list = result.split(" ")
status_flag_list = []
for i in status_list:
if i.isalpha():
status_flag_list.append(i)
else:
break
if result.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
elif "C" in status_flag_list:
return "C"
elif "K" in status_flag_list:
return "K"
def update_file(self, file_name):
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn up %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
with open(tempfile, "r") as f:
result = f.readlines()
count = -1
for line in result:
count += 1
if (line.strip() != "") and (count == 1):
if line.startswith("C"):
res = "更新系統(tǒng)資源時(shí)發(fā)現(xiàn)存在沖突,請稍后重試!"
raise SystemError(res)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請檢查網(wǎng)絡(luò)或其他配置!"
raise SystemError(res)
elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")):
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
cmd = "svn cleanup"
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
上述類GitSvn的應(yīng)用實(shí)例如下:
if __name__ == '__main__':
default_url = svn:*** # 用戶本地SVN客戶端的URL地址
git_class = GitSvn(default_url)
git_class.get_version()
git_class.download()
# 驗(yàn)證查看目錄文件列表功能
git_class.search_dir()
# 驗(yàn)證刪除目錄功能
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
id_path = os.path.join(file_path, 'SCR202202100002')
git_class.delete_url(id_path)
# 驗(yàn)證文件加鎖功能
git_class.lock_file(file_name="單號總表.xlsx")
# 驗(yàn)證文件解鎖功能
git_class.unlock_file(file_name="單號總表.xlsx")
# 檢查文件狀態(tài)
git_class.check_status(file_name="單號總表.xlsx")
# 驗(yàn)證創(chuàng)建目錄功能
git_class.mkdir_command("SCR202203280001")
# 驗(yàn)證提交文件功能
cur_path = os.path.dirname(os.path.realpath(__file__))
sys_path = os.path.join(cur_path, default_sys_name)
target_dir = os.path.join(sys_path, "SCR202203280001")
git_class.upload_file(target_dir)
git_class.commit_command(target_dir)
到此這篇關(guān)于python調(diào)用subprocess模塊實(shí)現(xiàn)命令行操作控制SVN的文章就介紹到這了,更多相關(guān)python subprocess模塊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python3 圖片referer防盜鏈的實(shí)現(xiàn)方法
本篇文章主要介紹了python3 圖片referer防盜鏈的實(shí)現(xiàn)方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03
python通過ssh-powershell監(jiān)控windows的方法
這篇文章主要介紹了python通過ssh-powershell監(jiān)控windows的方法,涉及Python操作ssh-powershell的相關(guān)技巧,需要的朋友可以參考下2015-06-06
python數(shù)字圖像處理數(shù)據(jù)類型及顏色空間轉(zhuǎn)換
這篇文章主要為大家介紹了python數(shù)字圖像處理數(shù)據(jù)類型及顏色空間轉(zhuǎn)換示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
Python爬取當(dāng)網(wǎng)書籍?dāng)?shù)據(jù)并數(shù)據(jù)可視化展示
Python使用Pandas庫實(shí)現(xiàn)MySQL數(shù)據(jù)庫的讀寫
PyHacker編寫指南引用Nmap模塊實(shí)現(xiàn)端口掃描器

