Netty分布式ByteBuf中PooledByteBufAllocator剖析
前言
上一小節(jié)簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩沖區(qū)分類的邏輯, 這一小節(jié)開始帶大家剖析更為復(fù)雜的PooledByteBufAllocator, 我們知道PooledByteBufAllocator是通過自己取一塊連續(xù)的內(nèi)存進(jìn)行ByteBuf的封裝, 所以這里更為復(fù)雜, 在這一小節(jié)簡單講解有關(guān)PooledByteBufAllocator分配邏輯
友情提示: 從這一節(jié)開始難度開始加大, 請(qǐng)各位戰(zhàn)友做好心理準(zhǔn)備
PooledByteBufAllocator分配邏輯
PooledByteBufAllocator同樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個(gè)抽象方法, 我們這一小節(jié)以newDirectBuffer為例, 先簡述一下其邏輯
邏輯簡述
首先看UnPooledByteBufAllocator中newDirectBuffer這個(gè)方法
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個(gè)線程局部緩存對(duì)象, 線程局部緩存, 顧明思議, 就是同一個(gè)線程共享的一個(gè)緩存
threadCache是PooledByteBufAllocator類的一個(gè)成員變量, 類型是PoolThreadLocalCache(這兩個(gè)非常容易混淆, 切記):
private final PoolThreadLocalCache threadCache;
再看其類型PoolThreadLocalCache的定義:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略 }
這里繼承了一個(gè)FastThreadLocal類, 這個(gè)類相當(dāng)于jdk的ThreadLocal, 只是性能更快, 有關(guān)FastThreadLocal, 我們?cè)诤竺娴恼鹿?jié)會(huì)詳細(xì)剖析, 這里我們只要知道, 繼承FastThreadLocal類并且重寫了initialValue方法, 則通過其get方法就能獲得initialValue返回的對(duì)象, 并且這個(gè)對(duì)象是線程共享的
在這里我們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個(gè)屬性之后, 通過new PoolThreadCache()這種方式創(chuàng)建了PoolThreadCache對(duì)象
這里注意, PoolThreadLocalCache是一個(gè)FastThreadLocal, 而PoolThreadCache才是線程局部緩存, 這兩個(gè)類名非常非常像, 千萬別搞混了(我當(dāng)初讀這段代碼時(shí)因?yàn)楦慊焖糟卤屏?
其中heapArena和directArena是分別是用來分配堆和堆外內(nèi)存用的兩個(gè)對(duì)象, 以directArena為例, 我們看到是通過leastUsedArena(directArenas)這種方式獲得的, directArenas是一個(gè)directArena類型的數(shù)組, leastUsedArena(directArenas)這個(gè)方法是用來獲取數(shù)組中一個(gè)使用最少的directArena對(duì)象
directArenas是PooledByteBufAllocator的成員變量, 是在其構(gòu)造方法中初始化的:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { //代碼省略 if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } }
我們看到這里通過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 默認(rèn)是cpu核心數(shù)的2倍, 這點(diǎn)我們可以跟蹤構(gòu)造方法的調(diào)用鏈可以分析到
這樣保證了每一個(gè)線程會(huì)有一個(gè)獨(dú)享的arena
我們看newArenaArray(nDirectArena)這個(gè)方法:
private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }
這里只是創(chuàng)建了一個(gè)數(shù)組, 默認(rèn)長度為nDirectArena
繼續(xù)跟PooledByteBufAllocator的構(gòu)造方法, 創(chuàng)建完了數(shù)組, 后面在for循環(huán)中為數(shù)組賦值:
首先通過new PoolArena.DirectArena創(chuàng)建一個(gè)DirectArena實(shí)例, 然后再為新創(chuàng)建的directArenas數(shù)組賦值
再回到PoolThreadLocalCache的構(gòu)造方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略 }
方法最后, 創(chuàng)建PoolThreadCache的一個(gè)對(duì)象, 我們跟進(jìn)構(gòu)造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略 //保存成兩個(gè)成員變量 this.heapArena = heapArena; this.directArena = directArena; //代碼省略 }
這里省略了大段代碼, 只需要關(guān)注這里將兩個(gè)值保存在PoolThreadCache的成員變量中
我們回到newDirectBuffer中
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
簡單分析的線程局部緩存初始化相關(guān)邏輯, 我們?cè)偻赂?
PoolArena<ByteBuffer> directArena = cache.directArena;
通過上面的分析, 這步我們應(yīng)該不陌生, 在PoolThreadCache構(gòu)造方法中將directArena和heapArena中保存在成員變量中, 這樣就可以直接通過cache.directArena這種方式拿到其成員變量的內(nèi)容
從以上邏輯, 我們可以大概的分析一下流程, 通常會(huì)創(chuàng)建和線程數(shù)量相等的arena, 并以數(shù)組的形式存儲(chǔ)在PooledByteBufAllocator的成員變量中, 每一個(gè)PoolThreadCache創(chuàng)建的時(shí)候, 都會(huì)在當(dāng)前線程拿到一個(gè)arena, 并保存在自身的成員變量中
PoolThreadCache除了維護(hù)了一個(gè)arena之外, 還維護(hù)了一個(gè)緩存列表, 我們?cè)谥貜?fù)分配ByteBuf的時(shí)候, 并不需要每次都通過arena進(jìn)行分配, 可以直接從緩存列表中拿一個(gè)ByteBuf
有關(guān)緩存列表, 我們循序漸進(jìn)的往下看
在PooledByteBufAllocator中維護(hù)了三個(gè)值:
1. tinyCacheSize
2. smallCacheSize
3. normalCacheSize
tinyCacheSize代表tiny類型的ByteBuf能緩存多少個(gè)
smallCacheSize代表small類型的ByteBuf能緩存多少個(gè)
normalCacheSize代表normal類型的ByteBuf能緩存多少個(gè)
具體tiny類型, small類型, normal是什么意思, 我們會(huì)在后面講解
我們回到PoolThreadLocalCache類中看其構(gòu)造方法:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略 }
我們看到這三個(gè)屬性是在PoolThreadCache的構(gòu)造方法中傳入的
這三個(gè)屬性是通過PooledByteBufAllocator的構(gòu)造方法中初始化的, 跟隨構(gòu)造方法的調(diào)用鏈會(huì)走到這個(gè)構(gòu)造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); }
這里仍然調(diào)用了一個(gè)重載的構(gòu)造方法, 這里我們關(guān)注這幾個(gè)參數(shù):
DEFAULT_TINY_CACHE_SIZE,
DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE
這里對(duì)應(yīng)著幾個(gè)靜態(tài)的成員變量:
private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
我們?cè)趕tatic塊中看其初始化過程
static{ //代碼省略 DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); //代碼省略 }
在這里我們看到, 這三個(gè)屬性分別初始化的大小是512, 256, 64, 這三個(gè)屬性就對(duì)應(yīng)了PooledByteBufAllocator另外的幾個(gè)成員變量, tinyCacheSize, smallCacheSize, normalCacheSize
也就是說, tiny類型的ByteBuf在每個(gè)緩存中默認(rèn)緩存的數(shù)量是512個(gè), small類型的ByteBuf在每個(gè)緩存中默認(rèn)緩存的數(shù)量是256個(gè), normal類型的ByteBuf在每個(gè)緩存中默認(rèn)緩存的數(shù)量是64個(gè)
我們?cè)俚絇ooledByteBufAllocator中重載的構(gòu)造方法中:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; //代碼省略 }
篇幅原因, 這里也省略了大段代碼, 大家可以通過構(gòu)造方法參數(shù)找到源碼中相對(duì)的位置進(jìn)行閱讀
我們關(guān)注這段代碼:
this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize;
在這里將將參數(shù)的
DEFAULT_TINY_CACHE_SIZE,
DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE
的三個(gè)值保存到了成員變量
tinyCacheSize,
smallCacheSize,
normalCacheSize
PooledByteBufAllocator中將這三個(gè)成員變量初始化之后, 在PoolThreadLocalCache的initialValue方法中就可以使用這三個(gè)成員變量的值了
我們?cè)俅胃絠nitialValue方法中
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略 }
這里就可以在創(chuàng)建PoolThreadCache對(duì)象的的構(gòu)造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個(gè)成員變量了
我們?cè)俑絇oolThreadCache的構(gòu)造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { //代碼省略 } //代碼省略 ThreadDeathWatcher.watch(thread, freeTask); }
其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三種類型的緩存數(shù)組, 數(shù)組元素是MemoryRegionCache類型的對(duì)象, MemoryRegionCache就代表一個(gè)ByeBuf的緩存
以tinySubPageDirectCaches為例, 我們看到tiny類型的緩存是通過createSubPageCaches這個(gè)方法創(chuàng)建的
這里傳入了三個(gè)參數(shù)tinyCacheSize我們之前分析過是512, PoolArena.numTinySubpagePools這里是32(這里不同類型的緩存大小不一樣, small類型是4, normal類型是3) , SizeClass.Tiny代表其類型是tiny類型
我們跟到createSubPageCaches這個(gè)方法中
private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { //創(chuàng)建數(shù)組, 長度為32 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //每一個(gè)節(jié)點(diǎn)是ubPageMemoryRegionCache對(duì)象 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }
這里首先創(chuàng)建了MemoryRegionCache, 長度是我們剛才分析過的32
然后通過for循環(huán), 為數(shù)組賦值, 賦值的對(duì)象是SubPageMemoryRegionCache類型的, SubPageMemoryRegionCache就是MemoryRegionCache類型的子類, 同樣也是一個(gè)緩存對(duì)象, 構(gòu)造方法中, cacheSize, 就是其中緩存對(duì)象的數(shù)量, 如果是tiny類型就是512, sizeClass, 代表其類型, 比如tiny, small或者normal
再簡單跟到其構(gòu)造方法:
SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); }
這里調(diào)用了父類的構(gòu)造方法, 我們繼續(xù)跟進(jìn)去:
MemoryRegionCache(int size, SizeClass sizeClass) { //size會(huì)進(jìn)行規(guī)格化 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //隊(duì)列大小 queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }
首先會(huì)對(duì)其進(jìn)行規(guī)格化, 其實(shí)就是查找大于等于當(dāng)前size的2的冪次方的數(shù), 這里如果是512那么規(guī)格化之后還是512, 然后初始化一個(gè)隊(duì)列, 隊(duì)列大小就是傳入的大小, 如果是tiny, 這里大小就是512
最后并保存其類型
這里我們不難看出, 其實(shí)每個(gè)緩存的對(duì)象, 里面是通過一個(gè)隊(duì)列保存的, 有關(guān)緩存隊(duì)列和ByteBuf之間的邏輯, 后面的小節(jié)會(huì)進(jìn)行剖析
從上面剖析我們不難看出, PoolThreadCache中維護(hù)了三種類型的緩存數(shù)組, 每個(gè)緩存數(shù)組中的每個(gè)值中, 又通過一個(gè)隊(duì)列進(jìn)行對(duì)象的存儲(chǔ)
當(dāng)然這里只舉了Direct類型的對(duì)象關(guān)系, heap類型其實(shí)都是一樣的, 這里不再贅述
這一小節(jié)邏輯較為復(fù)雜, 同學(xué)們可以自己在源碼中跟蹤一遍加深印象
以上就是Netty分布式ByteBuf中PooledByteBufAllocator剖析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式ByteBuf PooledByteBufAllocato的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Dubbo?retries?超時(shí)重試機(jī)制的問題原因分析及解決方案
這篇文章主要介紹了Dubbo?retries?超時(shí)重試機(jī)制的問題,解決方案是通過修改dubbo服務(wù)提供方,將timeout超時(shí)設(shè)為20000ms或者設(shè)置retries=“0”,禁用超時(shí)重試機(jī)制,感興趣的朋友跟隨小編一起看看吧2022-04-04詳解Java的Hibernate框架中的set映射集與SortedSet映射
這篇文章主要介紹了詳解Java的Hibernate框架中的set映射集與SortedSet映射,Hibernate是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12Java中Double、Float類型的NaN和Infinity的具體使用
Java在處理浮點(diǎn)數(shù)運(yùn)算時(shí),提供了NaN和Infinity兩個(gè)常量,本文主要介紹了Java中Double、Float類型的NaN和Infinity的具體使用,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06Java?中很好用的數(shù)據(jù)結(jié)構(gòu)EnumSet
這篇文章主要介紹了Java?中很好用的數(shù)據(jù)結(jié)構(gòu)EnumSet,EnumMap即屬于一個(gè)Map,下文圍繞主題展開詳細(xì)內(nèi)容,需要的小伙伴可以參考參考一下2022-05-05Java生產(chǎn)者消費(fèi)者模式實(shí)例分析
這篇文章主要介紹了Java生產(chǎn)者消費(fèi)者模式,結(jié)合實(shí)例形式分析了java生產(chǎn)者消費(fèi)者模式的相關(guān)組成、原理及實(shí)現(xiàn)方法,需要的朋友可以參考下2019-03-03