Netty分布式ByteBuf使用命中緩存的分配解析
上一小節(jié)簡(jiǎn)單分析了directArena內(nèi)存分配大概流程 ,知道其先命中緩存, 如果命中不到, 則區(qū)分配一款連續(xù)內(nèi)存, 這一小節(jié)帶大家剖析命中緩存的相關(guān)邏輯
分析先關(guān)邏輯之前, 首先介紹緩存對(duì)象的數(shù)據(jù)結(jié)構(gòu)
回顧上一小節(jié)的內(nèi)容, 我們講到PoolThreadCache中維護(hù)了三個(gè)緩存數(shù)組(實(shí)際上是六個(gè), 這里僅僅以Direct為例, heap類型的邏輯是一樣的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分別代表tiny類型, small類型和normal類型的緩存數(shù)組
這三個(gè)數(shù)組保存在PoolThreadCache的成員變量中:
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
其中是在構(gòu)造方法中進(jìn)行了初始化:
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);我們以tiny類型為例跟到createSubPageCaches方法中
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}這里上面的小節(jié)已經(jīng)分析過(guò), 這里創(chuàng)建了一個(gè)緩存數(shù)組, 這個(gè)緩存數(shù)組的長(zhǎng)度,也就是numCaches, 在不同的類型, 這個(gè)長(zhǎng)度不一樣, tiny類型長(zhǎng)度是32, small類型長(zhǎng)度為4, normal類型長(zhǎng)度為3
我們知道, 緩存數(shù)組中每個(gè)節(jié)點(diǎn)代表一個(gè)緩存對(duì)象, 里面維護(hù)了一個(gè)隊(duì)列, 隊(duì)列大小由PooledByteBufAllocator類中的tinyCacheSize, smallCacheSize, normalCacheSize屬性決定的, 這里之前小節(jié)已經(jīng)剖析過(guò)
其中每個(gè)緩存對(duì)象, 隊(duì)列中緩存的ByteBuf大小是固定的, netty將每種緩沖區(qū)類型分成了不同長(zhǎng)度規(guī)格, 而每個(gè)緩存中的隊(duì)列緩存的ByteBuf的長(zhǎng)度, 都是同一個(gè)規(guī)格的長(zhǎng)度, 而緩沖區(qū)數(shù)組的長(zhǎng)度, 就是規(guī)格的數(shù)量
比如, 在tiny類型中, netty將其長(zhǎng)度分成32個(gè)規(guī)格, 每個(gè)規(guī)格都是16的整數(shù)倍, 也就是包含0B, 16B, 32B, 48B, 64B, 80B, 96B......496B總共32種規(guī)格, 而在其緩存數(shù)組tinySubPageDirectCaches中, 這每一種規(guī)格代表數(shù)組中的一個(gè)緩存對(duì)象緩存的ByteBuf的大小, 我們以tinySubPageDirectCaches[1]為例(這里下標(biāo)選擇1是因?yàn)橄聵?biāo)為0代表的規(guī)格是0B, 其實(shí)就代表一個(gè)空的緩存, 這里不進(jìn)行舉例), 在tinySubPageDirectCaches[1]的緩存對(duì)象中所緩存的ByteBuf的緩沖區(qū)長(zhǎng)度是16B, 在tinySubPageDirectCaches[2]中緩存的ByteBuf長(zhǎng)度都為32B, 以此類推, tinySubPageDirectCaches[31]中緩存的ByteBuf長(zhǎng)度為496B
有關(guān)類型規(guī)則的分配如下:
tiny:總共32個(gè)規(guī)格, 均是16的整數(shù)倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B
small:4種規(guī)格, 512b, 1k, 2k, 4k
nomal:3種規(guī)格, 8k, 16k, 32k
這樣, PoolThreadCache中緩存數(shù)組的數(shù)據(jù)結(jié)構(gòu)為

大概了解緩存數(shù)組的數(shù)據(jù)結(jié)構(gòu), 我們?cè)倮^續(xù)剖析在緩沖中分配內(nèi)存的邏輯
回到PoolArena的allocate方法中
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//規(guī)格化
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) {
int tableIdx;
PoolSubpage<T>[] table;
//判斷是不是tinty
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
//緩存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
return;
}
//通過(guò)tinyIdx拿到tableIdx
tableIdx = tinyIdx(normCapacity);
//subpage的數(shù)組
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
//拿到對(duì)應(yīng)的節(jié)點(diǎn)
final PoolSubpage<T> head = table[tableIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;
//默認(rèn)情況下, head的next也是自身
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
if (normCapacity <= chunkSize) {
//首先在緩存上進(jìn)行內(nèi)存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
//分配成功, 返回
return;
}
//分配不成功, 做實(shí)際的內(nèi)存分配
allocateNormal(buf, reqCapacity, normCapacity);
} else {
//大于這個(gè)值, 就不在緩存上分配
allocateHuge(buf, reqCapacity);
}
}首先通過(guò)normalizeCapacity方法進(jìn)行內(nèi)存規(guī)格化
我們跟到normalizeCapacity方法中
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return reqCapacity;
}
//如果>tiny
if (!isTiny(reqCapacity)) { // >= 512
//找一個(gè)2的冪次方的數(shù)值, 確保數(shù)值大于等于reqCapacity
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
return normalizedCapacity;
}
//如果是16的倍數(shù)
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
//不是16的倍數(shù), 變成最大小于當(dāng)前值的值+16
return (reqCapacity & ~15) + 16;
}if (!isTiny(reqCapacity)) 代表如果大于tiny類型的大小, 也就是512, 則會(huì)找一個(gè)2的冪次方的數(shù)值, 確保這個(gè)數(shù)值大于等于reqCapacity
如果是tiny, 則繼續(xù)往下
if ((reqCapacity & 15) == 0) 這里判斷如果是16的倍數(shù), 則直接返回
如果不是16的倍數(shù), 則返回 (reqCapacity & ~15) + 16 , 也就是變成最小大于當(dāng)前值的16的倍數(shù)值
從上面規(guī)格化邏輯看出, 這里將緩存大小規(guī)格化成固定大小, 確保每個(gè)緩存對(duì)象緩存的ByteBuf容量統(tǒng)一
回到allocate方法中
if(isTinyOrSmall(normCapacity)) 這里是根據(jù)規(guī)格化后的大小判斷是否tiny或者small類型, 我們跟到方法中:
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}這里是判斷如果normCapacity小于一個(gè)page的大小, 也就是8k代表其實(shí)tiny或者small
繼續(xù)看allocate方法:
如果當(dāng)前大小是tiny或者small, 則isTiny(normCapacity)判斷是否是tiny類型, 跟進(jìn)去:
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}這里是判斷如果小于512, 則認(rèn)為是tiny
再繼續(xù)看allocate方法:
如果是tiny, 則通過(guò)cache.allocateTiny(this, buf, reqCapacity, normCapacity)在緩存上進(jìn)行分配
我們就以tiny類型為例, 分析在緩存上分配ByteBuf的流程
allocateTiny是緩存分配的入口
我們跟進(jìn)去, 進(jìn)入到了PoolThreadCache的allocateTiny方法中:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}這里有個(gè)方法cacheForTiny(area, normCapacity), 這個(gè)方法的作用是根據(jù)normCapacity找到tiny類型緩存數(shù)組中的一個(gè)緩存對(duì)象
我們跟進(jìn)cacheForTiny:
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}PoolArena.tinyIdx(normCapacity)是找到tiny類型緩存數(shù)組的下標(biāo)
繼續(xù)跟tinyIdx:
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}這里直接將normCapacity除以16, 通過(guò)前面的內(nèi)容我們知道, tiny類型緩存數(shù)組中每個(gè)元素規(guī)格化的數(shù)據(jù)都是16的倍數(shù), 所以通過(guò)這種方式可以找到其下標(biāo), 參考圖5-2, 如果是16B會(huì)拿到下標(biāo)為1的元素, 如果是32B則會(huì)拿到下標(biāo)為2的元素
回到acheForTiny方法中
if (area.isDirect()) 這里判斷是否是分配堆外內(nèi)存, 因?yàn)槲覀兪前凑斩淹鈨?nèi)存進(jìn)行舉例, 所以這里為true
再繼續(xù)跟到cache(tinySubPageDirectCaches, idx)方法中:
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}這里我們看到直接通過(guò)下標(biāo)的方式拿到了緩存數(shù)組中的對(duì)象
回到PoolThreadCache的allocateTiny方法中:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}拿到了緩存對(duì)象之后, 我們跟到allocate(cacheForTiny(area, normCapacity), buf, reqCapacity)方法中:
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}這里通過(guò)cache.allocate(buf, reqCapacity)進(jìn)行繼續(xù)進(jìn)行分配
再繼續(xù)往里跟, 跟到內(nèi)部類MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法中:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
++ allocations;
return true;
}這里首先通過(guò)queue.poll()這種方式彈出一個(gè)entry, 我們之前的小節(jié)分析過(guò), MemoryRegionCache維護(hù)著一個(gè)隊(duì)列, 而隊(duì)列中的每一個(gè)值是一個(gè)entry
我們簡(jiǎn)單看下Entry這個(gè)類
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;
//代碼省略
}這里重點(diǎn)關(guān)注chunk和handle的這兩個(gè)屬性, chunk代表一塊連續(xù)的內(nèi)存, 我們之前簡(jiǎn)單介紹過(guò), netty是通過(guò)chunk為單位進(jìn)行內(nèi)存分配的, 我們之后會(huì)對(duì)chunk進(jìn)行剖析
handle相當(dāng)于一個(gè)指針, 可以唯一定位到chunk里面的一塊連續(xù)的內(nèi)存, 之后也會(huì)詳細(xì)分析
這樣, 通過(guò)chunk和handle就可以定位ByteBuf中指定一塊連續(xù)內(nèi)存, 有關(guān)ByteBuf相關(guān)的讀寫, 都會(huì)在這塊內(nèi)存中進(jìn)行
我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
++ allocations;
return true;
}彈出entry之后, 通過(guò)initBuf(entry.chunk, entry.handle, buf, reqCapacity)這種方式給ByteBuf初始化, 這里參數(shù)傳入我們剛才分析過(guò)的當(dāng)前Entry的chunk和hanle
因?yàn)槲覀兎治龅膖iny類型的緩存對(duì)象是SubPageMemoryRegionCache類型,所以我們繼續(xù)跟到SubPageMemoryRegionCache類的initBuf(entry.chunk, entry.handle, buf, reqCapacity)方法中:
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}這里的chunk調(diào)用了initBufWithSubpage(buf, handle, reqCapacity)方法, 其實(shí)就是PoolChunk類中的方法
我們繼續(xù)跟initBufWithSubpage:
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
}這里有關(guān)bitmapIdx(handle)相關(guān)的邏輯, 會(huì)在后續(xù)的章節(jié)進(jìn)行剖析, 這里繼續(xù)往里跟:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
}這里我們先關(guān)注init方法, 因?yàn)槲覀兪且訮ooledUnsafeDirectByteBuf為例, 所以這里走的是PooledUnsafeDirectByteBuf的init方法
跟進(jìn)init方法
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
initMemoryAddress();
}首先調(diào)用了父類的init方法, 再跟進(jìn)去:
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
//初始化
assert handle >= 0;
assert chunk != null;
//在哪一塊內(nèi)存上進(jìn)行分配的
this.chunk = chunk;
//這一塊內(nèi)存上的哪一塊連續(xù)內(nèi)存
this.handle = handle;
memory = chunk.memory;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
this.cache = cache;
}這里將PooledUnsafeDirectByteBuf的各個(gè)屬性進(jìn)行了初始化
this.chunk = chunk 這里初始化了chunk, 代表當(dāng)前的ByteBuf是在哪一塊內(nèi)存中分配的
this.handle = handle 這里初始化了handle, 代表當(dāng)前的ByteBuf是這塊內(nèi)存的哪個(gè)連續(xù)內(nèi)存
有關(guān)offset和length, 我們會(huì)在之后的小節(jié)進(jìn)行分析, 在這里我們只需要知道, 通過(guò)緩存分配ByteBuf, 我們只需要通過(guò)一個(gè)chunk和handle, 就可以確定一塊內(nèi)存
以上就是通過(guò)緩存分配ByteBuf對(duì)象的過(guò)程
我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
++ allocations;
return true;
}分析完了initBuf方法, 再繼續(xù)往下看
entry.recycle()這步是將entry對(duì)象進(jìn)行回收, 因?yàn)閑ntry對(duì)象彈出之后沒有再被引用, 可能gc會(huì)將entry對(duì)象回收, netty為了將對(duì)象進(jìn)行循環(huán)利用, 就將其放在對(duì)象回收站進(jìn)行回收
我們跟進(jìn)recycle方法
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}chunk = null和handle = -1表示當(dāng)前Entry不指向任何一塊內(nèi)存
recyclerHandle.recycle(this) 將當(dāng)前entry回收, 有關(guān)對(duì)象回收站, 我們會(huì)在后面的章節(jié)詳細(xì)剖析
以上就是命中緩存的流程, 因?yàn)檫@里我們是假設(shè)緩中有值的情況下進(jìn)行分配的, 如果第一次分配, 緩存中是沒有值的, 那么在緩存中沒有值的情況下, netty是如何進(jìn)行分配的呢?我們?cè)僦蟮男」?jié)會(huì)進(jìn)行剖析
更多關(guān)于Netty分布式ByteBuf使用命中緩存分配的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot開發(fā)實(shí)戰(zhàn)系列之動(dòng)態(tài)定時(shí)任務(wù)
在我們?nèi)粘5拈_發(fā)中,很多時(shí)候,定時(shí)任務(wù)都不是寫死的,而是寫到數(shù)據(jù)庫(kù)中,從而實(shí)現(xiàn)定時(shí)任務(wù)的動(dòng)態(tài)配置,下面這篇文章主要給大家介紹了關(guān)于SpringBoot開發(fā)實(shí)戰(zhàn)系列之動(dòng)態(tài)定時(shí)任務(wù)的相關(guān)資料,需要的朋友可以參考下2021-08-08
Mybatis 如何批量刪除數(shù)據(jù)的實(shí)現(xiàn)示例
這篇文章主要介紹了Mybatis 如何批量刪除數(shù)據(jù)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Spring Security中用JWT退出登錄時(shí)遇到的坑
使用了JWT后,每次請(qǐng)求都要攜帶 Bearer Token 并且被專門的過(guò)濾器攔截解析之后才能將用戶認(rèn)證信息保存到 SecurityContext 中去,接下來(lái)通過(guò)本文給大家介紹Spring Security中用JWT退出登錄時(shí)遇到的坑,感興趣的朋友一起看看吧2021-10-10
MyBatis中傳入?yún)?shù)parameterType類型詳解
這篇文章主要給大家介紹了關(guān)于MyBatis中傳入?yún)?shù)parameterType類型的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2018-04-04
Java中雙重檢查鎖(double checked locking)的正確實(shí)現(xiàn)
雙重檢查鎖(Double-Check Locking),顧名思義,通過(guò)兩次檢查,并基于加鎖機(jī)制,實(shí)現(xiàn)某個(gè)功能,下面這篇文章主要給大家介紹了關(guān)于Java中雙重檢查鎖(double checked locking)的相關(guān)資料,需要的朋友可以參考下2021-09-09
java 用遞歸獲取一個(gè)目錄下的所有文件路徑的小例子
還是日志的問(wèn)題,log4j生成的日志文件,自動(dòng)保存到月份所在的文件夾中,需要獲取到所有的日志文件,包括文件夾2013-09-09
SpringBoot使用JUL實(shí)現(xiàn)日志記錄功能
在SpringBoot中,我們可以使用多種日志框架進(jìn)行日志記錄,其中,JUL(Java Util Logging)是Java平臺(tái)自帶的日志框架,它提供了簡(jiǎn)單的 API 和配置,可以輕松地進(jìn)行日志記錄,本文將介紹如何在 SpringBoot中使用JUL進(jìn)行日志記錄,并提供示例代碼2023-06-06
Java面向?qū)ο蠡A(chǔ)知識(shí)之委托和lambda
這篇文章主要介紹了Java面向?qū)ο蟮闹泻?lambda,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有很好的幫助,需要的朋友可以參考下2021-11-11

