欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python中使用websocket方法實例詳解

 更新時間:2024年03月15日 14:20:31   作者:別出BUG求求了  
WebSocket是一種網(wǎng)絡(luò)通信協(xié)議,它在單個TCP連接上提供全雙工的通信信道,本文我們將探討如何在Python中使用WebSocket實現(xiàn)實時通信,感興趣的朋友跟隨小編一起看看吧

WebSocket是一種網(wǎng)絡(luò)通信協(xié)議,它在單個TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實現(xiàn)實時通信。

websockets是Python中最常用的網(wǎng)絡(luò)庫之一,也是websocket協(xié)議的Python實現(xiàn)。它不僅作為基礎(chǔ)組件在眾多項目中發(fā)揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網(wǎng):https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket協(xié)議是在2008年由Web應(yīng)用程序設(shè)計師和開發(fā)人員創(chuàng)建的,目的是為了在Web瀏覽器和服務(wù)器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務(wù)器在任何時候發(fā)送消息,無需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務(wù)器之間傳輸文本和二進(jìn)制數(shù)據(jù),使得構(gòu)建實時Web應(yīng)用程序變得更加簡單。

2. 在Python中使用WebSocket

Python中有多個庫可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫來演示W(wǎng)ebSocket編程。

要安裝websockets庫,你可以使用pip:

pip install websockets

3. 創(chuàng)建WebSocket服務(wù)器

使用websockets庫,我們可以輕松地創(chuàng)建一個WebSocket服務(wù)器。以下是一個簡單的示例:

import asyncio
import websockets
async def echo(websocket, path):
    async for message in websocket:
        print(f"Received message: {message}")
        await websocket.send(f"Echo: {message}")
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在這個示例中,我們定義了一個名為echo的協(xié)程函數(shù),它接收兩個參數(shù):websocket和path。該函數(shù)使用async for循環(huán)讀取客戶端發(fā)送的消息,并將消息發(fā)送回客戶端。

然后,我們使用websockets.serve()函數(shù)創(chuàng)建一個WebSocket服務(wù)器,監(jiān)聽本地主機的8765端口。最后,我們使用asyncio的事件循環(huán)啟動服務(wù)器。

4. 創(chuàng)建WebSocket客戶端

要創(chuàng)建一個WebSocket客戶端,我們同樣可以使用websockets庫。以下是一個簡單的客戶端示例:

import asyncio
import websockets
async def main():
    async with websockets.connect("ws://localhost:8765") as websocket:
        message = "Hello, server!"
        await websocket.send(message)
        print(f"Sent: {message}")
        response = await websocket.recv()
        print(f"Received: {response}")
asyncio.run(main())

在這個示例中,我們使用websockets.connect()函數(shù)建立與WebSocket服務(wù)器的連接。然后,我們使用send()方法向服務(wù)器發(fā)送消息,并使用recv()方法接收服務(wù)器的響應(yīng)。

5. 總結(jié)

WebSocket協(xié)議為Web瀏覽器和服務(wù)器之間提供了實時雙向通信的能力,使得構(gòu)建實時Web應(yīng)用程序變得更加容易。在Python中,我們可以使用websockets庫輕松地實現(xiàn)WebSocket編程。

6. 通過websockets這個項目,從大型開源項目中學(xué)習(xí)asyncio庫。

一、asyncio.Transport
在官方文檔中,Transport被描述成對socket的抽象,它控制著如何傳輸數(shù)據(jù)。除了websockets,uvicorn、daphne等ASGI實現(xiàn)都會用到Transport。

Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼?zhèn)渥x和寫的功能,可以類比為讀寫socket對象。

Transport對象提供以下常用函數(shù)——

is_reading:判斷該Transport是否在讀。

set_write_buffer_limits:設(shè)置寫入Transport的高和低水位??紤]到網(wǎng)絡(luò)狀況,有時不希望寫入過多的數(shù)據(jù)。

write、write_eof、write_line:為當(dāng)前Transport寫入數(shù)據(jù),分別表示寫入二進(jìn)制數(shù)據(jù)、eof和二進(jìn)制行數(shù)據(jù)。其中eof寫入后不會關(guān)閉Transport,但會flush數(shù)據(jù)。

abort:立刻關(guān)閉Transport,不接受新的數(shù)據(jù)。留在緩沖的數(shù)據(jù)也會丟失,后續(xù)調(diào)用Protocol的connection_lost函數(shù)。

在websockets中,Transport使用場景不多,一般都是通過Protocol對象的回調(diào)參數(shù)使用的。在websocket的初始化過程中,會設(shè)置Transport的最高水位。同樣,在這種場景下,該對象也是作為回調(diào)參數(shù)使用的。

二、asyncio.Protocol
如果Transport是對socket的抽象,那么Protocol就是對協(xié)議的抽象。它提供了如何使用Transport的方式。

用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個Unimplemented函數(shù)需要用戶去實現(xiàn)——

connection_made:當(dāng)連接建立時會執(zhí)行該函數(shù),該函數(shù)包含一個Transport類型的參數(shù)。

connection_lost:當(dāng)連接丟失或者關(guān)閉時會執(zhí)行該函數(shù),該函數(shù)包含一個Exception類型的參數(shù)。

pause_writing:當(dāng)Transport對象寫入的數(shù)據(jù)高于之前設(shè)置的高水位時被調(diào)用,一般會暫停數(shù)據(jù)的寫入。

resume_writing:當(dāng)Transport對象寫入的數(shù)據(jù)低于之前設(shè)置的低水位時被調(diào)用,一般用于恢復(fù)數(shù)據(jù)寫入。

data_received:當(dāng)有數(shù)據(jù)被接受時回調(diào),該函數(shù)包含一個二進(jìn)制對象data,用來表示接受的數(shù)據(jù)。

eof_received:當(dāng)被Transport對象被調(diào)用write_eof時被調(diào)用。

在websockets中,server端的connection_made實現(xiàn)截圖如圖所示。在該函數(shù)中,websockets將用戶實現(xiàn)的handler封裝成task對象,并和websocket的server綁定。

而在client端中實現(xiàn)如第一節(jié)截圖所示,只是在reader中注冊該Transport對象。

websockets的connection_lost函數(shù)實現(xiàn)方式如下。主要操作即更新狀態(tài)、關(guān)閉pings、更新對應(yīng)的waiter狀態(tài),以及維護(hù)reader對象。

在其他函數(shù)的實現(xiàn)中,websockets也主要用到了reader對象完成數(shù)據(jù)流的暫停和恢復(fù),以及數(shù)據(jù)的寫入。

從上面代碼實現(xiàn)可以看出,websockets通過reader代理完成數(shù)據(jù)流的操作。這個reader是一個asyncio.StreamReader對象。這個對象具體如何使用將在下一篇介紹。

附錄:進(jìn)階版本:

python使用websockets庫
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個websocket。

connect: 在client端使用,用于建立連接。

send:發(fā)送數(shù)據(jù)

recv:接收數(shù)據(jù)

close:關(guān)閉連接

服務(wù)端

#!/usr/bin/python3
# 主要功能:創(chuàng)建1個基本的websocket server, 符合asyncio 開發(fā)要求
import asyncio
import websockets
from datetime import datetime
async def handler(websocket):
    data = await websocket.recv()
    reply = f"Data received as \"{data}\".  time: {datetime.now()}"
    print(reply)
    await websocket.send(reply)
    print("Send reply")
async def main():
    async with websockets.serve(handler, "localhost", 9999):
        await asyncio.Future()  # run forever
if __name__ == "__main__":
    asyncio.run(main())

客戶端

import asyncio
import websockets
import time
async def ws_client(url):
    for i in range(1, 40):
        async with websockets.connect(url) as websocket:
            await websocket.send("Hello, I am PyPy.")
            response = await websocket.recv()
        print(response)
        time.sleep(1)
asyncio.run(ws_client('ws://localhost:9999'))

服務(wù)端

import asyncio
import websockets
IP_ADDR = "127.0.0.1"
IP_PORT = "9090"
# 握手,通過接收Hi,發(fā)送"success"來進(jìn)行雙方的握手。
async def serverHands(websocket):
    while True:
        recv_text = await websocket.recv()
        print("recv_text=" + recv_text)
        if recv_text == "Hi":
            print("connected success")
            await websocket.send("success")
            return True
        else:
            await websocket.send("connected fail")
# 接收從客戶端發(fā)來的消息并處理,再返給客戶端success
async def serverRecv(websocket):
    while True:
        recv_text = await websocket.recv()
        print("recv:", recv_text)
        await websocket.send("success,get mess:"+ recv_text)
# 握手并且接收數(shù)據(jù)
async def serverRun(websocket, path):
    print(path)
    await serverHands(websocket)
    await serverRecv(websocket)
# main function
if __name__ == '__main__':
    print("======server======")
    server = websockets.serve(serverRun, IP_ADDR, IP_PORT)
    asyncio.get_event_loop().run_until_complete(server)
    asyncio.get_event_loop().run_forever()

客戶端

import asyncio
import websockets
IP_ADDR = "127.0.0.1"
IP_PORT = "9090"
async def clientHands(websocket):
    while True:
        # 通過發(fā)送hello握手
        await websocket.send("Hi")
        response_str = await websocket.recv()
        # 接收"success"來進(jìn)行雙方的握手
        if "success" in response_str:
            print("握手成功")
            return True
# 向服務(wù)器端發(fā)送消息
async def clientSend(websocket):
    while True:
        input_text = input("input text: ")
        if input_text == "exit":
            print(f'"exit", bye!')
            await websocket.close(reason="exit")
            return False
        await websocket.send(input_text)
        recv_text = await websocket.recv()
        print(f"{recv_text}")
# 進(jìn)行websocket連接
async def clientRun():
    ipaddress = IP_ADDR + ":" + IP_PORT
    async with websockets.connect("ws://" + ipaddress) as websocket:
        await clientHands(websocket)
        await clientSend(websocket)
# main function
if __name__ == '__main__':
    print("======client======")
    asyncio.get_event_loop().run_until_complete(clientRun())

服務(wù)端

# -*- coding:utf8 -*-
import json
import socket
import asyncio
import logging
import websockets
import multiprocessing
IP = '127.0.0.1'
PORT_CHAT = 9090
USERS ={}
#提供聊天的后臺
async def ServerWs(websocket,path):
    logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                        filename="chat.log",
                        level=logging.INFO)
    # 握手
    await websocket.send(json.dumps({"type": "handshake"}))
    async for message in websocket:
        data = json.loads(message)
        message = ''
        # 用戶發(fā)信息
        if data["type"] == 'send':
            name = '404'
            for k, v in USERS.items():
                if v == websocket:
                    name = k
            data["from"] = name
            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                message = json.dumps(
                    {"type": "user", "content": data["content"], "from": name})
        # 用戶注冊
        elif data["type"] == 'register':
            try:
                USERS[data["uuid"]] = websocket
                if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                    message = json.dumps(
                        {"type": "login", "content": data["content"], "user_list": list(USERS.keys())})
            except Exception as exp:
                print(exp)
        # 用戶注銷
        elif data["type"] == 'unregister':
            del USERS[data["uuid"]]
            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                message = json.dumps(
                    {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})
        #打印日志
        logging.info(data)
        # 群發(fā)
        await asyncio.wait([user.send(message) for user in USERS.values()])
def server_run():
    print("server")
    start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
if __name__ == "__main__":
    from multiprocessing import Process
    multiprocessing.freeze_support()
    server = Process(target=server_run, daemon=False)
    server.start()

服務(wù)端

import asyncio
import websockets
import time
import json
import threading
# 功能模塊
class OutputHandler():
    async def run(self,message,send_ms,websocket):
        # 用戶發(fā)信息
        await send_ms(message, websocket)
        # 單發(fā)消息
        # await send_ms(message, websocket)
        # 群發(fā)消息
        #await s('hi起來')
# 存儲所有的客戶端
Clients = {}
# 服務(wù)端
class WS_Server():
    def __init__(self):
        self.ip = "127.0.0.1"
        self.port = 9090
    # 回調(diào)函數(shù)(發(fā)消息給客戶端)
    async def callback_send(self, msg, websocket=None):
        await self.sendMsg(msg, websocket)
    # 發(fā)送消息
    async def sendMsg(self, msg, websocket):
        print('sendMsg:', msg)
        # websocket不為空,單發(fā),為空,群發(fā)消息
        if websocket != None:
            await websocket.send(msg)
        else:
            # 群發(fā)消息
            await self.broadcastMsg(msg)
        # 避免被卡線程
        await asyncio.sleep(0.2)
    # 群發(fā)消息
    async def broadcastMsg(self, msg):
        for user in Clients:
            await user.send(msg)
    # 針對不同的信息進(jìn)行請求,可以考慮json文本
    async def runCaseX(self,jsonMsg,websocket):
        print('runCase')
        op = OutputHandler()
        # 參數(shù):消息、方法、socket
        await op.run(jsonMsg,self.callback_send,websocket)
    # 連接一個客戶端,起一個循環(huán)監(jiān)聽
    async def echo(self,websocket, path):
        # 添加到客戶端列表
        # Clients.append(websocket)
        # 握手
        await websocket.send(json.dumps({"type": "handshake"}))
        # 循環(huán)監(jiān)聽
        while True:
            # 接受信息
            try:
                # 接受文本
                recv_text = await websocket.recv()
                message = "Get message: {}".format(recv_text)
                # 返回客戶端信息
                await websocket.send(message)
                # 轉(zhuǎn)json
                data = json.loads(recv_text)
                # 用戶發(fā)信息
                if data["type"] == 'send':
                    name = '404'
                    for k, v in Clients.items():
                        if v == websocket:
                            name = k
                    data["from"] = name
                    if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list
                        message = json.dumps({"type": "send", "content": data["content"], "from": name})
                        await self.runCaseX(jsonMsg=message, websocket=websocket)
                # 用戶注冊
                elif data["type"] == 'register':
                    try:
                        Clients[data["uuid"]] = websocket
                        if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list
                            message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})
                            await self.runCaseX(jsonMsg=message, websocket=websocket)
                    except Exception as exp:
                        print(exp)
                # 用戶注銷
                elif data["type"] == 'unregister':
                    del Clients[data["uuid"]]
                # 對message進(jìn)行解析,跳進(jìn)不同功能區(qū)
                # await self.runCaseX(jsonMsg=data,websocket=websocket)
            # 鏈接斷開
            except websockets.ConnectionClosed:
                print("ConnectionClosed...", path)
                # del Clients
                break
            # 無效狀態(tài)
            except websockets.InvalidState:
                print("InvalidState...")
                # del Clients
                break
            # 報錯
            except Exception as e:
                print("ws連接報錯",e)
                # del Clients
                break
    # 啟動服務(wù)器
    async def runServer(self):
        async with websockets.serve(self.echo, self.ip, self.port):
            await asyncio.Future()  # run forever
	# 多協(xié)程模式,防止阻塞主線程無法做其他事情
    def WebSocketServer(self):
        asyncio.run(self.runServer())
    # 多線程啟動
    def startServer(self):
        # 多線程啟動,否則會堵塞
        thread = threading.Thread(target=self.WebSocketServer)
        thread.start()
        # thread.join()
if __name__=='__main__':
    print("server")
    s = WS_Server()
    s.startServer()

到此這篇關(guān)于python的websocket方法教程的文章就介紹到這了,更多相關(guān)python的websocket方法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 使用python進(jìn)行PostgreSQL數(shù)據(jù)庫連接全過程

    使用python進(jìn)行PostgreSQL數(shù)據(jù)庫連接全過程

    這篇文章主要介紹了使用python進(jìn)行PostgreSQL數(shù)據(jù)庫連接的相關(guān)資料,包括安裝psycopg2模塊、使用PyCharm進(jìn)行圖形化連接、代碼連接數(shù)據(jù)庫的方法、以及如何執(zhí)行DML和DQL操作,需要的朋友可以參考下
    2025-03-03
  • 使用Qt?QSS繪制簡單美化界面功能

    使用Qt?QSS繪制簡單美化界面功能

    這篇文章主要介紹了使用Qt?QSS繪制簡單美化界面,本文以繪制登錄界面為例,創(chuàng)建一個繼承自Qwidget的設(shè)計師界面類,為了使得控件排放整齊有序,可以使用layout布局進(jìn)行輔助,感興趣的朋友跟隨小編一起看看吧
    2022-10-10
  • 用yum安裝MySQLdb模塊的步驟方法

    用yum安裝MySQLdb模塊的步驟方法

    在python2.7版本中,MySQLdb模塊還不是python的內(nèi)置模塊,但是MySQLdb模塊又是Python與MySQL連接的橋梁,對于作為MySQL DBA又很喜歡Python語言的我來說,MySQLdb真的是必需品呢。所以就需要自己進(jìn)行安裝了,這篇文章就給大家詳細(xì)介紹了關(guān)于用yum安裝MySQLdb模塊的步驟。
    2016-12-12
  • python3使用libpcap庫進(jìn)行抓包及數(shù)據(jù)處理的操作方法

    python3使用libpcap庫進(jìn)行抓包及數(shù)據(jù)處理的操作方法

    這篇文章主要介紹了python3使用libpcap庫進(jìn)行抓包及數(shù)據(jù)處理,需要的朋友可以參考下
    2022-10-10
  • python boto和boto3操作bucket的示例

    python boto和boto3操作bucket的示例

    這篇文章主要介紹了python boto和boto3操作bucket的示例,幫助大家更好的理解和使用python,感興趣的朋友可以了解下
    2020-10-10
  • Keras自動下載的數(shù)據(jù)集/模型存放位置介紹

    Keras自動下載的數(shù)據(jù)集/模型存放位置介紹

    這篇文章主要介紹了Keras自動下載的數(shù)據(jù)集/模型存放位置介紹,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06
  • keras用auc做metrics以及早停實例

    keras用auc做metrics以及早停實例

    這篇文章主要介紹了keras用auc做metrics以及早停實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-07-07
  • python對視頻畫框標(biāo)記后保存的方法

    python對視頻畫框標(biāo)記后保存的方法

    今天小編就為大家分享一篇python對視頻畫框標(biāo)記后保存的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-12-12
  • pycharm中選中一個單詞替換所有重復(fù)單詞的實現(xiàn)方法

    pycharm中選中一個單詞替換所有重復(fù)單詞的實現(xiàn)方法

    這篇文章主要介紹了pycharm中選中一個單詞替換所有重復(fù)單詞的實現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2020-11-11
  • Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享

    Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享

    這篇文章主要介紹了Python字典操作詳細(xì)介紹及字典內(nèi)建方法分享,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01

最新評論