Netty源碼分析NioEventLoop線程的啟動(dòng)
之前的小節(jié)我們學(xué)習(xí)了NioEventLoop的創(chuàng)建以及線程分配器的初始化, 那么NioEventLoop是如何開(kāi)啟的呢, 我們這一小節(jié)繼續(xù)學(xué)習(xí)
NioEventLoop開(kāi)啟方法
NioEventLoop的開(kāi)啟方法在其父類SingleThreadEventExecutor中的execute(Runnable task)方法中, 我們跟到這個(gè)方法:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //判斷當(dāng)前線程是不是eventLoop線程 boolean inEventLoop = inEventLoop(); //如果是eventLoop線程 if (inEventLoop) { addTask(task); } else { //不是eventLoop線程啟動(dòng)線程 startThread(); //添加task addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
這個(gè)方法傳入一個(gè)Runnble對(duì)象, 也就是一個(gè)任務(wù)
首先boolean inEventLoop = inEventLoop()方法會(huì)判斷是不是NioEventLoop線程
跟進(jìn) inEventLoop()方法
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
這里inEventLoop(Thread.currentThread())方法傳入了當(dāng)前線程對(duì)象, 這個(gè)方法會(huì)調(diào)用當(dāng)前類的inEventLoop(Thread thread)方法
跟進(jìn)inEventLoop(Thread thread)方法:
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
我們看到判斷的依據(jù)是當(dāng)前線程對(duì)象是不是NioEventLoop綁定的線程對(duì)象, 這里我們會(huì)想到開(kāi)啟線程肯定會(huì)為NioEventLoop綁定一個(gè)線程對(duì)象, 如果判斷當(dāng)前線程對(duì)象不是當(dāng)前NioEventLoop綁定的線程對(duì)象, 說(shuō)明執(zhí)行此方法的線程不是當(dāng)前NioEventLoop線程, 那么這個(gè)線程如何初始化的, 后面我們會(huì)講到, 我們繼續(xù)看execute(Runnable task)方法:
如果是NioEventLoop線程, 則會(huì)通過(guò)addTask(task)添加任務(wù), 通過(guò)NioEventLoop異步執(zhí)行, 那么這個(gè)task是什么時(shí)候執(zhí)行的, 同樣后面會(huì)講到
跟一下addTask(task)
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //如果添加不成功 if (!offerTask(task)) { reject(task); } }
這里offerTask(task)代表添加一個(gè)task, 跟進(jìn)去:
final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } //往taskQ中添加一個(gè)task return taskQueue.offer(task); }
我們看到taskQueue.offer(task)將一個(gè)task添加到任務(wù)隊(duì)列, 而這個(gè)任務(wù)隊(duì)列taskQueue就是我們NioEventLoop初始化的時(shí)候與NioEventLoop唯一綁定的任務(wù)隊(duì)列
回顧一下初始構(gòu)造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
在這里通過(guò) taskQueue = newTaskQueue(this.maxPendingTasks) 創(chuàng)建了taskQueue
回到execute(Runnable task)方法中, 我們繼續(xù)往下看:
如果不是NioEventLoop線程我們通過(guò)startThread()開(kāi)啟一個(gè)NioEventLoop線程
跟到startThread()之前, 我們先繼續(xù)往下走:
開(kāi)啟NioEventLoop線程之后, 又通過(guò)addTask(task)往taskQueue添加任務(wù)
最后我們注意有這么一段代碼:
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }
addTaskWakesUp代表添加task之后, NioEventLoop的select()操作是不是要喚醒, 這個(gè)屬性是在初始化NioEventLoop的時(shí)候傳入的, 大家可以回顧下, 默認(rèn)是false, 這里!addTaskWakesUp就是需要喚醒, wakesUpForTask(task)與addTaskWakesUp意義相同, 默認(rèn)是true, 可以看代碼:
protected boolean wakesUpForTask(Runnable task) { return true; }
這里恒為true, 所以這段代碼就是添加task時(shí)需要通過(guò)wakeup(inEventLoop)喚醒, 這樣NioEventLoop在做select()操作時(shí)如果正在阻塞則立刻喚醒, 然后執(zhí)行任務(wù)隊(duì)列的task
回到execute(Runnable task)方法中我們跟進(jìn)開(kāi)啟線程的startThread()方法中:
private void startThread() { //判斷線程是否啟動(dòng), 未啟動(dòng)則啟動(dòng) if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //當(dāng)前線程未啟動(dòng), 則啟動(dòng) doStartThread(); } } }
前面的判斷是判斷當(dāng)前NioEventLoop線程是否啟動(dòng), 如果未啟動(dòng), 則通過(guò)doStartThread()方法啟動(dòng), 我們第一次執(zhí)行execute(Runnable task)線程是未啟動(dòng)的, 所以會(huì)執(zhí)行doStartThread(), 后續(xù)該線程則不會(huì)再執(zhí)行doStartThread()方法
我們跟進(jìn)doStartThread()方法中
private void doStartThread() { assert thread == null; //線程執(zhí)行器執(zhí)行線程(所有的eventLoop共用一個(gè)線程執(zhí)行器) executor.execute(new Runnable() { @Override public void run() { //將當(dāng)前線程復(fù)制給屬性 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //開(kāi)始輪詢 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //代碼省略 } } }); }
我們重點(diǎn)關(guān)注executor.execute()這個(gè)方法, 其中executor就是我們創(chuàng)建NioEventLoop的線程器, execute()就是開(kāi)啟一個(gè)線程
回顧下execute()方法
public void execute(Runnable command) { //起一個(gè)線程 threadFactory.newThread(command).start(); }
我們看到通過(guò)線程工廠開(kāi)啟一個(gè)線程, 由于前面的小節(jié)已經(jīng)剖析, 這里不再贅述
開(kāi)啟線程則執(zhí)行Runnble類中的run()方法, 我們看到在run()方法里通過(guò) thread = Thread.currentThread() 將新開(kāi)啟的線程對(duì)象賦值NioEventLoop的thread的屬性, 這樣就可以通過(guò)線程對(duì)象的判斷, 來(lái)確定是不是NioEventLoop線程了
后面我們看到 SingleThreadEventExecutor.this.run() , 這里this, 就是當(dāng)前NioEventLoop對(duì)象, 而這里的run()方法, 就是NioEventLoop中的run()方法, 在這個(gè)run()方法中, 真正開(kāi)始了selector的輪詢工作, 對(duì)于run()方法的詳細(xì)剖析, 我們會(huì)在之后的小節(jié)中進(jìn)行
剛才我們剖析了NioEventLoop的啟動(dòng)方法, 那么根據(jù)我們的分析, 就是第一次調(diào)用NioEventLoop的execute(Runnable task)方法的時(shí)候, 則會(huì)開(kāi)啟NioEventLoop線程, 之后的調(diào)用只是往taskQueue中添加任務(wù), 那么第一次是什么時(shí)候開(kāi)啟的呢?這里我們要回顧上一章講過(guò)的內(nèi)容
上一章中我們講過(guò)在AbstractServerBootstrap中有個(gè)initAndRegister()方法, 這個(gè)方法主要用于channel的初始化和注冊(cè), 其中注冊(cè)的代碼為:
ChannelFuture regFuture = config().group().register(channel);
其中g(shù)roup()我們剖析過(guò)是Boss線程的group, 我們剖析過(guò)其中的register(channel)方法:
public ChannelFuture register(Channel channel) { return next().register(channel); }
首先跟到next()方法:
public EventLoop next() { return (EventLoop) super.next(); }
首先調(diào)用了其父類MultithreadEventExecutorGroup的next方法, 跟進(jìn)去:
public EventExecutor next() { return chooser.next(); }
這里chooser, 就是初始化NioEventLoopGroup的線程選擇器, 為此分配了不同的策略, 這里不再贅述, 通過(guò)這個(gè)方法, 返回一個(gè)NioEventLoop線程
回到MultithreadEventLoopGroup類的register()方法中, next().register(channel)代表分配后的NioEventLoop的register()方法, 這里會(huì)調(diào)用NioEventLoop的父類SingleThreadEventLoop類中的register()方法
跟到SingleThreadEventLoop類中的register()方法:
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
DefaultChannelPromise是一個(gè)監(jiān)聽(tīng)器, 它會(huì)跟隨channel的讀寫進(jìn)行監(jiān)聽(tīng), 綁定傳入的channel和NioEventLoop, 有關(guān)Promise后面的章節(jié)會(huì)講到
這里我們繼續(xù)跟進(jìn)register(new DefaultChannelPromise(channel, this))
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
unsafe()方法返回創(chuàng)建channel初始化的unsafe()對(duì)象, 如果是NioSeverSocketChannel, 則綁定NioMessageUnsafe對(duì)象, 上一小節(jié)進(jìn)行剖析過(guò)這里不再贅述
最終這個(gè)unsafe對(duì)象會(huì)調(diào)用到AbstractChannel的內(nèi)部類AbstractUnsafe中的register()方法, 這里register(), 無(wú)論是客戶端channel和服務(wù)器channel都會(huì)通過(guò)這個(gè)一個(gè)register注冊(cè), 在以后的客戶端接入章節(jié)中我們會(huì)看到
這里我們繼續(xù)看register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) { //代碼省略 //所有的復(fù)制操作, 都交給eventLoop處理(1) AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { //做實(shí)際主注冊(cè)(2) register0(promise); } }); } catch (Throwable t) { //代碼省略 } } }
這里我們上一小節(jié)分析過(guò), 不再陌生, 這里只分析有關(guān)NioEventLoop相關(guān)的內(nèi)容
我們首先看到 AbstractChannel.this.eventLoop = eventLoop , 獲取當(dāng)前channel的NioEventLoop, 通過(guò)上一章的學(xué)習(xí), 我們知道每個(gè)channel創(chuàng)建的時(shí)候會(huì)綁定一個(gè)NioEventLoop
這里通過(guò)eventLoop.inEventLoop()判斷當(dāng)前線程是否是NioEventLoop線程, inEventLoop()方法在前面的小節(jié)剖析過(guò), 這里不再贅述
如果是NioEventLoop線程則通過(guò)register0(promise)方法做實(shí)際的注冊(cè), 但是我們第一次執(zhí)行注冊(cè)方法的時(shí)候, 如果是服務(wù)器channel是則是由server的用戶線程執(zhí)行的, 如果是客戶端channel, 則是由Boss線程執(zhí)行的, 所以走到這里均不是當(dāng)前channel的NioEventLoop的線程, 于是會(huì)走到下面的eventLoop.execute()方法中
eventLoop.execute()上一小節(jié)剖析過(guò), 就是將task添加到taskQueue中并且開(kāi)啟器NioEventLoop線程, 所以, 在這里就開(kāi)啟了NioEventLoop線程, 有關(guān)開(kāi)啟步驟, 可以通過(guò)上一小節(jié)內(nèi)容進(jìn)行回顧
這里注意一點(diǎn), 有的資料會(huì)講第一次開(kāi)啟NioEventLoop線程是在AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法中開(kāi)啟的, 個(gè)人經(jīng)過(guò)debug和分析, 實(shí)際上并不是那樣的, 希望大家不要被誤導(dǎo)
簡(jiǎn)單看下doBind0(regFuture, channel, localAddress, promise)方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //綁定端口 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
這里雖然調(diào)用了eventLoop的execute()方法, 但是eventLoop線程在注冊(cè)期間已經(jīng)啟動(dòng), 所以這里不會(huì)重復(fù)啟動(dòng), 只會(huì)將任務(wù)添加到taskQueue中
其實(shí)這里我們也能夠看出, 其實(shí)綁定端口的相關(guān)操作, 同樣是也是eventLoop線程中執(zhí)行的
以上就是Netty源碼分析NioEventLoop線程的啟動(dòng)的詳細(xì)內(nèi)容,更多關(guān)于Netty NioEventLoop線程啟動(dòng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Netty分布式NioSocketChannel注冊(cè)到selector方法解析
- Netty客戶端接入流程N(yùn)ioSocketChannel創(chuàng)建解析
- Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行源碼分析
- Netty源碼分析NioEventLoop執(zhí)行select操作入口
- Netty分布式NioEventLoop優(yōu)化selector源碼解析
- Netty源碼分析NioEventLoop初始化線程選擇器創(chuàng)建
- Netty源碼解析NioEventLoop創(chuàng)建的構(gòu)造方法
- Netty實(shí)戰(zhàn)源碼解析NIO編程
相關(guān)文章
MyBatis異常java.sql.SQLSyntaxErrorException的問(wèn)題解決
使用mybatis插入數(shù)據(jù)時(shí)出現(xiàn)java.sql.SQLSyntaxErrorException異常,本文就來(lái)介紹一下MyBatis異常的問(wèn)題解決,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08Spring定時(shí)任務(wù)@scheduled多線程使用@Async注解示例
這篇文章主要為大家介紹了Spring定時(shí)任務(wù)@scheduled多線程使用@Async注解示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總
今天小編就為大家分享一篇關(guān)于hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03IDEA使用MyBatisCodeHelperPro來(lái)generator代碼的詳細(xì)教程
這篇文章主要介紹了IDEA使用MyBatisCodeHelperPro來(lái)generator代碼的詳細(xì)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09Java NIO Path接口和Files類配合操作文件的實(shí)例
下面小編就為大家分享一篇Java NIO Path接口和Files類配合操作文件的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-11-11Spring Boot集成MinIO對(duì)象存儲(chǔ)服務(wù)器操作步驟
通過(guò)Spring Boot集成MinIO,你可以在應(yīng)用中方便地進(jìn)行文件的存儲(chǔ)和管理,本文給大家分享Spring Boot集成MinIO對(duì)象存儲(chǔ)服務(wù)器詳細(xì)操作步驟,感興趣的朋友一起看看吧2024-01-01