欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python調(diào)用subprocess模塊實(shí)現(xiàn)命令行操作控制SVN的方法

 更新時(shí)間:2022年09月08日 10:52:43   作者:Logintern09  
這篇文章主要介紹了使用python的subprocess模塊實(shí)現(xiàn)對(duì)SVN的相關(guān)操作,通過(guò)設(shè)置GitSvn類(lèi),在該類(lèi)下自定義執(zhí)行SVN常規(guī)操作的方法,需要的朋友跟隨小編一起看看吧

使用python的subprocess模塊實(shí)現(xiàn)對(duì)SVN的相關(guān)操作。

設(shè)置GitSvn類(lèi),在該類(lèi)下自定義執(zhí)行SVN常規(guī)操作的方法。

SVN的常規(guī)操作包括:
(1)獲取SVN當(dāng)前版本,通過(guò)方法get_version()實(shí)現(xiàn);

(2)下載SVN指定倉(cāng)庫(kù),通過(guò)方法download()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn checkout實(shí)現(xiàn)的下載整個(gè)倉(cāng)庫(kù)功能;

(3)獲取SVN某個(gè)倉(cāng)庫(kù)下的所有文件列表,通過(guò)方法search_dir()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn list實(shí)現(xiàn)的獲取倉(cāng)庫(kù)文件列表功能;

(4)在SVN指定位置創(chuàng)建新的文件夾,通過(guò)方法mkdir_command()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn mkdir實(shí)現(xiàn)的創(chuàng)建文件夾或者目錄功能;

(5)將本地倉(cāng)庫(kù)文件添加到SVN倉(cāng)庫(kù),通過(guò)方法upload_file()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn add實(shí)現(xiàn)的添加文件功能;

(6)將本地倉(cāng)庫(kù)已添加的文件提交SVN指定倉(cāng)庫(kù),通過(guò)方法commit_command()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn commit實(shí)現(xiàn)的提交文件功能;

(7)刪除SVN倉(cāng)庫(kù)的目錄,通過(guò)方法delete_url()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn delete實(shí)現(xiàn)的刪除目錄功能;

(8)鎖定SVN倉(cāng)庫(kù)的文件,通過(guò)方法lock_file()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn lock實(shí)現(xiàn)的鎖定文件功能;

(9)將SVN倉(cāng)庫(kù)的文件解除鎖定,通過(guò)方法unlock_file()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn unlock實(shí)現(xiàn)的解除文件鎖定功能;

(10)查看SVN倉(cāng)庫(kù)文件當(dāng)前狀態(tài),通過(guò)方法check_status()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn status實(shí)現(xiàn)的查看文件狀態(tài)功能;

(11)更新SVN倉(cāng)庫(kù)的某個(gè)文件,通過(guò)方法update_file()實(shí)現(xiàn),實(shí)際是通過(guò)調(diào)用SVN的命令行操作指令svn up實(shí)現(xiàn)的更新文件功能;

GitSvn類(lèi)定義如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Logintern09


import os
import time
import subprocess
from log_manage import logger

rq = time.strftime('%Y%m%d', time.localtime(time.time()))
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files")
tempfile = os.path.join(file_path, rq + '.log')

class GitSvn(object):
    def __init__(self, bill_addr):
        self.bill_addr = bill_addr

    def get_version(self):
        cmd = "svn info %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        logger.info(result)
        error = error.decode(encoding="gbk")
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def download(self, dir_path):
        with open(tempfile, "r") as f:
            result = f.readlines()
            for line in result:
                if line.startswith("Revision"):
                    laster_version = int(line.split(":")[-1].strip())
                    logger.info(line)
                    break
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)
            if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

    def search_dir(self):
        cmd = "svn list %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def mkdir_command(self, id_num):
        cmd = "svn mkdir %s" % id_num
        logger.info(cmd)
        cur_path = os.path.dirname(os.path.realpath(__file__))
        file_path = os.path.join(cur_path, default_sys_name)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def upload_file(self, file_path):
        cmd = "svn add %s --force" % file_path
        root_path, file_name = os.path.split(file_path)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=root_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def commit_command(self, file):
        # 執(zhí)行了鎖定的用戶執(zhí)行了提交操作(提交操作將自動(dòng)解鎖)
        if os.path.isfile(file):
            file_path, file_name = os.path.split(file)
        else:
            file_path = file
        cmd = "svn commit %s -m 'update_files'" % file
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: E730053:"):
                res = "數(shù)據(jù)上傳失敗,請(qǐng)重新提交數(shù)據(jù)?。?!"
                raise SystemError(res)
            else:
                res = "數(shù)據(jù)上傳失敗,請(qǐng)重新提交數(shù)據(jù)?。?!"
                raise SystemError(res)

    def delete_url(self, url):
        cmd = "svn delete %s" % url
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def lock_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn lock %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: warning: W160042:"):
                res = "系統(tǒng)資源已被其他用戶鎖定,請(qǐng)稍后重試!"
                raise SystemError(res)

    def unlock_file(self, file_name):
        # 不使用--force 參數(shù) 可以解鎖被自己鎖定的文集 即普通的release lock
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn unlock %s --force" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)

    def check_status(self, file_name):
        # ?:不在svn的控制中;M:內(nèi)容被修改;C:發(fā)生沖突;A:預(yù)定加入到版本庫(kù);K:被鎖定
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn status -v %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        # 獲取狀態(tài)結(jié)果解析
        status_list = result.split(" ")
        status_flag_list = []
        for i in status_list:
            if i.isalpha():
                status_flag_list.append(i)
            else:
                break
        if result.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)
        elif "C" in status_flag_list:
            return "C"
        elif "K" in status_flag_list:
            return "K"

    def update_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn up %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        with open(tempfile, "r") as f:
            result = f.readlines()
            count = -1
            for line in result:
                count += 1
                if (line.strip() != "") and (count == 1):
                    if line.startswith("C"):
                        res = "更新系統(tǒng)資源時(shí)發(fā)現(xiàn)存在沖突,請(qǐng)稍后重試!"
                        raise SystemError(res)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡(luò)異常,暫時(shí)連接不上系統(tǒng),請(qǐng)檢查網(wǎng)絡(luò)或其他配置!"
                raise SystemError(res)
            elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

上述類(lèi)GitSvn的應(yīng)用實(shí)例如下:

if __name__ == '__main__':
    default_url = svn:***  # 用戶本地SVN客戶端的URL地址
    git_class = GitSvn(default_url)
    git_class.get_version()
    git_class.download()
    # 驗(yàn)證查看目錄文件列表功能
    git_class.search_dir()
    # 驗(yàn)證刪除目錄功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    file_path = os.path.join(cur_path, default_sys_name)
    id_path = os.path.join(file_path, 'SCR202202100002')
    git_class.delete_url(id_path)
    # 驗(yàn)證文件加鎖功能
    git_class.lock_file(file_name="單號(hào)總表.xlsx")
    # 驗(yàn)證文件解鎖功能
    git_class.unlock_file(file_name="單號(hào)總表.xlsx")
    # 檢查文件狀態(tài)
    git_class.check_status(file_name="單號(hào)總表.xlsx")
    # 驗(yàn)證創(chuàng)建目錄功能
    git_class.mkdir_command("SCR202203280001")
    # 驗(yàn)證提交文件功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    sys_path = os.path.join(cur_path, default_sys_name)
    target_dir = os.path.join(sys_path, "SCR202203280001")
    git_class.upload_file(target_dir)
    git_class.commit_command(target_dir)

到此這篇關(guān)于python調(diào)用subprocess模塊實(shí)現(xiàn)命令行操作控制SVN的文章就介紹到這了,更多相關(guān)python subprocess模塊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python爬取當(dāng)網(wǎng)書(shū)籍?dāng)?shù)據(jù)并數(shù)據(jù)可視化展示

    Python爬取當(dāng)網(wǎng)書(shū)籍?dāng)?shù)據(jù)并數(shù)據(jù)可視化展示

    這篇文章主要介紹了Python爬取當(dāng)網(wǎng)書(shū)籍?dāng)?shù)據(jù)并數(shù)據(jù)可視化展示,下面文章圍繞Python爬蟲(chóng)的相關(guān)資料展開(kāi)對(duì)爬取當(dāng)網(wǎng)書(shū)籍?dāng)?shù)據(jù)的詳細(xì)介紹,需要的小伙伴可以參考一下,希望對(duì)你有所幫助
    2022-01-01
  • Python使用Pandas庫(kù)實(shí)現(xiàn)MySQL數(shù)據(jù)庫(kù)的讀寫(xiě)

    Python使用Pandas庫(kù)實(shí)現(xiàn)MySQL數(shù)據(jù)庫(kù)的讀寫(xiě)

    這篇文章主要介紹了Python使用Pandas庫(kù)實(shí)現(xiàn)MySQL數(shù)據(jù)庫(kù)的讀寫(xiě) ,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • PyHacker編寫(xiě)指南引用Nmap模塊實(shí)現(xiàn)端口掃描器

    PyHacker編寫(xiě)指南引用Nmap模塊實(shí)現(xiàn)端口掃描器

    這篇文章主要為大家介紹了PyHacker編寫(xiě)指南Nmap模塊實(shí)現(xiàn)端口掃描,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • python實(shí)現(xiàn)自動(dòng)登錄

    python實(shí)現(xiàn)自動(dòng)登錄

    這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)自動(dòng)登錄,填充網(wǎng)頁(yè)表單,從而自動(dòng)登錄WEB門(mén)戶,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-09-09
  • 最新評(píng)論