Netty分布式Future與Promise執(zhí)行回調(diào)相關(guān)邏輯剖析
Future和Promise執(zhí)行回調(diào)
Netty中的Future, 其實(shí)類似于jdk的Future, 用于異步獲取執(zhí)行結(jié)果
Promise則相當(dāng)于一個(gè)被觀察者, 其中promise對(duì)象會(huì)一直跟隨著channel的讀寫事件, 并跟蹤著事件狀態(tài), 然后執(zhí)行相應(yīng)的回調(diào)
這種設(shè)計(jì)思路也就是java設(shè)計(jì)模式的觀察者模式
首先我們看一段寫在handler中的業(yè)務(wù)代碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }
熟悉netty的小伙伴估計(jì)對(duì)這段代碼并不陌生, 首先調(diào)用writeAndFlush方法將數(shù)據(jù)寫出, 然后返回的future進(jìn)行添加Listener, 并且重寫回調(diào)函數(shù)
這里舉一個(gè)最簡(jiǎn)單的示例, 在回調(diào)函數(shù)中判斷future的狀態(tài)成功與否, 成功的話就打印"寫出成功", 否則節(jié)打印"寫出失敗"
這里如果寫在handler中通常是NioEventLoop線程執(zhí)行的, 在future返回之后才會(huì)執(zhí)行添加listener的操作, 如果在用戶線程中writeAndFlush是異步執(zhí)行的, 在添加監(jiān)聽的時(shí)候有可能寫出操作沒(méi)有執(zhí)行完畢, 等寫出操作執(zhí)行完畢之后才會(huì)執(zhí)行回調(diào)
以上邏輯在代碼中如何體現(xiàn)的呢?我們首先跟到writeAndFlush的方法中去
這里會(huì)走到AbstractChannelHandlerContext中的writeAndFlush方法中:
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
這里的邏輯之前剖析過(guò), 想必大家并不陌生
這里關(guān)注newPromise()方法, 跟進(jìn)去
public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }
這里直接創(chuàng)建了DefaultChannelPromise這個(gè)對(duì)象并傳入了當(dāng)前channel和當(dāng)前channel綁定NioEventLoop對(duì)象
在DefaultChannelPromise構(gòu)造方法中, 也會(huì)將channel和NioEventLoop對(duì)象綁定在自身成員變量中
回到writeAndFlush方法繼續(xù)跟
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); return promise; } write(msg, true, promise); return promise; }
這里的邏輯也不陌生, 注意這里最后返回了promise, 其實(shí)就是我們上一步創(chuàng)建DefaultChannelPromise對(duì)象
DefaultChannelPromise實(shí)現(xiàn)了ChannelFuture接口, 所以方法如果返回該對(duì)象可以被ChannelFuture類型接收
我們繼續(xù)跟write方法
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里的邏輯我們同樣不陌生, 如果nioEventLoop線程, 我們繼續(xù)調(diào)invokeWriteAndFlush方法, 如果不是nioEventLoop線程則將writeAndFlush事件封裝成task, 交給eventLoop線程異步
這里如果是異步執(zhí)行, 則到這一步之后, 我們的業(yè)務(wù)代碼中, writeAndFlush就會(huì)返回并添加監(jiān)聽, 有關(guān)添加監(jiān)聽的邏輯稍后分析
走到這里, 無(wú)論同步異步, 都會(huì)執(zhí)行到invokeWriteAndFlush方法:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
這里也是我們熟悉的邏輯, 我們看到在invokeWrite0方法中傳入了我們剛才創(chuàng)建的DefaultChannelPromise
后續(xù)邏輯想必大家都比較熟悉, 通過(guò)事件傳播, 最終會(huì)調(diào)用head節(jié)點(diǎn)的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這里最終調(diào)用unsafe的write方法, 并傳入了promise對(duì)象
跟到AbstractUnsafe的write方法中:
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //負(fù)責(zé)緩沖寫進(jìn)來(lái)的byteBuf ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入寫隊(duì)列 outboundBuffer.addMessage(msg, size, promise); }
這里的邏輯之前小節(jié)也剖析過(guò), 這里我們首先關(guān)注兩個(gè)部分, 首先看在catch中safeSetFailure這步
因?yàn)槭莄atch塊, 說(shuō)明發(fā)生了異常, 寫到緩沖區(qū)不成功, safeSetFailure就是設(shè)置寫出失敗的狀態(tài)
我們跟到safeSetFailure方法中:
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }
這里看if判斷, 首先我們的promise是DefaultChannelPromise, 所以!(promise instanceof VoidChannelPromise)為true
重點(diǎn)分析promise.tryFailure(cause), 這里是設(shè)置失敗狀態(tài), 這里會(huì)調(diào)用DefaultPromise的tryFailure方法
跟進(jìn)tryFailure方法
public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }
再跟到setFailure0(cause)中:
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
這里在if塊中的cas操作, 會(huì)將參數(shù)objResult的值設(shè)置到DefaultPromise的成員變量result中, 表示當(dāng)前操作為異常狀態(tài)
回到tryFailure方法:
這里關(guān)注notifyListeners()這個(gè)方法, 這個(gè)方法是執(zhí)行添加監(jiān)聽的回調(diào)函數(shù), 當(dāng)writeAndFlush和addListener是異步執(zhí)行的時(shí)候, 這里有可能添加已經(jīng)添加, 所以通過(guò)這個(gè)方法可以調(diào)用添加監(jiān)聽后的回調(diào)
如果writeAndFlush和addListener是同步執(zhí)行的時(shí)候, 也就是都在NioEventLoop線程中執(zhí)行的時(shí)候, 那么走到這里addListener還沒(méi)執(zhí)行, 所以這里不能回調(diào)添加監(jiān)聽的回調(diào)函數(shù), 那么回調(diào)是什么時(shí)候執(zhí)行的呢?我們?cè)谄饰鯽ddListener步驟的時(shí)候會(huì)給大家分析
具體執(zhí)行回調(diào)我們?cè)僦v解添加監(jiān)聽的時(shí)候進(jìn)行剖析
以上就是記錄異常狀態(tài)的大概邏輯
回到AbstractUnsafe的write方法:
我們?cè)訇P(guān)注這一步:
outboundBuffer.addMessage(msg, size, promise);
跟到addMessage方法中
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); //代碼省略 }
我們只需要關(guān)注包裝Entry的newInstance方法, 該方法傳入promise對(duì)象
跟到newInstance中:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; }
這里將promise設(shè)置到Entry的成員變量中了, 也就是說(shuō), 每個(gè)Entry都關(guān)聯(lián)了唯一的一個(gè)promise
我們回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
我們剛才分析了write操作中promise的傳遞以及狀態(tài)設(shè)置的大概過(guò)程, 我們繼續(xù)看在flush中promise的操作過(guò)程
這里invokeFlush0()并沒(méi)有傳入promise對(duì)象, 是因?yàn)槲覀儎偛欧治鲞^(guò), promise對(duì)象會(huì)綁定在緩沖區(qū)中entry的成員變量中, 可以通過(guò)其成員變量拿到promise對(duì)象
invokeFlush0()我們之前也分析過(guò), 通過(guò)事件傳遞, 最終會(huì)調(diào)用HeadContext的flush方法:
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
最后跟到AbstractUnsafe的flush方法
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
這塊邏輯之前已分析過(guò), 繼續(xù)看flush0方法:
protected void flush0() { //代碼省略 try { doWrite(outboundBuffer); } catch (Throwable t) { //代碼省略 } finally { inFlush0 = false; } }
篇幅原因我們省略大段代碼
我們繼續(xù)跟進(jìn)doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //代碼省略 boolean done = false; //代碼省略 if (done) { //移除當(dāng)前對(duì)象 in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略 } else { throw new Error(); } } incompleteWrite(setOpWrite); }
這里也省略了大段代碼, 我們重點(diǎn)關(guān)注in.remove()這里, 之前介紹過(guò), 如果done為true, 說(shuō)明刷新事件已完成, 則移除當(dāng)前entry節(jié)點(diǎn)
我們跟到remove()方法中
public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }
這里我們看這一步:
ChannelPromise promise = e.promise;
之前我們剖析promise對(duì)象會(huì)綁定在entry中, 而這步就是從entry中獲取promise對(duì)象
等remove操作完成, 會(huì)執(zhí)行到這一步:
safeSuccess(promise);
這一步正好和我們剛才分析的safeSetFailure相反, 這里是設(shè)置成功狀態(tài)
跟到safeSuccess方法中:
private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise)) { PromiseNotificationUtil.trySuccess(promise, null, logger); } }
再跟到trySuccess方法中
public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { //代碼省略 } }
這里再繼續(xù)跟if中的trySuccess方法, 最后會(huì)走到DefaultPromise的trySuccess方法:
public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
這里跟到setSuccess0方法中:
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
這里的邏輯我們剛才剖析過(guò)了, 這里參數(shù)傳入一個(gè)信號(hào)SUCCESS, 表示設(shè)置成功狀
再繼續(xù)跟setValue方法:
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
同樣, 在if判斷中, 通過(guò)cas操作將參數(shù)傳入的SUCCESS對(duì)象賦值到DefaultPromise的屬性result中, 我們看這個(gè)屬性:
private volatile Object result;
這里是Object類型, 也就是可以賦值成任何類型
SUCCESS是一個(gè)Signal類型的對(duì)象, 這里我們可以簡(jiǎn)單理解成一種狀態(tài), SUCCESS表示一種成功的狀態(tài)
通過(guò)上述cas操作, result的值將賦值成SUCCESS
我們回到trySuccess方法:
public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; }
設(shè)置完成功狀態(tài)之后, 則會(huì)通過(guò)notifyListeners()執(zhí)行監(jiān)聽中的回調(diào)
我們看用戶代碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()){ System.out.println("寫出成功"); }else{ System.out.println("寫出失敗"); } } }); }
在回調(diào)中會(huì)判斷future.isSuccess(), promise設(shè)置為成功狀態(tài)這里會(huì)返回true, 從而打印寫出成功"
跟到isSuccess方法中, 這里會(huì)調(diào)用DefaultPromise的isSuccess方法:
public boolean isSuccess() { Object result = this.result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); }
我們看到首先會(huì)拿到result對(duì)象, 然后判斷result不為空, 并且不是UNCANCELLABLE, 并且不屬于CauseHolder對(duì)象
我們剛才分析如果promise設(shè)置為成功裝載, 則result為SUCCESS, 所以這里條件成立, 可以執(zhí)行 if (future.isSuccess()) 中if塊的邏輯
和設(shè)置錯(cuò)誤狀態(tài)的邏輯一樣, 這里也有同樣的問(wèn)題, 如果writeAndFlush是和addListener是異步操作, 那么執(zhí)行到回調(diào)的時(shí)候, 可能addListener已經(jīng)添加完成, 所以可以正常的執(zhí)行回調(diào)
那么如果writeAndFlush是和addListener是同步操作, writeAndFlush在執(zhí)行回調(diào)的時(shí)候, addListener并沒(méi)有執(zhí)行, 所以無(wú)法執(zhí)行回調(diào)方法, 那么回調(diào)方法是如何執(zhí)行的呢, 我們看addListener這個(gè)方法:
addListener傳入ChannelFutureListener對(duì)象, 并重寫了operationComplete方法, 也就是執(zhí)行回調(diào)的方法
這里會(huì)執(zhí)行到DefaultChannelPromise的addListener方法, 跟進(jìn)去
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }
跟到父類的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
這里通過(guò)addListener0方法添加listener, 因?yàn)樘砑觢istener有可能會(huì)在不同的線程中操作, 比如用戶線程和NioEventLoop線程, 為了防止并發(fā)問(wèn)題, 這里簡(jiǎn)單粗暴的加了個(gè)synchronized關(guān)鍵字
跟到addListener0方法中
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }
如果是第一次添加listener, 則成員變量listeners為null, 這樣就把參數(shù)傳入的GenericFutureListener賦值到成員變量listeners
如果是第二次添加listener, listeners不為空, 會(huì)走到else if判斷, 因?yàn)榈谝淮翁砑拥膌istener是GenericFutureListener類型, 并不是DefaultFutureListeners類型, 所以else if判斷返回false, 進(jìn)入到else塊中
else塊中, 通過(guò)new的方式創(chuàng)建一個(gè)DefaultFutureListeners對(duì)象并賦值到成員變量listeners中
DefaultFutureListeners的構(gòu)造方法中, 第一個(gè)參數(shù)傳入DefaultPromise中的成員變量listeners, 也就是第一次添加的GenericFutureListener對(duì)象, 第二個(gè)參數(shù)為第二次添加的GenericFutureListener對(duì)象, 這里通過(guò)兩個(gè)GenericFutureListener對(duì)象包裝成一個(gè)DefaultFutureListeners對(duì)象
我們看listeners的定義:
private Object listeners;
這里是個(gè)Object類型, 所以可以保存任何類型的對(duì)象
再看DefaultFutureListeners的構(gòu)造方法:
DefaultFutureListeners( GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2]; //第0個(gè) listeners[0] = first; //第1個(gè) listeners[1] = second; size = 2; //代碼省略 }
在DefaultFutureListeners類中也定義了一個(gè)成員變量listeners, 類型為GenericFutureListener數(shù)組
構(gòu)造方法中初始化listeners這個(gè)數(shù)組, 并且數(shù)組中第一個(gè)值賦值為我們第一次添加的GenericFutureListener, 第二個(gè)賦值為我們第二次添加的GenericFutureListener
回到addListener0方法中
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } }
經(jīng)過(guò)兩次添加listener, 屬性listeners的值就變成了DefaultFutureListeners類型的對(duì)象, 如果第三次添加listener, 則會(huì)走到else if塊中, DefaultFutureListeners對(duì)象通過(guò)調(diào)用add方法繼續(xù)添加listener
跟到add方法中:
public void add(GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } listeners[size] = l; this.size = size + 1; //代碼省略 }
這里的邏輯也比較簡(jiǎn)單, 就是為當(dāng)前的數(shù)組對(duì)象listeners中追加新的GenericFutureListener對(duì)象, 如果listeners容量不足則進(jìn)行擴(kuò)容操作
根據(jù)以上邏輯, 就完成了listener的添加邏輯
那么再看我們剛才遺留的問(wèn)題, 如果writeAndFlush和addListener是同步進(jìn)行的, writeAndFlush執(zhí)行回調(diào)時(shí)還沒(méi)有addListener還沒(méi)有執(zhí)行回調(diào), 那么回調(diào)是如何執(zhí)行的呢?
回到DefaultPromise的addListener中:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
我們分析完了addListener0方法, 再往下看
這個(gè)會(huì)有if判斷isDone(), isDone方法, 就是程序執(zhí)行到這一步的時(shí)候, 判斷刷新事件是否執(zhí)行完成
跟到isDone方法中
public boolean isDone() { return isDone0(result); }
繼續(xù)跟isDone0, 這里傳入了成員變量result
private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }
這里判斷result不為null并且不為UNCANCELLABLE, 則就表示完成
因?yàn)槌晒Φ臓顟B(tài)是SUCCESS, 所以flush成功這里會(huì)返回true
回到 addListener中:
如果執(zhí)行完成, 就通過(guò)notifyListeners()方法執(zhí)行回調(diào), 這也解釋剛才的問(wèn)題, 在同步操作中, writeAndFlush在執(zhí)行回調(diào)時(shí)并沒(méi)有添加listener, 所以添加listener的時(shí)候會(huì)判斷writeAndFlush的執(zhí)行狀態(tài), 如果狀態(tài)時(shí)完成, 則會(huì)這里執(zhí)行回調(diào)
同樣, 在異步操作中, 走到這里writeAndFlush可能還沒(méi)完成, 所以這里不會(huì)執(zhí)行回調(diào), 由writeAndFlush執(zhí)行回調(diào)
所以, 無(wú)論writeAndFlush和addListener誰(shuí)先完成, 都可以執(zhí)行到回調(diào)方法
跟到notifyListeners()方法中
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
這里首先判斷是否是eventLoop線程, 如果是eventLoop線程則執(zhí)行if塊中的邏輯, 如果不是eventLoop線程, 則把執(zhí)行回調(diào)的邏輯封裝成task丟到EventLoop的任務(wù)隊(duì)列中異步執(zhí)行
我們重點(diǎn)關(guān)注notifyListenersNow()方法, 跟進(jìn)去:
private void notifyListenersNow() { Object listeners; synchronized (this) { if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners); } //代碼省略 } }
在無(wú)限for循環(huán)中, 首先首先判斷l(xiāng)isteners是不是DefaultFutureListeners類型, 根據(jù)我們之前的邏輯, 如果只添加了一個(gè)listener, 則listeners是GenericFutureListener類型
通常在添加的時(shí)候只會(huì)添加一個(gè)listener, 所以我們跟到else塊中的notifyListener0方法:
private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }
我們看到, 這里執(zhí)行了GenericFutureListener的中我們重寫的回調(diào)函數(shù)operationComplete
以上就是執(zhí)行回調(diào)的相關(guān)邏輯
章節(jié)小結(jié)
這一章講解了有關(guān)write和flush的相關(guān)邏輯, 并分析了有關(guān)添加監(jiān)聽和異步寫數(shù)據(jù)的相關(guān)步驟
經(jīng)過(guò)學(xué)習(xí), 同學(xué)們應(yīng)該掌握如下知識(shí):
write操作是如何將ByteBuf添加到發(fā)送緩沖區(qū)的
flush操作是如何將ByteBuf寫出到chanel中的
抽象編碼器MessageToByteEncoder中如何定義了編碼器的骨架邏輯
writeAndFlush和addListener在同步和異步操作中是如何執(zhí)行回調(diào)的
更多關(guān)于Netty分布式Future和Promise執(zhí)行回調(diào)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天/群聊系統(tǒng)
這篇文章主要實(shí)現(xiàn)在好友添加、建群、聊天對(duì)話、群聊功能,使用Java作為后端語(yǔ)言進(jìn)行支持,界面友好,開發(fā)簡(jiǎn)單,文章中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2023-08-08SpringBoot加載應(yīng)用事件監(jiān)聽器代碼實(shí)例
這篇文章主要介紹了SpringBoot加載應(yīng)用事件監(jiān)聽器代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范
這篇文章主要介紹了Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范,IOC又名控制反轉(zhuǎn),把對(duì)象創(chuàng)建和對(duì)象之間的調(diào)用過(guò)程,交給Spring進(jìn)行管理,目的是為了降低耦合度,需要的朋友可以參考下2023-05-05SpringBoot優(yōu)化接口響應(yīng)時(shí)間的九個(gè)技巧
在實(shí)際開發(fā)中,提升接口響應(yīng)速度是一件挺重要的事,特別是在面臨大量用戶請(qǐng)求的時(shí)候,本文為大家整理了9個(gè)SpringBoot優(yōu)化接口響應(yīng)時(shí)間的技巧,希望對(duì)大家有所幫助2024-01-01Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例
這篇文章主要介紹了Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。2017-12-12Java 中的FileReader和FileWriter源碼分析_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
本文給大家分享一段示例程序,通過(guò)示例代碼可以看出FileReader是基于InputStreamReader實(shí)現(xiàn)的,FileWriter是基于OutputStreamWriter實(shí)現(xiàn)的,具體程序代碼大家通過(guò)本文了解下吧2017-05-05maven中resource配置的實(shí)現(xiàn)示例
我們?cè)谑褂肕aven組件來(lái)構(gòu)建項(xiàng)目的時(shí)候,通常將配置文件放在資源文件目錄下,本文主要介紹了maven中resource配置的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09