Netty分布式ByteBuf使用subPage級別內(nèi)存分配剖析
上一小節(jié)我們剖析了page級別的內(nèi)存分配邏輯, 這一小節(jié)帶大家剖析有關(guān)subPage級別的內(nèi)存分配
通過之前的學(xué)習(xí)我們知道, 如果我們分配一個緩沖區(qū)大小遠(yuǎn)小于page, 則直接在一個page上進(jìn)行分配則會造成內(nèi)存浪費(fèi), 所以需要將page繼續(xù)進(jìn)行切分成多個子塊進(jìn)行分配, 子塊分配的個數(shù)根據(jù)你要分配的緩沖區(qū)大小而定, 比如只需要分配1k的內(nèi)存, 就會將一個page分成8等分。
subPage級別內(nèi)存分配
簡單起見, 我們這里僅僅以16字節(jié)為例, 講解其分配邏輯
在分析其邏輯前, 首先看PoolArean的一個屬性:
private final PoolSubpage<T>[] tinySubpagePools;
這個屬性是一個PoolSubpage的數(shù)組, 有點(diǎn)類似于一個subpage的緩存, 我們創(chuàng)建一個subpage之后, 會將創(chuàng)建的subpage與該屬性其中每個關(guān)聯(lián), 下次在分配的時候可以直接通過該屬性的元素去找關(guān)聯(lián)的subpage
我們其中是在構(gòu)造方法中初始化的, 看構(gòu)造方法中其初始化代碼
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
這里為numTinySubpagePools為32
跟到newSubpagePoolArray(numTinySubpagePools)方法里:
private PoolSubpage<T>[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; }
這里直接創(chuàng)建了一個PoolSubpage數(shù)組, 長度為32
在構(gòu)造方法中創(chuàng)建完畢之后, 會通過循環(huán)為其賦值
for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); }
我們跟到newSubpagePoolHead中:
private PoolSubpage<T> newSubpagePoolHead(int pageSize) { PoolSubpage<T> head = new PoolSubpage<T>(pageSize); head.prev = head; head.next = head; return head; }
這里創(chuàng)建了一個PoolSubpage對象head
head.prev = head; head.next = head;
這種寫法我們知道Subpage其實(shí)也是個雙向鏈表, 這里的將head的上一個節(jié)點(diǎn)和下一個節(jié)點(diǎn)都設(shè)置為自身, 有關(guān)PoolSubpage的關(guān)聯(lián)關(guān)系, 我們稍后會看到
這樣通過循環(huán)創(chuàng)建PoolSubpage, 總共會創(chuàng)建出32個subpage, 其中每個subpage實(shí)際代表一塊內(nèi)存大小:
5-8-1
這里就有點(diǎn)類之前小節(jié)的緩存數(shù)組tinySubPageDirectCaches的結(jié)構(gòu)
了解了tinySubpagePools屬性, 我們看PoolArean的allocate方法, 也就是緩沖區(qū)的入口方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //規(guī)格化 final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判斷是不是tinty boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //緩存分配 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //通過tinyIdx拿到tableIdx tableIdx = tinyIdx(normCapacity); //subpage的數(shù)組 table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到對應(yīng)的節(jié)點(diǎn) final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默認(rèn)情況下, head的next也是自身 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在緩存上進(jìn)行內(nèi)存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回 return; } //分配不成功, 做實(shí)際的內(nèi)存分配 allocateNormal(buf, reqCapacity, normCapacity); } else { //大于這個值, 就不在緩存上分配 allocateHuge(buf, reqCapacity); } }
之前我們最這個方法剖析過在page級別相關(guān)內(nèi)存分配邏輯, 這一小節(jié)看subpage級別分配的相關(guān)邏輯
假設(shè)我們分配16字節(jié)的緩沖區(qū), isTinyOrSmall(normCapacity)就會返回true, 進(jìn)入if塊
同樣if (tiny)這里會返回true, 繼續(xù)跟到if (tiny)中:
首先會在緩存中分配緩沖區(qū), 如果分配不到, 就開辟一塊內(nèi)存進(jìn)行內(nèi)存分配
首先看這一步:
tableIdx = tinyIdx(normCapacity);
這里通過normCapacity拿到tableIdx
我們跟進(jìn)去:
static int tinyIdx(int normCapacity) { return normCapacity >>> 4; }
這里將normCapacity除以16, 其實(shí)也就是1
我們回到PoolArena的allocate方法繼續(xù)看:
table = tinySubpagePools
這里將tinySubpagePools賦值到局部變量table中, 繼續(xù)往下看
final PoolSubpage<T> head = table[tableIdx]
這步時通過下標(biāo)拿到一個PoolSubpage, 因?yàn)槲覀円?6字節(jié)為例, 所以我們拿到下標(biāo)為1的PoolSubpage, 對應(yīng)的內(nèi)存大小也就是16B
再看 final PoolSubpage<T> s = head.next 這一步, 跟我們剛才了解的的tinySubpagePools屬性, 默認(rèn)情況下head.next也是自身, 所以if (s != head)會返回false, 我們繼續(xù)往下看:
下面, 會走到allocateNormal(buf, reqCapacity, normCapacity)這個方法:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進(jìn)行內(nèi)存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //創(chuàng)建chunk進(jìn)行內(nèi)存分配(2) PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
這里的邏輯我們之前的小節(jié)已經(jīng)剖析過, 首先在原來的chunk中分配, 如果分配不成功, 則會創(chuàng)建chunk進(jìn)行分配
我們看這一步
long handle = c.allocate(normCapacity)
跟到allocate(normCapacity)方法中
long allocate(int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }
上一小節(jié)我們分析page級別分配的時候, 剖析的是allocateRun(normCapacity)方法
因?yàn)檫@里我們是以16字節(jié)舉例, 所以這次我們剖析allocateSubpage(normCapacity)方法, 也就是在subpage級別進(jìn)行內(nèi)存分配
private long allocateSubpage(int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; //表示在第11層分配節(jié)點(diǎn) int id = allocateNode(d); if (id < 0) { return id; } //獲取初始化的subpage final PoolSubpage<T>[] subpages = this.subpages; final int pageSize = this.pageSize; freeBytes -= pageSize; //表示第幾個subpageIdx int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null) { //如果subpage為空 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); //則將當(dāng)前的下標(biāo)賦值為subpage subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } //取出一個子page return subpage.allocate(); } }
首先, 通過 PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity) 這種方式找到head節(jié)點(diǎn), 實(shí)際上這里head, 就是我們剛才分析的tinySubpagePools屬性的第一個節(jié)點(diǎn), 也就是對應(yīng)16B的那個節(jié)點(diǎn)
int d = maxOrder 是將11賦值給d, 也就是在內(nèi)存樹的第11層取節(jié)點(diǎn), 這部分上一小節(jié)剖析過了, 可以回顧圖5-8-5部分
int id = allocateNode(d) 這里獲取的是上一小節(jié)我們分析過的, 字節(jié)數(shù)組memoryMap的下標(biāo), 這里指向一個page, 如果第一次分配, 指向的是0-8k的那個page, 上一小節(jié)對此進(jìn)行詳細(xì)的剖析這里不再贅述
final PoolSubpage<T>[] subpages = this.subpages 這一步, 是拿到PoolChunk中成員變量subpages的值, 也是個PoolSubpage的數(shù)組, 在PoolChunk進(jìn)行初始化的時候, 也會初始化該數(shù)組, 長度為2048
也就是說每個chunk都維護(hù)著一個subpage的列表, 如果每一個page級別的內(nèi)存都需要被切分成子page, 則會將這個這個page放入該列表中, 專門用于分配子page, 所以這個列表中的subpage, 其實(shí)就是一個用于切分的page
5-8-2
int subpageIdx = subpageIdx(id) 這一步是通過id拿到這個PoolSubpage數(shù)組的下標(biāo), 如果id對應(yīng)的page是0-8k的節(jié)點(diǎn), 這里拿到的下標(biāo)就是0
在 if (subpage == null) 中, 因?yàn)槟J(rèn)subpages只是創(chuàng)建一個數(shù)組, 并沒有往數(shù)組中賦值, 所以第一次走到這里會返回true, 跟到if塊中:
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
這里通過new PoolSubpage創(chuàng)建一個新的subpage之后, 通過 subpages[subpageIdx] = subpage 這種方式將新創(chuàng)建的subpage根據(jù)下標(biāo)賦值到subpages中的元素中
在new PoolSubpage的構(gòu)造方法中, 傳入head, 就是我們剛才提到過的tinySubpagePools屬性中的節(jié)點(diǎn), 如果我們分配的16字節(jié)的緩沖區(qū), 則這里對應(yīng)的就是第一個節(jié)點(diǎn)
我們跟到PoolSubpage的構(gòu)造方法中
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { this.chunk = chunk; this.memoryMapIdx = memoryMapIdx; this.runOffset = runOffset; this.pageSize = pageSize; bitmap = new long[pageSize >>> 10]; init(head, elemSize); }
這里重點(diǎn)關(guān)注屬性bitmap, 這是一個long類型的數(shù)組, 初始大小為8, 這里只是初始化的大小, 真正的大小要根據(jù)將page切分多少塊而確定
這里將屬性進(jìn)行了賦值, 我們跟到init方法中:
void init(PoolSubpage<T> head, int elemSize) { doNotDestroy = true; this.elemSize = elemSize; if (elemSize != 0) { maxNumElems = numAvail = pageSize / elemSize; nextAvail = 0; bitmapLength = maxNumElems >>> 6; if ((maxNumElems & 63) != 0) { bitmapLength ++; } for (int i = 0; i < bitmapLength; i ++) { //bitmap標(biāo)識哪個子page被分配 //0標(biāo)識未分配, 1表示已分配 bitmap [i] = 0; } } //加到arena里面 addToPool(head); }
this.elemSize = elemSize 表示保存當(dāng)前分配的緩沖區(qū)大小, 這里我們以16字節(jié)舉例, 所以這里是16
maxNumElems = numAvail = pageSize / elemSize
這里初始化了兩個屬性maxNumElems, numAvail, 值都為pageSize / elemSize, 表示一個page大小除以分配的緩沖區(qū)大小, 也就是表示當(dāng)前page被劃分了多少分
numAvail則表示剩余可用的塊數(shù), 由于第一次分配都是可用的, 所以 numAvail=maxNumElems
bitmapLength表示bitmap的實(shí)際大小, 剛才我們分析過, bitmap初始化的大小為8, 但實(shí)際上并不一定需要8個元素, 元素個數(shù)要根據(jù)page切分的子塊而定, 這里的大小是所切分的子塊數(shù)除以64
再往下看, if ((maxNumElems & 63) != 0) 判斷maxNumElems也就是當(dāng)前配置所切分的子塊是不是64的倍數(shù), 如果不是, 則bitmapLength加1,
最后通過循環(huán), 將其分配的大小中的元素賦值為0
這里詳細(xì)介紹一下有關(guān)bitmap, 這里是個long類型的數(shù)組, long數(shù)組中的每一個值, 也就是long類型的數(shù)字, 其中的每一個比特位, 都標(biāo)記著page中每一個子塊的內(nèi)存是否已分配, 如果比特位是1, 表示該子塊已分配, 如果比特位是0, 表示該子塊未分配, 標(biāo)記順序是其二進(jìn)制數(shù)從低位到高位進(jìn)行排列
這里, 我們應(yīng)該知道為什么bitmap大小要設(shè)置為子塊數(shù)量除以, 64, 因?yàn)閘ong類型的數(shù)字是64位, 每一個元素能記錄64個子塊的數(shù)量, 這樣就可以通過子page個數(shù)除以64的方式?jīng)Q定bitmap中元素的數(shù)量
如果子塊不能整除64, 則通過元素?cái)?shù)量+1方式, 除以64之后剩余的子塊通過long中比特位由低到高進(jìn)行排列記錄
這里的邏輯結(jié)構(gòu)如下所示:
5-8-3
我們跟到addToPool(head)中
private void addToPool(PoolSubpage<T> head) { assert prev == null && next == null; prev = head; next = head.next; next.prev = this; head.next = this; }
這里的head我們剛才講過, 是Arena中數(shù)組tinySubpagePools中的元素, 通過以上邏輯, 就會將新創(chuàng)建的Subpage通過雙向鏈表的方式關(guān)聯(lián)到tinySubpagePools中的元素, 我們以16字節(jié)為例, 關(guān)聯(lián)關(guān)系如圖所示:
5-8-4
這樣, 下次如果還需要分配16字節(jié)的內(nèi)存, 就可以通過tinySubpagePools找到其元素關(guān)聯(lián)的subpage進(jìn)行分配了
我們再回到PoolChunk的allocateSubpage方法中:
private long allocateSubpage(int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; //表示在第11層分配節(jié)點(diǎn) int id = allocateNode(d); if (id < 0) { return id; } //獲取初始化的subpage final PoolSubpage<T>[] subpages = this.subpages; final int pageSize = this.pageSize; freeBytes -= pageSize; //表示第幾個subpageIdx int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null) { //如果subpage為空 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); //則將當(dāng)前的下標(biāo)賦值為subpage subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } //取出一個子page return subpage.allocate(); } }
創(chuàng)建完了一個subpage, 我們就可以通過subpage.allocate()方法進(jìn)行內(nèi)存分配了
我們跟到allocate()方法中
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; } //取一個bitmap中可用的id(絕對id) final int bitmapIdx = getNextAvail(); //除以64(bitmap的相對下標(biāo)) int q = bitmapIdx >>> 6; //除以64取余, 其實(shí)就是當(dāng)前絕對id的偏移量 int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當(dāng)前位標(biāo)記為1 bitmap[q] |= 1L << r; //如果可用的子page為0 //可用的子page-1 if (-- numAvail == 0) { //則移除相關(guān)子page removeFromPool(); } //bitmapIdx轉(zhuǎn)換成handler return toHandle(bitmapIdx); }
這里的邏輯看起來比較復(fù)雜, 這里帶著大家一點(diǎn)點(diǎn)剖析:
首先看:
final int bitmapIdx = getNextAvail();
其中bitmapIdx表示從bitmap中找到一個可用的bit位的下標(biāo), 注意, 這里是bit的下標(biāo), 并不是數(shù)組的下標(biāo), 我們之前分析過, 因?yàn)槊恳槐忍匚淮硪粋€子塊的內(nèi)存分配情況, 通過這個下標(biāo)就可以知道那個比特位是未分配狀態(tài)
我們跟進(jìn)這個方法:
private int getNextAvail() { //nextAvail=0 int nextAvail = this.nextAvail; if (nextAvail >= 0) { //一個子page被釋放之后, 會記錄當(dāng)前子page的bitmapIdx的位置, 下次分配可以直接通過bitmapIdx拿到一個子page this.nextAvail = -1; return nextAvail; } return findNextAvail(); }
這里nextAvail, 表示下一個可用的bitmapIdx, 在釋放的時候的會被標(biāo)記, 標(biāo)記被釋放的子塊對應(yīng)bitmapIdx的下標(biāo), 如果<0則代表沒有被釋放的子塊, 則通過findNextAvail方法進(jìn)行查找
我們繼續(xù)跟進(jìn)findNextAvail方法
private int findNextAvail() { //當(dāng)前l(fā)ong數(shù)組 final long[] bitmap = this.bitmap; //獲取其長度 final int bitmapLength = this.bitmapLength; for (int i = 0; i < bitmapLength; i ++) { //第i個 long bits = bitmap[i]; //!=-1 說明64位沒有全部占滿 if (~bits != 0) { //找下一個節(jié)點(diǎn) return findNextAvail0(i, bits); } } return -1; }
這里會遍歷bitmap中的每一個元素, 如果當(dāng)前元素中所有的比特位并沒有全部標(biāo)記被使用, 則通過findNextAvail0(i, bits)方法挨個往后找標(biāo)記未使用的比特位
再繼續(xù)跟findNextAvail0:
private int findNextAvail0(int i, long bits) { //多少份 final int maxNumElems = this.maxNumElems; //乘以64, 代表當(dāng)前l(fā)ong的第一個下標(biāo) final int baseVal = i << 6; //循環(huán)64次(指代當(dāng)前的下標(biāo)) for (int j = 0; j < 64; j ++) { //第一位為0(如果是2的倍數(shù), 則第一位就是0) if ((bits & 1) == 0) { //這里相當(dāng)于加, 將i*64之后加上j, 獲取絕對下標(biāo) int val = baseVal | j; //小于塊數(shù)(不能越界) if (val < maxNumElems) { return val; } else { break; } } //當(dāng)前下標(biāo)不為0 //右移一位 bits >>>= 1; } return -1; }
這里從當(dāng)前元素的第一個比特位開始找, 直到找到一個標(biāo)記為0的比特位, 并返回當(dāng)前比特位的下標(biāo), 大概流程如下圖所示:
5-8-5
我們回到allocate()方法中
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; } //取一個bitmap中可用的id(絕對id) final int bitmapIdx = getNextAvail(); //除以64(bitmap的相對下標(biāo)) int q = bitmapIdx >>> 6; //除以64取余, 其實(shí)就是當(dāng)前絕對id的偏移量 int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當(dāng)前位標(biāo)記為1 bitmap[q] |= 1L << r; //如果可用的子page為0 //可用的子page-1 if (-- numAvail == 0) { //則移除相關(guān)子page removeFromPool(); } //bitmapIdx轉(zhuǎn)換成handler return toHandle(bitmapIdx); }
找到可用的bitmapIdx之后, 通過 int q = bitmapIdx >>> 6 獲取bitmap中bitmapIdx所屬元素的數(shù)組下標(biāo)
int r = bitmapIdx & 63 表示獲取bitmapIdx的位置是從當(dāng)前元素最低位開始的第幾個比特位
bitmap[q] |= 1L << r 是將bitmap的位置設(shè)置為不可用, 也就是比特位設(shè)置為1, 表示已占用
然后將可用子配置的數(shù)量numAvail減一
如果沒有可用子page的數(shù)量, 則會將PoolArena中的數(shù)組tinySubpagePools所關(guān)聯(lián)的subpage進(jìn)行移除, 移除之后參考圖5-8-1
最后通過toHandle(bitmapIdx)獲取當(dāng)前子塊的handle, 上一小節(jié)我們知道handle指向的是當(dāng)前chunk中的唯一的一塊內(nèi)存, 我們跟進(jìn)toHandle(bitmapIdx)中:
private long toHandle(int bitmapIdx) { return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; }
(long) bitmapIdx << 32 是將bitmapIdx右移32位, 而32位正好是一個int的長度, 這樣, 通過 (long) bitmapIdx << 32 | memoryMapIdx 計(jì)算, 就可以將memoryMapIdx, 也就是page所屬的下標(biāo)的二進(jìn)制數(shù)保存在 (long) bitmapIdx << 32 的低32位中
0x4000000000000000L是一個最高位是1并且所有低位都是0的二進(jìn)制數(shù), 這樣通過按位或的方式可以將 (long) bitmapIdx << 32 | memoryMapIdx 計(jì)算出來的結(jié)果保存在0x4000000000000000L的所有低位中, 這樣, 返回對的數(shù)字就可以指向chunk中唯一的一塊內(nèi)存
我們回到PoolArena的allocateNormal方法中:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進(jìn)行內(nèi)存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //創(chuàng)建chunk進(jìn)行內(nèi)存分配(2) PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
我們分析完了long handle = c.allocate(normCapacity)這步, 這里返回的handle就指向chunk中的某個page中的某個子塊所對應(yīng)的連續(xù)內(nèi)存
最后, 通過iniBuf初始化之后, 將創(chuàng)建的chunk加到ChunkList里面
我們跟到initBuf方法中
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); //bitmapIdx是后面分配subpage時候使用到的 int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); //runOffset(memoryMapIdx):偏移量 //runLength(memoryMapIdx):當(dāng)前節(jié)點(diǎn)的長度 buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } }
這部分在之前的小節(jié)我們剖析過, 相信大家不會陌生, 這里有區(qū)別的是 if (bitmapIdx == 0) 的判斷, 這里的bitmapIdx不會是0, 這樣, 就會走到initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity)方法中
跟到initBufWithSubpage方法:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { assert bitmapIdx != 0; int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache()); }
首先拿到memoryMapIdx, 這里會將我們之前計(jì)算handle傳入, 跟進(jìn)去:
private static int memoryMapIdx(long handle) { return (int) handle; }
這里將其強(qiáng)制轉(zhuǎn)化為int類型, 也就是去掉高32位, 這樣就得到memoryMapIdx
回到initBufWithSubpage方法中
我們注意在buf調(diào)用init方法中的一個參數(shù): runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
這里的偏移量就是, 原來page的偏移量+子塊的偏移量
bitmapIdx & 0x3FFFFFFF 代表當(dāng)前分配的子page是屬于第幾個子page
(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize 表示在當(dāng)前page的偏移量
這樣, 分配的ByteBuf在內(nèi)存讀寫的時候, 就會根據(jù)偏移量進(jìn)行讀寫
最后我們跟到init方法中
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化 assert handle >= 0; assert chunk != null; //在哪一塊內(nèi)存上進(jìn)行分配的 this.chunk = chunk; //這一塊內(nèi)存上的哪一塊連續(xù)內(nèi)存 this.handle = handle; memory = chunk.memory; //偏移量 this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }
這里又是我們熟悉的邏輯, 初始化了屬性之后, 一個緩沖區(qū)分配完成
以上就是Subpage級別的緩沖區(qū)分配邏輯,更多關(guān)于Netty分布式ByteBuf使用subPage內(nèi)存分配的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java Date與String的相互轉(zhuǎn)換詳解
這篇文章主要介紹了Java Date與String的相互轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下2017-02-02mybatis 字段名自動轉(zhuǎn)小寫的實(shí)現(xiàn)
這篇文章主要介紹了mybatis 字段名自動轉(zhuǎn)小寫的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Spring?Boot接口支持高并發(fā)具體實(shí)現(xiàn)代碼
這篇文章主要給大家介紹了關(guān)于Spring?Boot接口支持高并發(fā)具體實(shí)現(xiàn)的相關(guān)資料,在SpringBoot項(xiàng)目中通常我們沒有處理并發(fā)問題,但是使用項(xiàng)目本身還是支持一定的并發(fā)量,需要的朋友可以參考下2023-08-08java編程兩種樹形菜單結(jié)構(gòu)的轉(zhuǎn)換代碼
這篇文章主要介紹了java編程兩種樹形菜單結(jié)構(gòu)的轉(zhuǎn)換代碼,首先介紹了兩種樹形菜單結(jié)構(gòu)的代碼,然后展示了轉(zhuǎn)換器實(shí)例代碼,最后分享了相關(guān)實(shí)例及結(jié)果演示,具有一定借鑒價值,需要的朋友可以了解下。2017-12-12Java形參和實(shí)參的實(shí)例之Integer類型與Int類型用法說明
這篇文章主要介紹了Java形參和實(shí)參的實(shí)例之Integer類型與Int類型用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10MyBatis中XML 映射文件中常見的標(biāo)簽說明
這篇文章主要介紹了MyBatis中XML 映射文件中常見的標(biāo)簽說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Java8中List轉(zhuǎn)換String字符串幾種方式
這篇文章主要給大家介紹了關(guān)于Java8中List轉(zhuǎn)換String字符串的幾種方式,在實(shí)際開發(fā)中經(jīng)常遇到List轉(zhuǎn)為String字符串的情況,文中給出了幾種方法的示例代碼,需要的朋友可以參考下2023-07-07java控制臺實(shí)現(xiàn)學(xué)生管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java控制臺實(shí)現(xiàn)簡單的學(xué)生管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02