Python Tornado 實現(xiàn)SSE服務(wù)端主動推送方案
一、SSE 服務(wù)端消息推送
SSE
是 Server-Sent Events
的簡稱, 是一種服務(wù)器端到客戶端(瀏覽器)的單項消息推送。對應(yīng)的瀏覽器端實現(xiàn) Event Source
接口被制定為HTML5
的一部分。相比于 WebSocket
,服務(wù)器端和客戶端工作量都要小很多、簡單很多,而 Tornado
又是Python
中的一款優(yōu)秀的高性能web
框架,本文帶領(lǐng)大家一起實踐下 Tornado
SSE
的實現(xiàn)。
本文主要探索兩個方面的實踐:一個是客戶端發(fā)送請求,服務(wù)端的返回是分多次進行傳輸?shù)模钡絺鬏斖瓿?,這種情況下請求結(jié)束后,就可以考慮關(guān)閉 SSE了,所以這種連接可以認為是暫時的。另一種是由服務(wù)端在特定的時機下主動推送消息給到客戶端,推送的時機具有不確定性,隨時性,所以這種情況下需要客戶端和服務(wù)端保持長久連接。
本次使用的 Tornado
版本:
tornado==6.3.2
二、短暫性場景下的 SSE 實現(xiàn)
短暫性場景下就是對應(yīng)上面的第一點,客戶端主動發(fā)送請求后,服務(wù)端分多次傳輸,直到完成,數(shù)據(jù)獲取完成后連接就可以斷開了,適用于一些接口復(fù)雜,操作步驟多的場景,可以提前告訴客戶端現(xiàn)在進行到了哪一步了,并且這種方式也有利于服務(wù)端的橫向擴展。
在 Tornado
中實現(xiàn),需要注意的是要關(guān)閉 _auto_finish
,這樣的話就不會被框架自己主動停止連接了,下面是一個實現(xiàn)的案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor class SSE(RequestHandler): def initialize(self): # 關(guān)閉自動結(jié)束 self._auto_finish = False print("initialize") def set_default_headers(self): # 設(shè)置為事件驅(qū)動模式 self.set_header('Content-Type', "text/event-stream") # 不使用緩存 self.set_header('Content-Control', "no-cache") # 保持長連接 self.set_header('Connection', "keep-alive") # 允許跨域 self.set_header('Access-Control-Allow-Origin', "*") def prepare(self): # 準備線程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): result = yield self.doHandle() self.write(result) # 結(jié)束 self.finish() @run_on_executor def doHandle(self): tornado.ioloop.IOLoop.current() # 分十次推送信息 for i in range(10): time.sleep(1) self.flush() self.callback(f"current: {i}") return f"data: end\n\n" def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
運行后可以到瀏覽器訪問:http://localhost:8020/sse
,此時就可以看到服務(wù)端在不斷地推送數(shù)據(jù)過來了:
那如何在前端用 JS
獲取數(shù)據(jù)呢,前面提到在 JS
層面,有封裝好的 Event Source
組件可以直接拿來使用,例如:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>測試服務(wù)器推送技術(shù)</title> </head> <body> <div id="messages"></div> </body> <script> const eventSource = new EventSource('http://localhost:8020/sse'); // 事件回調(diào) eventSource.onmessage = (event) => { console.log(event.data) const messagesDiv = document.getElementById('messages'); messagesDiv.innerHTML += '<p>' + event.data + '</p>'; }; // 異常 eventSource.onerror = (error) => { console.error('EventSource failed:', error); eventSource.close(); }; eventSource.onopen = ()=>{ console.log("開啟") } </script> </html>
運行后可以看到服務(wù)端分階段推送過來的數(shù)據(jù):
三、長連接場景下的 SSE 實現(xiàn)
上面實現(xiàn)了客戶端請求后,分批次返回,但是有些情況下是客戶端連接后沒有東西返回,而是在某個特定的時機下返回給某幾個客戶端,所以這種情況,我們需要和客戶端保持長久的連接,同時進行客戶端連接的緩存,因為同時有可能有 100
個用戶,但是推送時可能只需要給 10
個用戶推送,這種方式相當于將一個客戶端和一個服務(wù)端進行了綁定,一定程度上不利于服務(wù)端的橫向擴展,但也可以通過一些消息訂閱的方式解決類似問題。
下面是一個實現(xiàn)案例:
import time from tornado.concurrent import run_on_executor from tornado.web import RequestHandler import tornado.gen from concurrent.futures.thread import ThreadPoolExecutor # 單例 def singleton(cls): instances = {} def wrapper(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return wrapper # 訂閱推送工具類 @singleton class Pusher(): def __init__(self): self.clients = {} def add_client(self, client_id, callback): if client_id not in self.clients: self.clients[client_id] = callback print(f"{client_id} 連接") def send_all(self, message): for client_id in self.clients: callback = self.clients[client_id] print("發(fā)送消息給:", client_id) callback(message) def send(self, client_id, message): callback = self.clients[client_id] print("發(fā)送消息給:", client_id) callback(message) class SSE(RequestHandler): # 定義推送者 pusher = Pusher() def initialize(self): # 關(guān)閉自動結(jié)束 self._auto_finish = False print("initialize") def set_default_headers(self): # 設(shè)置為事件驅(qū)動模式 self.set_header('Content-Type', "text/event-stream") # 不使用緩存 self.set_header('Content-Control', "no-cache") # 保持長連接 self.set_header('Connection', "keep-alive") # 允許跨域 self.set_header('Access-Control-Allow-Origin', "*") @tornado.gen.coroutine def get(self): # 客戶端唯一標識 client_id = self.get_argument("client_id") self.pusher.add_client(client_id, self.callback) def callback(self, message): # 事件推送 message = f"data: {message}\n\n" self.write(message) self.flush() # 定義推送接口,模擬推送 class Push(RequestHandler): # 定義推送者 pusher = Pusher() def prepare(self): # 準備線程池 self.executor = self.application.pool @tornado.gen.coroutine def get(self): # 客戶端標識 client_id = self.get_argument("client_id") # 推送的消息 message = self.get_argument("message") result = yield self.doHandle(client_id, message) self.write(result) @run_on_executor def doHandle(self, client_id, message): tornado.ioloop.IOLoop.current() self.pusher.send(client_id, message) return "success" class Application(tornado.web.Application): def __init__(self): handlers = [ ("/sse", SSE), ("/push", Push), ("/(.*)$", tornado.web.StaticFileHandler, { "path": "resources/static", "default_filename": "index.html" }) ] super(Application, self).__init__(handlers) self.pool = ThreadPoolExecutor(200) def startServer(port): app = Application() httpserver = tornado.httpserver.HTTPServer(app) httpserver.listen(port) print(f"Start server success", f"The prot = {port}") tornado.ioloop.IOLoop.current().start() if __name__ == '__main__': startServer(8020)
這里我定義了一個 Pusher
訂閱推送工具類,用來存儲客戶端的連接,以及給指定客戶端或全部客戶端發(fā)送消息,然后我又定義 Push 接口,模擬不定時的指定客戶端發(fā)送信息的場景。
同樣前端也要修改,需要給自己定義 client_id
,例如:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>測試服務(wù)器推送技術(shù)</title> </head> <body> <div id="client"></div> <div id="messages"></div> </body> <script> function generateUUID() { let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { const r = Math.random() * 16 | 0; const v = c === 'x' ? r : (r & 0x3 | 0x8); return v.toString(16); }); return uuid; } // 利用uuid 模擬生成唯一的客戶端ID let client_id = generateUUID(); document.getElementById('client').innerHTML = "當前 client_id = "+client_id; const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id); // 事件回調(diào) eventSource.onmessage = (event) => { console.log(event.data) const messagesDiv = document.getElementById('messages'); messagesDiv.innerHTML += '<p>' + event.data + '</p>'; }; // 異常 eventSource.onerror = (error) => { console.error('EventSource failed:', error); eventSource.close(); }; eventSource.onopen = ()=>{ console.log("開啟") } </script> </html>
這里我用 uuid
模擬客戶端的唯一ID
,在真實使用時可不要這么做。
下面使用瀏覽器打開三個頁面,可以看到三個不同的 client_id
:
在服務(wù)端的日志中也能看到這三個客戶端的連接:
下面調(diào)用 push
接口來給任意一個客戶端發(fā)送消息,例如這里發(fā)給client_id = 2493045e-84dd-4118-8d96-0735c4ac186b
的用戶 :
下面看到 client_id
是 2493045e-84dd-4118-8d96-0735c4ac186b
的頁面:
已經(jīng)成功收到推送的消息,反之看另外兩個:
都沒有消息,到這里就實現(xiàn)了長連接下不定時的服務(wù)端消息推送方案。
到此這篇關(guān)于Python Tornado 實現(xiàn)SSE服務(wù)端主動推送方案的文章就介紹到這了,更多相關(guān)Python SSE服務(wù)端內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Pandas如何對Categorical類型字段數(shù)據(jù)統(tǒng)計實戰(zhàn)案例
這篇文章主要介紹了Pandas如何對Categorical類型字段數(shù)據(jù)統(tǒng)計實戰(zhàn)案例,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08解決python 在for循環(huán)并且pop數(shù)組的時候會跳過某些元素的問題
這篇文章主要介紹了解決python 在for循環(huán)并且pop數(shù)組的時候會跳過某些元素的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12python創(chuàng)建只讀屬性對象的方法(ReadOnlyObject)
有時需要創(chuàng)建一個帶只讀屬性的對象,大家可以參考下如下的方法進行創(chuàng)建,稍加改造,可以得到很特殊的效果2013-02-02python內(nèi)置函數(shù)compile(),complex()的使用
這篇文章主要為大家詳細介紹了python內(nèi)置函數(shù)compile(),complex()的使用,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-06-06Python Request爬取seo.chinaz.com百度權(quán)重網(wǎng)站的查詢結(jié)果過程解析
這篇文章主要介紹了Request爬取網(wǎng)站(seo.chinaz.com)百度權(quán)重的查詢結(jié)果過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-08-08將本地Python項目打包成docker鏡像上傳到服務(wù)器并在docker中運行
Docker是一個開源項目,為開發(fā)人員和系統(tǒng)管理員提供了一個開放平臺,可以將應(yīng)用程序構(gòu)建、打包為一個輕量級容器,并在任何地方運行,這篇文章主要給大家介紹了關(guān)于將本地Python項目打包成docker鏡像上傳到服務(wù)器并在docker中運行的相關(guān)資料,需要的朋友可以參考下2023-12-12