Python使用WebSocket和SSE實(shí)現(xiàn)HTTP服務(wù)器消息推送方式
很多時(shí)候我們需要實(shí)時(shí)獲取最新數(shù)據(jù),但是傳統(tǒng)意義上的HTTP請(qǐng)求,必須由客戶端向服務(wù)端發(fā)起請(qǐng)求,服務(wù)端再返回相應(yīng)的數(shù)據(jù)。
那如果我們需要獲取實(shí)時(shí)數(shù)據(jù),就要通過HTTP輪詢,客戶端不間斷的向服務(wù)器發(fā)起請(qǐng)求。
這樣不斷的的請(qǐng)求不但嚴(yán)重加大服務(wù)器的壓力,還可能因?yàn)榫W(wǎng)絡(luò)延遲而影響數(shù)據(jù)的時(shí)效性。
下面介紹兩種方法能夠很好的滿足業(yè)務(wù)的需求。
一、WebSocket
WebSocket是HTML5開始提供的一種在單個(gè) TCP 鏈接上進(jìn)行全雙工通信的協(xié)議。
- 優(yōu)點(diǎn):雙工通信
- 缺點(diǎn):需專門定義數(shù)據(jù)協(xié)議,解析數(shù)據(jù)流,且部分服務(wù)器支持不完善,后臺(tái)例如java spring boot 2.1.2 僅支持websocket 1.0(最高已達(dá)1.3)
1.客戶端代碼
python 3+ 代碼
#python 3+
import threading
import websocket
class Client:
def __init__(self,url):
self.url = url
self.ws = None
self.enable = True
def on_message(self,response):
self.enable = False
print(response)
def on_error(self,error):
# print(ws)
print(error)
def on_close(self):
self.enable = True
print(ws)
def on_open(self):
print('open')
def start_func(self):
while self.enable:
websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(self.url,
on_message=self.on_message,
# on_data=on_data,
on_error=self.on_error,
on_open=self.on_open,
on_close=self.on_close, )
self.ws.run_forever(ping_interval=60, ping_timeout=5)
if __name__ == "__main__":
cli = Client(url = 'wss://api.zb.live/websocket' )
t1 = threading.Thread(target=cli.start_func_zb)
t1.start()javascript 代碼
var ws = new WebSocket("wss://echo.websocket.org");
ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
ws.close();
};
ws.onclose = function(evt) {
console.log("Connection closed.");
}; 2.服務(wù)端代碼
from websocket_server import WebsocketServer
class WSSocketObj:
def __init__(self,host=None,port = 8131):
self.host = host if host else '127.0.0.1'
self.port = port
# 當(dāng)新的客戶端連接時(shí)會(huì)提示
def new_client(self,client, server,):
print("當(dāng)新的客戶端連接時(shí)會(huì)提示:%s" % client['id'])
dd = 122
server.send_message_to_all("Hey all, a new client has joined us")
# 當(dāng)舊的客戶端離開
def client_left(self,client, server):
print("客戶端%s斷開" % client['id'])
# 接收客戶端的信息。
def message_received(self,client, server, message):
print("Client(%d) said: %s" % (client['id'], message))
# server.send_message_to_all(message) #發(fā)送消息給 全部客戶端
server.send_message(client, 'hello,client') # 發(fā)送消息給指定客戶端
def run_server(self):
server = WebsocketServer(self.port, self.host)
server.set_fn_new_client(self.new_client)
server.set_fn_client_left(self.client_left)
server.set_fn_message_received(self.message_received)
server.run_forever()
if __name__ == '__main__':
WSSocketObj().run_server()
二、SSE(Server-Sent Events,服務(wù)器發(fā)送事件)
SSE ( Server-sent Events )通俗解釋起來就是一種基于HTTP的,以流的形式由服務(wù)端持續(xù)向客戶端發(fā)送數(shù)據(jù)的技術(shù),是 WebSocket 的一種輕量代替方案。
- 優(yōu)點(diǎn):開發(fā)簡(jiǎn)單,和傳統(tǒng)的http開發(fā)幾乎無任何差別,客戶端開發(fā)簡(jiǎn)單,有標(biāo)準(zhǔn)支持(EventSource)
- 缺點(diǎn):和websocket相比,只能單工通信,建立連接后,只能由服務(wù)端發(fā)往客戶端,且占用一個(gè)連接,如需客戶端向服務(wù)端通信,需額外打開一個(gè)連接
1.客戶端代碼
python
# 第一種方式
def sse_client():
"""
pip install sseclient-py
只對(duì)于返回標(biāo)準(zhǔn)SSE格式的請(qǐng)求可用 格式:event: {EVENT}\nretry: 10000\ndata: {DATA}\n\n
:return:
"""
import requests
# res = requests.request('get', url, json=data, stream=True, headers={'Accept': 'text/event-stream'})
client = requests.post(url, json=data, stream=True, headers={'Accept': 'text/event-stream'})
client = sseclient.SSEClient(client)
for i in client.events():
print(i.data)
# 第二種方式
def sse_with_requests():
headers = {"Accept": "text/event-stream"}
r = requests.post(url, headers=headers, json=data, stream=True)
r.encoding = 'utf-8'
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
# 處理接收到的數(shù)據(jù)塊
print("Received:", chunk)javascript
第一種方式:
//判斷是否支持SSE
if('EventSource' in window){
//初始化SSE
var url="http:localhost:8000/stream";
var source=new EventSource(url);
// 連接成功后會(huì)觸發(fā) open 事件
source.onopen=(event)=>{
console.log("開啟SSE");
}
// 服務(wù)器發(fā)送信息到客戶端時(shí),如果沒有 event 字段,默認(rèn)會(huì)觸發(fā) message 事件
source.onmessage=(event)=>{
var data=event.data;
$("body").append($("<p>").text(data));
}
//監(jiān)聽like事件
source.addEventListener('like',function(event){
var data=event.data;
$("body").append($("<p>").text(data));
},false);
// 連接異常時(shí)會(huì)觸發(fā) error 事件并自動(dòng)重連
source.onerror=(event)=>{
console.log(event);
}第二種方式:使用 addEventListener 方法來添加相應(yīng)的事件處理方法
if (window.EventSource) {
// 創(chuàng)建 EventSource 對(duì)象連接服務(wù)器
const source = new EventSource('http://localhost:2000');
// 連接成功后會(huì)觸發(fā) open 事件
source.addEventListener('open', () => {
console.log('Connected');
}, false);
// 服務(wù)器發(fā)送信息到客戶端時(shí),如果沒有 event 字段,默認(rèn)會(huì)觸發(fā) message 事件
source.addEventListener('message', e => {
console.log(`data: ${e.data}`);
}, false);
// 自定義 EventHandler,在收到 event 字段為 slide 的消息時(shí)觸發(fā)
source.addEventListener('slide', e => {
console.log(`data: ${e.data}`); // => data: 7
}, false);
// 連接異常時(shí)會(huì)觸發(fā) error 事件并自動(dòng)重連
source.addEventListener('error', e => {
if (e.target.readyState === EventSource.CLOSED) {
console.log('Disconnected');
} else if (e.target.readyState === EventSource.CONNECTING) {
console.log('Connecting...');
}
}, false);
} else {
console.error('Your browser doesn\'t support SSE');
}EventSource從父接口 EventTarget 中繼承了屬性和方法,其內(nèi)置了 3 個(gè) EventHandler 屬性、2 個(gè)只讀屬性和 1 個(gè)方法:
EventHandler 屬性
EventSource.onopen在連接打開時(shí)被調(diào)用。EventSource.onmessage在收到一個(gè)沒有 event 屬性的消息時(shí)被調(diào)用。EventSource.onerror在連接異常時(shí)被調(diào)用。 只讀屬性EventSource.readyState一個(gè) unsigned short 值,代表連接狀態(tài)??赡苤凳?CONNECTING (0), OPEN (1), 或者 CLOSED (2)。EventSource.url連接的 URL。 方法EventSource.close()關(guān)閉連接
EventSource 對(duì)象的 onmessage 屬性的作用類似于 addEventListener( ‘ message ’ )
2.服務(wù)端代碼(基于Flask)
import json
import time
from flask import Flask, request
from flask import Response
from flask import render_template
app = Flask(__name__)
def get_message():
"""this could be any function that blocks until data is ready"""
time.sleep(1)
s = time.ctime(time.time())
return json.dumps(['當(dāng)前時(shí)間:' + s , 'a'], ensure_ascii=False)
@app.route('/')
def hello_world():
return render_template('index.html')
@app.route('/stream')
def stream():
user_id = request.args.get('user_id')
print(user_id)
def eventStream():
id = 0
while True:
id +=1
# wait for source data to be available, then push it
yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message())
return Response(eventStream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run()
因?yàn)镾SE是http請(qǐng)求,可是又限定是一個(gè)長(zhǎng)鏈接,因此要設(shè)置MIME類型為text/event-stream。返回的為字符串。
消息的格式
服務(wù)器向?yàn)g覽器發(fā)送的 SSE 數(shù)據(jù),必須是 UTF-8 編碼的文本;
每一次發(fā)送的信息,由若干個(gè)message組成,每一個(gè)message之間用\n\n分隔。每一個(gè)message內(nèi)部由若干行組成
- 格式
[field]:value\n
其中在規(guī)范中為消息定義了 4 個(gè)字段
- id 表明id
- event 表明消息的事件類型
- data 消息的數(shù)據(jù)字段
- retry 客戶端重連的時(shí)間。只接受整數(shù),單位是毫秒。如果這個(gè)值不是整數(shù)則會(huì)被自動(dòng)忽略
需要注意的是,id字段不是必須的,服務(wù)器有可能不會(huì)在消息中帶上 id 字段,這樣子客戶端就不會(huì)存在 Last-Event-ID這個(gè)屬性。所以為了保證數(shù)據(jù)可靠,我們需要在每條消息上帶上 id 字段。
一個(gè)很有意思的地方是,規(guī)范中規(guī)定以冒號(hào)開頭的消息都會(huì)被當(dāng)作注釋,一條普通的注釋(:\n\n)對(duì)于服務(wù)器來說只占 5個(gè)字符,但是發(fā)送到客戶端上的時(shí)候不會(huì)觸發(fā)任何事件,這對(duì)客戶端來說是非常友好的。所以注釋一般被用于維持服務(wù)器和客戶端的長(zhǎng)連接。
3.SSE使用注意事項(xiàng)
1、SSE 如何保證數(shù)據(jù)完整性
客戶端在每次接收到消息時(shí),會(huì)把消息的 id 字段作為內(nèi)部屬性 Last-Event-ID 儲(chǔ)存起來。
SSE 默認(rèn)支持?jǐn)嗑€重連機(jī)制,在連接斷開時(shí)會(huì) 觸發(fā) EventSource 的 error 事件,同時(shí)自動(dòng)重連。再次連接成功時(shí)EventSource 會(huì)把 Last-Event-ID 屬性作為請(qǐng)求頭發(fā)送給服務(wù)器,這樣服務(wù)器就可以根據(jù)這個(gè) Last-Event-ID作出相應(yīng)的處理。
這里需要注意的是,id 字段不是必須的,服務(wù)器有可能不會(huì)在消息中帶上 id 字段,這樣子客戶端就不會(huì)存在 Last-Event-ID這個(gè)屬性。所以為了保證數(shù)據(jù)可靠,我們需要在每條消息上帶上 id 字段。
2、減少開銷
在 SSE 的草案中提到,“text/event-stream” 的 MIME 類型傳輸應(yīng)當(dāng)在靜置 15秒后自動(dòng)斷開。在實(shí)際的項(xiàng)目中也會(huì)有這個(gè)機(jī)制,但是斷開的時(shí)間沒有被列入標(biāo)準(zhǔn)中。
為了減少服務(wù)器的開銷,我們也可以有目的的斷開和重連。
簡(jiǎn)單的辦法是服務(wù)器發(fā)送一個(gè) 關(guān)閉消息并指定一個(gè)重連的時(shí)間戳,客戶端在觸發(fā)關(guān)閉事件時(shí)關(guān)閉當(dāng)前連接并創(chuàng)建 一個(gè)計(jì)時(shí)器,在重連時(shí)把計(jì)時(shí)器銷毀。
function connectSSE() {
if (window.EventSource) {
const source = new EventSource('http://localhost:2000');
let reconnectTimeout;
source.addEventListener('open', () => {
console.log('Connected');
clearTimeout(reconnectTimeout);
}, false);
source.addEventListener('pause', e => {
source.close();
const reconnectTime = +e.data;
const currentTime = +new Date();
reconnectTimeout = setTimeout(() => {
connectSSE();
}, reconnectTime - currentTime);
}, false);
} else {
console.error('Your browser doesn\'t support SSE');
}
}
connectSSE();總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python3數(shù)據(jù)庫(kù)操作包pymysql的操作方法
這篇文章主要介紹了Python3數(shù)據(jù)庫(kù)操作包pymysql的操作方法,文章通過實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2018-07-07
Python使用os.listdir()和os.walk()獲取文件路徑與文件下所有目錄的方法
今天小編就為大家分享一篇關(guān)于Python使用os.listdir()和os.walk()獲取文件路徑與文件下所有目錄的方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-04-04
Pandas中DataFrame對(duì)象轉(zhuǎn)置(交換行列)
本文主要介紹了Pandas中DataFrame對(duì)象轉(zhuǎn)置(交換行列),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02
windows下pycharm搭建spark環(huán)境并成功運(yùn)行 附源碼
這篇文章主要介紹了windows下pycharm搭建spark環(huán)境并成功運(yùn)行 附源碼,本文分步驟給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04
使用python BeautifulSoup庫(kù)抓取58手機(jī)維修信息
這篇文章主要介紹了一個(gè)使用python抓取58手機(jī)的精準(zhǔn)商家信息,使用BeautifulSoup API的方法2013-11-11

