基于Nodejs的Tcp封包和解包的理解
我們知道,TCP是面向連接流傳輸?shù)?,其采用Nagle算法,在緩沖區(qū)對(duì)上層數(shù)據(jù)進(jìn)行了處理。避免觸發(fā)自動(dòng)分片機(jī)制和網(wǎng)絡(luò)上大量小數(shù)據(jù)包的同時(shí)也造成了粘包(小包合并)和半包(大包拆分)問題,導(dǎo)致數(shù)據(jù)沒有消息保護(hù)邊界,接收端接收到一次數(shù)據(jù)無法判斷是否是一個(gè)完整數(shù)據(jù)包。那有什么方案可以解決這問題呢?
1、粘包問題解決方案及對(duì)比
很簡單,既然消息沒有邊界,那我們?cè)谙⑼聜髦敖o它加一個(gè)邊界識(shí)別就好了。
- 發(fā)送固定長度的消息
- 使用特殊標(biāo)記來區(qū)分消息間隔
- 把消息的尺寸與消息一塊發(fā)送
第一種方案不夠靈活;第二種有風(fēng)險(xiǎn),如果數(shù)據(jù)內(nèi)剛好有該特殊字符會(huì)出問題;第三種方案雖然要增加對(duì)消息頭的解析,不過相對(duì)而言還是要安全一些。
2、分包與拆包
既然使用第三種方案,就必然涉及到封包和拆包的問題。
首先肯定需要定義數(shù)據(jù)包的結(jié)構(gòu),這類似Http包一樣,有包頭和包體。包頭其實(shí)上是個(gè)大小固定的結(jié)構(gòu)體,其中有個(gè)結(jié)構(gòu)體成員變量表示包體的長度,其他的結(jié)構(gòu)體成員可根據(jù)需要自己定義。根據(jù)包頭長度固定以及包頭中含有包體長度的變量就能正確的拆分出一個(gè)完整的數(shù)據(jù)包。包體則存放數(shù)據(jù)內(nèi)容。
在發(fā)送端,需要進(jìn)行封包。封包就是給一段數(shù)據(jù)加上包頭,這樣一來數(shù)據(jù)包就分為包頭和包體兩部分內(nèi)容了。
在接受端,則需要進(jìn)行拆包。主要流程如下:
1. 為每一個(gè)連接動(dòng)態(tài)分配一個(gè)緩沖區(qū),同時(shí)把此緩沖區(qū)和SOCKET關(guān)聯(lián).
2. 當(dāng)接收到數(shù)據(jù)時(shí)首先把此段數(shù)據(jù)存放在緩沖區(qū)中.
3. 判斷緩存區(qū)中的數(shù)據(jù)長度是否夠一個(gè)包頭的長度,如不夠,則不進(jìn)行拆包操作.
4. 根據(jù)包頭數(shù)據(jù)解析出里面代表包體長度的變量.
5. 判斷緩存區(qū)中除包頭外的數(shù)據(jù)長度是否夠一個(gè)包體的長度,如不夠,則不進(jìn)行拆包操作.
6. 取出整個(gè)數(shù)據(jù)包.這里的"取"的意思是不光從緩沖區(qū)中拷貝出數(shù)據(jù)包,而且要把此數(shù)據(jù)包從緩存區(qū)中刪除掉.刪除的辦法就是把此包后面的數(shù)據(jù)移動(dòng)到緩沖區(qū)的起始地址.
其中對(duì)于緩沖區(qū)的設(shè)計(jì),主要由倆種:
2. 采用環(huán)形緩沖區(qū),定義兩個(gè)指針,分別指向有效數(shù)據(jù)的頭和尾.在存放數(shù)據(jù)和刪除數(shù)據(jù)時(shí)只是進(jìn)行頭尾指針的移動(dòng)
3、網(wǎng)絡(luò)字節(jié)序和本機(jī)字節(jié)序
定義了消息結(jié)構(gòu)之后,發(fā)送端和接收端還需要統(tǒng)一字節(jié)序。我們知道,不同機(jī)器的本機(jī)字節(jié)序不同,絕大多數(shù)X86機(jī)器都是小端字節(jié)序,然后還是由少數(shù)機(jī)器是大端存儲(chǔ)的。因此在數(shù)據(jù)流進(jìn)行傳輸時(shí),必須先統(tǒng)一字節(jié)序。一般約定在傳輸時(shí)采用網(wǎng)絡(luò)字節(jié)序(大端),統(tǒng)一用unicode編碼。
4、代碼實(shí)現(xiàn)
了解以上知識(shí)之后,我們現(xiàn)在之后要做什么了。發(fā)送端按定義的協(xié)議規(guī)則封包,接受端把接收到的buffer放入緩沖區(qū),當(dāng)緩沖區(qū)內(nèi)有完整包時(shí)開始拆包。封包拆包過程需要注意,讀寫超過一個(gè)字節(jié)的數(shù)據(jù)時(shí)需要按大端字節(jié)序讀取。下面看node的代碼實(shí)現(xiàn)(只提供核心實(shí)現(xiàn)片段):
1)發(fā)送端封包:
let head = new Buffer(4); let jsonStr = JSON.stringify(json); let body = new Buffer(jsonStr); //超過一字節(jié)的大端寫入 head.writeInt32BE(body.byteLength, 0); let buffer = Buffer.concat([head, body]);
2)接收端收到buffer入緩沖區(qū):
let dataReadStart = 0; //新數(shù)據(jù)的起始位置 let dataLength = buffer.length; // 要拷貝數(shù)據(jù)的長度 let availableLen = _bufferLength - _dataLen; // 緩沖區(qū)剩余可用空間 // buffer剩余空間不足夠存儲(chǔ)本次數(shù)據(jù) if (availableLen < dataLength) { let newLength = Math.ceil((_dataLen + dataLength) / _bufferLength) * _bufferLength; let _tempBuffer = Buffer.alloc(newLength); // 將舊數(shù)據(jù)復(fù)制到新buffer并且修正相關(guān)參數(shù) if (_writePointer < _readPointer) { // 數(shù)據(jù)存儲(chǔ)在舊buffer的尾部+頭部的順序 let dataTailLen = _bufferLength - _readPointer; _buffer.copy(_tempBuffer, 0, _readPointer, _readPointer + dataTailLen); _buffer.copy(_tempBuffer, dataTailLen, 0, _writePointer); } else { // 數(shù)據(jù)是按照順序進(jìn)行的完整存儲(chǔ) _buffer.copy(_tempBuffer, 0, _readPointer, _writePointer); } _bufferLength = newLength; _buffer = _tempBuffer; _tempBuffer = null; _readPointer = 0; _writePointer = _dataLen; //存儲(chǔ)新到來的buffer buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength); _dataLen += dataLength; _writePointer += dataLength; } else if (_writePointer + dataLength > _bufferLength) { // 空間夠用情況下,但是數(shù)據(jù)會(huì)沖破緩沖區(qū)尾部,部分存到緩沖區(qū)舊數(shù)據(jù)后,一部分存到緩沖區(qū)開始位置 // 緩沖區(qū)尾部剩余空間的長度 let bufferTailLength = _bufferLength - _writePointer; // 數(shù)據(jù)尾部位置 let dataEndPosition = dataReadStart + bufferTailLength; buffer.copy(_buffer, _writePointer, dataReadStart, dataEndPosition); // data剩余未拷貝進(jìn)緩存的長度 let restDataLen = dataLength - bufferTailLength; buffer.copy(_buffer, 0, dataEndPosition, dataLength); _dataLen = _dataLen + dataLength; _writePointer = restDataLen } else { // 剩余空間足夠存儲(chǔ)數(shù)據(jù),直接拷貝數(shù)據(jù)到緩沖區(qū) buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength); _dataLen = _dataLen + dataLength; _writePointer = _writePointer + dataLength }
3)取出緩沖區(qū)所有完整數(shù)據(jù)包(收到的buffer入緩沖區(qū)后)
let _dataHeadLen = 4; timer && clearInterval(timer); timer = setInterval(()=>{ // 緩沖區(qū)數(shù)據(jù)不夠解析出包頭 if (_dataLen < _dataHeadLen) { console.log('數(shù)據(jù)長度小于包頭規(guī)定長度,等待數(shù)據(jù)......') clearInterval(timer); } // 解析包頭長度 // 尾部最后剩余可讀字節(jié)長度 let restDataLen = _bufferLength - _readPointer; let dataLen = 0; let headBuffer = Buffer.alloc(_dataHeadLen); // 數(shù)據(jù)包為分段存儲(chǔ),不能直接解析出包頭,先拼接 if (restDataLen < _dataHeadLen) { // 取出第一部分頭部字節(jié) _buffer.copy(headBuffer, 0, _readPointer, _bufferLength) // 取出第二部分頭部字節(jié) let unReadHeadLen = _dataHeadLen - restDataLen; _buffer.copy(headBuffer, restDataLen, 0, unReadHeadLen) dataLen = headBuffer.readUInt32BE(0); } else { _buffer.copy(headBuffer, 0, _readPointer, _readPointer + _dataHeadLen); dataLen = headBuffer.readUInt32BE(0);; } // 數(shù)據(jù)長度不夠讀取,直接返回 if (_dataLen - _dataHeadLen < dataLen) { log.info("緩沖區(qū)已有body數(shù)據(jù)長度小于包頭定義body的長度,等待數(shù)據(jù)......") clearInterval(timer); } else { // 數(shù)據(jù)夠讀,讀取數(shù)據(jù)包 let package = Buffer.alloc(dataLen); // 數(shù)據(jù)是分段存儲(chǔ),需要分兩次讀取 if (_bufferLength - _readPointer < dataLen) { let firstPartLen = _bufferLength - _readPointer; // 讀取第一部分,直接到字符尾部的數(shù)據(jù) _buffer.copy(package, 0, _readPointer, firstPartLen + _readPointer); // 讀取第二部分,存儲(chǔ)在開頭的數(shù)據(jù) let secondPartLen = dataLen - firstPartLen; _buffer.copy(package, firstPartLen, 0, secondPartLen); _readPointer = secondPartLen; //更新可讀起點(diǎn) } else { // 直接讀取數(shù)據(jù) _buffer.copy(package, 0, _readPointer, _readPointer + dataLen); _readPointer += dataLen; //更新可讀起點(diǎn) } _dataLen -= readData.length; //更新數(shù)據(jù)長度 // 已經(jīng)讀取完所有數(shù)據(jù) if (_readPointer === _writePointer) { clearInterval(timer) } //開始解包 callback(package); } }, 50);
4)拆包得到數(shù)據(jù)
let headBytes = 4; let head = new Buffer(headBytes); buffer.copy(head, 0, 0, headBytes); let dataLen = head.readUInt32BE(); const body = new Buffer(dataLen); buffer.copy(body, 0, headBytes, headBytes + dataLen) let content = null; try { const str = body.toString('utf-8'); if(str === ''){ content = null; }else{ content = JSON.parse(body); } } catch (e) { log.error('head指定body長度有問題') } //傳遞給業(yè)務(wù)層 callback(content);
5、總結(jié)
從上面我們已經(jīng)了解到了封包解包的一個(gè)過程。TCP是可靠傳輸?shù)模粫r(shí)間在網(wǎng)絡(luò)上只會(huì)有一個(gè)數(shù)據(jù)包,并且丟包會(huì)重傳,因此不用擔(dān)心丟包或者數(shù)據(jù)包亂序問題。UDP有消息保護(hù)邊界,不需要進(jìn)行拆包解包,然后其是非可靠傳輸,也需要解決其他一些問題,譬如丟包和數(shù)據(jù)包排序問題。
上面進(jìn)行數(shù)據(jù)包結(jié)構(gòu)設(shè)計(jì)時(shí)只是簡單地加了一個(gè)包體長度,事實(shí)上在業(yè)務(wù)場(chǎng)景可以自由增加需要的字段,譬如協(xié)議版本,協(xié)議類型等等。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Node中node_modules文件夾及package.json文件的作用說明
這篇文章主要介紹了Node中node_modules文件夾及package.json文件的作用說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-09-09npm 更改默認(rèn)全局路徑以及國內(nèi)鏡像的方法
今天小編就為大家分享一篇npm 更改默認(rèn)全局路徑以及國內(nèi)鏡像的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-05-05淺談Node.js 子進(jìn)程與應(yīng)用場(chǎng)景
這篇文章主要介紹了淺談Node.js 子進(jìn)程與應(yīng)用場(chǎng)景,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-01-01nodejs基于WS模塊實(shí)現(xiàn)WebSocket聊天功能的方法
這篇文章主要介紹了nodejs基于WS模塊實(shí)現(xiàn)WebSocket聊天功能的方法,結(jié)合實(shí)例形式分析了nodejs使用WS模塊進(jìn)行WebSocket通信實(shí)現(xiàn)聊天功能的具體操作技巧,需要的朋友可以參考下2018-01-01Node.js操作mysql數(shù)據(jù)庫增刪改查
這篇文章主要介紹使用Node.js操作mysql數(shù)據(jù)庫增刪改查的相關(guān)資料,需要的朋友可以參考下2016-03-03