Java ByteBuffer網(wǎng)絡(luò)編程用法實例解析
做tcp網(wǎng)絡(luò)編程,要解析一批批的數(shù)據(jù),可是數(shù)據(jù)是通過Socket連接的InputStream一次次讀取的,讀取到的不是需要轉(zhuǎn)換的對象,而是要直接根據(jù)字節(jié)流和協(xié)議來生成自己的數(shù)據(jù)對象。
按照之前的編程思維,總是請求然后響應(yīng),當(dāng)然Socket也是請求和響應(yīng),不過與單純的請求響應(yīng)是不同的。
這里Socket連接往往是要保持住的,也就是長連接,然后設(shè)置一個緩沖區(qū),網(wǎng)絡(luò)流不斷的追加到緩沖區(qū)。然后后臺去解析緩沖區(qū)的字節(jié)流。

如圖所示,網(wǎng)絡(luò)的流一直在傳遞,我們收到也許是完成的數(shù)據(jù)流,也可能是沒有傳遞完的。這里就需要監(jiān)視管道,不斷讀取管道中的流數(shù)據(jù),然后向緩沖區(qū)追加。程序從頭開始解析,如果目前緩沖區(qū)包含了數(shù)據(jù),則解析,沒有則放棄繼續(xù)讀取管道流。
就算管道中包含了數(shù)據(jù),也不一定包含了完成的數(shù)據(jù)。例如,100個字節(jié)是一個數(shù)據(jù)體,可是目前緩沖區(qū)內(nèi)包含了120個字節(jié),這就是說緩沖區(qū)包含了一條數(shù)據(jù),但是還有沒有傳遞完的字節(jié)流。那么就要把前100個字節(jié)拿出來解析,然后從緩沖區(qū)清除這100個字節(jié)。那緩沖區(qū)就剩下20個字節(jié)了,這些數(shù)據(jù)可能在下次流中補充完成。
如何建立緩沖?
/**
* 全局MVB數(shù)據(jù)緩沖區(qū) 占用 1M 內(nèi)存
*/
private static ByteBuffer bbuf = ByteBuffer.allocate(10240);
/**
* 線程安全的取得緩沖變量
*/
public static synchronized ByteBuffer getByteBuffer() {
return bbuf;
}
寫一個Socket客戶端,該客戶端得到Socket連接,然后讀取流,一直向緩沖中追加字節(jié)流,每次追加后調(diào)用一個方法來解析該流
public void run() {
Socket socket = GlobalClientKeep.mvbSocket;
if (null != socket) {
try {
// 獲得mvb連接引用
OutputStream ops = socket.getOutputStream();
InputStream ips = socket.getInputStream();
while (true) {
if (null != ops && null != ips) {
// 接收返回信息
byte[] bt = StreamTool.inputStreamToByte(ips);
ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer();
// 設(shè)置到緩沖區(qū)中
bbuf.put(bt);
// ////////////////////////////////////////////////////////////////////////
// 拆包解析方法
splitByte(ops);
ops.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 如果連接存在問題,則必須重新建立
GlobalClientKeep.initMvbSocket();
}
}
關(guān)于如何讀取流,我有一篇博客專門講解了所以這里是直接調(diào)用方法
byte[] bt = StreamTool.inputStreamToByte(ips);
那么解析方法是如何做的?
解析方法首先獲得該緩沖中的所有可用字節(jié),然后判斷是否符合一條數(shù)據(jù)條件,符合就解析。如果符合兩條數(shù)據(jù)條件,則遞歸調(diào)用自己。其中每次解析一條數(shù)據(jù)以后,要從緩沖區(qū)中清除已經(jīng)讀取的字節(jié)信息。
/**
* @說明 拆包解析方法
*/
public static void splitByte(OutputStream ops) {
try {
ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer();
int p = bbuf.position();
int l = bbuf.limit();
// 回繞緩沖區(qū) 一是將 curPointer 移到 0, 二是將 endPointer 移到有效數(shù)據(jù)結(jié)尾
bbuf.flip();
byte[] byten = new byte[bbuf.limit()]; // 可用的字節(jié)數(shù)量
bbuf.get(byten, bbuf.position(), bbuf.limit()); // 得到目前為止緩沖區(qū)所有的數(shù)據(jù)
// 進行基本檢查,保證已經(jīng)包含了一組數(shù)據(jù)
if (checkByte(byten)) {
byte[] len = new byte[4];
// 數(shù)組源,數(shù)組源拷貝的開始位子,目標(biāo),目標(biāo)填寫的開始位子,拷貝的長度
System.arraycopy(byten, 0, len, 0, 4);
int length = StreamTool.bytesToInt(len); // 每個字節(jié)流的最開始肯定是定義本條數(shù)據(jù)的長度
byte[] deco = new byte[length]; // deco 就是這條數(shù)據(jù)體
System.arraycopy(byten, 0, deco, 0, length);
// 判斷消息類型,這個應(yīng)該是從 deco 中解析了,但是下面具體的解析內(nèi)容不再啰嗦
int type = 0;
// 判斷類型分類操作
if (type == 1) {
} else if (type == 2) {
} else if (type == 3) {
} else {
System.out.println("未知的消息類型,解析結(jié)束!");
// 清空緩存
bbuf.clear();
}
// 如果字節(jié)流是多余一組數(shù)據(jù)則遞歸
if (byten.length > length) {
byte[] temp = new byte[bbuf.limit() - length];
// 數(shù)組源,數(shù)組源拷貝的開始位子,目標(biāo),目標(biāo)填寫的開始位子,拷貝的長度
System.arraycopy(byten, length, temp, 0, bbuf.limit() - length);
// 情況緩存
bbuf.clear();
// 重新定義緩存
bbuf.put(temp);
// 遞歸回調(diào)
splitByte(ops);
}else if(byten.length == length){ // 如果只有一條數(shù)據(jù),則直接重置緩沖就可以了
// 清空緩存
bbuf.clear();
}
} else {
// 如果沒有符合格式包含數(shù)據(jù),則還原緩沖變量屬性
bbuf.position(p);
bbuf.limit(l);
}
} catch (Exception e) {
e.printStackTrace();
}
}
代碼只是一個參考,主要講解如何分解緩沖區(qū),和取得緩沖區(qū)的一條數(shù)據(jù),然后清除該數(shù)據(jù)原來站的空間。
至于緩沖區(qū)的屬性,如何得到緩沖區(qū)的數(shù)據(jù),為什么要清空,bbuf.flip();是什么意思。下面來說一下關(guān)于ByteBuffer 的一下事情。
ByteBuffer 中有幾個屬性,其中有兩個很重要。limit和 position。position開始在0,填充數(shù)據(jù)后等于數(shù)據(jù)的長度,而limit是整個緩沖可用的長度。bbuf.flip();之后,position直接變?yōu)?,而limit直接等于position。JDK源碼如下:
/**
* Flips this buffer. The limit is set to the current position and then
* the position is set to zero. If the mark is defined then it is
* discarded.
*
* <p> After a sequence of channel-read or <i>put</i> operations, invoke
* this method to prepare for a sequence of channel-write or relative
* <i>get</i> operations. For example:
*
* <blockquote><pre>
* buf.put(magic); // Prepend header
* in.read(buf); // Read data into rest of buffer
* buf.flip(); // Flip buffer
* out.write(buf); // Write header + data to channel</pre></blockquote>
*
* <p> This method is often used in conjunction with the {@link
* java.nio.ByteBuffer#compact compact} method when transferring data from
* one place to another. </p>
*
* @return This buffer
*/
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
這樣,在position和limit之間的數(shù)據(jù)就是我們要的可用數(shù)據(jù)。
但是position和limit是ByteBuffer在put和get時需要的屬性,所以在使用后要么還原,要么像上面代碼一樣,清除一些字節(jié)信息然后重置。
ByteBuffer 的get和put不是我們平常的取值和設(shè)值一樣,他會操縱一些屬性變化。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
springboot使用國產(chǎn)加密算法方式,sm2和sm3加解密demo
這篇文章主要介紹了springboot使用國產(chǎn)加密算法方式,sm2和sm3加解密demo,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
Java基于Netty實現(xiàn)Http server的實戰(zhàn)
本文主要介紹了Java基于Netty實現(xiàn)Http server的實戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
淺析java并發(fā)中的Synchronized關(guān)鍵詞
這篇文章主要介紹了java并發(fā)中的Synchronized關(guān)鍵詞,本文通過思路代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02

