python中使用websocket方法實(shí)例詳解
WebSocket是一種網(wǎng)絡(luò)通信協(xié)議,它在單個(gè)TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實(shí)現(xiàn)實(shí)時(shí)通信。
websockets是Python中最常用的網(wǎng)絡(luò)庫(kù)之一,也是websocket協(xié)議的Python實(shí)現(xiàn)。它不僅作為基礎(chǔ)組件在眾多項(xiàng)目中發(fā)揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網(wǎng):https://github.com/python-websockets/websockets
1. 什么是WebSocket?
WebSocket協(xié)議是在2008年由Web應(yīng)用程序設(shè)計(jì)師和開發(fā)人員創(chuàng)建的,目的是為了在Web瀏覽器和服務(wù)器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務(wù)器在任何時(shí)候發(fā)送消息,無需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務(wù)器之間傳輸文本和二進(jìn)制數(shù)據(jù),使得構(gòu)建實(shí)時(shí)Web應(yīng)用程序變得更加簡(jiǎn)單。
2. 在Python中使用WebSocket
Python中有多個(gè)庫(kù)可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫(kù)來演示W(wǎng)ebSocket編程。
要安裝websockets庫(kù),你可以使用pip:
pip install websockets
3. 創(chuàng)建WebSocket服務(wù)器
使用websockets庫(kù),我們可以輕松地創(chuàng)建一個(gè)WebSocket服務(wù)器。以下是一個(gè)簡(jiǎn)單的示例:
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(f"Received message: {message}")
await websocket.send(f"Echo: {message}")
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()在這個(gè)示例中,我們定義了一個(gè)名為echo的協(xié)程函數(shù),它接收兩個(gè)參數(shù):websocket和path。該函數(shù)使用async for循環(huán)讀取客戶端發(fā)送的消息,并將消息發(fā)送回客戶端。
然后,我們使用websockets.serve()函數(shù)創(chuàng)建一個(gè)WebSocket服務(wù)器,監(jiān)聽本地主機(jī)的8765端口。最后,我們使用asyncio的事件循環(huán)啟動(dòng)服務(wù)器。
4. 創(chuàng)建WebSocket客戶端
要?jiǎng)?chuàng)建一個(gè)WebSocket客戶端,我們同樣可以使用websockets庫(kù)。以下是一個(gè)簡(jiǎn)單的客戶端示例:
import asyncio
import websockets
async def main():
async with websockets.connect("ws://localhost:8765") as websocket:
message = "Hello, server!"
await websocket.send(message)
print(f"Sent: {message}")
response = await websocket.recv()
print(f"Received: {response}")
asyncio.run(main())在這個(gè)示例中,我們使用websockets.connect()函數(shù)建立與WebSocket服務(wù)器的連接。然后,我們使用send()方法向服務(wù)器發(fā)送消息,并使用recv()方法接收服務(wù)器的響應(yīng)。
5. 總結(jié)
WebSocket協(xié)議為Web瀏覽器和服務(wù)器之間提供了實(shí)時(shí)雙向通信的能力,使得構(gòu)建實(shí)時(shí)Web應(yīng)用程序變得更加容易。在Python中,我們可以使用websockets庫(kù)輕松地實(shí)現(xiàn)WebSocket編程。
6. 通過websockets這個(gè)項(xiàng)目,從大型開源項(xiàng)目中學(xué)習(xí)asyncio庫(kù)。
一、asyncio.Transport
在官方文檔中,Transport被描述成對(duì)socket的抽象,它控制著如何傳輸數(shù)據(jù)。除了websockets,uvicorn、daphne等ASGI實(shí)現(xiàn)都會(huì)用到Transport。
Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼?zhèn)渥x和寫的功能,可以類比為讀寫socket對(duì)象。

Transport對(duì)象提供以下常用函數(shù)——
is_reading:判斷該Transport是否在讀。
set_write_buffer_limits:設(shè)置寫入Transport的高和低水位。考慮到網(wǎng)絡(luò)狀況,有時(shí)不希望寫入過多的數(shù)據(jù)。
write、write_eof、write_line:為當(dāng)前Transport寫入數(shù)據(jù),分別表示寫入二進(jìn)制數(shù)據(jù)、eof和二進(jìn)制行數(shù)據(jù)。其中eof寫入后不會(huì)關(guān)閉Transport,但會(huì)flush數(shù)據(jù)。
abort:立刻關(guān)閉Transport,不接受新的數(shù)據(jù)。留在緩沖的數(shù)據(jù)也會(huì)丟失,后續(xù)調(diào)用Protocol的connection_lost函數(shù)。
在websockets中,Transport使用場(chǎng)景不多,一般都是通過Protocol對(duì)象的回調(diào)參數(shù)使用的。在websocket的初始化過程中,會(huì)設(shè)置Transport的最高水位。同樣,在這種場(chǎng)景下,該對(duì)象也是作為回調(diào)參數(shù)使用的。

二、asyncio.Protocol
如果Transport是對(duì)socket的抽象,那么Protocol就是對(duì)協(xié)議的抽象。它提供了如何使用Transport的方式。

用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個(gè)Unimplemented函數(shù)需要用戶去實(shí)現(xiàn)——
connection_made:當(dāng)連接建立時(shí)會(huì)執(zhí)行該函數(shù),該函數(shù)包含一個(gè)Transport類型的參數(shù)。
connection_lost:當(dāng)連接丟失或者關(guān)閉時(shí)會(huì)執(zhí)行該函數(shù),該函數(shù)包含一個(gè)Exception類型的參數(shù)。
pause_writing:當(dāng)Transport對(duì)象寫入的數(shù)據(jù)高于之前設(shè)置的高水位時(shí)被調(diào)用,一般會(huì)暫停數(shù)據(jù)的寫入。
resume_writing:當(dāng)Transport對(duì)象寫入的數(shù)據(jù)低于之前設(shè)置的低水位時(shí)被調(diào)用,一般用于恢復(fù)數(shù)據(jù)寫入。
data_received:當(dāng)有數(shù)據(jù)被接受時(shí)回調(diào),該函數(shù)包含一個(gè)二進(jìn)制對(duì)象data,用來表示接受的數(shù)據(jù)。
eof_received:當(dāng)被Transport對(duì)象被調(diào)用write_eof時(shí)被調(diào)用。
在websockets中,server端的connection_made實(shí)現(xiàn)截圖如圖所示。在該函數(shù)中,websockets將用戶實(shí)現(xiàn)的handler封裝成task對(duì)象,并和websocket的server綁定。

而在client端中實(shí)現(xiàn)如第一節(jié)截圖所示,只是在reader中注冊(cè)該Transport對(duì)象。
websockets的connection_lost函數(shù)實(shí)現(xiàn)方式如下。主要操作即更新狀態(tài)、關(guān)閉pings、更新對(duì)應(yīng)的waiter狀態(tài),以及維護(hù)reader對(duì)象。

在其他函數(shù)的實(shí)現(xiàn)中,websockets也主要用到了reader對(duì)象完成數(shù)據(jù)流的暫停和恢復(fù),以及數(shù)據(jù)的寫入。
從上面代碼實(shí)現(xiàn)可以看出,websockets通過reader代理完成數(shù)據(jù)流的操作。這個(gè)reader是一個(gè)asyncio.StreamReader對(duì)象。這個(gè)對(duì)象具體如何使用將在下一篇介紹。
附錄:進(jìn)階版本:
python使用websockets庫(kù)
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個(gè)websocket。
connect: 在client端使用,用于建立連接。
send:發(fā)送數(shù)據(jù)
recv:接收數(shù)據(jù)
close:關(guān)閉連接
服務(wù)端
#!/usr/bin/python3
# 主要功能:創(chuàng)建1個(gè)基本的websocket server, 符合asyncio 開發(fā)要求
import asyncio
import websockets
from datetime import datetime
async def handler(websocket):
data = await websocket.recv()
reply = f"Data received as \"{data}\". time: {datetime.now()}"
print(reply)
await websocket.send(reply)
print("Send reply")
async def main():
async with websockets.serve(handler, "localhost", 9999):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())客戶端
import asyncio
import websockets
import time
async def ws_client(url):
for i in range(1, 40):
async with websockets.connect(url) as websocket:
await websocket.send("Hello, I am PyPy.")
response = await websocket.recv()
print(response)
time.sleep(1)
asyncio.run(ws_client('ws://localhost:9999'))服務(wù)端
import asyncio
import websockets
IP_ADDR = "127.0.0.1"
IP_PORT = "9090"
# 握手,通過接收Hi,發(fā)送"success"來進(jìn)行雙方的握手。
async def serverHands(websocket):
while True:
recv_text = await websocket.recv()
print("recv_text=" + recv_text)
if recv_text == "Hi":
print("connected success")
await websocket.send("success")
return True
else:
await websocket.send("connected fail")
# 接收從客戶端發(fā)來的消息并處理,再返給客戶端success
async def serverRecv(websocket):
while True:
recv_text = await websocket.recv()
print("recv:", recv_text)
await websocket.send("success,get mess:"+ recv_text)
# 握手并且接收數(shù)據(jù)
async def serverRun(websocket, path):
print(path)
await serverHands(websocket)
await serverRecv(websocket)
# main function
if __name__ == '__main__':
print("======server======")
server = websockets.serve(serverRun, IP_ADDR, IP_PORT)
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()客戶端
import asyncio
import websockets
IP_ADDR = "127.0.0.1"
IP_PORT = "9090"
async def clientHands(websocket):
while True:
# 通過發(fā)送hello握手
await websocket.send("Hi")
response_str = await websocket.recv()
# 接收"success"來進(jìn)行雙方的握手
if "success" in response_str:
print("握手成功")
return True
# 向服務(wù)器端發(fā)送消息
async def clientSend(websocket):
while True:
input_text = input("input text: ")
if input_text == "exit":
print(f'"exit", bye!')
await websocket.close(reason="exit")
return False
await websocket.send(input_text)
recv_text = await websocket.recv()
print(f"{recv_text}")
# 進(jìn)行websocket連接
async def clientRun():
ipaddress = IP_ADDR + ":" + IP_PORT
async with websockets.connect("ws://" + ipaddress) as websocket:
await clientHands(websocket)
await clientSend(websocket)
# main function
if __name__ == '__main__':
print("======client======")
asyncio.get_event_loop().run_until_complete(clientRun())服務(wù)端
# -*- coding:utf8 -*-
import json
import socket
import asyncio
import logging
import websockets
import multiprocessing
IP = '127.0.0.1'
PORT_CHAT = 9090
USERS ={}
#提供聊天的后臺(tái)
async def ServerWs(websocket,path):
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
filename="chat.log",
level=logging.INFO)
# 握手
await websocket.send(json.dumps({"type": "handshake"}))
async for message in websocket:
data = json.loads(message)
message = ''
# 用戶發(fā)信息
if data["type"] == 'send':
name = '404'
for k, v in USERS.items():
if v == websocket:
name = k
data["from"] = name
if len(USERS) != 0: # asyncio.wait doesn't accept an empty list
message = json.dumps(
{"type": "user", "content": data["content"], "from": name})
# 用戶注冊(cè)
elif data["type"] == 'register':
try:
USERS[data["uuid"]] = websocket
if len(USERS) != 0: # asyncio.wait doesn't accept an empty list
message = json.dumps(
{"type": "login", "content": data["content"], "user_list": list(USERS.keys())})
except Exception as exp:
print(exp)
# 用戶注銷
elif data["type"] == 'unregister':
del USERS[data["uuid"]]
if len(USERS) != 0: # asyncio.wait doesn't accept an empty list
message = json.dumps(
{"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})
#打印日志
logging.info(data)
# 群發(fā)
await asyncio.wait([user.send(message) for user in USERS.values()])
def server_run():
print("server")
start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
if __name__ == "__main__":
from multiprocessing import Process
multiprocessing.freeze_support()
server = Process(target=server_run, daemon=False)
server.start()服務(wù)端
import asyncio
import websockets
import time
import json
import threading
# 功能模塊
class OutputHandler():
async def run(self,message,send_ms,websocket):
# 用戶發(fā)信息
await send_ms(message, websocket)
# 單發(fā)消息
# await send_ms(message, websocket)
# 群發(fā)消息
#await s('hi起來')
# 存儲(chǔ)所有的客戶端
Clients = {}
# 服務(wù)端
class WS_Server():
def __init__(self):
self.ip = "127.0.0.1"
self.port = 9090
# 回調(diào)函數(shù)(發(fā)消息給客戶端)
async def callback_send(self, msg, websocket=None):
await self.sendMsg(msg, websocket)
# 發(fā)送消息
async def sendMsg(self, msg, websocket):
print('sendMsg:', msg)
# websocket不為空,單發(fā),為空,群發(fā)消息
if websocket != None:
await websocket.send(msg)
else:
# 群發(fā)消息
await self.broadcastMsg(msg)
# 避免被卡線程
await asyncio.sleep(0.2)
# 群發(fā)消息
async def broadcastMsg(self, msg):
for user in Clients:
await user.send(msg)
# 針對(duì)不同的信息進(jìn)行請(qǐng)求,可以考慮json文本
async def runCaseX(self,jsonMsg,websocket):
print('runCase')
op = OutputHandler()
# 參數(shù):消息、方法、socket
await op.run(jsonMsg,self.callback_send,websocket)
# 連接一個(gè)客戶端,起一個(gè)循環(huán)監(jiān)聽
async def echo(self,websocket, path):
# 添加到客戶端列表
# Clients.append(websocket)
# 握手
await websocket.send(json.dumps({"type": "handshake"}))
# 循環(huán)監(jiān)聽
while True:
# 接受信息
try:
# 接受文本
recv_text = await websocket.recv()
message = "Get message: {}".format(recv_text)
# 返回客戶端信息
await websocket.send(message)
# 轉(zhuǎn)json
data = json.loads(recv_text)
# 用戶發(fā)信息
if data["type"] == 'send':
name = '404'
for k, v in Clients.items():
if v == websocket:
name = k
data["from"] = name
if len(Clients) != 0: # asyncio.wait doesn't accept an empty list
message = json.dumps({"type": "send", "content": data["content"], "from": name})
await self.runCaseX(jsonMsg=message, websocket=websocket)
# 用戶注冊(cè)
elif data["type"] == 'register':
try:
Clients[data["uuid"]] = websocket
if len(Clients) != 0: # asyncio.wait doesn't accept an empty list
message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})
await self.runCaseX(jsonMsg=message, websocket=websocket)
except Exception as exp:
print(exp)
# 用戶注銷
elif data["type"] == 'unregister':
del Clients[data["uuid"]]
# 對(duì)message進(jìn)行解析,跳進(jìn)不同功能區(qū)
# await self.runCaseX(jsonMsg=data,websocket=websocket)
# 鏈接斷開
except websockets.ConnectionClosed:
print("ConnectionClosed...", path)
# del Clients
break
# 無效狀態(tài)
except websockets.InvalidState:
print("InvalidState...")
# del Clients
break
# 報(bào)錯(cuò)
except Exception as e:
print("ws連接報(bào)錯(cuò)",e)
# del Clients
break
# 啟動(dòng)服務(wù)器
async def runServer(self):
async with websockets.serve(self.echo, self.ip, self.port):
await asyncio.Future() # run forever
# 多協(xié)程模式,防止阻塞主線程無法做其他事情
def WebSocketServer(self):
asyncio.run(self.runServer())
# 多線程啟動(dòng)
def startServer(self):
# 多線程啟動(dòng),否則會(huì)堵塞
thread = threading.Thread(target=self.WebSocketServer)
thread.start()
# thread.join()
if __name__=='__main__':
print("server")
s = WS_Server()
s.startServer()到此這篇關(guān)于python的websocket方法教程的文章就介紹到這了,更多相關(guān)python的websocket方法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 使用python構(gòu)建WebSocket客戶端的教程詳解
- python使用websocket庫(kù)發(fā)送WSS請(qǐng)求
- Python中使用dwebsocket實(shí)現(xiàn)后端數(shù)據(jù)實(shí)時(shí)刷新
- 使用Python創(chuàng)建websocket服務(wù)端并給出不同客戶端的請(qǐng)求
- python開發(fā)實(shí)例之python使用Websocket庫(kù)開發(fā)簡(jiǎn)單聊天工具實(shí)例詳解(python+Websocket+JS)
- Python Websocket服務(wù)端通信的使用示例
- Python如何使用WebSocket實(shí)現(xiàn)實(shí)時(shí)Web應(yīng)用
相關(guān)文章
使用python進(jìn)行PostgreSQL數(shù)據(jù)庫(kù)連接全過程
這篇文章主要介紹了使用python進(jìn)行PostgreSQL數(shù)據(jù)庫(kù)連接的相關(guān)資料,包括安裝psycopg2模塊、使用PyCharm進(jìn)行圖形化連接、代碼連接數(shù)據(jù)庫(kù)的方法、以及如何執(zhí)行DML和DQL操作,需要的朋友可以參考下2025-03-03
python3使用libpcap庫(kù)進(jìn)行抓包及數(shù)據(jù)處理的操作方法
這篇文章主要介紹了python3使用libpcap庫(kù)進(jìn)行抓包及數(shù)據(jù)處理,需要的朋友可以參考下2022-10-10
Keras自動(dòng)下載的數(shù)據(jù)集/模型存放位置介紹
這篇文章主要介紹了Keras自動(dòng)下載的數(shù)據(jù)集/模型存放位置介紹,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06
python對(duì)視頻畫框標(biāo)記后保存的方法
今天小編就為大家分享一篇python對(duì)視頻畫框標(biāo)記后保存的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-12-12
pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個(gè)單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-11-11
Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享
這篇文章主要介紹了Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01

