用Python進(jìn)行websocket接口測(cè)試
我們?cè)谧鼋涌跍y(cè)試時(shí),除了常見的http接口,還有一種比較多見,就是socket接口,今天講解下怎么用Python進(jìn)行websocket接口測(cè)試。
現(xiàn)在大多數(shù)用的都是websocket,那我們就先來安裝一下websocket的安裝包。
pip install websocket-client
安裝完之后,我們就開始我們的websocket之旅了。
我們先來看個(gè)炒雞簡(jiǎn)單的栗子:
import websocket ws = websocket.WebSocket() ws.connect("ws://example.com/websocket", http_proxy_host="proxy_host_name", http_proxy_port=3128)
這個(gè)栗子就是創(chuàng)建一個(gè)websocket連接,這個(gè)模塊支持通過http代理訪問websocket。代理服務(wù)器允許使用connect方法連接到websocket端口。默認(rèn)的squid設(shè)置是“只允許連接HTTPS端口”。
在websocket里,我們有常用的這幾個(gè)方法:
on_message方法:
def on_message(ws, message): print(message)
on_message是用來接受消息的,server發(fā)送的所有消息都可以用on_message這個(gè)方法來收取。
on_error方法:
def on_error(ws, error): print(error)
這個(gè)方法是用來處理錯(cuò)誤異常的,如果一旦socket的程序出現(xiàn)了通信的問題,就可以被這個(gè)方法捕捉到。
on_open方法:
def on_open(ws): def run(*args): for i in range(30): # send the message, then wait # so thread doesn't exit and socket # isn't closed ws.send("Hello %d" % i) time.sleep(1) time.sleep(1) ws.close() print("Thread terminating...") Thread(target=run).start()
on_open方法是用來保持連接的,上面這樣的一個(gè)例子,就是保持連接的一個(gè)過程,每隔一段時(shí)間就會(huì)來做一件事,他會(huì)在30s內(nèi)一直發(fā)送hello。最后停止。
on_close方法:
def on_close(ws): print("### closed ###")
onclose主要就是關(guān)閉socket連接的。
如何創(chuàng)建一個(gè)websocket應(yīng)用:
ws = websocket.WebSocketApp("wss://echo.websocket.org")
括號(hào)里面就是你要連接的socket的地址,在WebSocketApp這個(gè)實(shí)例化的方法里面還可以有其他參數(shù),這些參數(shù)就是我們剛剛介紹的這些方法。
ws = websocket.WebSocketApp("ws://echo.websocket.org/", on_message=on_message, on_error=on_error, on_close=on_close)
指定了這些參數(shù)之后就可以直接進(jìn)行調(diào)用了,例如:
ws.on_open = on_open
這樣就是調(diào)用了on_open方法
如果我們想讓我們的socket保持長(zhǎng)連接,一直連接著,就可以使用run_forever方法:
ws.run_forever()
完整代碼:
import websocket from threading import Thread import time import sys def on_message(ws, message): print(message) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def run(*args): for i in range(3): # send the message, then wait # so thread doesn't exit and socket # isn't closed ws.send("Hello %d" % i) time.sleep(1) time.sleep(1) ws.close() print("Thread terminating...") Thread(target=run).start() if __name__ == "__main__": websocket.enableTrace(True) host = "ws://echo.websocket.org/" ws = websocket.WebSocketApp(host, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever()
如果想要通信一條短消息,并在完成后立即斷開連接,我們可以使用短連接:
from websocket import create_connection ws = create_connection("ws://echo.websocket.org/") print("Sending 'Hello, World'...") ws.send("Hello, World") print("Sent") print("Receiving...") result = ws.recv() print("Received '%s'" % result) ws.close()
關(guān)于websocket的介紹就到這兒了。
以上就是用Python進(jìn)行websocket接口測(cè)試的詳細(xì)內(nèi)容,更多關(guān)于python 接口測(cè)試的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Django和Flask框架優(yōu)缺點(diǎn)對(duì)比
這篇文章主要介紹了Django和Flask框架相關(guān)對(duì)比,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10使用Django框架中ORM系統(tǒng)實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)增刪改查
這篇文章主要介紹了使用Django的ORM實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)數(shù)據(jù)增刪改查方法,文中附含詳細(xì)示例代碼以及過程詳解,有需要的朋友可以借鑒參考下2021-09-09python 實(shí)現(xiàn)批量圖片識(shí)別并翻譯
這篇文章主要介紹了python 實(shí)現(xiàn)批量圖片識(shí)別并翻譯,幫助大家利用python處理圖片,感興趣的朋友可以了解下2020-11-11pygame實(shí)現(xiàn)方塊動(dòng)畫實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于pygame實(shí)現(xiàn)方塊動(dòng)畫實(shí)例講解內(nèi)容,以后需要的朋友們可以學(xué)習(xí)參考下。2021-12-12python+POP3實(shí)現(xiàn)批量下載郵件附件
這篇文章主要為大家詳細(xì)介紹了python+POP3實(shí)現(xiàn)批量下載郵件附件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06如何利用python寫GUI及生成.exe可執(zhí)行文件
工作中需要開發(fā)一個(gè)小工具,簡(jiǎn)單的UI界面可以很好的提高工具的實(shí)用性,由此開啟了我的第一次GUI開發(fā)之旅,這篇文章主要給大家介紹了關(guān)于如何利用python寫GUI及生成.exe可執(zhí)行文件的相關(guān)資料,需要的朋友可以參考下2021-12-12Python遍歷目錄下文件、讀取、千萬條數(shù)據(jù)合并詳情
這篇文章主要介紹了Python遍歷目錄下文件、讀取、千萬條數(shù)據(jù)合并詳情,對(duì)文件夾和文件進(jìn)行屬性判斷,首先對(duì)文件夾進(jìn)行遍歷,看文件夾里有什么樣的文件,讀取出文件夾中的所有文件,下面文章將詳細(xì)介紹該內(nèi)容,需要的小伙伴可以參考一下2022-01-01