Netty分布式客戶端處理接入事件handle源碼解析
前文傳送門 :客戶端接入流程初始化源碼分析
上一小節(jié)我們剖析完成了與channel綁定的ChannelConfig初始化相關(guān)的流程,
這一小節(jié)繼續(xù)剖析客戶端連接事件的處理
處理接入事件創(chuàng)建handle
回到上一章NioEventLoop的processSelectedKey ()方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲取到channel中的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//如果這個(gè)key不是合法的, 說明這個(gè)channel可能有問題
if (!k.isValid()) {
//代碼省略
}
try {
//如果是合法的, 拿到key的io事件
int readyOps = k.readyOps();
//鏈接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//寫事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//讀事件和接受鏈接事件
//如果當(dāng)前NioEventLoop是work線程的話, 這里就是op_read事件
//如果是當(dāng)前NioEventLoop是boss線程的話, 這里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}我們看其中的if判斷:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
上一小節(jié)我們分析過, 如果當(dāng)前NioEventLoop是work線程的話, 這里就是op_read事件, 如果是當(dāng)前NioEventLoop是boss線程的話, 這里就是op_accept事件, 這里我們以boss線程為例進(jìn)行分析
之前我們講過, 無論處理op_read事件還是op_accept事件, 都走的unsafe的read()方法, 這里unsafe是通過channel拿到, 我們知道如果是處理accept事件, 這里的channel是NioServerSocketChannel, 這里與之綁定的unsafe是NioMessageUnsafe
我們跟到NioMessageUnsafe的read()方法:
public void read() {
//必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
assert eventLoop().inEventLoop();
//服務(wù)端channel的config
final ChannelConfig config = config();
//服務(wù)端channel的pipeline
final ChannelPipeline pipeline = pipeline();
//處理服務(wù)端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//設(shè)置配置
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//創(chuàng)建jdk底層的channel
//readBuf用于臨時(shí)承載讀到鏈接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//分配器將讀到的鏈接進(jìn)行計(jì)數(shù)
allocHandle.incMessagesRead(localRead);
//連接數(shù)是否超過最大值
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
//遍歷每一條客戶端連接
for (int i = 0; i < size; i ++) {
readPending = false;
//傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞
//最終會(huì)調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//代碼省略
} finally {
//代碼省略
}
}首先獲取與NioServerSocketChannel綁定config和pipeline, config我們上一小節(jié)進(jìn)行分析過, pipeline我們將在下一章進(jìn)行剖析
我們看這一句:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
這里通過RecvByteBufAllocator接口調(diào)用了其內(nèi)部接口Handler
我們看其RecvByteBufAllocator接口
public interface RecvByteBufAllocator {
Handle newHandle();
interface Handle {
int guess();
void reset(ChannelConfig config);
void incMessagesRead(int numMessages);
void lastBytesRead(int bytes);
int lastBytesRead();
void attemptedBytesRead(int bytes);
int attemptedBytesRead();
boolean continueReading();
void readComplete();
}
}我們看到RecvByteBufAllocator接口只有一個(gè)方法newHandle(), 顧名思義就是用于創(chuàng)建Handle對(duì)象的方法, 而Handle中的方法, 才是實(shí)際用于操作的方法
在RecvByteBufAllocator實(shí)現(xiàn)類中包含Handle的子類, 具體實(shí)現(xiàn)關(guān)系如下:

回到read()方法中再看這段代碼:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
unsafe()返回當(dāng)前channel綁定的unsafe對(duì)象, recvBufAllocHandle()最終會(huì)調(diào)用AbstractChannel內(nèi)部類AbstractUnsafe的recvBufAllocHandle()方法
跟進(jìn)AbstractUnsafe的recvBufAllocHandle()方法:
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
//如果不存在, 則創(chuàng)建一個(gè)recvHandle的實(shí)例
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}如果如果是第一次執(zhí)行到這里, 自身屬性recvHandle為空, 會(huì)創(chuàng)建一個(gè)recvHandle實(shí)例, config()返回NioServerSocketChannel綁定的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator對(duì)象, 這兩部分上一小節(jié)剖析過了, 這里通過newHandle()創(chuàng)建一個(gè)Handle, 這里會(huì)走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中
跟進(jìn)newHandle()方法中
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}這里創(chuàng)建HandleImpl傳入了三個(gè)參數(shù), 這三個(gè)參數(shù)我們上一小節(jié)剖析過, minIndex為最小內(nèi)存在SIZE_TABLE中的下標(biāo), maxIndex為最大內(nèi)存在SEIZE_TABEL中的下標(biāo), initial是初始內(nèi)存, 我們跟到HandleImpl的構(gòu)造方法中:
public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}初始化minIndex和maxIndex, 根據(jù)initial找到當(dāng)前的下標(biāo), nextReceiveBufferSize是根據(jù)當(dāng)前的下標(biāo)找到對(duì)應(yīng)的內(nèi)存
這樣, 我們就創(chuàng)建了個(gè)Handle對(duì)象
在這里我們需要知道, 這個(gè)handle, 是和channel唯一綁定的屬性, 而AdaptiveRecvByteBufAllocator對(duì)象是和ChannelConfig對(duì)象唯一綁定的, 間接也是和channel進(jìn)行唯一綁定
繼續(xù)回到read()方法
public void read() {
//必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
assert eventLoop().inEventLoop();
//服務(wù)端channel的config
final ChannelConfig config = config();
//服務(wù)端channel的pipeline
final ChannelPipeline pipeline = pipeline();
//處理服務(wù)端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//設(shè)置配置
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//創(chuàng)建jdk底層的channel
//readBuf用于臨時(shí)承載讀到鏈接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//分配器將讀到的鏈接進(jìn)行計(jì)數(shù)
allocHandle.incMessagesRead(localRead);
//連接數(shù)是否超過最大值
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
//遍歷每一條客戶端連接
for (int i = 0; i < size; i ++) {
readPending = false;
//傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞
//最終會(huì)調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//代碼省略
} finally {
//代碼省略
}
}繼續(xù)往下跟:
allocHandle.reset(config);
這個(gè)段代碼是重新設(shè)置配置, 也就是將之前的配置信息進(jìn)行初始化, 最終會(huì)走到, DefaultMaxMessagesRecvByteBufAllocator中的內(nèi)部類MaxMessageHandle的reet中
我們跟進(jìn)reset中
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}這里僅僅對(duì)幾個(gè)屬性做了賦值, 簡(jiǎn)單介紹下這幾個(gè)屬性:
config:當(dāng)前channelConfig對(duì)象
maxMessagePerRead:表示讀取消息的時(shí)候可以讀取幾次(循環(huán)次數(shù)), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節(jié)已經(jīng)做過剖析
totalMessages:代表目前讀循環(huán)已經(jīng)讀取的消息個(gè)數(shù), 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里初始化為0
totalBytesRead:代表目前已經(jīng)讀取到的消息字節(jié)總數(shù), 這里同樣也初始化為0
我們繼續(xù)往下走, 這里首先是一個(gè)do-while循環(huán), 循環(huán)體里通過int localRead = doReadMessages(readBuf)這種方式將讀取到的連接數(shù)放入到一個(gè)List集合中, 這一步我們下一小節(jié)再分析, 我們繼續(xù)往下走:
我們首先看allocHandle.incMessagesRead(localRead)這一步, 這里的localRead表示這次循環(huán)往readBuf中放入的連接數(shù), 在Nio模式下這, 如果讀取到一條連接會(huì)返回1
跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:
public final void incMessagesRead(int amt) {
totalMessages += amt;
}這里將totalMessages增加amt, 也就是+1
這里totalMessage, 剛才已經(jīng)剖析過, 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里每次執(zhí)行一次循環(huán)都會(huì)加一
再去看循環(huán)終止條件allocHandle.continueReading()
跟到MaxMessageHandle的continueReading()方法中:
public boolean continueReading() {
//config.isAutoRead()默認(rèn)返回true
// totalMessages < maxMessagePerRead
//totalMessages代表當(dāng)前讀到的鏈接, 默認(rèn)是1
//maxMessagePerRead每一次最大讀多少鏈接(默認(rèn)16)
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
}我們逐個(gè)分析判斷條件:
config.isAutoRead(): 這里默認(rèn)為true
attemptedBytesRead == lastBytesRead: 表示本次讀取的字節(jié)數(shù)和最后一次讀取的字節(jié)數(shù)相等, 因?yàn)榈竭@里都沒有進(jìn)行字節(jié)數(shù)組的讀取操作, 所以默認(rèn)都為0, 這里也返回true
totalMessages < maxMessagePerRead
表示當(dāng)前讀取的次數(shù)是否小于最大讀取次數(shù), 我們知道totalMessages每次循環(huán)都會(huì)自增, 而maxMessagePerRead默認(rèn)值為16, 所以這里會(huì)限制循環(huán)不能超過16次, 也就是最多一次只能讀取16條連接
totalBytesRead < Integer.MAX_VALUE
表示讀取的字節(jié)數(shù)不能超過int類型的最大值
這里就剖析完了Handle的創(chuàng)建和初始化過程, 并且剖析了循環(huán)終止條件等相關(guān)的邏輯
以上就是Netty分布式客戶端處理接入事件handle源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式客戶端接入事件handle的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中的形式參數(shù)和實(shí)際參數(shù)案例詳解
這篇文章主要介紹了Java中的形式參數(shù)和實(shí)際參數(shù),形參和實(shí)參間的關(guān)系,兩者是在調(diào)用的時(shí)候進(jìn)行結(jié)合的,通常實(shí)參會(huì)將取值傳遞給形參,形參去之后進(jìn)行函數(shù)過程運(yùn)算,然后可能將某些值經(jīng)過參數(shù)或函數(shù)符號(hào)返回給調(diào)用者,需要的朋友可以參考下2023-10-10
簡(jiǎn)單了解Java的默認(rèn)和靜態(tài)方法
這篇文章主要介紹了簡(jiǎn)單了解Java的默認(rèn)和靜態(tài)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01
IDEA調(diào)試源碼小技巧之辨別抽象類或接口多種實(shí)現(xiàn)類的正確路徑
這篇文章主要介紹了IDEA調(diào)試源碼小技巧之辨別抽象類或接口多種實(shí)現(xiàn)類的正確路徑,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
SpringBoot?AnnotationUtils工具類的使用實(shí)例詳解
這篇文章主要介紹了SpringBoot?AnnotationUtils工具類的使用,使用自定義注解標(biāo)記業(yè)務(wù)方法,原生Java獲取注解及AnnotationUtils工具類獲取方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-09-09
SpringBoot整合MongoDB實(shí)現(xiàn)文件上傳下載刪除
這篇文章主要介紹了SpringBoot整合MongoDB實(shí)現(xiàn)文件上傳下載刪除的方法,幫助大家更好的理解和學(xué)習(xí)使用SpringBoot框架,感興趣的朋友可以了解下2021-05-05
springboot實(shí)現(xiàn)在工具類(util)中調(diào)用注入service層方法
這篇文章主要介紹了springboot實(shí)現(xiàn)在工具類(util)中調(diào)用注入service層方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06

