Netty分布式客戶端接入流程初始化源碼分析
前文概述:
前文傳送門(mén):NioEventLoop任務(wù)隊(duì)列執(zhí)行
之前的章節(jié)學(xué)習(xí)了server啟動(dòng)以及eventLoop相關(guān)的邏輯, eventLoop輪詢到客戶端接入事件之后是如何處理的?這一章我們循序漸進(jìn), 帶大家繼續(xù)剖析客戶端接入之后的相關(guān)邏輯
第一節(jié):初始化NioSockectChannelConfig
創(chuàng)建channel
在剖析接入流程之前我們首先補(bǔ)充下第一章有關(guān)創(chuàng)建channel的知識(shí):
我們?cè)诘谝徽缕饰鲞^(guò)channel的創(chuàng)建, 其中NioServerSocketChannel中有個(gè)構(gòu)造方法:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
當(dāng)時(shí)我們并沒(méi)有剖析config相關(guān)知識(shí), 在這一章首先對(duì)此做一個(gè)補(bǔ)充, 這里我們看到每一個(gè)NioServerSocketChannel都擁有一個(gè)config屬性, 這個(gè)屬性存放著NioServerSocketChannel的相關(guān)配置, 這里創(chuàng)建一個(gè)NioServerSocketChannelConfig對(duì)象, 并將當(dāng)前channel, 和channel對(duì)應(yīng)的java底層的socket對(duì)象進(jìn)行了傳入, NioServerSocketChannelConfig其實(shí)是NioServerSocketChannel的內(nèi)部類
我們跟到NioServerSocketChannelConfig類的構(gòu)造方法中:
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); }
我們繼續(xù)跟入其父類DefaultServerSocketChannelConfig的構(gòu)造方法中:
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; }
這里繼續(xù)調(diào)用了其父類的構(gòu)造方法, 并保存了jdk底層的socket對(duì)象, 并且調(diào)用其父類DefaultChannelConfig的構(gòu)造方法
跟到其父類DefaultChannelConfig的構(gòu)造方法中
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }
這里調(diào)用了自身的構(gòu)造方法, 傳入了channel和一個(gè)AdaptiveRecvByteBufAllocator對(duì)象
AdaptiveRecvByteBufAllocator是一個(gè)緩沖區(qū)分配器, 用于分配一個(gè)緩沖區(qū)Bytebuf的, 有關(guān)Bytebuf的相關(guān)內(nèi)容會(huì)在后面的章節(jié)詳細(xì)講解, 這里可以簡(jiǎn)單介紹作為了解, 就當(dāng)對(duì)于之后知識(shí)的預(yù)習(xí)
Bytebuf相當(dāng)于jdk的ByetBuffer, Netty對(duì)其做了重新的封裝, 用于讀寫(xiě)channel中的字節(jié)流, 熟悉Nio的同學(xué)對(duì)此應(yīng)該并不陌生, AdaptiveRecvByteBufAllocator就是用于分配netty中ByetBuff的緩沖區(qū)分配器, 根據(jù)名字, 我們不難看出這個(gè)緩沖區(qū)是一個(gè)可變大小的字節(jié)緩沖區(qū)
我們跟到AdaptiveRecvByteBufAllocator的構(gòu)造方法中:
public AdaptiveRecvByteBufAllocator() { //DEFAULT_MINIMUM:最小緩沖區(qū)長(zhǎng)度64字節(jié) //DEFAULT_INITIAL:初始容量1024字節(jié) //最大容量65536字節(jié) this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); }
這里調(diào)用自身的構(gòu)造方法并且傳入了三個(gè)屬性, 這三個(gè)屬性的含義分別為:
DEFAULT_MINIMUM
:代表要分配的緩沖區(qū)長(zhǎng)度最少為64個(gè)字節(jié)
DEFAULT_INITIAL
:代表要分配的緩沖區(qū)的初始容量為1024個(gè)字節(jié)
DEFAULT_MAXIMUM
:代表要分配的緩沖區(qū)最大容量為65536個(gè)字節(jié)
我們跟到this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM)方法中
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略驗(yàn)證代碼 //最小容量在table中的下標(biāo) int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下標(biāo) int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
其中這里初始化了三個(gè)屬性, 分別是:
minIndex
:最小容量在size_table中的下標(biāo)
maxIndex
:最大容量在table中的下標(biāo)
initial
:初始容量1024個(gè)字節(jié)
這里的size_table就是一個(gè)數(shù)組, 里面盛放著byteBuf可分配的內(nèi)存大小的集合, 分配的bytebuf無(wú)論是擴(kuò)容還是收縮, 內(nèi)存大小都屬于size_table中的元素, 那么這個(gè)數(shù)組是如何初始化的, 我們跟到這個(gè)屬性中:
private static final int[] SIZE_TABLE;
我們看到是一個(gè)final修飾的靜態(tài)成員變量, 我們跟到static塊中看它的初始化過(guò)程:
static { //List集合 List<Integer> sizeTable = new ArrayList<Integer>(); //從16開(kāi)始, 每遞增16添加到List中, 直到大于等于512 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } //從512開(kāi)始, 倍增添加到List中, 直到內(nèi)存溢出 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } //初始化數(shù)組 SIZE_TABLE = new int[sizeTable.size()]; //將list的內(nèi)容放入數(shù)組中 for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
首先創(chuàng)建一個(gè)Integer類型的list用于盛放內(nèi)存元素
這里通過(guò)兩組循環(huán)為list添加元素
首先看第一組循環(huán):
for (int i = 16; i < 512; i += 16) { sizeTable.add(i); }
這里是通過(guò)16平移的方式, 直到512個(gè)字節(jié), 將每次平移之后的內(nèi)存大小添加到list中
再看第二組循環(huán)
for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); }
超過(guò)512之后, 再通過(guò)倍增的方式循環(huán), 直到int類型內(nèi)存溢出, 將每次倍增之后大小添加到list中
最后初始化SIZE_TABLE數(shù)組, 將list中的元素按下表存放到數(shù)組中
這樣就初始化了內(nèi)存數(shù)組
再回到AdaptiveRecvByteBufAllocator的構(gòu)造方法中
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略驗(yàn)證代碼 //最小容量在table中的下標(biāo) int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下標(biāo) int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
這里分別根據(jù)傳入的最小和最大容量去SIZE_TABLE中獲取其下標(biāo)
我們跟到getSizeTableIndex(minimum)中:
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
這里是通過(guò)二分查找去獲取其下表
if (SIZE_TABLE[minIndex] < minimum)這里判斷最小容量下標(biāo)所屬的內(nèi)存大小是否小于最小值, 如果小于最小值則下標(biāo)+1
最大容量的下標(biāo)獲取原理同上, 判斷最大容量下標(biāo)所屬內(nèi)存大小是否大于最大值, 如果是則下標(biāo)-1
我們回到DefaultChannelConfig的構(gòu)造方法:
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }
剛才我們剖析過(guò)了AdaptiveRecvByteBufAllocator()的創(chuàng)建過(guò)程, 我們繼續(xù)跟到this()中:
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }
我們看到這里初始化了channel, 在channel初始化之前, 調(diào)用了setRecvByteBufAllocator(allocator, channel.metadata())方法, 顧名思義, 這是用于設(shè)置緩沖區(qū)分配器的方法, 第一個(gè)參數(shù)是我們剛剛分析過(guò)的新建的AdaptiveRecvByteBufAllocator對(duì)象, 第二個(gè)傳入的是與channel綁定的ChannelMetadata對(duì)象, ChannelMetadata對(duì)象是什么?
我們跟進(jìn)到metadata()方法當(dāng)中, 由于是channel是NioServerSocketChannel, 所以調(diào)用到了NioServerSocketChannel的metadata()方法:
public ChannelMetadata metadata() { return METADATA; }
這里返回了一個(gè)成員變量METADATA, 跟到這個(gè)成員變量中:
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
這里創(chuàng)建了一個(gè)ChannelMetadata對(duì)象, 并在構(gòu)造方法中傳入false和16
繼續(xù)跟到ChannelMetadata的構(gòu)造方法中
public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) { //省略驗(yàn)證代碼 //false this.hasDisconnect = hasDisconnect; //16 this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead; }
這里做的事情非常簡(jiǎn)單, 只初始化了兩個(gè)屬性:
hasDisconnect=false
defaultMaxMessagesPerRead=16
defaultMaxMessagesPerRead=16代表在讀取對(duì)方的鏈接或者channel的字節(jié)流時(shí)(無(wú)論server還是client), 最多只循環(huán)16次, 后面的講解將會(huì)看到
剖析完了ChannelMetadata對(duì)象的創(chuàng)建, 我們回到DefaultChannelConfig的構(gòu)造方法:
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }
跟到setRecvByteBufAllocator(allocator, channel.metadata())方法中:
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator; }
首先會(huì)判斷傳入的緩沖區(qū)分配器是不是MaxMessagesRecvByteBufAllocator類型的, 因?yàn)锳daptiveRecvByteBufAllocator實(shí)現(xiàn)了MaxMessagesRecvByteBufAllocator接口, 所以此條件成立
之后將其轉(zhuǎn)換成MaxMessagesRecvByteBufAllocator類型,
然后調(diào)用其maxMessagesPerRead(metadata.defaultMaxMessagesPerRead())方法,
這里會(huì)走到其子類DefaultMaxMessagesRecvByteBufAllocator的maxMessagesPerRead(int maxMessagesPerRead)方法中,
其中參數(shù)metadata.defaultMaxMessagesPerRead()返回就是ChannelMetadata的屬性defaultMaxMessagesPerRead,
也就是16
跟到maxMessagesPerRead(int maxMessagesPerRead)方法中:
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { //忽略驗(yàn)證代碼 //初始化為16 this.maxMessagesPerRead = maxMessagesPerRead; return this; }
這里將自身屬性maxMessagesPerRead設(shè)置為16, 然后返回自身
回到DefaultChannelConfig的構(gòu)造方法
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator; }
設(shè)置完了內(nèi)存分配器的maxMessagesPerRead屬性, 最后將DefaultChannelConfig自身的成員變量rcvBufAllocator設(shè)置成我們初始化完畢的allocator對(duì)象
至此, 有關(guān)channelConfig有關(guān)的初始化過(guò)程剖析完成
以上就是Netty分布式客戶端接入流程初始化源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式客戶端接入流程初始化的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java EE項(xiàng)目中的異常處理總結(jié)(一篇不得不看的文章)
什么是異常?運(yùn)行時(shí)發(fā)生的可被捕獲和處理的錯(cuò)誤。這篇文章主要介紹了Java EE項(xiàng)目中的異常處理總結(jié),有需要的可以了解一下。2016-11-11JetBrains IntelliJ IDEA 2020安裝與使用教程詳解
這篇文章主要介紹了JetBrains IntelliJ IDEA 2020安裝與使用教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06淺談Spring中的循環(huán)依賴問(wèn)題與解決方案
這篇文章主要介紹了淺談Spring中的循環(huán)依賴問(wèn)題與解決方案,循環(huán)依賴就是兩個(gè)或則兩個(gè)以上的bean互相持有對(duì)方,最終形成閉環(huán),比如A依賴于B,B依賴于C,C又依賴于A,需要的朋友可以參考下2023-12-12使用SpringBoot中的Schedule定時(shí)發(fā)送郵件的方法
在SpringBoot中,你可以使用@Scheduled注解來(lái)創(chuàng)建定時(shí)任務(wù),@Scheduled注解可以應(yīng)用于方法上,表示這個(gè)方法是一個(gè)定時(shí)任務(wù),可以根據(jù)指定的時(shí)間間隔或固定時(shí)間執(zhí)行,本文就給大家介紹一下如何使用SpringBoot中的Schedule定時(shí)發(fā)送郵件,需要的朋友可以參考下2023-08-08