Netty源碼分析NioEventLoop線程的啟動(dòng)
之前的小節(jié)我們學(xué)習(xí)了NioEventLoop的創(chuàng)建以及線程分配器的初始化, 那么NioEventLoop是如何開啟的呢, 我們這一小節(jié)繼續(xù)學(xué)習(xí)
NioEventLoop開啟方法
NioEventLoop的開啟方法在其父類SingleThreadEventExecutor中的execute(Runnable task)方法中, 我們跟到這個(gè)方法:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//判斷當(dāng)前線程是不是eventLoop線程
boolean inEventLoop = inEventLoop();
//如果是eventLoop線程
if (inEventLoop) {
addTask(task);
} else {
//不是eventLoop線程啟動(dòng)線程
startThread();
//添加task
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}這個(gè)方法傳入一個(gè)Runnble對(duì)象, 也就是一個(gè)任務(wù)
首先boolean inEventLoop = inEventLoop()方法會(huì)判斷是不是NioEventLoop線程
跟進(jìn) inEventLoop()方法
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}這里inEventLoop(Thread.currentThread())方法傳入了當(dāng)前線程對(duì)象, 這個(gè)方法會(huì)調(diào)用當(dāng)前類的inEventLoop(Thread thread)方法
跟進(jìn)inEventLoop(Thread thread)方法:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}我們看到判斷的依據(jù)是當(dāng)前線程對(duì)象是不是NioEventLoop綁定的線程對(duì)象, 這里我們會(huì)想到開啟線程肯定會(huì)為NioEventLoop綁定一個(gè)線程對(duì)象, 如果判斷當(dāng)前線程對(duì)象不是當(dāng)前NioEventLoop綁定的線程對(duì)象, 說明執(zhí)行此方法的線程不是當(dāng)前NioEventLoop線程, 那么這個(gè)線程如何初始化的, 后面我們會(huì)講到, 我們繼續(xù)看execute(Runnable task)方法:
如果是NioEventLoop線程, 則會(huì)通過addTask(task)添加任務(wù), 通過NioEventLoop異步執(zhí)行, 那么這個(gè)task是什么時(shí)候執(zhí)行的, 同樣后面會(huì)講到
跟一下addTask(task)
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//如果添加不成功
if (!offerTask(task)) {
reject(task);
}
}這里offerTask(task)代表添加一個(gè)task, 跟進(jìn)去:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
//往taskQ中添加一個(gè)task
return taskQueue.offer(task);
}我們看到taskQueue.offer(task)將一個(gè)task添加到任務(wù)隊(duì)列, 而這個(gè)任務(wù)隊(duì)列taskQueue就是我們NioEventLoop初始化的時(shí)候與NioEventLoop唯一綁定的任務(wù)隊(duì)列
回顧一下初始構(gòu)造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}在這里通過 taskQueue = newTaskQueue(this.maxPendingTasks) 創(chuàng)建了taskQueue
回到execute(Runnable task)方法中, 我們繼續(xù)往下看:
如果不是NioEventLoop線程我們通過startThread()開啟一個(gè)NioEventLoop線程
跟到startThread()之前, 我們先繼續(xù)往下走:
開啟NioEventLoop線程之后, 又通過addTask(task)往taskQueue添加任務(wù)
最后我們注意有這么一段代碼:
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}addTaskWakesUp代表添加task之后, NioEventLoop的select()操作是不是要喚醒, 這個(gè)屬性是在初始化NioEventLoop的時(shí)候傳入的, 大家可以回顧下, 默認(rèn)是false, 這里!addTaskWakesUp就是需要喚醒, wakesUpForTask(task)與addTaskWakesUp意義相同, 默認(rèn)是true, 可以看代碼:
protected boolean wakesUpForTask(Runnable task) {
return true;
}這里恒為true, 所以這段代碼就是添加task時(shí)需要通過wakeup(inEventLoop)喚醒, 這樣NioEventLoop在做select()操作時(shí)如果正在阻塞則立刻喚醒, 然后執(zhí)行任務(wù)隊(duì)列的task
回到execute(Runnable task)方法中我們跟進(jìn)開啟線程的startThread()方法中:
private void startThread() {
//判斷線程是否啟動(dòng), 未啟動(dòng)則啟動(dòng)
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//當(dāng)前線程未啟動(dòng), 則啟動(dòng)
doStartThread();
}
}
}前面的判斷是判斷當(dāng)前NioEventLoop線程是否啟動(dòng), 如果未啟動(dòng), 則通過doStartThread()方法啟動(dòng), 我們第一次執(zhí)行execute(Runnable task)線程是未啟動(dòng)的, 所以會(huì)執(zhí)行doStartThread(), 后續(xù)該線程則不會(huì)再執(zhí)行doStartThread()方法
我們跟進(jìn)doStartThread()方法中
private void doStartThread() {
assert thread == null;
//線程執(zhí)行器執(zhí)行線程(所有的eventLoop共用一個(gè)線程執(zhí)行器)
executor.execute(new Runnable() {
@Override
public void run() {
//將當(dāng)前線程復(fù)制給屬性
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//開始輪詢
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//代碼省略
}
}
});
}我們重點(diǎn)關(guān)注executor.execute()這個(gè)方法, 其中executor就是我們創(chuàng)建NioEventLoop的線程器, execute()就是開啟一個(gè)線程
回顧下execute()方法
public void execute(Runnable command) {
//起一個(gè)線程
threadFactory.newThread(command).start();
}我們看到通過線程工廠開啟一個(gè)線程, 由于前面的小節(jié)已經(jīng)剖析, 這里不再贅述
開啟線程則執(zhí)行Runnble類中的run()方法, 我們看到在run()方法里通過 thread = Thread.currentThread() 將新開啟的線程對(duì)象賦值NioEventLoop的thread的屬性, 這樣就可以通過線程對(duì)象的判斷, 來確定是不是NioEventLoop線程了
后面我們看到 SingleThreadEventExecutor.this.run() , 這里this, 就是當(dāng)前NioEventLoop對(duì)象, 而這里的run()方法, 就是NioEventLoop中的run()方法, 在這個(gè)run()方法中, 真正開始了selector的輪詢工作, 對(duì)于run()方法的詳細(xì)剖析, 我們會(huì)在之后的小節(jié)中進(jìn)行
剛才我們剖析了NioEventLoop的啟動(dòng)方法, 那么根據(jù)我們的分析, 就是第一次調(diào)用NioEventLoop的execute(Runnable task)方法的時(shí)候, 則會(huì)開啟NioEventLoop線程, 之后的調(diào)用只是往taskQueue中添加任務(wù), 那么第一次是什么時(shí)候開啟的呢?這里我們要回顧上一章講過的內(nèi)容
上一章中我們講過在AbstractServerBootstrap中有個(gè)initAndRegister()方法, 這個(gè)方法主要用于channel的初始化和注冊(cè), 其中注冊(cè)的代碼為:
ChannelFuture regFuture = config().group().register(channel);
其中g(shù)roup()我們剖析過是Boss線程的group, 我們剖析過其中的register(channel)方法:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}首先跟到next()方法:
public EventLoop next() {
return (EventLoop) super.next();
}首先調(diào)用了其父類MultithreadEventExecutorGroup的next方法, 跟進(jìn)去:
public EventExecutor next() {
return chooser.next();
}這里chooser, 就是初始化NioEventLoopGroup的線程選擇器, 為此分配了不同的策略, 這里不再贅述, 通過這個(gè)方法, 返回一個(gè)NioEventLoop線程
回到MultithreadEventLoopGroup類的register()方法中, next().register(channel)代表分配后的NioEventLoop的register()方法, 這里會(huì)調(diào)用NioEventLoop的父類SingleThreadEventLoop類中的register()方法
跟到SingleThreadEventLoop類中的register()方法:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}DefaultChannelPromise是一個(gè)監(jiān)聽器, 它會(huì)跟隨channel的讀寫進(jìn)行監(jiān)聽, 綁定傳入的channel和NioEventLoop, 有關(guān)Promise后面的章節(jié)會(huì)講到
這里我們繼續(xù)跟進(jìn)register(new DefaultChannelPromise(channel, this))
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}unsafe()方法返回創(chuàng)建channel初始化的unsafe()對(duì)象, 如果是NioSeverSocketChannel, 則綁定NioMessageUnsafe對(duì)象, 上一小節(jié)進(jìn)行剖析過這里不再贅述
最終這個(gè)unsafe對(duì)象會(huì)調(diào)用到AbstractChannel的內(nèi)部類AbstractUnsafe中的register()方法, 這里register(), 無論是客戶端channel和服務(wù)器channel都會(huì)通過這個(gè)一個(gè)register注冊(cè), 在以后的客戶端接入章節(jié)中我們會(huì)看到
這里我們繼續(xù)看register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//代碼省略
//所有的復(fù)制操作, 都交給eventLoop處理(1)
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//做實(shí)際主注冊(cè)(2)
register0(promise);
}
});
} catch (Throwable t) {
//代碼省略
}
}
}這里我們上一小節(jié)分析過, 不再陌生, 這里只分析有關(guān)NioEventLoop相關(guān)的內(nèi)容
我們首先看到 AbstractChannel.this.eventLoop = eventLoop , 獲取當(dāng)前channel的NioEventLoop, 通過上一章的學(xué)習(xí), 我們知道每個(gè)channel創(chuàng)建的時(shí)候會(huì)綁定一個(gè)NioEventLoop
這里通過eventLoop.inEventLoop()判斷當(dāng)前線程是否是NioEventLoop線程, inEventLoop()方法在前面的小節(jié)剖析過, 這里不再贅述
如果是NioEventLoop線程則通過register0(promise)方法做實(shí)際的注冊(cè), 但是我們第一次執(zhí)行注冊(cè)方法的時(shí)候, 如果是服務(wù)器channel是則是由server的用戶線程執(zhí)行的, 如果是客戶端channel, 則是由Boss線程執(zhí)行的, 所以走到這里均不是當(dāng)前channel的NioEventLoop的線程, 于是會(huì)走到下面的eventLoop.execute()方法中
eventLoop.execute()上一小節(jié)剖析過, 就是將task添加到taskQueue中并且開啟器NioEventLoop線程, 所以, 在這里就開啟了NioEventLoop線程, 有關(guān)開啟步驟, 可以通過上一小節(jié)內(nèi)容進(jìn)行回顧
這里注意一點(diǎn), 有的資料會(huì)講第一次開啟NioEventLoop線程是在AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法中開啟的, 個(gè)人經(jīng)過debug和分析, 實(shí)際上并不是那樣的, 希望大家不要被誤導(dǎo)
簡單看下doBind0(regFuture, channel, localAddress, promise)方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//綁定端口
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}這里雖然調(diào)用了eventLoop的execute()方法, 但是eventLoop線程在注冊(cè)期間已經(jīng)啟動(dòng), 所以這里不會(huì)重復(fù)啟動(dòng), 只會(huì)將任務(wù)添加到taskQueue中
其實(shí)這里我們也能夠看出, 其實(shí)綁定端口的相關(guān)操作, 同樣是也是eventLoop線程中執(zhí)行的
以上就是Netty源碼分析NioEventLoop線程的啟動(dòng)的詳細(xì)內(nèi)容,更多關(guān)于Netty NioEventLoop線程啟動(dòng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Netty分布式NioSocketChannel注冊(cè)到selector方法解析
- Netty客戶端接入流程N(yùn)ioSocketChannel創(chuàng)建解析
- Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行源碼分析
- Netty源碼分析NioEventLoop執(zhí)行select操作入口
- Netty分布式NioEventLoop優(yōu)化selector源碼解析
- Netty源碼分析NioEventLoop初始化線程選擇器創(chuàng)建
- Netty源碼解析NioEventLoop創(chuàng)建的構(gòu)造方法
- Netty實(shí)戰(zhàn)源碼解析NIO編程
相關(guān)文章
不喜歡羅里吧嗦,講的很精簡易懂。從基礎(chǔ)開始講,后續(xù)會(huì)講到JAVA高級(jí),中間會(huì)穿插面試題和項(xiàng)目實(shí)戰(zhàn),希望能給大家?guī)韼椭?/div> 2022-03-03
MyBatis異常java.sql.SQLSyntaxErrorException的問題解決
使用mybatis插入數(shù)據(jù)時(shí)出現(xiàn)java.sql.SQLSyntaxErrorException異常,本文就來介紹一下MyBatis異常的問題解決,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08
Spring定時(shí)任務(wù)@scheduled多線程使用@Async注解示例
這篇文章主要為大家介紹了Spring定時(shí)任務(wù)@scheduled多線程使用@Async注解示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11
hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總
今天小編就為大家分享一篇關(guān)于hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-03-03
IDEA使用MyBatisCodeHelperPro來generator代碼的詳細(xì)教程
這篇文章主要介紹了IDEA使用MyBatisCodeHelperPro來generator代碼的詳細(xì)教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09
Java NIO Path接口和Files類配合操作文件的實(shí)例
下面小編就為大家分享一篇Java NIO Path接口和Files類配合操作文件的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2017-11-11
Spring Boot集成MinIO對(duì)象存儲(chǔ)服務(wù)器操作步驟
通過Spring Boot集成MinIO,你可以在應(yīng)用中方便地進(jìn)行文件的存儲(chǔ)和管理,本文給大家分享Spring Boot集成MinIO對(duì)象存儲(chǔ)服務(wù)器詳細(xì)操作步驟,感興趣的朋友一起看看吧2024-01-01最新評(píng)論

