python中使用websocket方法實(shí)例詳解
WebSocket是一種網(wǎng)絡(luò)通信協(xié)議,它在單個(gè)TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實(shí)現(xiàn)實(shí)時(shí)通信。
websockets是Python中最常用的網(wǎng)絡(luò)庫(kù)之一,也是websocket協(xié)議的Python實(shí)現(xiàn)。它不僅作為基礎(chǔ)組件在眾多項(xiàng)目中發(fā)揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網(wǎng):https://github.com/python-websockets/websockets
1. 什么是WebSocket?
WebSocket協(xié)議是在2008年由Web應(yīng)用程序設(shè)計(jì)師和開(kāi)發(fā)人員創(chuàng)建的,目的是為了在Web瀏覽器和服務(wù)器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務(wù)器在任何時(shí)候發(fā)送消息,無(wú)需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務(wù)器之間傳輸文本和二進(jìn)制數(shù)據(jù),使得構(gòu)建實(shí)時(shí)Web應(yīng)用程序變得更加簡(jiǎn)單。
2. 在Python中使用WebSocket
Python中有多個(gè)庫(kù)可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫(kù)來(lái)演示W(wǎng)ebSocket編程。
要安裝websockets庫(kù),你可以使用pip:
pip install websockets
3. 創(chuàng)建WebSocket服務(wù)器
使用websockets庫(kù),我們可以輕松地創(chuàng)建一個(gè)WebSocket服務(wù)器。以下是一個(gè)簡(jiǎn)單的示例:
import asyncio import websockets async def echo(websocket, path): async for message in websocket: print(f"Received message: {message}") await websocket.send(f"Echo: {message}") start_server = websockets.serve(echo, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
在這個(gè)示例中,我們定義了一個(gè)名為echo的協(xié)程函數(shù),它接收兩個(gè)參數(shù):websocket和path。該函數(shù)使用async for循環(huán)讀取客戶端發(fā)送的消息,并將消息發(fā)送回客戶端。
然后,我們使用websockets.serve()函數(shù)創(chuàng)建一個(gè)WebSocket服務(wù)器,監(jiān)聽(tīng)本地主機(jī)的8765端口。最后,我們使用asyncio的事件循環(huán)啟動(dòng)服務(wù)器。
4. 創(chuàng)建WebSocket客戶端
要?jiǎng)?chuàng)建一個(gè)WebSocket客戶端,我們同樣可以使用websockets庫(kù)。以下是一個(gè)簡(jiǎn)單的客戶端示例:
import asyncio import websockets async def main(): async with websockets.connect("ws://localhost:8765") as websocket: message = "Hello, server!" await websocket.send(message) print(f"Sent: {message}") response = await websocket.recv() print(f"Received: {response}") asyncio.run(main())
在這個(gè)示例中,我們使用websockets.connect()函數(shù)建立與WebSocket服務(wù)器的連接。然后,我們使用send()方法向服務(wù)器發(fā)送消息,并使用recv()方法接收服務(wù)器的響應(yīng)。
5. 總結(jié)
WebSocket協(xié)議為Web瀏覽器和服務(wù)器之間提供了實(shí)時(shí)雙向通信的能力,使得構(gòu)建實(shí)時(shí)Web應(yīng)用程序變得更加容易。在Python中,我們可以使用websockets庫(kù)輕松地實(shí)現(xiàn)WebSocket編程。
6. 通過(guò)websockets這個(gè)項(xiàng)目,從大型開(kāi)源項(xiàng)目中學(xué)習(xí)asyncio庫(kù)。
一、asyncio.Transport
在官方文檔中,Transport被描述成對(duì)socket的抽象,它控制著如何傳輸數(shù)據(jù)。除了websockets,uvicorn、daphne等ASGI實(shí)現(xiàn)都會(huì)用到Transport。
Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼?zhèn)渥x和寫的功能,可以類比為讀寫socket對(duì)象。
Transport對(duì)象提供以下常用函數(shù)——
is_reading:判斷該Transport是否在讀。
set_write_buffer_limits:設(shè)置寫入Transport的高和低水位。考慮到網(wǎng)絡(luò)狀況,有時(shí)不希望寫入過(guò)多的數(shù)據(jù)。
write、write_eof、write_line:為當(dāng)前Transport寫入數(shù)據(jù),分別表示寫入二進(jìn)制數(shù)據(jù)、eof和二進(jìn)制行數(shù)據(jù)。其中eof寫入后不會(huì)關(guān)閉Transport,但會(huì)flush數(shù)據(jù)。
abort:立刻關(guān)閉Transport,不接受新的數(shù)據(jù)。留在緩沖的數(shù)據(jù)也會(huì)丟失,后續(xù)調(diào)用Protocol的connection_lost函數(shù)。
在websockets中,Transport使用場(chǎng)景不多,一般都是通過(guò)Protocol對(duì)象的回調(diào)參數(shù)使用的。在websocket的初始化過(guò)程中,會(huì)設(shè)置Transport的最高水位。同樣,在這種場(chǎng)景下,該對(duì)象也是作為回調(diào)參數(shù)使用的。
二、asyncio.Protocol
如果Transport是對(duì)socket的抽象,那么Protocol就是對(duì)協(xié)議的抽象。它提供了如何使用Transport的方式。
用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個(gè)Unimplemented函數(shù)需要用戶去實(shí)現(xiàn)——
connection_made:當(dāng)連接建立時(shí)會(huì)執(zhí)行該函數(shù),該函數(shù)包含一個(gè)Transport類型的參數(shù)。
connection_lost:當(dāng)連接丟失或者關(guān)閉時(shí)會(huì)執(zhí)行該函數(shù),該函數(shù)包含一個(gè)Exception類型的參數(shù)。
pause_writing:當(dāng)Transport對(duì)象寫入的數(shù)據(jù)高于之前設(shè)置的高水位時(shí)被調(diào)用,一般會(huì)暫停數(shù)據(jù)的寫入。
resume_writing:當(dāng)Transport對(duì)象寫入的數(shù)據(jù)低于之前設(shè)置的低水位時(shí)被調(diào)用,一般用于恢復(fù)數(shù)據(jù)寫入。
data_received:當(dāng)有數(shù)據(jù)被接受時(shí)回調(diào),該函數(shù)包含一個(gè)二進(jìn)制對(duì)象data,用來(lái)表示接受的數(shù)據(jù)。
eof_received:當(dāng)被Transport對(duì)象被調(diào)用write_eof時(shí)被調(diào)用。
在websockets中,server端的connection_made實(shí)現(xiàn)截圖如圖所示。在該函數(shù)中,websockets將用戶實(shí)現(xiàn)的handler封裝成task對(duì)象,并和websocket的server綁定。
而在client端中實(shí)現(xiàn)如第一節(jié)截圖所示,只是在reader中注冊(cè)該Transport對(duì)象。
websockets的connection_lost函數(shù)實(shí)現(xiàn)方式如下。主要操作即更新?tīng)顟B(tài)、關(guān)閉pings、更新對(duì)應(yīng)的waiter狀態(tài),以及維護(hù)reader對(duì)象。
在其他函數(shù)的實(shí)現(xiàn)中,websockets也主要用到了reader對(duì)象完成數(shù)據(jù)流的暫停和恢復(fù),以及數(shù)據(jù)的寫入。
從上面代碼實(shí)現(xiàn)可以看出,websockets通過(guò)reader代理完成數(shù)據(jù)流的操作。這個(gè)reader是一個(gè)asyncio.StreamReader對(duì)象。這個(gè)對(duì)象具體如何使用將在下一篇介紹。
附錄:進(jìn)階版本:
python使用websockets庫(kù)
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個(gè)websocket。
connect: 在client端使用,用于建立連接。
send:發(fā)送數(shù)據(jù)
recv:接收數(shù)據(jù)
close:關(guān)閉連接
服務(wù)端
#!/usr/bin/python3 # 主要功能:創(chuàng)建1個(gè)基本的websocket server, 符合asyncio 開(kāi)發(fā)要求 import asyncio import websockets from datetime import datetime async def handler(websocket): data = await websocket.recv() reply = f"Data received as \"{data}\". time: {datetime.now()}" print(reply) await websocket.send(reply) print("Send reply") async def main(): async with websockets.serve(handler, "localhost", 9999): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())
客戶端
import asyncio import websockets import time async def ws_client(url): for i in range(1, 40): async with websockets.connect(url) as websocket: await websocket.send("Hello, I am PyPy.") response = await websocket.recv() print(response) time.sleep(1) asyncio.run(ws_client('ws://localhost:9999'))
服務(wù)端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" # 握手,通過(guò)接收Hi,發(fā)送"success"來(lái)進(jìn)行雙方的握手。 async def serverHands(websocket): while True: recv_text = await websocket.recv() print("recv_text=" + recv_text) if recv_text == "Hi": print("connected success") await websocket.send("success") return True else: await websocket.send("connected fail") # 接收從客戶端發(fā)來(lái)的消息并處理,再返給客戶端success async def serverRecv(websocket): while True: recv_text = await websocket.recv() print("recv:", recv_text) await websocket.send("success,get mess:"+ recv_text) # 握手并且接收數(shù)據(jù) async def serverRun(websocket, path): print(path) await serverHands(websocket) await serverRecv(websocket) # main function if __name__ == '__main__': print("======server======") server = websockets.serve(serverRun, IP_ADDR, IP_PORT) asyncio.get_event_loop().run_until_complete(server) asyncio.get_event_loop().run_forever()
客戶端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" async def clientHands(websocket): while True: # 通過(guò)發(fā)送hello握手 await websocket.send("Hi") response_str = await websocket.recv() # 接收"success"來(lái)進(jìn)行雙方的握手 if "success" in response_str: print("握手成功") return True # 向服務(wù)器端發(fā)送消息 async def clientSend(websocket): while True: input_text = input("input text: ") if input_text == "exit": print(f'"exit", bye!') await websocket.close(reason="exit") return False await websocket.send(input_text) recv_text = await websocket.recv() print(f"{recv_text}") # 進(jìn)行websocket連接 async def clientRun(): ipaddress = IP_ADDR + ":" + IP_PORT async with websockets.connect("ws://" + ipaddress) as websocket: await clientHands(websocket) await clientSend(websocket) # main function if __name__ == '__main__': print("======client======") asyncio.get_event_loop().run_until_complete(clientRun())
服務(wù)端
# -*- coding:utf8 -*- import json import socket import asyncio import logging import websockets import multiprocessing IP = '127.0.0.1' PORT_CHAT = 9090 USERS ={} #提供聊天的后臺(tái) async def ServerWs(websocket,path): logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', filename="chat.log", level=logging.INFO) # 握手 await websocket.send(json.dumps({"type": "handshake"})) async for message in websocket: data = json.loads(message) message = '' # 用戶發(fā)信息 if data["type"] == 'send': name = '404' for k, v in USERS.items(): if v == websocket: name = k data["from"] = name if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "user", "content": data["content"], "from": name}) # 用戶注冊(cè) elif data["type"] == 'register': try: USERS[data["uuid"]] = websocket if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "login", "content": data["content"], "user_list": list(USERS.keys())}) except Exception as exp: print(exp) # 用戶注銷 elif data["type"] == 'unregister': del USERS[data["uuid"]] if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())}) #打印日志 logging.info(data) # 群發(fā) await asyncio.wait([user.send(message) for user in USERS.values()]) def server_run(): print("server") start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() if __name__ == "__main__": from multiprocessing import Process multiprocessing.freeze_support() server = Process(target=server_run, daemon=False) server.start()
服務(wù)端
import asyncio import websockets import time import json import threading # 功能模塊 class OutputHandler(): async def run(self,message,send_ms,websocket): # 用戶發(fā)信息 await send_ms(message, websocket) # 單發(fā)消息 # await send_ms(message, websocket) # 群發(fā)消息 #await s('hi起來(lái)') # 存儲(chǔ)所有的客戶端 Clients = {} # 服務(wù)端 class WS_Server(): def __init__(self): self.ip = "127.0.0.1" self.port = 9090 # 回調(diào)函數(shù)(發(fā)消息給客戶端) async def callback_send(self, msg, websocket=None): await self.sendMsg(msg, websocket) # 發(fā)送消息 async def sendMsg(self, msg, websocket): print('sendMsg:', msg) # websocket不為空,單發(fā),為空,群發(fā)消息 if websocket != None: await websocket.send(msg) else: # 群發(fā)消息 await self.broadcastMsg(msg) # 避免被卡線程 await asyncio.sleep(0.2) # 群發(fā)消息 async def broadcastMsg(self, msg): for user in Clients: await user.send(msg) # 針對(duì)不同的信息進(jìn)行請(qǐng)求,可以考慮json文本 async def runCaseX(self,jsonMsg,websocket): print('runCase') op = OutputHandler() # 參數(shù):消息、方法、socket await op.run(jsonMsg,self.callback_send,websocket) # 連接一個(gè)客戶端,起一個(gè)循環(huán)監(jiān)聽(tīng) async def echo(self,websocket, path): # 添加到客戶端列表 # Clients.append(websocket) # 握手 await websocket.send(json.dumps({"type": "handshake"})) # 循環(huán)監(jiān)聽(tīng) while True: # 接受信息 try: # 接受文本 recv_text = await websocket.recv() message = "Get message: {}".format(recv_text) # 返回客戶端信息 await websocket.send(message) # 轉(zhuǎn)json data = json.loads(recv_text) # 用戶發(fā)信息 if data["type"] == 'send': name = '404' for k, v in Clients.items(): if v == websocket: name = k data["from"] = name if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "send", "content": data["content"], "from": name}) await self.runCaseX(jsonMsg=message, websocket=websocket) # 用戶注冊(cè) elif data["type"] == 'register': try: Clients[data["uuid"]] = websocket if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())}) await self.runCaseX(jsonMsg=message, websocket=websocket) except Exception as exp: print(exp) # 用戶注銷 elif data["type"] == 'unregister': del Clients[data["uuid"]] # 對(duì)message進(jìn)行解析,跳進(jìn)不同功能區(qū) # await self.runCaseX(jsonMsg=data,websocket=websocket) # 鏈接斷開(kāi) except websockets.ConnectionClosed: print("ConnectionClosed...", path) # del Clients break # 無(wú)效狀態(tài) except websockets.InvalidState: print("InvalidState...") # del Clients break # 報(bào)錯(cuò) except Exception as e: print("ws連接報(bào)錯(cuò)",e) # del Clients break # 啟動(dòng)服務(wù)器 async def runServer(self): async with websockets.serve(self.echo, self.ip, self.port): await asyncio.Future() # run forever # 多協(xié)程模式,防止阻塞主線程無(wú)法做其他事情 def WebSocketServer(self): asyncio.run(self.runServer()) # 多線程啟動(dòng) def startServer(self): # 多線程啟動(dòng),否則會(huì)堵塞 thread = threading.Thread(target=self.WebSocketServer) thread.start() # thread.join() if __name__=='__main__': print("server") s = WS_Server() s.startServer()
到此這篇關(guān)于python的websocket方法教程的文章就介紹到這了,更多相關(guān)python的websocket方法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 使用python構(gòu)建WebSocket客戶端的教程詳解
- python使用websocket庫(kù)發(fā)送WSS請(qǐng)求
- Python中使用dwebsocket實(shí)現(xiàn)后端數(shù)據(jù)實(shí)時(shí)刷新
- 使用Python創(chuàng)建websocket服務(wù)端并給出不同客戶端的請(qǐng)求
- python開(kāi)發(fā)實(shí)例之python使用Websocket庫(kù)開(kāi)發(fā)簡(jiǎn)單聊天工具實(shí)例詳解(python+Websocket+JS)
- Python Websocket服務(wù)端通信的使用示例
- Python如何使用WebSocket實(shí)現(xiàn)實(shí)時(shí)Web應(yīng)用
相關(guān)文章
使用python進(jìn)行PostgreSQL數(shù)據(jù)庫(kù)連接全過(guò)程
這篇文章主要介紹了使用python進(jìn)行PostgreSQL數(shù)據(jù)庫(kù)連接的相關(guān)資料,包括安裝psycopg2模塊、使用PyCharm進(jìn)行圖形化連接、代碼連接數(shù)據(jù)庫(kù)的方法、以及如何執(zhí)行DML和DQL操作,需要的朋友可以參考下2025-03-03python3使用libpcap庫(kù)進(jìn)行抓包及數(shù)據(jù)處理的操作方法
這篇文章主要介紹了python3使用libpcap庫(kù)進(jìn)行抓包及數(shù)據(jù)處理,需要的朋友可以參考下2022-10-10Keras自動(dòng)下載的數(shù)據(jù)集/模型存放位置介紹
這篇文章主要介紹了Keras自動(dòng)下載的數(shù)據(jù)集/模型存放位置介紹,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06python對(duì)視頻畫框標(biāo)記后保存的方法
今天小編就為大家分享一篇python對(duì)視頻畫框標(biāo)記后保存的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-11-11Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享
這篇文章主要介紹了Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01