Python reques接口測(cè)試框架實(shí)現(xiàn)代碼
一、框架菜單

1.1 common模塊

1.2 其他

二、Excel接口測(cè)試案例編寫


三、讀取Excel測(cè)試封裝(核心封裝)
excel_utils.py 讀取Excel中的數(shù)據(jù)
import os
import xlrd #內(nèi)置模塊、第三方模塊pip install 自定義模塊
class ExcelUtils():
def __init__(self,file_path,sheet_name):
self.file_path = file_path
self.sheet_name = sheet_name
self.sheet = self.get_sheet() # 整個(gè)表格對(duì)象
def get_sheet(self):
wb = xlrd.open_workbook(self.file_path)
sheet = wb.sheet_by_name(self.sheet_name)
return sheet
def get_row_count(self):
row_count = self.sheet.nrows
return row_count
def get_col_count(self):
col_count = self.sheet.ncols
return col_count
def __get_cell_value(self,row_index, col_index):
cell_value = self.sheet.cell_value(row_index,col_index)
return cell_value
def get_merged_info(self):
merged_info = self.sheet.merged_cells
return merged_info
def get_merged_cell_value(self,row_index, col_index):
"""既能獲取普通單元格的數(shù)據(jù)又能獲取合并單元格數(shù)據(jù)"""
cell_value = None
for (rlow, rhigh, clow, chigh) in self.get_merged_info():
if (row_index >= rlow and row_index < rhigh):
if (col_index >= clow and col_index < chigh):
cell_value = self.__get_cell_value(rlow, clow)
break; # 防止循環(huán)去進(jìn)行判斷出現(xiàn)值覆蓋的情況
else:
cell_value = self.__get_cell_value(row_index, col_index)
else:
cell_value = self.__get_cell_value(row_index, col_index)
return cell_value
def get_sheet_data_by_dict(self):
all_data_list = []
first_row = self.sheet.row(0) #獲取首行數(shù)據(jù)
for row in range(1, self.get_row_count()):
row_dict = {}
for col in range(0, self.get_col_count()):
row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
all_data_list.append(row_dict)
return all_data_list
if __name__=='__main__':
current_path = os.path.dirname(__file__)
excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' )
excelUtils = ExcelUtils(excel_path,"Sheet1")
for row in excelUtils.get_sheet_data_by_dict():
print( row )
import os
from common.excel_utils import ExcelUtils
from common.config_utils import config
current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH )
class TestdataUtils():
def __init__(self,test_data_path = test_data_path):
self.test_data_path = test_data_path
self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict()
self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()
def __get_testcase_data_dict(self):
testcase_dict = {}
for row_data in self.test_data:
testcase_dict.setdefault( row_data['測(cè)試用例編號(hào)'],[] ).append( row_data )
return testcase_dict
def def_testcase_data_list(self):
testcase_list = []
for k,v in self.__get_testcase_data_dict().items():
one_case_dict = {}
one_case_dict["case_id"] = k
one_case_dict["case_info"] = v
testcase_list.append( one_case_dict )
return testcase_list
if __name__=="__main__":
testdataUtils = TestdataUtils()
for i in testdataUtils.def_testcase_data_list():
print( i )
testdata_utils.py 讀取Excel中的數(shù)據(jù)后處理成需要的數(shù)據(jù)
四、request封裝(核心封裝)
requests_utils.py 包含post請(qǐng)求,get請(qǐng)求,異常,調(diào)用斷言
import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils
class RequestsUtils():
def __init__(self):
self.hosts = config.hosts
self.headers = {"ContentType":"application/json;charset=utf-8"}
self.session = requests.session()
self.temp_variables = {}
def __get(self,get_info):
try:
url = self.hosts + get_info["請(qǐng)求地址"]
response = self.session.get( url = url,
params = ast.literal_eval(get_info["請(qǐng)求參數(shù)(get)"])
)
response.encoding = response.apparent_encoding
if get_info["取值方式"] == "json取值":
value = jsonpath.jsonpath( response.json(),get_info["取值代碼"] )[0]
self.temp_variables[ get_info["傳值變量"] ] = value
elif get_info["取值方式"] == "正則取值":
value = re.findall(get_info["取值代碼"],response.text)[0]
self.temp_variables[get_info["傳值變量"]] = value
result = CheckUtils(response).run_check(get_info['期望結(jié)果類型'], get_info['期望結(jié)果'])
except ProxyError as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:代理錯(cuò)誤異常' % (get_info["接口名稱"])}
except ConnectionError as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:連接超時(shí)異常' % (get_info["接口名稱"])}
except RequestException as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:Request異常,原因:%s' % (get_info["接口名稱"], e.__str__())}
except Exception as e:
result = {'code':4,'result':'[%s]請(qǐng)求:系統(tǒng)異常,原因:%s'%(get_info["接口名稱"],e.__str__())}
return result
def __post(self,post_info):
try:
url = self.hosts + post_info["請(qǐng)求地址"]
response = self.session.post( url = url,
headers = self.headers,
params = ast.literal_eval(post_info["請(qǐng)求參數(shù)(get)"]),
# data = post_infos["提交數(shù)據(jù)(post)"],
json=ast.literal_eval(post_info["提交數(shù)據(jù)(post)"])
)
response.encoding = response.apparent_encoding
if post_info["取值方式"] == "json取值":
value = jsonpath.jsonpath( response.json(),post_info["取值代碼"] )[0]
self.temp_variables[ post_info["傳值變量"] ] = value
elif post_info["取值方式"] == "正則取值":
value = re.findall(post_info["取值代碼"],response.text)[0]
self.temp_variables[post_info["傳值變量"]] = value
#調(diào)用CheckUtils()
result = CheckUtils(response).run_check(post_info['期望結(jié)果類型'],post_info['期望結(jié)果'])
except ProxyError as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:代理錯(cuò)誤異常' % (post_info["接口名稱"])}
except ConnectionError as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:連接超時(shí)異常' % (post_info["接口名稱"])}
except RequestException as e:
result = {'code': 4, 'result': '[%s]請(qǐng)求:Request異常,原因:%s' % (post_info["接口名稱"], e.__str__())}
except Exception as e:
result = {'code':4,'result':'[%s]請(qǐng)求:系統(tǒng)異常,原因:%s'%(post_info["接口名稱"],e.__str__())}
return result
def request(self,step_info):
try:
request_type = step_info["請(qǐng)求方式"]
param_variable_list = re.findall('\\${\w+}', step_info["請(qǐng)求參數(shù)(get)"])
if param_variable_list:
for param_variable in param_variable_list:
step_info["請(qǐng)求參數(shù)(get)"] = step_info["請(qǐng)求參數(shù)(get)"]\
.replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1]))
if request_type == "get":
result = self.__get( step_info )
elif request_type == "post":
data_variable_list = re.findall('\\${\w+}', step_info["提交數(shù)據(jù)(post)"])
if data_variable_list:
for param_variable in data_variable_list:
step_info["提交數(shù)據(jù)(post)"] = step_info["提交數(shù)據(jù)(post)"] \
.replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
result = self.__post( step_info )
else:
result = {'code':1,'result':'請(qǐng)求方式不支持'}
except Exception as e:
result = {'code':4,'result':'用例編號(hào)[%s]的[%s]步驟出現(xiàn)系統(tǒng)異常,原因:%s'%(step_info['測(cè)試用例編號(hào)'],step_info["測(cè)試用例步驟"],e.__str__())}
return result
def request_by_step(self,step_infos):
self.temp_variables = {}
for step_info in step_infos:
temp_result = self.request( step_info )
# print( temp_result )
if temp_result['code']!=0:
break
return temp_result
if __name__=="__main__":
case_info = [
{'請(qǐng)求方式': 'get', '請(qǐng)求地址': '/cgi-bin/token', '請(qǐng)求參數(shù)(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交數(shù)據(jù)(post)': '', '取值方式': 'json取值', '傳值變量': 'token', '取值代碼': '$.access_token', '期望結(jié)果類型': '正則匹配', '期望結(jié)果': '{"access_token":"(.+?)","expires_in":(.+?)}'},
{'請(qǐng)求方式': 'post', '請(qǐng)求地址': '/cgi-bin/tags/create', '請(qǐng)求參數(shù)(get)': '{"access_token":${token}}', '提交數(shù)據(jù)(post)': '{"tag" : {"name" : "衡東"}}','取值方式': '無', '傳值變量': '', '取值代碼': '', '期望結(jié)果類型': '正則匹配', '期望結(jié)果': '{"tag":{"id":(.+?),"name":"衡東"}}'}
]
RequestsUtils().request_by_step(case_info)
五、斷言封裝(核心封裝)
check_utils.py 斷言封裝,與實(shí)際結(jié)果核對(duì)
import re
import ast
class CheckUtils():
def __init__(self,check_response=None):
self.ck_response=check_response
self.ck_rules = {
'無': self.no_check,
'json鍵是否存在': self.check_key,
'json鍵值對(duì)': self.check_keyvalue,
'正則匹配': self.check_regexp
}
self.pass_result = {
'code': 0,
'response_reason': self.ck_response.reason,
'response_code': self.ck_response.status_code,
'response_headers': self.ck_response.headers,
'response_body': self.ck_response.text,
'check_result': True,
'message': '' # 擴(kuò)招作為日志輸出等
}
self.fail_result = {
'code': 2,
'response_reason': self.ck_response.reason,
'response_code': self.ck_response.status_code,
'response_headers': self.ck_response.headers,
'response_body': self.ck_response.text,
'check_result': False,
'message': '' # 擴(kuò)招作為日志輸出等
}
def no_check(self):
return self.pass_result
def check_key(self,check_data=None):
check_data_list = check_data.split(',') #把需要判斷的值做切割,取出鍵值
res_list = [] #存放每次比較的結(jié)果
wrong_key = [] #存放比較失敗key
for check_data in check_data_list: #把切割的鍵值和取出響應(yīng)結(jié)果中的所有的鍵一個(gè)一個(gè)對(duì)比
if check_data in self.ck_response.json().keys():
res_list.append(self.pass_result )
else:
res_list.append( self.fail_result )
wrong_key.append(check_data) #把失敗的鍵放進(jìn)來,便于后續(xù)日志輸出
# print(res_list)
# print(wrong_key)
if self.fail_result in res_list:
return self.fail_result
else:
return self.pass_result
def check_keyvalue(self,check_data=None):
res_list = [] # 存放每次比較的結(jié)果
wrong_items = [] # 存放比較失敗 items
for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串轉(zhuǎn)成字典,items()取出鍵值對(duì)
if check_item in self.ck_response.json().items():
res_list.append( self.pass_result )
else:
res_list.append( self.fail_result )
wrong_items.append(check_item)
# print( res_list )
# print( wrong_items )
if self.fail_result in res_list:
return self.fail_result
else:
return self.pass_result
def check_regexp(self,check_data=None):
pattern = re.compile(check_data)
if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不為空,為true
return self.pass_result
else:
return self.fail_result
def run_check(self,check_type=None,check_data=None):
code = self.ck_response.status_code
if code == 200:
if check_type in self.ck_rules.keys():
result=self.ck_rules[check_type](check_data)
return result
else:
self.fail_result['message'] = '不支持%s判斷方法'%check_type
return self.fail_result
else:
self.fail_result['message'] = '請(qǐng)求的響應(yīng)狀態(tài)碼非%s'%str(code)
return self.fail_result
if __name__=="__main__":
# 檢查鍵是否存在,{"access_token":"hello","expires_":7200} 設(shè)為響應(yīng)結(jié)果,"access_token,expires_in" 為檢查對(duì)象值
CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in")
#檢查鍵值對(duì)是否存在
CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}')
#正則對(duì)比
#TURE
print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+?)'))
#False
print(CheckUtils('{"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+?)'))
六、api_testcase下的api_test.py 封裝
import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils
#如果是mysql數(shù)據(jù)源的話切換成 def_testcase_data_list_by_mysql() exccel數(shù)據(jù)源:def_testcase_data_list()
case_infos = TestdataUtils().def_testcase_data_list_by_mysql()
@paramunittest.parametrized(
*case_infos
)
class APITest(paramunittest.ParametrizedTestCase):
def setUp(self) -> None:
warnings.simplefilter('ignore', ResourceWarning) #不會(huì)彈出警告提示
def setParameters(self, case_id, case_info):
self.case_id = case_id
self.case_info = case_info
def test_api_common_function(self):
'''測(cè)試描述'''
self._testMethodName = self.case_info[0].get("測(cè)試用例編號(hào)")
self._testMethodDoc = self.case_info[0].get("測(cè)試用例名稱")
actual_result = RequestsUtils().request_by_step(self.case_info)
self.assertTrue( actual_result.get('check_result'),actual_result.get('message') )
if __name__ == '__main__':
unittest.main()
七、common下的log_utils.py 封裝
import os
import logging
import time
from common.config_utils import config
current_path = os.path.dirname(__file__)
log_output_path = os.path.join( current_path,'..', config.LOG_PATH )
class LogUtils():
def __init__(self,log_path=log_output_path):
self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') )
self.logger = logging.getLogger("ApiTestLog")
self.logger.setLevel( config.LOG_LEVEL )
console_handler = logging.StreamHandler() # 控制臺(tái)輸出
file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8') # 文件輸出
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
self.logger.addHandler( console_handler )
self.logger.addHandler( file_handler )
console_handler.close() # 防止打印日志重復(fù)
file_handler.close() # 防止打印日志重復(fù)
def get_logger(self):
return self.logger
logger = LogUtils().get_logger() # 防止打印日志重復(fù)
if __name__ == '__main__':
logger.info('hello')
八、common下的config_utils.py的封裝
配置文件的編寫:

對(duì)配置文件的讀取封裝:
import os
import configparser
current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini")
print(cfgpath)
class ConfigUtils:
def __init__(self,config_path=cfgpath):
self.__conf=configparser.ConfigParser()
self.__conf.read(config_path, encoding="utf-8")
def read_ini(self,sec,option):
value=self.__conf.get(sec,option)
return value
@property
def hosts(self):
value=self.read_ini('default','hosts')
return value
@property
def LOG_PATH(self):
value = self.read_ini('path', 'LOG_PATH')
return value
@property
def CASE_DATA_PATH(self):
value = self.read_ini('path', 'CASE_DATA_PATH')
return value
@property
def REPORT_PATH(self):
value = self.read_ini('path', 'REPORT_PATH')
return value
@property
def LOG_LEVEL(self):
value = int(self.read_ini('log', 'LOG_LEVEL'))
return value
@property
def smtp_server(self):
smtp_server_value = self.read_ini('email', 'smtp_server')
return smtp_server_value
@property
def smtp_sender(self):
smtp_sender_value = self.read_ini('email', 'smtp_sender')
return smtp_sender_value
@property
def smtp_password(self):
smtp_password_value = self.read_ini('email', 'smtp_password')
return smtp_password_value
@property
def smtp_receiver(self):
smtp_receiver_value = self.read_ini('email', 'smtp_receiver')
return smtp_receiver_value
@property
def smtp_cc(self):
smtp_cc_value = self.read_ini('email', 'smtp_cc')
return smtp_cc_value
@property
def smtp_subject(self):
smtp_subject_value = self.read_ini('email', 'smtp_subject')
return smtp_subject_value
config=ConfigUtils()
if __name__=='__main__':
current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini")
config_u=ConfigUtils()
print(config_u.hosts)
print(config_u.LOG_LEVEL)
九、test_runner下的run_case.py 封裝
class RunCase():
def __init__(self):
self.test_case_path = test_case_path
self.report_path = test_report_path
self.title = 'P1P2接口自動(dòng)化測(cè)試報(bào)告'
self.description = '自動(dòng)化接口測(cè)試框架'
self.tester = '測(cè)試開發(fā)組'
def load_test_suite(self):
discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path,
pattern='api_test.py',
top_level_dir=self.test_case_path)
all_suite = unittest.TestSuite()
all_suite.addTest( discover )
return all_suite
def run(self):
report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
report_dir.create_dir(self.title)
report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path')
fp = open( report_file_path ,'wb' )
runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
title=self.title,
description=self.description,
tester=self.tester)
runner.run( self.load_test_suite() )
fp.close()
return report_file_path
if __name__=='__main__':
report_path = RunCase().run()
EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()
十、common下的email_utils.py 封裝
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.config_utils import config
class EmailUtils():
def __init__(self,smtp_body,smtp_attch_path=None):
self.smtp_server = config.smtp_server
self.smtp_sender = config.smtp_sender
self.smtp_password = config.smtp_password
self.smtp_receiver = config.smtp_receiver
self.smtp_cc = config.smtp_cc
self.smtp_subject = config.smtp_subject
self.smtp_body = smtp_body
self.smtp_attch = smtp_attch_path
def mail_message_body(self):
message = MIMEMultipart()
message['from'] = self.smtp_sender
message['to'] = self.smtp_receiver
message['Cc'] = self.smtp_cc
message['subject'] = self.smtp_subject
message.attach( MIMEText(self.smtp_body,'html','utf-8') )
if self.smtp_attch:
attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8')
attach_file['Content-Type'] = 'application/octet-stream'
attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch)))
message.attach(attach_file)
return message
def send_mail(self):
smtp = smtplib.SMTP()
smtp.connect(self.smtp_server)
smtp.login(user=self.smtp_sender, password=self.smtp_password)
smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string())
if __name__=='__main__':
html_path = os.path.dirname(__file__) + '/../test_reports/接口自動(dòng)化測(cè)試報(bào)告V1.1/接口自動(dòng)化測(cè)試報(bào)告V1.1.html'
EmailUtils('<h3 align="center">自動(dòng)化測(cè)試報(bào)告</h3>',html_path).send_mail()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python計(jì)算機(jī)視覺opencv圖像金字塔輪廓及模板匹配
這篇文章主要為大家介紹了python計(jì)算機(jī)視覺opencv圖像金字塔圖像輪廓及模板匹配的學(xué)習(xí)講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-11-11
Python使用指定字符長(zhǎng)度切分?jǐn)?shù)據(jù)示例
今天小編就為大家分享一篇Python使用指定字符長(zhǎng)度切分?jǐn)?shù)據(jù)示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-12-12
Python3實(shí)現(xiàn)定時(shí)任務(wù)的四種方式
Python實(shí)現(xiàn)定點(diǎn)與定時(shí)任務(wù)方式比較多,找到下面四中實(shí)現(xiàn)方式,每個(gè)方式都有自己應(yīng)用場(chǎng)景;下面來快速介紹Python中常用的定時(shí)任務(wù)實(shí)現(xiàn)方式,一起看看吧2019-06-06
進(jìn)行數(shù)據(jù)處理的6個(gè)?Python?代碼塊分享
這篇文章主要介紹了進(jìn)行數(shù)據(jù)處理6個(gè)Python代碼塊的分享,分享內(nèi)容有選取有空值的行、快速替換列值、對(duì)列進(jìn)行分區(qū)、將一列分為多列等內(nèi)容,需要的朋友可以參考一下2022-04-04
python 利用matplotlib在3D空間繪制二次拋物面的案例
這篇文章主要介紹了python 利用matplotlib在3D空間繪制二次拋物面的案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02
關(guān)于sklearn包導(dǎo)入錯(cuò)誤:ImportError:?cannot?import?name Type解
這篇文章主要介紹了關(guān)于sklearn包導(dǎo)入錯(cuò)誤:ImportError:?cannot?import?name‘Type‘解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02
python獲取文件版本信息、公司名和產(chǎn)品名的方法
這篇文章主要介紹了python獲取文件版本信息、公司名和產(chǎn)品名的方法,是Python程序設(shè)計(jì)中非常實(shí)用的技巧,需要的朋友可以參考下2014-10-10
使用 Python 實(shí)現(xiàn)微信消息的一鍵已讀的思路代碼
利用python可以實(shí)現(xiàn)微信消息的一鍵已讀功能,怎么實(shí)現(xiàn)呢?你肯定會(huì)想著很復(fù)雜,但是python的好處就是很多人已經(jīng)把接口打包做好了,只需要調(diào)用即可,今天通過本文給大家分享使用 Python 實(shí)現(xiàn)微信消息的一鍵已讀的思路代碼,一起看看吧2021-06-06

