使用Python實現(xiàn)WebSocket服務(wù)器與客戶端通信功能
簡介
WebSocket 是一種基于 TCP 協(xié)議的通信協(xié)議,能夠在客戶端與服務(wù)器之間進(jìn)行全雙工(雙向)通信。相比傳統(tǒng)的 HTTP 協(xié)議,WebSocket 可以實現(xiàn)實時數(shù)據(jù)的傳輸,尤其適合需要實時交互的應(yīng)用場景,如在線游戲、實時聊天、金融交易等。
我通過 Python 實現(xiàn)一個簡單的 WebSocket 服務(wù)器,并使其與客戶端進(jìn)行通信。我們將創(chuàng)建兩個 Python 文件:websocket.py
和 main.py
,websocket.py
負(fù)責(zé)實現(xiàn) WebSocket 服務(wù)器的功能,main.py
負(fù)責(zé)啟動和管理服務(wù)器,以及定時向客戶端發(fā)送消息。
安裝 WebSocket 庫
在開始之前,需要安裝 websockets
庫,它是 Python 中非常流行的 WebSocket 實現(xiàn)。
pip install websockets
websocket.py — WebSocket 服務(wù)器實現(xiàn)
在 websocket.py
文件中,我定義了一個 WebSocketServer
類,包含了 WebSocket 服務(wù)器的主要邏輯。
代碼解析
import asyncio import websockets class WebSocketServer: def __init__(self, host="localhost", port=8765): self.host = host self.port = port self.clients = set() # 存儲所有連接的客戶端
首先,我創(chuàng)建了一個 WebSocketServer
類,它的構(gòu)造方法初始化了服務(wù)器的主機(jī)地址 host
和端口號 port
,同時維護(hù)了一個客戶端集合 clients
來存儲當(dāng)前連接的 WebSocket 客戶端。
async def handle_client(self, websocket): # 新的客戶端連接 self.clients.add(websocket) try: async for message in websocket: print(f"收到消息: {message}") # 回顯消息給客戶端 await websocket.send(f"服務(wù)器已收到: {message}") except websockets.ConnectionClosed as e: print(f"客戶端斷開連接: {e}") finally: # 移除斷開的客戶端 self.clients.remove(websocket)
handle_client
是一個異步方法,用來處理與客戶端的連接。在客戶端發(fā)送消息時,服務(wù)器會接收到該消息,并通過 websocket.send()
方法將回顯消息發(fā)送給客戶端。如果客戶端斷開連接,捕獲到 ConnectionClosed
異常,并從 clients
集合中移除該客戶端。
async def send(self, message): """向所有連接的客戶端發(fā)送消息""" if not self.clients: print("沒有客戶端連接,無法發(fā)送消息") return disconnected_clients = set() for client in self.clients: try: await client.send(message) except websockets.ConnectionClosed: disconnected_clients.add(client) # 清理已斷開的客戶端 self.clients -= disconnected_clients
send
方法允許服務(wù)器向所有連接的客戶端發(fā)送消息。如果有客戶端斷開連接,服務(wù)器會將其從 clients
集合中移除。
async def start(self): print(f"啟動 WebSocket 服務(wù)器: ws://{self.host}:{self.port}") async with websockets.serve(self.handle_client, self.host, self.port): await asyncio.Future() # 持續(xù)運(yùn)行直到手動停止
start
方法啟動 WebSocket 服務(wù)器,通過 websockets.serve()
啟動一個 WebSocket 服務(wù),該服務(wù)會監(jiān)聽來自客戶端的連接請求,并調(diào)用 handle_client
方法處理這些請求。await asyncio.Future()
會讓服務(wù)器持續(xù)運(yùn)行,直到手動停止。
main.py — 啟動 WebSocket 服務(wù)器并定時發(fā)送消息
在 main.py
文件中,創(chuàng)建了一個 WebSocket 服務(wù)器的實例,并啟動了一個定時任務(wù),定期向連接的客戶端發(fā)送消息。
代碼解析
import asyncio from websocket import WebSocketServer async def main(): # 創(chuàng)建 WebSocket 服務(wù)器實例 websocket_server = WebSocketServer() websocket_task = asyncio.create_task(websocket_server.start()) # 啟動 WebSocket 服務(wù)器
在 main()
函數(shù)中,首先創(chuàng)建了 WebSocketServer
的實例,并啟動了 WebSocket 服務(wù)器。
# 定時向 WebSocket 客戶端發(fā)送數(shù)據(jù) async def send_data_periodically(): while True: await asyncio.sleep(5) # 每 5 秒發(fā)送一次數(shù)據(jù) await websocket_server.send("服務(wù)器定時發(fā)送消息:ping")
send_data_periodically
是一個定時任務(wù),每隔 5 秒就向所有連接的客戶端發(fā)送一次 "服務(wù)器定時發(fā)送消息:ping"
。
periodic_task = asyncio.create_task(send_data_periodically()) # 并發(fā)運(yùn)行所有任務(wù) await asyncio.gather(websocket_task, periodic_task)
通過 asyncio.create_task()
啟動了定時任務(wù),并通過 asyncio.gather()
并發(fā)運(yùn)行 WebSocket 服務(wù)器和定時發(fā)送消息的任務(wù)。
if __name__ == "__main__": asyncio.run(main())
最后,我們 asyncio.run(main())
啟動了整個程序。
總結(jié)
通過這兩個文件,實現(xiàn)了一個簡單的 WebSocket 服務(wù)器,該服務(wù)器能夠接收客戶端消息并進(jìn)行回顯。同時,服務(wù)器也能夠定時向所有連接的客戶端發(fā)送消息。
完整代碼
websocket.py
import asyncio import websockets class WebSocketServer: def __init__(self, host="localhost", port=8765): self.host = host self.port = port self.clients = set() # 存儲所有連接的客戶端 async def handle_client(self, websocket): # 新的客戶端連接 self.clients.add(websocket) try: async for message in websocket: print(f"收到消息: {message}") # 回顯消息給客戶端 await websocket.send(f"服務(wù)器已收到: {message}") except websockets.ConnectionClosed as e: print(f"客戶端斷開連接: {e}") finally: # 移除斷開的客戶端 self.clients.remove(websocket) async def send(self, message): """向所有連接的客戶端發(fā)送消息""" if not self.clients: print("沒有客戶端連接,無法發(fā)送消息") return disconnected_clients = set() for client in self.clients: try: await client.send(message) except websockets.ConnectionClosed: disconnected_clients.add(client) # 清理已斷開的客戶端 self.clients -= disconnected_clients async def start(self): print(f"啟動 WebSocket 服務(wù)器: ws://{self.host}:{self.port}") async with websockets.serve(self.handle_client, self.host, self.port): await asyncio.Future() # 持續(xù)運(yùn)行直到手動停止
main.py
import asyncio from websocket import WebSocketServer async def main(): # 創(chuàng)建 WebSocket 服務(wù)器實例 websocket_server = WebSocketServer() websocket_task = asyncio.create_task(websocket_server.start()) # 啟動 WebSocket 服務(wù)器 # 定時向 WebSocket 客戶端發(fā)送數(shù)據(jù) async def send_data_periodically(): while True: await asyncio.sleep(5) # 每 5 秒發(fā)送一次數(shù)據(jù) await websocket_server.send("服務(wù)器定時發(fā)送消息:ping") periodic_task = asyncio.create_task(send_data_periodically()) # 并發(fā)運(yùn)行所有任務(wù) await asyncio.gather(websocket_task, periodic_task) if __name__ == "__main__": asyncio.run(main())
以上就是使用Python實現(xiàn)WebSocket服務(wù)器與客戶端通信的詳細(xì)內(nèi)容,更多關(guān)于Python WebSocket通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas時間序列之pd.to_datetime()的實現(xiàn)
本文主要介紹了pandas時間序列之pd.to_datetime()的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧<BR>2022-06-06Python使用urllib模塊的urlopen超時問題解決方法
這篇文章主要介紹了Python使用urllib模塊的urlopen超時問題解決方法,本文使用socket模塊中的setdefaulttimeout函數(shù)解決了超時問題,需要的朋友可以參考下2014-11-11詳解win10下pytorch-gpu安裝以及CUDA詳細(xì)安裝過程
這篇文章主要介紹了win10下pytorch-gpu安裝以及CUDA詳細(xì)安裝過程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01PyTorch中的squeeze()和unsqueeze()解析與應(yīng)用案例
這篇文章主要介紹了PyTorch中的squeeze()和unsqueeze()解析與應(yīng)用案例,文章內(nèi)容介紹詳細(xì),需要的小伙伴可以參考一下,希望對你有所幫助2022-03-03