Node.js高級編程之UDP可靠性分析
前言
UDP 協(xié)議是我們平時較少接觸到的知識,不同于 TCP,它是“不可靠”的,今天我們就來實戰(zhàn)一下看下它到底怎么個不可靠法?
不可靠的 UDP
實驗前,我們先介紹一下需要用到的工具(Mac 環(huán)境,其他環(huán)境請自行搜索相關(guān)工具):
- Network Link Conditioner:模擬丟包場景,可以去蘋果開發(fā)者網(wǎng)站上下載
- Wireshark:抓包分析工具
- 云主機:因為實現(xiàn)發(fā)現(xiàn) Network Link Conditioner 對本地回環(huán)地址不起作用,如果有更好的方法求大佬指出
然后我們準備兩段代碼,一段作為 UDP Server,一段作為 UDP Client,Client 會向 Server 發(fā)送 26 個英文大寫字母,Server 會將他們存到文件:
// udp-server.js const udp = require('dgram') const server = udp.createSocket('udp4') const fs = require('fs') server.on('listening', function () { var address = server.address() var port = address.port console.log('Server is listening at port ' + port) }) server.on('message', function (msg, info) { console.log( `Data received from ${info.address}:${info.port}: ${msg.toString()}` ) fs.appendFileSync('./out', msg.toString()) }) server.on('error', function (error) { console.log('Error: ' + error) server.close() }) server.bind(7788) // udp-client.js const udp = require('dgram') const client = udp.createSocket('udp4') for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.send(Buffer.from(char), 7788, '********', function (error) { if (error) { console.log(error) } }) }
接著我們按照下面步驟開始實驗:
- 通過 Network Link Conditioner 把丟包率設置為 50%:
- 設置好 Wireshark 的抓包參數(shù):
- 在云主機上啟動 Server,在本地啟動 Client。
接著,我們來看一下實驗結(jié)果:
- 首先,我們可以看到服務端接收到的字母少了很多,只有 14 個:
- 服務端接收到的字母順序是亂序的,比如 U 跑到了 T 的前面:
為了進行對比,我們可以換成 TCP 試試,代碼如下,結(jié)果就不貼了:
// tcp-server.js const net = require('net') const server = net.createServer() const fs = require('fs') server.on('connection', function (conn) { conn.on('data', (msg) => { console.log( `Data received from ${conn.address().address}:${ conn.address().port }: ${msg.toString()}` ) fs.appendFileSync('./out', msg.toString()) }) }) server.listen(8899, () => { console.log('server listening to %j', server.address().port) }) // tcp-client.js var net = require('net') var client = new net.Socket() client.connect(8899, '********', function () { for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.write(char) } })
接下我們試試基于 UDP 來實現(xiàn)一個可靠的傳輸協(xié)議,主要解決上面的丟包和亂序問題。
基于 UDP 的簡單可靠傳輸協(xié)議
首先,需要設計一下我們的協(xié)議格式。為了簡單起見,我們只在原來 UDP 的數(shù)據(jù)部分分別新增 4 個字節(jié)的 SEQ 和 ACK:
+-------------------------------+ | 64 個字節(jié)的 UDP 首部 | +-------------------------------+ | SEQ(4 個字節(jié)) | ACK(4 個字節(jié)) | +-------------------------------+ | Data | +-------------------------------+
其中 SEQ 表示當前包的序號,ACK 表示回復序號。
接下來看看,我們?nèi)绾谓鉀Q前面的兩個問題。
亂序問題
接收方需要維護一個變量 expectedSeq
的變量表示期待接收到的包序號。為了簡單起見,我們制定如下規(guī)則:如果當前接收到的包序號等于 expectedSeq
,則把包交給應用層處理,并發(fā)送 ACK 給發(fā)送方;否則我們都直接丟棄。當然更好的做法是維護一個接收窗口,這樣可以批量的提交數(shù)據(jù)給應用層,也可以用來緩存大于 expectedSeq
的包。
假設現(xiàn)在發(fā)送方發(fā)送了 1 2 3 兩個包,但是到達接收方的順序是 3 2 1,按照我們的規(guī)則接收方會丟棄 3 和 2,接收 1。好家伙,順序倒是不亂了,但是包沒了。
所以還得把丟包問題也解決了才行。
丟包問題
發(fā)送方維護一個發(fā)送窗口用來存儲已發(fā)送但是還未被確認的包:
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+
發(fā)送方每發(fā)送一個包的同時還需要將包放入發(fā)送窗口,并設置一個定時器用來重發(fā)這個包。當發(fā)送方接收到來自接收方的 ACK 時,需要取消掉對應包的定時器,并將發(fā)送窗口中小于 ACK 的包都刪除。
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+ // ACK = 4,刪除 1 2 3,并取消掉他們的定時器 +---+ | 4 | +---+
完整代碼及使用 Demo 見文末,現(xiàn)在可以正常按順序輸出 26 個字母了,但是離“可靠”協(xié)議還差得遠。比如第一次輸出完 26 個字母后,我們再次啟動客戶端時發(fā)現(xiàn)就沒有任何輸出了。原因在于此時接收端的 expectedSeq
已經(jīng)是 20 多了,但是新啟動的 client 發(fā)送的 SEQ 還是從 1 開始的,結(jié)果就是接收端一直丟棄接收到的包,發(fā)送端一直重試。
要解決這個問題,可以參考 TCP 在傳輸兩端建立“連接”的概念,在開始發(fā)送前通過“三次握手”建立連接,也就是確定起始 SEQ,初始化窗口等工作,結(jié)束前通過“四次揮手”斷開連接,即清理窗口定時器等工作。這個就留到以后再說吧。
代碼
// packet.js class Packet { constructor({seq, ack, data = ''}) { this.seq = seq // 序列號 this.ack = ack // 確認號 this.data = data // 數(shù)據(jù) } // 將 Packet 轉(zhuǎn)換成 Buffer,以便通過網(wǎng)絡傳輸 toBuffer() { const seqBuffer = Buffer.alloc(4) seqBuffer.writeUInt32BE(this.seq) const ackBuffer = Buffer.alloc(4) ackBuffer.writeUInt32BE(this.ack) const dataBuffer = Buffer.from(this.data) return Buffer.concat([seqBuffer, ackBuffer, dataBuffer]) } // 從 Buffer 中解析出 Packet static fromBuffer(buffer) { const seq = buffer.readUInt32BE() const ack = buffer.readUInt32BE(4) const data = buffer.slice(8) return new Packet({seq, ack, data}) } } module.exports = Packet // reliableUDP.js const dgram = require('dgram') const Packet = require('./packet') class ReliableUDP { constructor() { this.socket = dgram.createSocket('udp4') this.socket.on('message', this.handleMessage.bind(this)) this.sendWindow = [] // 發(fā)送窗口,用于存放待確認的數(shù)據(jù)包 this.receiveWindow = [] // 接收窗口,用于存放已接收的數(shù)據(jù)包 this.expectedSeq = 1 // 期望接收的數(shù)據(jù)包序列號 this.nextSeq = 1 // 下一個要發(fā)送的數(shù)據(jù)包序列號 this.timeout = 100 // 超時時間,單位為毫秒 this.timeoutIds = {} // 用于存放定時器 ID } listen(port, address, fn) { this.socket.bind(port, address, fn) } // 發(fā)送數(shù)據(jù)包 sendPacket(packet, address, port) { const buffer = packet.toBuffer() this.socket.send(buffer, port, address, (err) => { if (err) { console.error(err) } }) if (packet.ack) return if (!this.sendWindow.includes((p) => p.seq === packet.seq)) this.sendWindow.push(packet) // 設置超時定時器 const timeoutId = setTimeout(() => { this.handleTimeout(packet.seq, address, port) }, this.timeout) this.timeoutIds[packet.seq] = timeoutId } // 處理接收到的數(shù)據(jù)包 handleMessage(msg, rinfo) { const {address, port} = rinfo const packet = Packet.fromBuffer(msg) // 收到的是應答的包 if (packet.ack) { const ackNum = packet.ack - 1 // 處理發(fā)送窗口中已經(jīng)確認的數(shù)據(jù)包 while (this.sendWindow.length > 0 && this.sendWindow[0].seq <= ackNum) { this.sendWindow.shift() } // 清除超時定時器 if (this.timeoutIds[ackNum]) { clearTimeout(this.timeoutIds[ackNum]) delete this.timeoutIds[ackNum] } } else { // 如果是重復的數(shù)據(jù)包,則忽略 if (packet.seq < this.expectedSeq) { return } // 如果是期望接收的數(shù)據(jù)包 if (packet.seq === this.expectedSeq) { this.receiveWindow.push(packet) this.expectedSeq++ // 處理接收窗口中已經(jīng)確認的數(shù)據(jù)包 while ( this.receiveWindow.length > 0 && this.receiveWindow[0].seq <= this.expectedSeq ) { const packet = this.receiveWindow.shift() this.onPacketReceived(packet.data) } const ackPacket = new Packet({ seq: this.nextSeq++, ack: this.expectedSeq, }) this.sendPacket(ackPacket, address, port) } else { // 如果是未來的數(shù)據(jù)包,暫不做處理,更好的做法是緩存起來 } } } // 應用層調(diào)用該方法發(fā)送數(shù)據(jù) send(data, address, port) { const packet = new Packet({ seq: this.nextSeq, ack: null, data, }) this.sendPacket(packet, address, port) this.nextSeq++ } // 應用層調(diào)用該方法注冊回調(diào)函數(shù),接收數(shù)據(jù) onReceive(callback) { this.onPacketReceived = callback } // 處理超時 handleTimeout(seq, address, port) { // 重傳超時的數(shù)據(jù)包 const packet = this.sendWindow.find((p) => p.seq === seq) if (packet) { this.sendPacket(packet, address, port) } } } module.exports = ReliableUDP // server.js const ReliableUDP = require('./reliableUDP') const server = new ReliableUDP() server.listen(7788, 'localhost') server.onReceive((data) => { console.log(data.toString()) }) // client.js const ReliableUDP = require('./reliableUDP') const client = new ReliableUDP() for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.send(char, 'localhost', 7788) }
以上就是Node.js高級編程之UDP可靠性分析的詳細內(nèi)容,更多關(guān)于Node.js高級編程UDP的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
nodejs16.15.0版本如何解決node-sass和sass-loader版本沖突問題
這篇文章主要介紹了nodejs16.15.0版本如何解決node-sass和sass-loader版本沖突問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08Node?文件查找優(yōu)先級及?Require?方法文件查找策略
這篇文章主要介紹了Node文件查找優(yōu)先級及Require方法文件查找策略。文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09nodejs結(jié)合Socket.IO實現(xiàn)websocket即時通訊
websocket 是一種網(wǎng)絡通信協(xié)議,一般用來進行實時通信會使用到。本文主要介紹了nodejs結(jié)合Socket.IO實現(xiàn)websocket即時通訊 ,感興趣的可以了解一下2021-11-11Node.js+jade抓取博客所有文章生成靜態(tài)html文件的實例
下面小編就為大家?guī)硪黄狽ode.js+jade抓取博客所有文章生成靜態(tài)html文件的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-09-09node.js中的querystring.unescape方法使用說明
這篇文章主要介紹了node.js中的querystring.unescape方法使用說明,本文介紹了querystring.unescape的方法說明、語法、接收參數(shù)、使用實例和實現(xiàn)源碼,需要的朋友可以參考下2014-12-12win系統(tǒng)下nodejs環(huán)境安裝配置
這篇文章主要介紹了win系統(tǒng)下nodejs環(huán)境安裝配置的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05