Netty分布式Future與Promise執(zhí)行回調(diào)相關(guān)邏輯剖析
Future和Promise執(zhí)行回調(diào)
Netty中的Future, 其實(shí)類似于jdk的Future, 用于異步獲取執(zhí)行結(jié)果
Promise則相當(dāng)于一個(gè)被觀察者, 其中promise對(duì)象會(huì)一直跟隨著channel的讀寫(xiě)事件, 并跟蹤著事件狀態(tài), 然后執(zhí)行相應(yīng)的回調(diào)
這種設(shè)計(jì)思路也就是java設(shè)計(jì)模式的觀察者模式
首先我們看一段寫(xiě)在handler中的業(yè)務(wù)代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture future = ctx.writeAndFlush("test data");
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("寫(xiě)出成功");
}else{
System.out.println("寫(xiě)出失敗");
}
}
});
}熟悉netty的小伙伴估計(jì)對(duì)這段代碼并不陌生, 首先調(diào)用writeAndFlush方法將數(shù)據(jù)寫(xiě)出, 然后返回的future進(jìn)行添加Listener, 并且重寫(xiě)回調(diào)函數(shù)
這里舉一個(gè)最簡(jiǎn)單的示例, 在回調(diào)函數(shù)中判斷future的狀態(tài)成功與否, 成功的話就打印"寫(xiě)出成功", 否則節(jié)打印"寫(xiě)出失敗"
這里如果寫(xiě)在handler中通常是NioEventLoop線程執(zhí)行的, 在future返回之后才會(huì)執(zhí)行添加listener的操作, 如果在用戶線程中writeAndFlush是異步執(zhí)行的, 在添加監(jiān)聽(tīng)的時(shí)候有可能寫(xiě)出操作沒(méi)有執(zhí)行完畢, 等寫(xiě)出操作執(zhí)行完畢之后才會(huì)執(zhí)行回調(diào)
以上邏輯在代碼中如何體現(xiàn)的呢?我們首先跟到writeAndFlush的方法中去
這里會(huì)走到AbstractChannelHandlerContext中的writeAndFlush方法中:
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}這里的邏輯之前剖析過(guò), 想必大家并不陌生
這里關(guān)注newPromise()方法, 跟進(jìn)去
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}這里直接創(chuàng)建了DefaultChannelPromise這個(gè)對(duì)象并傳入了當(dāng)前channel和當(dāng)前channel綁定NioEventLoop對(duì)象
在DefaultChannelPromise構(gòu)造方法中, 也會(huì)將channel和NioEventLoop對(duì)象綁定在自身成員變量中
回到writeAndFlush方法繼續(xù)跟
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}這里的邏輯也不陌生, 注意這里最后返回了promise, 其實(shí)就是我們上一步創(chuàng)建DefaultChannelPromise對(duì)象
DefaultChannelPromise實(shí)現(xiàn)了ChannelFuture接口, 所以方法如果返回該對(duì)象可以被ChannelFuture類型接收
我們繼續(xù)跟write方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}這里的邏輯我們同樣不陌生, 如果nioEventLoop線程, 我們繼續(xù)調(diào)invokeWriteAndFlush方法, 如果不是nioEventLoop線程則將writeAndFlush事件封裝成task, 交給eventLoop線程異步
這里如果是異步執(zhí)行, 則到這一步之后, 我們的業(yè)務(wù)代碼中, writeAndFlush就會(huì)返回并添加監(jiān)聽(tīng), 有關(guān)添加監(jiān)聽(tīng)的邏輯稍后分析
走到這里, 無(wú)論同步異步, 都會(huì)執(zhí)行到invokeWriteAndFlush方法:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}這里也是我們熟悉的邏輯, 我們看到在invokeWrite0方法中傳入了我們剛才創(chuàng)建的DefaultChannelPromise
后續(xù)邏輯想必大家都比較熟悉, 通過(guò)事件傳播, 最終會(huì)調(diào)用head節(jié)點(diǎn)的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}這里最終調(diào)用unsafe的write方法, 并傳入了promise對(duì)象
跟到AbstractUnsafe的write方法中:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//負(fù)責(zé)緩沖寫(xiě)進(jìn)來(lái)的byteBuf
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//插入寫(xiě)隊(duì)列
outboundBuffer.addMessage(msg, size, promise);
}這里的邏輯之前小節(jié)也剖析過(guò), 這里我們首先關(guān)注兩個(gè)部分, 首先看在catch中safeSetFailure這步
因?yàn)槭莄atch塊, 說(shuō)明發(fā)生了異常, 寫(xiě)到緩沖區(qū)不成功, safeSetFailure就是設(shè)置寫(xiě)出失敗的狀態(tài)
我們跟到safeSetFailure方法中:
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}這里看if判斷, 首先我們的promise是DefaultChannelPromise, 所以!(promise instanceof VoidChannelPromise)為true
重點(diǎn)分析promise.tryFailure(cause), 這里是設(shè)置失敗狀態(tài), 這里會(huì)調(diào)用DefaultPromise的tryFailure方法
跟進(jìn)tryFailure方法
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}再跟到setFailure0(cause)中:
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}這里在if塊中的cas操作, 會(huì)將參數(shù)objResult的值設(shè)置到DefaultPromise的成員變量result中, 表示當(dāng)前操作為異常狀態(tài)
回到tryFailure方法:
這里關(guān)注notifyListeners()這個(gè)方法, 這個(gè)方法是執(zhí)行添加監(jiān)聽(tīng)的回調(diào)函數(shù), 當(dāng)writeAndFlush和addListener是異步執(zhí)行的時(shí)候, 這里有可能添加已經(jīng)添加, 所以通過(guò)這個(gè)方法可以調(diào)用添加監(jiān)聽(tīng)后的回調(diào)
如果writeAndFlush和addListener是同步執(zhí)行的時(shí)候, 也就是都在NioEventLoop線程中執(zhí)行的時(shí)候, 那么走到這里addListener還沒(méi)執(zhí)行, 所以這里不能回調(diào)添加監(jiān)聽(tīng)的回調(diào)函數(shù), 那么回調(diào)是什么時(shí)候執(zhí)行的呢?我們?cè)谄饰鯽ddListener步驟的時(shí)候會(huì)給大家分析
具體執(zhí)行回調(diào)我們?cè)僦v解添加監(jiān)聽(tīng)的時(shí)候進(jìn)行剖析
以上就是記錄異常狀態(tài)的大概邏輯
回到AbstractUnsafe的write方法:
我們?cè)訇P(guān)注這一步:
outboundBuffer.addMessage(msg, size, promise);
跟到addMessage方法中
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
//代碼省略
}我們只需要關(guān)注包裝Entry的newInstance方法, 該方法傳入promise對(duì)象
跟到newInstance中:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size;
entry.total = total;
entry.promise = promise;
return entry;
}這里將promise設(shè)置到Entry的成員變量中了, 也就是說(shuō), 每個(gè)Entry都關(guān)聯(lián)了唯一的一個(gè)promise
我們回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}我們剛才分析了write操作中promise的傳遞以及狀態(tài)設(shè)置的大概過(guò)程, 我們繼續(xù)看在flush中promise的操作過(guò)程
這里invokeFlush0()并沒(méi)有傳入promise對(duì)象, 是因?yàn)槲覀儎偛欧治鲞^(guò), promise對(duì)象會(huì)綁定在緩沖區(qū)中entry的成員變量中, 可以通過(guò)其成員變量拿到promise對(duì)象
invokeFlush0()我們之前也分析過(guò), 通過(guò)事件傳遞, 最終會(huì)調(diào)用HeadContext的flush方法:
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}最后跟到AbstractUnsafe的flush方法
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}這塊邏輯之前已分析過(guò), 繼續(xù)看flush0方法:
protected void flush0() {
//代碼省略
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
//代碼省略
} finally {
inFlush0 = false;
}
}篇幅原因我們省略大段代碼
我們繼續(xù)跟進(jìn)doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}
if (msg instanceof ByteBuf) {
//代碼省略
boolean done = false;
//代碼省略
if (done) {
//移除當(dāng)前對(duì)象
in.remove();
} else {
break;
}
} else if (msg instanceof FileRegion) {
//代碼省略
} else {
throw new Error();
}
}
incompleteWrite(setOpWrite);
}這里也省略了大段代碼, 我們重點(diǎn)關(guān)注in.remove()這里, 之前介紹過(guò), 如果done為true, 說(shuō)明刷新事件已完成, 則移除當(dāng)前entry節(jié)點(diǎn)
我們跟到remove()方法中
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
e.recycle();
return true;
}這里我們看這一步:
ChannelPromise promise = e.promise;
之前我們剖析promise對(duì)象會(huì)綁定在entry中, 而這步就是從entry中獲取promise對(duì)象
等remove操作完成, 會(huì)執(zhí)行到這一步:
safeSuccess(promise);
這一步正好和我們剛才分析的safeSetFailure相反, 這里是設(shè)置成功狀態(tài)
跟到safeSuccess方法中:
private static void safeSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise)) {
PromiseNotificationUtil.trySuccess(promise, null, logger);
}
}再跟到trySuccess方法中
public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) {
if (!p.trySuccess(result) && logger != null) {
//代碼省略
}
}這里再繼續(xù)跟if中的trySuccess方法, 最后會(huì)走到DefaultPromise的trySuccess方法:
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}這里跟到setSuccess0方法中:
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}這里的邏輯我們剛才剖析過(guò)了, 這里參數(shù)傳入一個(gè)信號(hào)SUCCESS, 表示設(shè)置成功狀
再繼續(xù)跟setValue方法:
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}同樣, 在if判斷中, 通過(guò)cas操作將參數(shù)傳入的SUCCESS對(duì)象賦值到DefaultPromise的屬性result中, 我們看這個(gè)屬性:
private volatile Object result;
這里是Object類型, 也就是可以賦值成任何類型
SUCCESS是一個(gè)Signal類型的對(duì)象, 這里我們可以簡(jiǎn)單理解成一種狀態(tài), SUCCESS表示一種成功的狀態(tài)
通過(guò)上述cas操作, result的值將賦值成SUCCESS
我們回到trySuccess方法:
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}設(shè)置完成功狀態(tài)之后, 則會(huì)通過(guò)notifyListeners()執(zhí)行監(jiān)聽(tīng)中的回調(diào)
我們看用戶代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture future = ctx.writeAndFlush("test data");
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("寫(xiě)出成功");
}else{
System.out.println("寫(xiě)出失敗");
}
}
});
}在回調(diào)中會(huì)判斷future.isSuccess(), promise設(shè)置為成功狀態(tài)這里會(huì)返回true, 從而打印寫(xiě)出成功"
跟到isSuccess方法中, 這里會(huì)調(diào)用DefaultPromise的isSuccess方法:
public boolean isSuccess() {
Object result = this.result;
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}我們看到首先會(huì)拿到result對(duì)象, 然后判斷result不為空, 并且不是UNCANCELLABLE, 并且不屬于CauseHolder對(duì)象
我們剛才分析如果promise設(shè)置為成功裝載, 則result為SUCCESS, 所以這里條件成立, 可以執(zhí)行 if (future.isSuccess()) 中if塊的邏輯
和設(shè)置錯(cuò)誤狀態(tài)的邏輯一樣, 這里也有同樣的問(wèn)題, 如果writeAndFlush是和addListener是異步操作, 那么執(zhí)行到回調(diào)的時(shí)候, 可能addListener已經(jīng)添加完成, 所以可以正常的執(zhí)行回調(diào)
那么如果writeAndFlush是和addListener是同步操作, writeAndFlush在執(zhí)行回調(diào)的時(shí)候, addListener并沒(méi)有執(zhí)行, 所以無(wú)法執(zhí)行回調(diào)方法, 那么回調(diào)方法是如何執(zhí)行的呢, 我們看addListener這個(gè)方法:
addListener傳入ChannelFutureListener對(duì)象, 并重寫(xiě)了operationComplete方法, 也就是執(zhí)行回調(diào)的方法
這里會(huì)執(zhí)行到DefaultChannelPromise的addListener方法, 跟進(jìn)去
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}跟到父類的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}這里通過(guò)addListener0方法添加listener, 因?yàn)樘砑觢istener有可能會(huì)在不同的線程中操作, 比如用戶線程和NioEventLoop線程, 為了防止并發(fā)問(wèn)題, 這里簡(jiǎn)單粗暴的加了個(gè)synchronized關(guān)鍵字
跟到addListener0方法中
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}如果是第一次添加listener, 則成員變量listeners為null, 這樣就把參數(shù)傳入的GenericFutureListener賦值到成員變量listeners
如果是第二次添加listener, listeners不為空, 會(huì)走到else if判斷, 因?yàn)榈谝淮翁砑拥膌istener是GenericFutureListener類型, 并不是DefaultFutureListeners類型, 所以else if判斷返回false, 進(jìn)入到else塊中
else塊中, 通過(guò)new的方式創(chuàng)建一個(gè)DefaultFutureListeners對(duì)象并賦值到成員變量listeners中
DefaultFutureListeners的構(gòu)造方法中, 第一個(gè)參數(shù)傳入DefaultPromise中的成員變量listeners, 也就是第一次添加的GenericFutureListener對(duì)象, 第二個(gè)參數(shù)為第二次添加的GenericFutureListener對(duì)象, 這里通過(guò)兩個(gè)GenericFutureListener對(duì)象包裝成一個(gè)DefaultFutureListeners對(duì)象
我們看listeners的定義:
private Object listeners;
這里是個(gè)Object類型, 所以可以保存任何類型的對(duì)象
再看DefaultFutureListeners的構(gòu)造方法:
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
//第0個(gè)
listeners[0] = first;
//第1個(gè)
listeners[1] = second;
size = 2;
//代碼省略
}在DefaultFutureListeners類中也定義了一個(gè)成員變量listeners, 類型為GenericFutureListener數(shù)組
構(gòu)造方法中初始化listeners這個(gè)數(shù)組, 并且數(shù)組中第一個(gè)值賦值為我們第一次添加的GenericFutureListener, 第二個(gè)賦值為我們第二次添加的GenericFutureListener
回到addListener0方法中
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}經(jīng)過(guò)兩次添加listener, 屬性listeners的值就變成了DefaultFutureListeners類型的對(duì)象, 如果第三次添加listener, 則會(huì)走到else if塊中, DefaultFutureListeners對(duì)象通過(guò)調(diào)用add方法繼續(xù)添加listener
跟到add方法中:
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
//代碼省略
}這里的邏輯也比較簡(jiǎn)單, 就是為當(dāng)前的數(shù)組對(duì)象listeners中追加新的GenericFutureListener對(duì)象, 如果listeners容量不足則進(jìn)行擴(kuò)容操作
根據(jù)以上邏輯, 就完成了listener的添加邏輯
那么再看我們剛才遺留的問(wèn)題, 如果writeAndFlush和addListener是同步進(jìn)行的, writeAndFlush執(zhí)行回調(diào)時(shí)還沒(méi)有addListener還沒(méi)有執(zhí)行回調(diào), 那么回調(diào)是如何執(zhí)行的呢?
回到DefaultPromise的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}我們分析完了addListener0方法, 再往下看
這個(gè)會(huì)有if判斷isDone(), isDone方法, 就是程序執(zhí)行到這一步的時(shí)候, 判斷刷新事件是否執(zhí)行完成
跟到isDone方法中
public boolean isDone() {
return isDone0(result);
}繼續(xù)跟isDone0, 這里傳入了成員變量result
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}這里判斷result不為null并且不為UNCANCELLABLE, 則就表示完成
因?yàn)槌晒Φ臓顟B(tài)是SUCCESS, 所以flush成功這里會(huì)返回true
回到 addListener中:
如果執(zhí)行完成, 就通過(guò)notifyListeners()方法執(zhí)行回調(diào), 這也解釋剛才的問(wèn)題, 在同步操作中, writeAndFlush在執(zhí)行回調(diào)時(shí)并沒(méi)有添加listener, 所以添加listener的時(shí)候會(huì)判斷writeAndFlush的執(zhí)行狀態(tài), 如果狀態(tài)時(shí)完成, 則會(huì)這里執(zhí)行回調(diào)
同樣, 在異步操作中, 走到這里writeAndFlush可能還沒(méi)完成, 所以這里不會(huì)執(zhí)行回調(diào), 由writeAndFlush執(zhí)行回調(diào)
所以, 無(wú)論writeAndFlush和addListener誰(shuí)先完成, 都可以執(zhí)行到回調(diào)方法
跟到notifyListeners()方法中
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}這里首先判斷是否是eventLoop線程, 如果是eventLoop線程則執(zhí)行if塊中的邏輯, 如果不是eventLoop線程, 則把執(zhí)行回調(diào)的邏輯封裝成task丟到EventLoop的任務(wù)隊(duì)列中異步執(zhí)行
我們重點(diǎn)關(guān)注notifyListenersNow()方法, 跟進(jìn)去:
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
//代碼省略
}
}在無(wú)限for循環(huán)中, 首先首先判斷l(xiāng)isteners是不是DefaultFutureListeners類型, 根據(jù)我們之前的邏輯, 如果只添加了一個(gè)listener, 則listeners是GenericFutureListener類型
通常在添加的時(shí)候只會(huì)添加一個(gè)listener, 所以我們跟到else塊中的notifyListener0方法:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}我們看到, 這里執(zhí)行了GenericFutureListener的中我們重寫(xiě)的回調(diào)函數(shù)operationComplete
以上就是執(zhí)行回調(diào)的相關(guān)邏輯
章節(jié)小結(jié)
這一章講解了有關(guān)write和flush的相關(guān)邏輯, 并分析了有關(guān)添加監(jiān)聽(tīng)和異步寫(xiě)數(shù)據(jù)的相關(guān)步驟
經(jīng)過(guò)學(xué)習(xí), 同學(xué)們應(yīng)該掌握如下知識(shí):
write操作是如何將ByteBuf添加到發(fā)送緩沖區(qū)的
flush操作是如何將ByteBuf寫(xiě)出到chanel中的
抽象編碼器MessageToByteEncoder中如何定義了編碼器的骨架邏輯
writeAndFlush和addListener在同步和異步操作中是如何執(zhí)行回調(diào)的
更多關(guān)于Netty分布式Future和Promise執(zhí)行回調(diào)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天/群聊系統(tǒng)
這篇文章主要實(shí)現(xiàn)在好友添加、建群、聊天對(duì)話、群聊功能,使用Java作為后端語(yǔ)言進(jìn)行支持,界面友好,開(kāi)發(fā)簡(jiǎn)單,文章中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2023-08-08
SpringBoot加載應(yīng)用事件監(jiān)聽(tīng)器代碼實(shí)例
這篇文章主要介紹了SpringBoot加載應(yīng)用事件監(jiān)聽(tīng)器代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范
這篇文章主要介紹了Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范,IOC又名控制反轉(zhuǎn),把對(duì)象創(chuàng)建和對(duì)象之間的調(diào)用過(guò)程,交給Spring進(jìn)行管理,目的是為了降低耦合度,需要的朋友可以參考下2023-05-05
SpringBoot優(yōu)化接口響應(yīng)時(shí)間的九個(gè)技巧
在實(shí)際開(kāi)發(fā)中,提升接口響應(yīng)速度是一件挺重要的事,特別是在面臨大量用戶請(qǐng)求的時(shí)候,本文為大家整理了9個(gè)SpringBoot優(yōu)化接口響應(yīng)時(shí)間的技巧,希望對(duì)大家有所幫助2024-01-01
Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例
這篇文章主要介紹了Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。2017-12-12
Java 中的FileReader和FileWriter源碼分析_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
本文給大家分享一段示例程序,通過(guò)示例代碼可以看出FileReader是基于InputStreamReader實(shí)現(xiàn)的,FileWriter是基于OutputStreamWriter實(shí)現(xiàn)的,具體程序代碼大家通過(guò)本文了解下吧2017-05-05
maven中resource配置的實(shí)現(xiàn)示例
我們?cè)谑褂肕aven組件來(lái)構(gòu)建項(xiàng)目的時(shí)候,通常將配置文件放在資源文件目錄下,本文主要介紹了maven中resource配置的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09

