python中使用websocket方法實例詳解
WebSocket是一種網(wǎng)絡(luò)通信協(xié)議,它在單個TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實現(xiàn)實時通信。
websockets是Python中最常用的網(wǎng)絡(luò)庫之一,也是websocket協(xié)議的Python實現(xiàn)。它不僅作為基礎(chǔ)組件在眾多項目中發(fā)揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網(wǎng):https://github.com/python-websockets/websockets
1. 什么是WebSocket?
WebSocket協(xié)議是在2008年由Web應(yīng)用程序設(shè)計師和開發(fā)人員創(chuàng)建的,目的是為了在Web瀏覽器和服務(wù)器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務(wù)器在任何時候發(fā)送消息,無需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務(wù)器之間傳輸文本和二進(jìn)制數(shù)據(jù),使得構(gòu)建實時Web應(yīng)用程序變得更加簡單。
2. 在Python中使用WebSocket
Python中有多個庫可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫來演示W(wǎng)ebSocket編程。
要安裝websockets庫,你可以使用pip:
pip install websockets
3. 創(chuàng)建WebSocket服務(wù)器
使用websockets庫,我們可以輕松地創(chuàng)建一個WebSocket服務(wù)器。以下是一個簡單的示例:
import asyncio import websockets async def echo(websocket, path): async for message in websocket: print(f"Received message: {message}") await websocket.send(f"Echo: {message}") start_server = websockets.serve(echo, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
在這個示例中,我們定義了一個名為echo的協(xié)程函數(shù),它接收兩個參數(shù):websocket和path。該函數(shù)使用async for循環(huán)讀取客戶端發(fā)送的消息,并將消息發(fā)送回客戶端。
然后,我們使用websockets.serve()函數(shù)創(chuàng)建一個WebSocket服務(wù)器,監(jiān)聽本地主機的8765端口。最后,我們使用asyncio的事件循環(huán)啟動服務(wù)器。
4. 創(chuàng)建WebSocket客戶端
要創(chuàng)建一個WebSocket客戶端,我們同樣可以使用websockets庫。以下是一個簡單的客戶端示例:
import asyncio import websockets async def main(): async with websockets.connect("ws://localhost:8765") as websocket: message = "Hello, server!" await websocket.send(message) print(f"Sent: {message}") response = await websocket.recv() print(f"Received: {response}") asyncio.run(main())
在這個示例中,我們使用websockets.connect()函數(shù)建立與WebSocket服務(wù)器的連接。然后,我們使用send()方法向服務(wù)器發(fā)送消息,并使用recv()方法接收服務(wù)器的響應(yīng)。
5. 總結(jié)
WebSocket協(xié)議為Web瀏覽器和服務(wù)器之間提供了實時雙向通信的能力,使得構(gòu)建實時Web應(yīng)用程序變得更加容易。在Python中,我們可以使用websockets庫輕松地實現(xiàn)WebSocket編程。
6. 通過websockets這個項目,從大型開源項目中學(xué)習(xí)asyncio庫。
一、asyncio.Transport
在官方文檔中,Transport被描述成對socket的抽象,它控制著如何傳輸數(shù)據(jù)。除了websockets,uvicorn、daphne等ASGI實現(xiàn)都會用到Transport。
Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼?zhèn)渥x和寫的功能,可以類比為讀寫socket對象。
Transport對象提供以下常用函數(shù)——
is_reading:判斷該Transport是否在讀。
set_write_buffer_limits:設(shè)置寫入Transport的高和低水位??紤]到網(wǎng)絡(luò)狀況,有時不希望寫入過多的數(shù)據(jù)。
write、write_eof、write_line:為當(dāng)前Transport寫入數(shù)據(jù),分別表示寫入二進(jìn)制數(shù)據(jù)、eof和二進(jìn)制行數(shù)據(jù)。其中eof寫入后不會關(guān)閉Transport,但會flush數(shù)據(jù)。
abort:立刻關(guān)閉Transport,不接受新的數(shù)據(jù)。留在緩沖的數(shù)據(jù)也會丟失,后續(xù)調(diào)用Protocol的connection_lost函數(shù)。
在websockets中,Transport使用場景不多,一般都是通過Protocol對象的回調(diào)參數(shù)使用的。在websocket的初始化過程中,會設(shè)置Transport的最高水位。同樣,在這種場景下,該對象也是作為回調(diào)參數(shù)使用的。
二、asyncio.Protocol
如果Transport是對socket的抽象,那么Protocol就是對協(xié)議的抽象。它提供了如何使用Transport的方式。
用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個Unimplemented函數(shù)需要用戶去實現(xiàn)——
connection_made:當(dāng)連接建立時會執(zhí)行該函數(shù),該函數(shù)包含一個Transport類型的參數(shù)。
connection_lost:當(dāng)連接丟失或者關(guān)閉時會執(zhí)行該函數(shù),該函數(shù)包含一個Exception類型的參數(shù)。
pause_writing:當(dāng)Transport對象寫入的數(shù)據(jù)高于之前設(shè)置的高水位時被調(diào)用,一般會暫停數(shù)據(jù)的寫入。
resume_writing:當(dāng)Transport對象寫入的數(shù)據(jù)低于之前設(shè)置的低水位時被調(diào)用,一般用于恢復(fù)數(shù)據(jù)寫入。
data_received:當(dāng)有數(shù)據(jù)被接受時回調(diào),該函數(shù)包含一個二進(jìn)制對象data,用來表示接受的數(shù)據(jù)。
eof_received:當(dāng)被Transport對象被調(diào)用write_eof時被調(diào)用。
在websockets中,server端的connection_made實現(xiàn)截圖如圖所示。在該函數(shù)中,websockets將用戶實現(xiàn)的handler封裝成task對象,并和websocket的server綁定。
而在client端中實現(xiàn)如第一節(jié)截圖所示,只是在reader中注冊該Transport對象。
websockets的connection_lost函數(shù)實現(xiàn)方式如下。主要操作即更新狀態(tài)、關(guān)閉pings、更新對應(yīng)的waiter狀態(tài),以及維護(hù)reader對象。
在其他函數(shù)的實現(xiàn)中,websockets也主要用到了reader對象完成數(shù)據(jù)流的暫停和恢復(fù),以及數(shù)據(jù)的寫入。
從上面代碼實現(xiàn)可以看出,websockets通過reader代理完成數(shù)據(jù)流的操作。這個reader是一個asyncio.StreamReader對象。這個對象具體如何使用將在下一篇介紹。
附錄:進(jìn)階版本:
python使用websockets庫
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個websocket。
connect: 在client端使用,用于建立連接。
send:發(fā)送數(shù)據(jù)
recv:接收數(shù)據(jù)
close:關(guān)閉連接
服務(wù)端
#!/usr/bin/python3 # 主要功能:創(chuàng)建1個基本的websocket server, 符合asyncio 開發(fā)要求 import asyncio import websockets from datetime import datetime async def handler(websocket): data = await websocket.recv() reply = f"Data received as \"{data}\". time: {datetime.now()}" print(reply) await websocket.send(reply) print("Send reply") async def main(): async with websockets.serve(handler, "localhost", 9999): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())
客戶端
import asyncio import websockets import time async def ws_client(url): for i in range(1, 40): async with websockets.connect(url) as websocket: await websocket.send("Hello, I am PyPy.") response = await websocket.recv() print(response) time.sleep(1) asyncio.run(ws_client('ws://localhost:9999'))
服務(wù)端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" # 握手,通過接收Hi,發(fā)送"success"來進(jìn)行雙方的握手。 async def serverHands(websocket): while True: recv_text = await websocket.recv() print("recv_text=" + recv_text) if recv_text == "Hi": print("connected success") await websocket.send("success") return True else: await websocket.send("connected fail") # 接收從客戶端發(fā)來的消息并處理,再返給客戶端success async def serverRecv(websocket): while True: recv_text = await websocket.recv() print("recv:", recv_text) await websocket.send("success,get mess:"+ recv_text) # 握手并且接收數(shù)據(jù) async def serverRun(websocket, path): print(path) await serverHands(websocket) await serverRecv(websocket) # main function if __name__ == '__main__': print("======server======") server = websockets.serve(serverRun, IP_ADDR, IP_PORT) asyncio.get_event_loop().run_until_complete(server) asyncio.get_event_loop().run_forever()
客戶端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" async def clientHands(websocket): while True: # 通過發(fā)送hello握手 await websocket.send("Hi") response_str = await websocket.recv() # 接收"success"來進(jìn)行雙方的握手 if "success" in response_str: print("握手成功") return True # 向服務(wù)器端發(fā)送消息 async def clientSend(websocket): while True: input_text = input("input text: ") if input_text == "exit": print(f'"exit", bye!') await websocket.close(reason="exit") return False await websocket.send(input_text) recv_text = await websocket.recv() print(f"{recv_text}") # 進(jìn)行websocket連接 async def clientRun(): ipaddress = IP_ADDR + ":" + IP_PORT async with websockets.connect("ws://" + ipaddress) as websocket: await clientHands(websocket) await clientSend(websocket) # main function if __name__ == '__main__': print("======client======") asyncio.get_event_loop().run_until_complete(clientRun())
服務(wù)端
# -*- coding:utf8 -*- import json import socket import asyncio import logging import websockets import multiprocessing IP = '127.0.0.1' PORT_CHAT = 9090 USERS ={} #提供聊天的后臺 async def ServerWs(websocket,path): logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', filename="chat.log", level=logging.INFO) # 握手 await websocket.send(json.dumps({"type": "handshake"})) async for message in websocket: data = json.loads(message) message = '' # 用戶發(fā)信息 if data["type"] == 'send': name = '404' for k, v in USERS.items(): if v == websocket: name = k data["from"] = name if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "user", "content": data["content"], "from": name}) # 用戶注冊 elif data["type"] == 'register': try: USERS[data["uuid"]] = websocket if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "login", "content": data["content"], "user_list": list(USERS.keys())}) except Exception as exp: print(exp) # 用戶注銷 elif data["type"] == 'unregister': del USERS[data["uuid"]] if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())}) #打印日志 logging.info(data) # 群發(fā) await asyncio.wait([user.send(message) for user in USERS.values()]) def server_run(): print("server") start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() if __name__ == "__main__": from multiprocessing import Process multiprocessing.freeze_support() server = Process(target=server_run, daemon=False) server.start()
服務(wù)端
import asyncio import websockets import time import json import threading # 功能模塊 class OutputHandler(): async def run(self,message,send_ms,websocket): # 用戶發(fā)信息 await send_ms(message, websocket) # 單發(fā)消息 # await send_ms(message, websocket) # 群發(fā)消息 #await s('hi起來') # 存儲所有的客戶端 Clients = {} # 服務(wù)端 class WS_Server(): def __init__(self): self.ip = "127.0.0.1" self.port = 9090 # 回調(diào)函數(shù)(發(fā)消息給客戶端) async def callback_send(self, msg, websocket=None): await self.sendMsg(msg, websocket) # 發(fā)送消息 async def sendMsg(self, msg, websocket): print('sendMsg:', msg) # websocket不為空,單發(fā),為空,群發(fā)消息 if websocket != None: await websocket.send(msg) else: # 群發(fā)消息 await self.broadcastMsg(msg) # 避免被卡線程 await asyncio.sleep(0.2) # 群發(fā)消息 async def broadcastMsg(self, msg): for user in Clients: await user.send(msg) # 針對不同的信息進(jìn)行請求,可以考慮json文本 async def runCaseX(self,jsonMsg,websocket): print('runCase') op = OutputHandler() # 參數(shù):消息、方法、socket await op.run(jsonMsg,self.callback_send,websocket) # 連接一個客戶端,起一個循環(huán)監(jiān)聽 async def echo(self,websocket, path): # 添加到客戶端列表 # Clients.append(websocket) # 握手 await websocket.send(json.dumps({"type": "handshake"})) # 循環(huán)監(jiān)聽 while True: # 接受信息 try: # 接受文本 recv_text = await websocket.recv() message = "Get message: {}".format(recv_text) # 返回客戶端信息 await websocket.send(message) # 轉(zhuǎn)json data = json.loads(recv_text) # 用戶發(fā)信息 if data["type"] == 'send': name = '404' for k, v in Clients.items(): if v == websocket: name = k data["from"] = name if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "send", "content": data["content"], "from": name}) await self.runCaseX(jsonMsg=message, websocket=websocket) # 用戶注冊 elif data["type"] == 'register': try: Clients[data["uuid"]] = websocket if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())}) await self.runCaseX(jsonMsg=message, websocket=websocket) except Exception as exp: print(exp) # 用戶注銷 elif data["type"] == 'unregister': del Clients[data["uuid"]] # 對message進(jìn)行解析,跳進(jìn)不同功能區(qū) # await self.runCaseX(jsonMsg=data,websocket=websocket) # 鏈接斷開 except websockets.ConnectionClosed: print("ConnectionClosed...", path) # del Clients break # 無效狀態(tài) except websockets.InvalidState: print("InvalidState...") # del Clients break # 報錯 except Exception as e: print("ws連接報錯",e) # del Clients break # 啟動服務(wù)器 async def runServer(self): async with websockets.serve(self.echo, self.ip, self.port): await asyncio.Future() # run forever # 多協(xié)程模式,防止阻塞主線程無法做其他事情 def WebSocketServer(self): asyncio.run(self.runServer()) # 多線程啟動 def startServer(self): # 多線程啟動,否則會堵塞 thread = threading.Thread(target=self.WebSocketServer) thread.start() # thread.join() if __name__=='__main__': print("server") s = WS_Server() s.startServer()
到此這篇關(guān)于python的websocket方法教程的文章就介紹到這了,更多相關(guān)python的websocket方法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用python進(jìn)行PostgreSQL數(shù)據(jù)庫連接全過程
這篇文章主要介紹了使用python進(jìn)行PostgreSQL數(shù)據(jù)庫連接的相關(guān)資料,包括安裝psycopg2模塊、使用PyCharm進(jìn)行圖形化連接、代碼連接數(shù)據(jù)庫的方法、以及如何執(zhí)行DML和DQL操作,需要的朋友可以參考下2025-03-03python3使用libpcap庫進(jìn)行抓包及數(shù)據(jù)處理的操作方法
這篇文章主要介紹了python3使用libpcap庫進(jìn)行抓包及數(shù)據(jù)處理,需要的朋友可以參考下2022-10-10Keras自動下載的數(shù)據(jù)集/模型存放位置介紹
這篇文章主要介紹了Keras自動下載的數(shù)據(jù)集/模型存放位置介紹,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06pycharm中選中一個單詞替換所有重復(fù)單詞的實現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個單詞替換所有重復(fù)單詞的實現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-11-11Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享
這篇文章主要介紹了Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享,具有一定借鑒價值,需要的朋友可以參考下2018-01-01