欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python Websockets庫的使用指南

 更新時(shí)間:2025年04月04日 08:58:19   作者:老胖閑聊  
python websockets庫是一個(gè)用于創(chuàng)建WebSocket服務(wù)器和客戶端的Python庫,它提供了一種簡單的方式來實(shí)現(xiàn)實(shí)時(shí)通信,支持異步和同步操作,并且易于使用,本文給大家介紹了Python Websockets庫的使用指南,需要的朋友可以參考下

一、WebSocket 簡介

WebSocket 是一種基于 TCP 的協(xié)議,支持全雙工通信(服務(wù)器和客戶端可以同時(shí)發(fā)送消息),適用于實(shí)時(shí)性要求高的場景(如聊天、實(shí)時(shí)數(shù)據(jù)監(jiān)控、在線游戲等)。與 HTTP 不同,WebSocket 連接一旦建立,會(huì)保持持久化,避免了 HTTP 輪詢的開銷。

二、Python 的 websockets 庫

websockets 是 Python 中用于構(gòu)建 WebSocket 服務(wù)器和客戶端的異步庫,基于 asyncio 實(shí)現(xiàn),支持高并發(fā)和低延遲通信。

安裝

pip install websockets

三、完整代碼示例

1. WebSocket 服務(wù)器

import asyncio
import websockets
from datetime import datetime

# 保存所有活躍的客戶端連接
connected_clients = set()

async def handle_client(websocket, path):
    """
    處理單個(gè)客戶端連接
    """
    # 將新客戶端添加到集合
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            # 廣播消息給所有客戶端
            timestamp = datetime.now().strftime("%H:%M:%S")
            broadcast_message = f"[{timestamp}] {message}"
            await broadcast(broadcast_message)
    except websockets.exceptions.ConnectionClosed:
        print("客戶端斷開連接")
    finally:
        # 客戶端斷開后移除
        connected_clients.remove(websocket)

async def broadcast(message):
    """
    向所有連接的客戶端廣播消息
    """
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients]
        )

async def start_server():
    """
    啟動(dòng) WebSocket 服務(wù)器
    """
    async with websockets.serve(handle_client, "localhost", 8765):
        print("服務(wù)器已啟動(dòng),監(jiān)聽端口 8765...")
        await asyncio.Future()  # 永久運(yùn)行

if __name__ == "__main__":
    asyncio.run(start_server())

2. WebSocket 客戶端

import asyncio
import websockets

async def client():
    """
    WebSocket 客戶端實(shí)現(xiàn)
    """
    async with websockets.connect("ws://localhost:8765") as websocket:
        # 創(chuàng)建兩個(gè)任務(wù):一個(gè)接收消息,一個(gè)發(fā)送消息
        receive_task = asyncio.create_task(receive_messages(websocket))
        send_task = asyncio.create_task(send_messages(websocket))
        await asyncio.gather(receive_task, send_task)

async def receive_messages(websocket):
    """
    接收服務(wù)器消息
    """
    async for message in websocket:
        print(f"\n收到消息: {message}")

async def send_messages(websocket):
    """
    發(fā)送用戶輸入的消息
    """
    while True:
        message = await asyncio.get_event_loop().run_in_executor(None, input, "輸入消息 (輸入 'exit' 退出): ")
        if message.strip().lower() == 'exit':
            break
        await websocket.send(message)

if __name__ == "__main__":
    asyncio.run(client())

四、代碼詳解

服務(wù)器端

  1. connected_clients:使用集合保存所有活躍連接,便于廣播。
  2. handle_client
    • 接受新連接并添加到集合。
    • 循環(huán)接收客戶端消息,收到后調(diào)用 broadcast 廣播。
    • 處理連接斷開異常,確??蛻舳吮灰瞥?。
  3. broadcast:使用 asyncio.gather 并發(fā)向所有客戶端發(fā)送消息。
  4. start_server:啟動(dòng)服務(wù)器并永久運(yùn)行。

客戶端

  1. client:連接服務(wù)器并啟動(dòng)收發(fā)任務(wù)。
  2. receive_messages:循環(huán)接收服務(wù)器消息并打印。
  3. send_messages:讀取用戶輸入,發(fā)送到服務(wù)器(run_in_executor 用于異步處理阻塞的 input)。

五、運(yùn)行與測試

  • 啟動(dòng)服務(wù)器
python server.py
  • 啟動(dòng)多個(gè)客戶端
python client.py
  • 測試:在任一客戶端輸入消息,所有客戶端都會(huì)收到廣播。

六、高級用法

  1. 消息協(xié)議:可定義 JSON 格式的消息,添加類型字段(如 {"type": "chat", "content": "Hello"})。
  2. 認(rèn)證:在連接時(shí)驗(yàn)證 Token(通過 path 參數(shù)或首次握手消息)。
  3. 心跳機(jī)制:定期發(fā)送 Ping/Pong 保持連接活躍。
  4. 集成 FastAPI:結(jié)合 ASGI 框架提供 HTTP + WebSocket 混合服務(wù)。

七、應(yīng)用場景

  1. 實(shí)時(shí)聊天應(yīng)用
  2. 多人在線游戲
  3. 股票行情推送
  4. 物聯(lián)網(wǎng)設(shè)備監(jiān)控

八、總結(jié)

通過 websockets 庫,可以輕松構(gòu)建高性能的實(shí)時(shí)應(yīng)用。本文提供的代碼示例覆蓋了服務(wù)器和客戶端的基本實(shí)現(xiàn),注釋詳細(xì),可直接擴(kuò)展用于實(shí)際項(xiàng)目。

到此這篇關(guān)于Python Websockets庫的使用指南的文章就介紹到這了,更多相關(guān)Python Websockets庫使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • numpy中的log和ln函數(shù)解讀

    numpy中的log和ln函數(shù)解讀

    這篇文章主要介紹了numpy中的log和ln函數(shù)解讀,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Python入門教程(三十四)Python的文件處理

    Python入門教程(三十四)Python的文件處理

    這篇文章主要介紹了Python入門教程(三十四)Python的文件處理,在Python中處理文件的主要是open()函數(shù),接下來我們就來一起看看open()函數(shù)的用法吧,需要的朋友可以參考下
    2023-05-05
  • 詳解在python操作數(shù)據(jù)庫中游標(biāo)的使用方法

    詳解在python操作數(shù)據(jù)庫中游標(biāo)的使用方法

    這篇文章主要介紹了在python操作數(shù)據(jù)庫中游標(biāo)的使用方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-11-11
  • python?@property?裝飾器使用方法

    python?@property?裝飾器使用方法

    這篇文章主要介紹了python?@property?裝飾器使用詳細(xì),使用property可以講類的方法變成同名屬性,使用起來更加簡潔,下文最后舉例說明詳情說明需要的小伙伴可以參考一下
    2022-03-03
  • 如何利用Python快速繪制海報(bào)級別地圖詳解

    如何利用Python快速繪制海報(bào)級別地圖詳解

    Python之所以這么流行,是因?yàn)樗粌H能夠應(yīng)用于科技領(lǐng)域,還能用來做許多其他學(xué)科的研究工具,最常見的便是繪制地圖,這篇文章主要給大家介紹了關(guān)于如何利用Python快速繪制海報(bào)級別地圖的相關(guān)資料,需要的朋友可以參考下
    2021-09-09
  • Python?列表(list)的常用方法

    Python?列表(list)的常用方法

    這篇文章主要介紹了Python?列表(list)的常用方法,這節(jié)主要講列表,列表用于存儲任意數(shù)目、任意類型的數(shù)據(jù)集合,本文通過示例代碼給大家詳細(xì)講解,需要的朋友可以參考下
    2023-05-05
  • python插入數(shù)據(jù)到列表的方法

    python插入數(shù)據(jù)到列表的方法

    這篇文章主要介紹了python插入數(shù)據(jù)到列表的方法,涉及Python中insert方法的使用技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2015-04-04
  • flask開啟多線程的具體方法

    flask開啟多線程的具體方法

    在本篇內(nèi)容里小編給大家整理的是一篇關(guān)于flask開啟多線程的具體方法,對此有需求的可以學(xué)習(xí)參考下。
    2020-08-08
  • python用函數(shù)創(chuàng)造字典的實(shí)例講解

    python用函數(shù)創(chuàng)造字典的實(shí)例講解

    在本篇文章里小編給大家整理的是一篇關(guān)于python用函數(shù)創(chuàng)造字典的實(shí)例講解內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。
    2021-06-06
  • 一文詳解Python中生成器的原理與使用

    一文詳解Python中生成器的原理與使用

    生成器表達(dá)式本質(zhì)上就是一個(gè)迭代器,是定義迭代器的一種方式,是允許自定義邏輯的迭代器。本文將詳細(xì)講解一下Python中生成器的原理與使用,需要的可以參考一下
    2022-05-05

最新評論