Python Tornado 實(shí)現(xiàn)SSE服務(wù)端主動(dòng)推送方案
一、SSE 服務(wù)端消息推送
SSE
是 Server-Sent Events
的簡(jiǎn)稱, 是一種服務(wù)器端到客戶端(瀏覽器)的單項(xiàng)消息推送。對(duì)應(yīng)的瀏覽器端實(shí)現(xiàn) Event Source
接口被制定為HTML5
的一部分。相比于 WebSocket
,服務(wù)器端和客戶端工作量都要小很多、簡(jiǎn)單很多,而 Tornado
又是Python
中的一款優(yōu)秀的高性能web
框架,本文帶領(lǐng)大家一起實(shí)踐下 Tornado
SSE
的實(shí)現(xiàn)。
本文主要探索兩個(gè)方面的實(shí)踐:一個(gè)是客戶端發(fā)送請(qǐng)求,服務(wù)端的返回是分多次進(jìn)行傳輸?shù)?,直到傳輸完成,這種情況下請(qǐng)求結(jié)束后,就可以考慮關(guān)閉 SSE了,所以這種連接可以認(rèn)為是暫時(shí)的。另一種是由服務(wù)端在特定的時(shí)機(jī)下主動(dòng)推送消息給到客戶端,推送的時(shí)機(jī)具有不確定性,隨時(shí)性,所以這種情況下需要客戶端和服務(wù)端保持長(zhǎng)久連接。
本次使用的 Tornado
版本:
tornado==6.3.2
二、短暫性場(chǎng)景下的 SSE 實(shí)現(xiàn)
短暫性場(chǎng)景下就是對(duì)應(yīng)上面的第一點(diǎn),客戶端主動(dòng)發(fā)送請(qǐng)求后,服務(wù)端分多次傳輸,直到完成,數(shù)據(jù)獲取完成后連接就可以斷開(kāi)了,適用于一些接口復(fù)雜,操作步驟多的場(chǎng)景,可以提前告訴客戶端現(xiàn)在進(jìn)行到了哪一步了,并且這種方式也有利于服務(wù)端的橫向擴(kuò)展。
在 Tornado
中實(shí)現(xiàn),需要注意的是要關(guān)閉 _auto_finish
,這樣的話就不會(huì)被框架自己主動(dòng)停止連接了,下面是一個(gè)實(shí)現(xiàn)的案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor class SSE(RequestHandler): def initialize(self): # 關(guān)閉自動(dòng)結(jié)束 self._auto_finish = False print("initialize") def set_default_headers(self): # 設(shè)置為事件驅(qū)動(dòng)模式 self.set_header('Content-Type', "text/event-stream") # 不使用緩存 self.set_header('Content-Control', "no-cache") # 保持長(zhǎng)連接 self.set_header('Connection', "keep-alive") # 允許跨域 self.set_header('Access-Control-Allow-Origin', "*") def prepare(self): # 準(zhǔn)備線程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): result = yield self.doHandle() self.write(result) # 結(jié)束 self.finish() @run_on_executor def doHandle(self): tornado.ioloop.IOLoop.current() # 分十次推送信息 for i in range(10): time.sleep(1) self.flush() self.callback(f"current: {i}") return f"data: end\n\n" def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
運(yùn)行后可以到瀏覽器訪問(wèn):http://localhost:8020/sse
,此時(shí)就可以看到服務(wù)端在不斷地推送數(shù)據(jù)過(guò)來(lái)了:
那如何在前端用 JS
獲取數(shù)據(jù)呢,前面提到在 JS
層面,有封裝好的 Event Source
組件可以直接拿來(lái)使用,例如:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>測(cè)試服務(wù)器推送技術(shù)</title> </head> <body> <div id="messages"></div> </body> <script> const eventSource = new EventSource('http://localhost:8020/sse'); // 事件回調(diào) eventSource.onmessage = (event) => { console.log(event.data) const messagesDiv = document.getElementById('messages'); messagesDiv.innerHTML += '<p>' + event.data + '</p>'; }; // 異常 eventSource.onerror = (error) => { console.error('EventSource failed:', error); eventSource.close(); }; eventSource.onopen = ()=>{ console.log("開(kāi)啟") } </script> </html>
運(yùn)行后可以看到服務(wù)端分階段推送過(guò)來(lái)的數(shù)據(jù):
三、長(zhǎng)連接場(chǎng)景下的 SSE 實(shí)現(xiàn)
上面實(shí)現(xiàn)了客戶端請(qǐng)求后,分批次返回,但是有些情況下是客戶端連接后沒(méi)有東西返回,而是在某個(gè)特定的時(shí)機(jī)下返回給某幾個(gè)客戶端,所以這種情況,我們需要和客戶端保持長(zhǎng)久的連接,同時(shí)進(jìn)行客戶端連接的緩存,因?yàn)橥瑫r(shí)有可能有 100
個(gè)用戶,但是推送時(shí)可能只需要給 10
個(gè)用戶推送,這種方式相當(dāng)于將一個(gè)客戶端和一個(gè)服務(wù)端進(jìn)行了綁定,一定程度上不利于服務(wù)端的橫向擴(kuò)展,但也可以通過(guò)一些消息訂閱的方式解決類似問(wèn)題。
下面是一個(gè)實(shí)現(xiàn)案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor # 單例 def singleton(cls): instances = {} def wrapper(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return wrapper # 訂閱推送工具類 @singleton class Pusher(): def __init__(self): self.clients = {} def add_client(self, client_id, callback): if client_id not in self.clients: self.clients[client_id] = callback print(f"{client_id} 連接") def send_all(self, message): for client_id in self.clients: callback = self.clients[client_id] print("發(fā)送消息給:", client_id) callback(message) def send(self, client_id, message): callback = self.clients[client_id] print("發(fā)送消息給:", client_id) callback(message) class SSE(RequestHandler): # 定義推送者 pusher = Pusher() def initialize(self): # 關(guān)閉自動(dòng)結(jié)束 self._auto_finish = False print("initialize") def set_default_headers(self): # 設(shè)置為事件驅(qū)動(dòng)模式 self.set_header('Content-Type', "text/event-stream") # 不使用緩存 self.set_header('Content-Control', "no-cache") # 保持長(zhǎng)連接 self.set_header('Connection', "keep-alive") # 允許跨域 self.set_header('Access-Control-Allow-Origin', "*") @tornado.gen.coroutine def get(self): # 客戶端唯一標(biāo)識(shí) client_id = self.get_argument("client_id") self.pusher.add_client(client_id, self.callback) def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() # 定義推送接口,模擬推送 class Push(RequestHandler): # 定義推送者 pusher = Pusher() def prepare(self): # 準(zhǔn)備線程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): # 客戶端標(biāo)識(shí) client_id = self.get_argument("client_id") # 推送的消息 message = self.get_argument("message") result = yield self.doHandle(client_id, message) self.write(result) @run_on_executor def doHandle(self, client_id, message): tornado.ioloop.IOLoop.current() self.pusher.send(client_id, message) return "success" class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/push", Push), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
這里我定義了一個(gè) Pusher
訂閱推送工具類,用來(lái)存儲(chǔ)客戶端的連接,以及給指定客戶端或全部客戶端發(fā)送消息,然后我又定義 Push 接口,模擬不定時(shí)的指定客戶端發(fā)送信息的場(chǎng)景。
同樣前端也要修改,需要給自己定義 client_id
,例如:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>測(cè)試服務(wù)器推送技術(shù)</title> </head> <body> <div id="client"></div> <div id="messages"></div> </body> <script> function generateUUID() { let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { const r = Math.random() * 16 | 0; const v = c === 'x' ? r : (r & 0x3 | 0x8); return v.toString(16); }); return uuid; } // 利用uuid 模擬生成唯一的客戶端ID let client_id = generateUUID(); document.getElementById('client').innerHTML = "當(dāng)前 client_id = "+client_id; const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id); // 事件回調(diào) eventSource.onmessage = (event) => { console.log(event.data) const messagesDiv = document.getElementById('messages'); messagesDiv.innerHTML += '<p>' + event.data + '</p>'; }; // 異常 eventSource.onerror = (error) => { console.error('EventSource failed:', error); eventSource.close(); }; eventSource.onopen = ()=>{ console.log("開(kāi)啟") } </script> </html>
這里我用 uuid
模擬客戶端的唯一ID
,在真實(shí)使用時(shí)可不要這么做。
下面使用瀏覽器打開(kāi)三個(gè)頁(yè)面,可以看到三個(gè)不同的 client_id
:
在服務(wù)端的日志中也能看到這三個(gè)客戶端的連接:
下面調(diào)用 push
接口來(lái)給任意一個(gè)客戶端發(fā)送消息,例如這里發(fā)給client_id = 2493045e-84dd-4118-8d96-0735c4ac186b
的用戶 :
下面看到 client_id
是 2493045e-84dd-4118-8d96-0735c4ac186b
的頁(yè)面:
已經(jīng)成功收到推送的消息,反之看另外兩個(gè):
都沒(méi)有消息,到這里就實(shí)現(xiàn)了長(zhǎng)連接下不定時(shí)的服務(wù)端消息推送方案。
到此這篇關(guān)于Python Tornado 實(shí)現(xiàn)SSE服務(wù)端主動(dòng)推送方案的文章就介紹到這了,更多相關(guān)Python SSE服務(wù)端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Pandas如何對(duì)Categorical類型字段數(shù)據(jù)統(tǒng)計(jì)實(shí)戰(zhàn)案例
這篇文章主要介紹了Pandas如何對(duì)Categorical類型字段數(shù)據(jù)統(tǒng)計(jì)實(shí)戰(zhàn)案例,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-08-08解決python 在for循環(huán)并且pop數(shù)組的時(shí)候會(huì)跳過(guò)某些元素的問(wèn)題
這篇文章主要介紹了解決python 在for循環(huán)并且pop數(shù)組的時(shí)候會(huì)跳過(guò)某些元素的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12python?包實(shí)現(xiàn)?time?時(shí)間管理操作
這篇文章主要介紹了python包實(shí)現(xiàn)time時(shí)間管理操作,文章通過(guò)獲取當(dāng)前時(shí)間戳,即當(dāng)前系統(tǒng)內(nèi)表示時(shí)間的一個(gè)浮點(diǎn)數(shù),下文更多相關(guān)內(nèi)容需要的小伙伴可以參考一下2022-04-04python創(chuàng)建只讀屬性對(duì)象的方法(ReadOnlyObject)
有時(shí)需要?jiǎng)?chuàng)建一個(gè)帶只讀屬性的對(duì)象,大家可以參考下如下的方法進(jìn)行創(chuàng)建,稍加改造,可以得到很特殊的效果2013-02-02Django前端BootCSS實(shí)現(xiàn)分頁(yè)的方法
本文主要介紹了Django前端BootCSS實(shí)現(xiàn)分頁(yè)的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11python內(nèi)置函數(shù)compile(),complex()的使用
這篇文章主要為大家詳細(xì)介紹了python內(nèi)置函數(shù)compile(),complex()的使用,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06Python Request爬取seo.chinaz.com百度權(quán)重網(wǎng)站的查詢結(jié)果過(guò)程解析
這篇文章主要介紹了Request爬取網(wǎng)站(seo.chinaz.com)百度權(quán)重的查詢結(jié)果過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08將本地Python項(xiàng)目打包成docker鏡像上傳到服務(wù)器并在docker中運(yùn)行
Docker是一個(gè)開(kāi)源項(xiàng)目,為開(kāi)發(fā)人員和系統(tǒng)管理員提供了一個(gè)開(kāi)放平臺(tái),可以將應(yīng)用程序構(gòu)建、打包為一個(gè)輕量級(jí)容器,并在任何地方運(yùn)行,這篇文章主要給大家介紹了關(guān)于將本地Python項(xiàng)目打包成docker鏡像上傳到服務(wù)器并在docker中運(yùn)行的相關(guān)資料,需要的朋友可以參考下2023-12-12