netty服務(wù)端處理請求聯(lián)合pipeline分析
兩個問題
- 在客戶端接入的時候,
NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法。 - 客戶端
handler是什么時候被添加的?
先分析第一個問題?;氐絥etty處理客戶端請求分析_1中服務(wù)端接收到accpet事件后,進行讀取的方法NioMessageUnsafe.read()
NioMessageUnsafe.read()
public void read() {
//必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
assert eventLoop().inEventLoop();
//服務(wù)端channel的config
final ChannelConfig config = config();
//服務(wù)端channel的pipeline
final ChannelPipeline pipeline = pipeline();
//處理服務(wù)端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//設(shè)置配置
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//創(chuàng)建jdk底層的channel
//readBuf用于臨時承載讀到鏈接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//分配器將讀到的鏈接進行計數(shù)
allocHandle.incMessagesRead(localRead);
//連接數(shù)是否超過最大值
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
//遍歷每一條客戶端連接
for (int i = 0; i < size; i ++) {
readPending = false;
//傳遞事件, 將創(chuàng)建NioSokectChannel進行傳遞
//最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//代碼省略
} finally {
//代碼省略
}
}
重點看pipeline.fireChannelRead(readBuf.get(i))
首先, 這里pipeline是服務(wù)端channel的pipeline, 也就是NioServerSocketChannel的pipeline
我們學(xué)習(xí)過pipeline之后, 對這種寫法并不陌生, 就是傳遞channelRead事件, 這里通過傳遞channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 說明在這步之前, ServerBootstrapAcceptor作為一個handler添加到了服務(wù)端channel的pipeline中, 那么這個handler什么時候添加的呢?
我們回顧下第一章, 初始化NioServerSocketChannel的時候, 調(diào)用了ServerBootstrap的init方法 回顧下ServerBootstrap.init的調(diào)用鏈路:
ServerBootstrap.bind(8899) ---> AbstractBootstrap.doBind(final SocketAddress localAddress) ---> AbstractBootstrap.initAndRegister() ---> ServerBootstrap.init(Channel channel)
ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception {
//獲取用戶定義的選項(1)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
//獲取用戶定義的屬性(2)
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//獲取channel的pipline(3)
ChannelPipeline p = channel.pipeline();
//work線程組(4)
final EventLoopGroup currentChildGroup = childGroup;
//用戶設(shè)置的Handler(5)
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//選項轉(zhuǎn)化為Entry對象(6)
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//屬性轉(zhuǎn)化為Entry對象(7)
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//添加服務(wù)端handler(8)
p.addLast(new ChannelInitializer<Channel>() {
//初始化channel
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
我們重點關(guān)注第8步, 添加服務(wù)端channel, 這里的pipeline, 是服務(wù)服務(wù)端channel的pipeline, 也就是NioServerSocketChannel綁定的pipeline, 這里添加了一個ChannelInitializer類型的handler
ChannelInitializer的繼承關(guān)系
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
//省略類體
}
我們看到其繼承了ChannelInboundHandlerAdapter, 說明是一個inbound類型的handler
這里我們可能會想到, 添加完handler會執(zhí)行handlerAdded, 然后在handlerAdded方法中做了添加ServerBootstrapAcceptor這個handler
但是, 實際上并不是這樣的, 當(dāng)程序執(zhí)行到這里, 并沒有馬上執(zhí)行handlerAdded, 我們緊跟addLast方法
最后執(zhí)行到DefualtChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//判斷handler是否被重復(fù)添加(1)
checkMultiplicity(handler);
//創(chuàng)建一個HandlerContext并添加到列表(2)
newCtx = newContext(group, filterName(name, handler), handler);
//添加HandlerContext(3)
addLast0(newCtx);
//是否已注冊
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
//回調(diào)用戶事件
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//回調(diào)添加事件(4)
callHandlerAdded0(newCtx);
return this;
}
首先完成了handler的添加, 但是并沒有馬上執(zhí)行回調(diào)
這里我們重點關(guān)注if (!registered)這個條件判斷, 其實在注冊完成, registered會變成true, 但是走到這一步的時候NioServerSockeChannel并沒有完成注冊(可以回顧第一章看注冊在哪一步), 所以會進到if里并返回自身
DefualtChannelPipeline.callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
//判斷是否已添加, 未添加, 進行添加, 已添加進行刪除
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
//獲取第一個Callback任務(wù)
PendingHandlerCallback pending = pendingHandlerCallbackHead;
//如果第一個Callback任務(wù)為空
if (pending == null) {
//將第一個任務(wù)設(shè)置為剛創(chuàng)建的任務(wù)
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
因我們調(diào)用這個方法的時候added傳的true, 所以PendingHandlerCallback task賦值為new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask這個類, 我們從名字可以看出, 這是一個handler添加的延遲任務(wù), 用于執(zhí)行handler延遲添加的操作, 同樣也對應(yīng)一個名字為PendingHandlerRemovedTask的類, 用于執(zhí)行延遲刪除handler的操作, 這兩個類都繼承抽象類PendingHandlerCallback
PendingHandlerAddedTask構(gòu)造方法
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
進入super(ctx)
PendingHandlerCallback構(gòu)造方法
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
在父類中, 保存了要添加的context, 也就是ChannelInitializer類型的包裝類
回到callHandlerCallbackLater方法
PendingHandlerCallback pending = pendingHandlerCallbackHead;
這表示獲取第一個PendingHandlerCallback的任務(wù), 其實PendingHandlerCallback是一個單向鏈表, 自身維護一個PendingHandlerCallback類型的next, 指向下一個任務(wù), 在DefaultChannelPipeline這個類中, 定義了個PendingHandlerCallback類型的引用pendingHandlerCallbackHead, 用來指向延遲回調(diào)任務(wù)的中的第一個任務(wù)。
之后判斷這個任務(wù)是為空, 如果是第一次添加handler, 那么這里就是空, 所以將第一個任務(wù)賦值為我們剛創(chuàng)建的添加任務(wù)。
如果不是第一次添加handler, 則將我們新創(chuàng)建的任務(wù)添加到鏈表的尾部, 因為這里我們是第一次添加, 所以第一個回調(diào)任務(wù)就指向了我們創(chuàng)建的添加handler的任務(wù)。
完成這一系列操作之后, addLast方法返歸, 此時并沒有完成添加操作。
而什么時候完成添加操作的呢?
回到在服務(wù)端channel注冊時候的會走到AbstractChannel.register0方法 回顧下AbstractChannel.register0的調(diào)用鏈路:
ServerBootstrap.bind(8899) ---> AbstractBootstrap.doBind(final SocketAddress localAddress) ---> AbstractBootstrap.initAndRegister() ---> config().group().register(channel) ---> SingleThreadEventLoop.register(final ChannelPromise promise) ---> AbstractChannel.register(EventLoop eventLoop, final ChannelPromise promise) ---> AbstractChannel.register0(ChannelPromise promise)
AbstractChannel.register0(ChannelPromise promise)
private void register0(ChannelPromise promise) {
try {
//做實際的注冊(1)
doRegister();
neverRegistered = false;
registered = true;
//觸發(fā)事件(2)
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//觸發(fā)注冊成功事件(3)
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//傳播active事件(4)
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
//省略代碼
}
}
重點關(guān)注第二步pipeline.invokeHandlerAddedIfNeeded(), 這里已經(jīng)通過doRegister()方法完成了實際的注冊, 我們跟到該方法中
pipeline.invokeHandlerAddedIfNeeded()
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
這里會判斷是否第一次注冊, 這里返回true, 然后會執(zhí)行callHandlerAddedForAllHandlers()方法, 我們跟進去
DefaultChannelPipeline.callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
//獲取task
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
//執(zhí)行添加handler方法
task.execute();
task = task.next;
}
}
這里拿到第一個延遲執(zhí)行handler添加的task其實就是我們之前剖析過的, 延遲執(zhí)行handler添加的task, 就是PendingHandlerAddedTask對象
在while循環(huán)中, 通過執(zhí)行execute()方法將handler添加
進入PendingHandlerAddedTask.execute()
void execute() {
//獲取當(dāng)前eventLoop線程
EventExecutor executor = ctx.executor();
//是當(dāng)前執(zhí)行的線程
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
//添加到隊列
executor.execute(this);
} catch (RejectedExecutionException e) {
//代碼省略
}
}
}
再進入callHandlerAdded0方法
callHandlerAdded0(final AbstractChannelHandlerContext ctx)
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
//省略...
}
}
終于在這里, 我們看到了執(zhí)行回調(diào)的方法
再回到ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception {
//獲取用戶定義的選項(1)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
//獲取用戶定義的屬性(2)
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//獲取channel的pipline(3)
ChannelPipeline p = channel.pipeline();
//work線程組(4)
final EventLoopGroup currentChildGroup = childGroup;
//用戶設(shè)置的Handler(5)
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//選項轉(zhuǎn)化為Entry對象(6)
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//屬性轉(zhuǎn)化為Entry對象(7)
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//添加服務(wù)端handler(8)
p.addLast(new ChannelInitializer<Channel>() {
//初始化channel
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
我們繼續(xù)看第8步添加服務(wù)端handler
因為這里的handler是ChannelInitializer, 所以完成添加之后會調(diào)用ChannelInitializer的handlerAdded方法
跟到handlerAdded方法
ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//默認情況下, 會返回true
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
因為執(zhí)行到這步服務(wù)端channel已經(jīng)完成注冊, 所以會執(zhí)行到initChannel方法
ChannelInitializer.initChannel(ChannelHandlerContext ctx)
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
//這段代碼是否被執(zhí)行過
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
//調(diào)用之后會刪除當(dāng)前節(jié)點
remove(ctx);
}
return true;
}
return false;
}
我們關(guān)注initChannel這個方法, 這個方法是在ChannelInitializer的匿名內(nèi)部來實現(xiàn)的, 這里我們注意, 在initChannel方法執(zhí)行完畢之后會調(diào)用remove(ctx)刪除當(dāng)前節(jié)點
繼續(xù)跟進initChannel方法
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
這里首先添加用戶自定義的handler, 這里如果用戶沒有定義, 則添加不成功, 然后, 會調(diào)用addLast將ServerBootstrapAcceptor這個handler添加了進去, 同樣這個handler也繼承了ChannelInboundHandlerAdapter, 在這個handler中, 重寫了channelRead方法, 所以, 這就是第一個問題的答案
緊接著我們看第二個問題:客戶端handler是什么時候被添加的?
看ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//添加channelHadler, 這個channelHandler, 就是用戶代碼添加的ChannelInitializer
child.pipeline().addLast(childHandler);
//代碼省略
try {
//work線程注冊channel
childGroup.register(child).addListener(new ChannelFutureListener() {
//代碼省略
});
} catch (Throwable t) {
forceClose(child, t);
}
}
這里真相可以大白了, 服務(wù)端再創(chuàng)建完客戶端channel之后, 將新創(chuàng)建的NioSocketChannel作為參數(shù)觸發(fā)channelRead事件(可以回顧NioMessageUnsafe.read方法, 代碼這里就不貼了), 所以這里的參數(shù)msg就是NioSocketChannel
拿到channel時候再將客戶端的handler添加進去, 我們回顧客戶端handler的添加過程:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
和服務(wù)端channel的邏輯一樣, 首先會添加ChannelInitializer這個handler但是沒有注冊所以沒有執(zhí)行添加handler的回調(diào), 將任務(wù)保存到一個延遲回調(diào)的task中
等客戶端channel注冊完畢, 會將執(zhí)行添加handler的回調(diào), 也就是handlerAdded方法, 在回調(diào)中執(zhí)行initChannel方法將客戶端handler添加進去, 然后刪除ChannelInitializer這個handler
因為在服務(wù)端channel中這塊邏輯已經(jīng)進行了詳細的剖析, 所以這邊就不在贅述, 同學(xué)們可以自己跟進去走一遍流程
這里注意, 因為每創(chuàng)建一個NioSoeketChannel都會調(diào)用服務(wù)端ServerBootstrapAcceptor的channelRead方法, 所以這里會將每一個NioSocketChannel的handler進行添加
總結(jié)
本文章分析了事件傳輸?shù)南嚓P(guān)邏輯, 包括handler的添加, 刪除, inbound和outbound以及異常事件的傳輸, 最后結(jié)合第一章和第三章, 剖析了服務(wù)端channel和客戶端channel的添加過程,更多關(guān)于netty服務(wù)端請求聯(lián)合pipeline的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java程序圖形用戶界面設(shè)計之標(biāo)簽組件
圖形界面(簡稱GUI)是指采用圖形方式顯示的計算機操作用戶界面。與早期計算機使用的命令行界面相比,圖形界面對于用戶來說在視覺上更易于接受,本篇精講Java語言中關(guān)于圖形用戶界面的標(biāo)簽組件部分2022-02-02
java使用TimerTask定時器獲取指定網(wǎng)絡(luò)數(shù)據(jù)
java.util.Timer定時器,實際上是個線程,定時調(diào)度所擁有的TimerTasks。一個TimerTask實際上就是一個擁有run方法的類,需要定時執(zhí)行的代碼放到run方法體內(nèi),TimerTask一般是以匿名類的方式創(chuàng)建,下面的就用示例來學(xué)習(xí)他的使用方法2014-01-01
Java Code Cache滿導(dǎo)致應(yīng)用性能降低問題解決
這篇文章主要介紹了Java Code Cache滿導(dǎo)致應(yīng)用性能降低問題解決,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08
Deepin系統(tǒng)安裝eclipse2021-03及CDT插件的安裝教程
本教程教大家deepin20.1操作系統(tǒng)上安裝eclipse_2021-03版的詳細步驟及CDT插件的安裝方法,通過圖文展示的非常明了,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-06-06

