欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python Tornado 實(shí)現(xiàn)SSE服務(wù)端主動(dòng)推送方案

 更新時(shí)間:2024年01月23日 11:13:03   作者:小畢超  
SSE是Server-Sent Events 的簡(jiǎn)稱,是一種服務(wù)器端到客戶端(瀏覽器)的單項(xiàng)消息推送,本文主要探索兩個(gè)方面的實(shí)踐一個(gè)是客戶端發(fā)送請(qǐng)求,服務(wù)端的返回是分多次進(jìn)行傳輸?shù)?直到傳輸完成,這種情況下請(qǐng)求結(jié)束后,考慮關(guān)閉SSE,所以這種連接可以認(rèn)為是暫時(shí)的,感興趣的朋友一起看看吧

一、SSE 服務(wù)端消息推送

SSEServer-Sent Events 的簡(jiǎn)稱, 是一種服務(wù)器端到客戶端(瀏覽器)的單項(xiàng)消息推送。對(duì)應(yīng)的瀏覽器端實(shí)現(xiàn) Event Source 接口被制定為HTML5 的一部分。相比于 WebSocket,服務(wù)器端和客戶端工作量都要小很多、簡(jiǎn)單很多,而 Tornado 又是Python中的一款優(yōu)秀的高性能web框架,本文帶領(lǐng)大家一起實(shí)踐下 Tornado SSE 的實(shí)現(xiàn)。

本文主要探索兩個(gè)方面的實(shí)踐:一個(gè)是客戶端發(fā)送請(qǐng)求,服務(wù)端的返回是分多次進(jìn)行傳輸?shù)?,直到傳輸完成,這種情況下請(qǐng)求結(jié)束后,就可以考慮關(guān)閉 SSE了,所以這種連接可以認(rèn)為是暫時(shí)的。另一種是由服務(wù)端在特定的時(shí)機(jī)下主動(dòng)推送消息給到客戶端,推送的時(shí)機(jī)具有不確定性,隨時(shí)性,所以這種情況下需要客戶端和服務(wù)端保持長(zhǎng)久連接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暫性場(chǎng)景下的 SSE 實(shí)現(xiàn)

短暫性場(chǎng)景下就是對(duì)應(yīng)上面的第一點(diǎn),客戶端主動(dòng)發(fā)送請(qǐng)求后,服務(wù)端分多次傳輸,直到完成,數(shù)據(jù)獲取完成后連接就可以斷開(kāi)了,適用于一些接口復(fù)雜,操作步驟多的場(chǎng)景,可以提前告訴客戶端現(xiàn)在進(jìn)行到了哪一步了,并且這種方式也有利于服務(wù)端的橫向擴(kuò)展。

Tornado 中實(shí)現(xiàn),需要注意的是要關(guān)閉 _auto_finish ,這樣的話就不會(huì)被框架自己主動(dòng)停止連接了,下面是一個(gè)實(shí)現(xiàn)的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
class SSE(RequestHandler):
    def initialize(self):
        # 關(guān)閉自動(dòng)結(jié)束
        self._auto_finish = False
        print("initialize")
    def set_default_headers(self):
        # 設(shè)置為事件驅(qū)動(dòng)模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用緩存
        self.set_header('Content-Control', "no-cache")
        # 保持長(zhǎng)連接
        self.set_header('Connection', "keep-alive")
        # 允許跨域
        self.set_header('Access-Control-Allow-Origin', "*")
    def prepare(self):
        # 準(zhǔn)備線程池
        self.executor = self.application.pool
    @tornado.gen.coroutine
    def get(self):
        result = yield self.doHandle()
        self.write(result)
        # 結(jié)束
        self.finish()
    @run_on_executor
    def doHandle(self):
        tornado.ioloop.IOLoop.current()
        # 分十次推送信息
        for i in range(10):
            time.sleep(1)
            self.flush()
            self.callback(f"current: {i}")
        return f"data: end\n\n"
    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()
class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)
def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)
    print(f"Start server success", f"The prot = {port}")
    tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
    startServer(8020)

運(yùn)行后可以到瀏覽器訪問(wèn):http://localhost:8020/sse,此時(shí)就可以看到服務(wù)端在不斷地推送數(shù)據(jù)過(guò)來(lái)了:

那如何在前端用 JS 獲取數(shù)據(jù)呢,前面提到在 JS 層面,有封裝好的 Event Source 組件可以直接拿來(lái)使用,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>測(cè)試服務(wù)器推送技術(shù)</title>
</head>
<body>
    <div id="messages"></div>
</body>
<script>
    const eventSource = new EventSource('http://localhost:8020/sse');
    // 事件回調(diào)
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };
    // 異常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };
    eventSource.onopen = ()=>{
        console.log("開(kāi)啟")
    }
  </script>
</html>

運(yùn)行后可以看到服務(wù)端分階段推送過(guò)來(lái)的數(shù)據(jù):

三、長(zhǎng)連接場(chǎng)景下的 SSE 實(shí)現(xiàn)

上面實(shí)現(xiàn)了客戶端請(qǐng)求后,分批次返回,但是有些情況下是客戶端連接后沒(méi)有東西返回,而是在某個(gè)特定的時(shí)機(jī)下返回給某幾個(gè)客戶端,所以這種情況,我們需要和客戶端保持長(zhǎng)久的連接,同時(shí)進(jìn)行客戶端連接的緩存,因?yàn)橥瑫r(shí)有可能有 100 個(gè)用戶,但是推送時(shí)可能只需要給 10 個(gè)用戶推送,這種方式相當(dāng)于將一個(gè)客戶端和一個(gè)服務(wù)端進(jìn)行了綁定,一定程度上不利于服務(wù)端的橫向擴(kuò)展,但也可以通過(guò)一些消息訂閱的方式解決類似問(wèn)題。

下面是一個(gè)實(shí)現(xiàn)案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
# 單例
def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return wrapper
# 訂閱推送工具類
@singleton
class Pusher():
    def __init__(self):
        self.clients = {}
    def add_client(self, client_id, callback):
        if client_id not in self.clients:
            self.clients[client_id] = callback
            print(f"{client_id} 連接")
    def send_all(self, message):
        for client_id in self.clients:
            callback = self.clients[client_id]
            print("發(fā)送消息給:", client_id)
            callback(message)
    def send(self, client_id, message):
        callback = self.clients[client_id]
        print("發(fā)送消息給:", client_id)
        callback(message)
class SSE(RequestHandler):
    # 定義推送者
    pusher = Pusher()
    def initialize(self):
        # 關(guān)閉自動(dòng)結(jié)束
        self._auto_finish = False
        print("initialize")
    def set_default_headers(self):
        # 設(shè)置為事件驅(qū)動(dòng)模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用緩存
        self.set_header('Content-Control', "no-cache")
        # 保持長(zhǎng)連接
        self.set_header('Connection', "keep-alive")
        # 允許跨域
        self.set_header('Access-Control-Allow-Origin', "*")
    @tornado.gen.coroutine
    def get(self):
        # 客戶端唯一標(biāo)識(shí)
        client_id = self.get_argument("client_id")
        self.pusher.add_client(client_id, self.callback)
    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()
# 定義推送接口,模擬推送
class Push(RequestHandler):
    # 定義推送者
    pusher = Pusher()
    def prepare(self):
        # 準(zhǔn)備線程池
        self.executor = self.application.pool
    @tornado.gen.coroutine
    def get(self):
        # 客戶端標(biāo)識(shí)
        client_id = self.get_argument("client_id")
        # 推送的消息
        message = self.get_argument("message")
        result = yield self.doHandle(client_id, message)
        self.write(result)
    @run_on_executor
    def doHandle(self, client_id, message):
        tornado.ioloop.IOLoop.current()
        self.pusher.send(client_id, message)
        return "success"
class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/push", Push),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)
def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)
    print(f"Start server success", f"The prot = {port}")
    tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
    startServer(8020)

這里我定義了一個(gè) Pusher 訂閱推送工具類,用來(lái)存儲(chǔ)客戶端的連接,以及給指定客戶端或全部客戶端發(fā)送消息,然后我又定義 Push 接口,模擬不定時(shí)的指定客戶端發(fā)送信息的場(chǎng)景。

同樣前端也要修改,需要給自己定義 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>測(cè)試服務(wù)器推送技術(shù)</title>
</head>
<body>
    <div id="client"></div>
    <div id="messages"></div>
</body>
<script>
    function generateUUID() {
      let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        const r = Math.random() * 16 | 0;
        const v = c === 'x' ? r : (r & 0x3 | 0x8);
        return v.toString(16);
      });
      return uuid;
    }
    // 利用uuid 模擬生成唯一的客戶端ID
    let client_id = generateUUID();
    document.getElementById('client').innerHTML = "當(dāng)前 client_id = "+client_id;
    const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);
    // 事件回調(diào)
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };
    // 異常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };
    eventSource.onopen = ()=>{
        console.log("開(kāi)啟")
    }
  </script>
</html>

這里我用 uuid 模擬客戶端的唯一ID,在真實(shí)使用時(shí)可不要這么做。

下面使用瀏覽器打開(kāi)三個(gè)頁(yè)面,可以看到三個(gè)不同的 client_id :


在服務(wù)端的日志中也能看到這三個(gè)客戶端的連接:

下面調(diào)用 push 接口來(lái)給任意一個(gè)客戶端發(fā)送消息,例如這里發(fā)給client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用戶 :

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的頁(yè)面:


已經(jīng)成功收到推送的消息,反之看另外兩個(gè):

都沒(méi)有消息,到這里就實(shí)現(xiàn)了長(zhǎng)連接下不定時(shí)的服務(wù)端消息推送方案。

到此這篇關(guān)于Python Tornado 實(shí)現(xiàn)SSE服務(wù)端主動(dòng)推送方案的文章就介紹到這了,更多相關(guān)Python SSE服務(wù)端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論