Netty分布式ByteBuf使用的底層實現(xiàn)方式源碼解析
概述
熟悉Nio的小伙伴應該對jdk底層byteBuffer不會陌生, 也就是字節(jié)緩沖區(qū), 主要用于對網(wǎng)絡(luò)底層io進行讀寫, 當channel中有數(shù)據(jù)時, 將channel中的數(shù)據(jù)讀取到字節(jié)緩沖區(qū), 當要往對方寫數(shù)據(jù)的時候, 將字節(jié)緩沖區(qū)的數(shù)據(jù)寫到channel中
但是jdk的byteBuffer是使用起來有諸多不便, 比如只有一個標記位置的指針position, 在進行讀寫操作時要頻繁的通過flip()方法進行指針位置的移動, 極易出錯, 并且byteBuffer的內(nèi)存一旦分配則不能改變, 不支持動態(tài)擴容, 當讀寫的內(nèi)容大于緩沖區(qū)內(nèi)存時, 則會發(fā)生索引越界異常
而Netty的ByteBuf對jdk的byteBuffer做了重新的定義, 同樣是字節(jié)緩沖區(qū)用于讀取網(wǎng)絡(luò)io中的數(shù)據(jù), 但是使用起來大大簡化, 并且支持了自動擴容, 不用擔心讀寫數(shù)據(jù)大小超過初始分配的大小
byteBuf根據(jù)其分類的不同底層實現(xiàn)方式有所不同, 有直接基于jdk底層byteBuffer實現(xiàn)的, 也有基于字節(jié)數(shù)組的實現(xiàn)的, 對于byteBuf的分類, 在后面的小節(jié)將會講到
byteBuf中維護了兩個指針, 一是讀指針, 二是寫指針, 兩個指針相互獨立, 在讀操作的時候, 只會移動讀指針, 通過指針位置記錄讀取的字節(jié)數(shù)
同樣在寫操作時, 也只會移動寫指針, 通過寫指針的位置記錄寫的字節(jié)數(shù)
在每次讀寫操作的過程中都會對指針的位置進行校驗, 讀指針的位置不能超過寫指針, 否則會拋出異常
同樣, 寫指針不能超過緩沖區(qū)分配的內(nèi)存, 則將對緩沖區(qū)做擴容操作
具體指針操作, 入下圖所示:

AbstractByteBuf屬性和構(gòu)造方法
在講AbstractByteBuf之前, 我們首先先了解一下ByteBuf這個類, 這是所有ByteBuf的最頂層抽象, 里面定義了大量對ByteBuf操作的抽象方法供子類實現(xiàn)
AbstractByteBuf同樣也緩沖區(qū)的抽象類, 定義了byteBuf的骨架操作, 比如參數(shù)校驗, 自動擴容, 以及一些讀寫操作的指針移動, 但具體的實現(xiàn), 不同的bytebuf實現(xiàn)起來是不同的, 這種情況則交給其子類實現(xiàn)
AbstractByteBuf繼承了這個類, 并實現(xiàn)了其大部分的方法
首先看這個類的屬性和構(gòu)造方法
//讀指針
int readerIndex;
//寫指針
int writerIndex;
//保存讀指針
private int markedReaderIndex;
//保存寫指針
private int markedWriterIndex;
//最大分配容量
private int maxCapacity;
protected AbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}
this.maxCapacity = maxCapacity;
}我們可以看到在屬性中定義了讀寫指針的成員標量, 和讀寫指針位置的保存
在構(gòu)造方法中可以傳入可分配的最大內(nèi)存, 然后賦值到成員變量中
我們看幾個最簡單的方法
@Override
public int maxCapacity() {
return maxCapacity;
}
@Override
public int readerIndex() {
return readerIndex;
}
@Override
public int writerIndex() {
return writerIndex;
}獲取最大內(nèi)存, 獲取讀寫指針這些方法, 對所有的bytebuf都是通用的, 所以可以定義在AbstractByteBuf中
我們以一個writeBytes方法為例, 讓同學們熟悉AbstractByteBuf中哪些部分自己實現(xiàn), 哪些部分則交給了子類實現(xiàn):
@Override
public ByteBuf writeBytes(ByteBuf src) {
writeBytes(src, src.readableBytes());
return this;
}這個方法是將源的ByteBuf(參數(shù))中的字節(jié)寫入到自身ByteBuf中
首先這里調(diào)用了自身的writeBytes方法, 并傳入?yún)?shù)ByteBuf本身, 以及Bytebuf的可讀字節(jié)數(shù), 我們跟到readbleBytes()方法中, 其實就是調(diào)用了自身的方法:
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}我們看到, 這里可讀字節(jié)數(shù)就是返回了寫指針到讀指針之間的長度
我們再繼續(xù)跟到writeBytes(src, src.readableBytes())中:
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);
return this;
}這里同樣調(diào)用了自身的方法首先會對參數(shù)進行驗證, 就是寫入自身的長度不能超過源ByteBuf的可讀字節(jié)數(shù)
這里又調(diào)用了一個wirte方法, 參數(shù)傳入源Bytebuf, 其可讀字節(jié)數(shù), 寫入的長度, 這里寫入的長度我們知道就是源ByteBuf的可讀字節(jié)數(shù)
我們再跟到writeBytes(src, src.readerIndex(), length);
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}我們重點關(guān)注第二個校驗方法ensureWritable(length)
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
ensureWritable0(minWritableBytes);
return this;
}然后我們再跟到ensureWritable0(minWritableBytes)方法中:
private void ensureWritable0(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
//自動擴容
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
capacity(newCapacity);
}開始做了兩個參數(shù)校驗, 第一個表示當前ByteBuf寫入的長度如果要小于可寫字節(jié)數(shù), 則返回
第二個可以換種方式去看minWritableBytes+ writerIndex> maxCapacity 也就是需要寫入的長度+寫指針必須要小于最大分配的內(nèi)存, 否則報錯, 注意這里最大分配內(nèi)存不帶表當前內(nèi)存, 而是byteBuf所能分配的最大內(nèi)存
如果需要寫入的長度超過了可寫字節(jié)數(shù), 并且需要寫入的長度+寫指針不超過最大內(nèi)存, 則就開始了ByteBuf非常經(jīng)典也非常重要的操作, 也就是自動擴容
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
其中alloc()返回的是當前bytebuf返回的緩沖區(qū)分配器對象, 我們之后的小節(jié)會講到, 這里調(diào)用了其calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity)方法為其擴容, 其中傳入的參數(shù)writerIndex + minWritableBytes代表所需要的容量, maxCapacity為最大容量
我們跟到擴容的方法里面去
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
//合法性校驗
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
//閾值為4mb
final int threshold = 1048576 * 4;
//最小需要擴容內(nèi)存(總內(nèi)存) == 閾值
if (minNewCapacity == threshold) {
//返回閾值
return threshold;
}
//最小擴容內(nèi)存>閾值
if (minNewCapacity > threshold) {
//newCapacity為需要擴容內(nèi)存
int newCapacity = minNewCapacity / threshold * threshold;
//目標容量+閾值>最大容量
if (newCapacity > maxCapacity - threshold) {
//將最大容量作為新容量
newCapacity = maxCapacity;
} else {
//否則, 目標容量+閾值
newCapacity += threshold;
}
return newCapacity;
}
//如果小于閾值
int newCapacity = 64;
//目標容量<需要擴容的容量
while (newCapacity < minNewCapacity) {
//倍增
newCapacity <<= 1;
}
//目標容量和最大容量返回一個最小的
return Math.min(newCapacity, maxCapacity);
}擴容相關(guān)的邏輯注釋也寫的非常清楚, 如果小于閾值(4mb), 采用倍增的方式, 如果大于閾值(4mb), 采用平移4mb的方式
我們回到writeBytes(ByteBuf src, int srcIndex, int length):
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}再往下看setBytes(writerIndex, src, srcIndex, length), 這里的參數(shù)的意思是從當前byteBuf的writerIndex節(jié)點開始寫入, 將源緩沖區(qū)src的讀指針位置, 寫lenght個字節(jié), 這里的方法中AbstractByteBuf類并沒有提供實現(xiàn), 因為不同類型的BtyeBuf實現(xiàn)的方式是不一樣的, 所以這里交給了子類去實現(xiàn)
最后將寫指針后移length個字節(jié)
最后我們回到writeBytes(ByteBuf src, int length)方法中:
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);
return this;
}當writeBytes(src, src.readerIndex(), length)寫完之后, 通過src.readerIndex(src.readerIndex() + length)將源緩沖區(qū)的讀指針后移lenght個字節(jié)
以上對AbstractByteBuf的簡單介紹和其中寫操作的方法的簡單剖析
以上就是Netty分布式ByteBuf使用的底層實現(xiàn)方式源碼解析的詳細內(nèi)容,更多關(guān)于Netty分布式ByteBuf使用底層實現(xiàn)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot如何讀取application.yml文件
這篇文章主要介紹了springboot如何讀取application.yml文件的方法,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下2020-12-12
java案例實戰(zhàn)之字符串轉(zhuǎn)換為二進制
最近遇到個需求,要求編寫一個程序,從鍵盤錄入一個字符串,將字符串轉(zhuǎn)換為二進制數(shù),下面這篇文章主要給大家介紹了關(guān)于java字符串轉(zhuǎn)換為二進制的相關(guān)資料,需要的朋友可以參考下2023-06-06

