欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式Future與Promise執(zhí)行回調(diào)相關(guān)邏輯剖析

 更新時(shí)間:2022年03月29日 15:08:50   作者:向南是個(gè)萬(wàn)人迷  
這篇文章主要為大家介紹了Netty分布式Future與Promise執(zhí)行回調(diào)相關(guān)邏輯剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步

Future和Promise執(zhí)行回調(diào)

Netty中的Future, 其實(shí)類似于jdk的Future, 用于異步獲取執(zhí)行結(jié)果

Promise則相當(dāng)于一個(gè)被觀察者, 其中promise對(duì)象會(huì)一直跟隨著channel的讀寫事件, 并跟蹤著事件狀態(tài), 然后執(zhí)行相應(yīng)的回調(diào)

這種設(shè)計(jì)思路也就是java設(shè)計(jì)模式的觀察者模式

首先我們看一段寫在handler中的業(yè)務(wù)代碼

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ChannelFuture future = ctx.writeAndFlush("test data");
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()){
                System.out.println("寫出成功");
            }else{
                System.out.println("寫出失敗");
            }
        }
    });
}

熟悉netty的小伙伴估計(jì)對(duì)這段代碼并不陌生, 首先調(diào)用writeAndFlush方法將數(shù)據(jù)寫出, 然后返回的future進(jìn)行添加Listener, 并且重寫回調(diào)函數(shù)

這里舉一個(gè)最簡(jiǎn)單的示例, 在回調(diào)函數(shù)中判斷future的狀態(tài)成功與否, 成功的話就打印"寫出成功", 否則節(jié)打印"寫出失敗"

這里如果寫在handler中通常是NioEventLoop線程執(zhí)行的, 在future返回之后才會(huì)執(zhí)行添加listener的操作, 如果在用戶線程中writeAndFlush是異步執(zhí)行的, 在添加監(jiān)聽的時(shí)候有可能寫出操作沒(méi)有執(zhí)行完畢, 等寫出操作執(zhí)行完畢之后才會(huì)執(zhí)行回調(diào)

以上邏輯在代碼中如何體現(xiàn)的呢?我們首先跟到writeAndFlush的方法中去

這里會(huì)走到AbstractChannelHandlerContext中的writeAndFlush方法中:

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

這里的邏輯之前剖析過(guò), 想必大家并不陌生

這里關(guān)注newPromise()方法, 跟進(jìn)去

public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
}

這里直接創(chuàng)建了DefaultChannelPromise這個(gè)對(duì)象并傳入了當(dāng)前channel和當(dāng)前channel綁定NioEventLoop對(duì)象

在DefaultChannelPromise構(gòu)造方法中, 也會(huì)將channel和NioEventLoop對(duì)象綁定在自身成員變量中

回到writeAndFlush方法繼續(xù)跟

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }
    if (!validatePromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        return promise;
    }
    write(msg, true, promise);
    return promise;
}

這里的邏輯也不陌生, 注意這里最后返回了promise, 其實(shí)就是我們上一步創(chuàng)建DefaultChannelPromise對(duì)象

DefaultChannelPromise實(shí)現(xiàn)了ChannelFuture接口, 所以方法如果返回該對(duì)象可以被ChannelFuture類型接收

我們繼續(xù)跟write方法

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這里的邏輯我們同樣不陌生, 如果nioEventLoop線程, 我們繼續(xù)調(diào)invokeWriteAndFlush方法, 如果不是nioEventLoop線程則將writeAndFlush事件封裝成task, 交給eventLoop線程異步

這里如果是異步執(zhí)行, 則到這一步之后, 我們的業(yè)務(wù)代碼中, writeAndFlush就會(huì)返回并添加監(jiān)聽, 有關(guān)添加監(jiān)聽的邏輯稍后分析

走到這里, 無(wú)論同步異步, 都會(huì)執(zhí)行到invokeWriteAndFlush方法:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) { 
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

這里也是我們熟悉的邏輯, 我們看到在invokeWrite0方法中傳入了我們剛才創(chuàng)建的DefaultChannelPromise

后續(xù)邏輯想必大家都比較熟悉, 通過(guò)事件傳播, 最終會(huì)調(diào)用head節(jié)點(diǎn)的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這里最終調(diào)用unsafe的write方法, 并傳入了promise對(duì)象

跟到AbstractUnsafe的write方法中:

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    //負(fù)責(zé)緩沖寫進(jìn)來(lái)的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try { 
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //插入寫隊(duì)列
    outboundBuffer.addMessage(msg, size, promise);
}

這里的邏輯之前小節(jié)也剖析過(guò), 這里我們首先關(guān)注兩個(gè)部分, 首先看在catch中safeSetFailure這步

因?yàn)槭莄atch塊, 說(shuō)明發(fā)生了異常, 寫到緩沖區(qū)不成功, safeSetFailure就是設(shè)置寫出失敗的狀態(tài)

我們跟到safeSetFailure方法中:

protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
    if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
        logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
    }
}

這里看if判斷, 首先我們的promise是DefaultChannelPromise, 所以!(promise instanceof VoidChannelPromise)為true

重點(diǎn)分析promise.tryFailure(cause), 這里是設(shè)置失敗狀態(tài), 這里會(huì)調(diào)用DefaultPromise的tryFailure方法

跟進(jìn)tryFailure方法

public boolean tryFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return true;
    }
    return false;
}

再跟到setFailure0(cause)中:

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();
        return true;
    }
    return false;
}

這里在if塊中的cas操作, 會(huì)將參數(shù)objResult的值設(shè)置到DefaultPromise的成員變量result中, 表示當(dāng)前操作為異常狀態(tài)

回到tryFailure方法:

這里關(guān)注notifyListeners()這個(gè)方法, 這個(gè)方法是執(zhí)行添加監(jiān)聽的回調(diào)函數(shù), 當(dāng)writeAndFlush和addListener是異步執(zhí)行的時(shí)候, 這里有可能添加已經(jīng)添加, 所以通過(guò)這個(gè)方法可以調(diào)用添加監(jiān)聽后的回調(diào)

如果writeAndFlush和addListener是同步執(zhí)行的時(shí)候, 也就是都在NioEventLoop線程中執(zhí)行的時(shí)候, 那么走到這里addListener還沒(méi)執(zhí)行, 所以這里不能回調(diào)添加監(jiān)聽的回調(diào)函數(shù), 那么回調(diào)是什么時(shí)候執(zhí)行的呢?我們?cè)谄饰鯽ddListener步驟的時(shí)候會(huì)給大家分析

具體執(zhí)行回調(diào)我們?cè)僦v解添加監(jiān)聽的時(shí)候進(jìn)行剖析

以上就是記錄異常狀態(tài)的大概邏輯

回到AbstractUnsafe的write方法:

我們?cè)訇P(guān)注這一步:

outboundBuffer.addMessage(msg, size, promise);

跟到addMessage方法中

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    //代碼省略
}

我們只需要關(guān)注包裝Entry的newInstance方法, 該方法傳入promise對(duì)象

跟到newInstance中:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    Entry entry = RECYCLER.get();
    entry.msg = msg;
    entry.pendingSize = size;
    entry.total = total;
    entry.promise = promise;
    return entry;
}

這里將promise設(shè)置到Entry的成員變量中了, 也就是說(shuō), 每個(gè)Entry都關(guān)聯(lián)了唯一的一個(gè)promise

我們回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) { 
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

我們剛才分析了write操作中promise的傳遞以及狀態(tài)設(shè)置的大概過(guò)程, 我們繼續(xù)看在flush中promise的操作過(guò)程

這里invokeFlush0()并沒(méi)有傳入promise對(duì)象, 是因?yàn)槲覀儎偛欧治鲞^(guò), promise對(duì)象會(huì)綁定在緩沖區(qū)中entry的成員變量中, 可以通過(guò)其成員變量拿到promise對(duì)象

invokeFlush0()我們之前也分析過(guò), 通過(guò)事件傳遞, 最終會(huì)調(diào)用HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

最后跟到AbstractUnsafe的flush方法

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

這塊邏輯之前已分析過(guò), 繼續(xù)看flush0方法:

protected void flush0() {
    //代碼省略
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        //代碼省略
    } finally {
        inFlush0 = false;
    }
}

篇幅原因我們省略大段代碼

我們繼續(xù)跟進(jìn)doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;
    boolean setOpWrite = false;
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        if (msg instanceof ByteBuf) {
            //代碼省略
            boolean done = false;
            //代碼省略
            if (done) {
                //移除當(dāng)前對(duì)象
                in.remove();
            } else {
                break;
            }
        } else if (msg instanceof FileRegion) {
            //代碼省略
        } else {
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

這里也省略了大段代碼, 我們重點(diǎn)關(guān)注in.remove()這里, 之前介紹過(guò), 如果done為true, 說(shuō)明刷新事件已完成, 則移除當(dāng)前entry節(jié)點(diǎn)

我們跟到remove()方法中

public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    removeEntry(e);
    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }
    e.recycle();
    return true;
}

這里我們看這一步:

ChannelPromise promise = e.promise;

之前我們剖析promise對(duì)象會(huì)綁定在entry中, 而這步就是從entry中獲取promise對(duì)象

等remove操作完成, 會(huì)執(zhí)行到這一步:

safeSuccess(promise);

這一步正好和我們剛才分析的safeSetFailure相反, 這里是設(shè)置成功狀態(tài)

跟到safeSuccess方法中:

private static void safeSuccess(ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise)) {
        PromiseNotificationUtil.trySuccess(promise, null, logger);
    }
}

再跟到trySuccess方法中

public static &lt;V&gt; void trySuccess(Promise&lt;? super V&gt; p, V result, InternalLogger logger) {
    if (!p.trySuccess(result) &amp;&amp; logger != null) {
        //代碼省略
    }
}

這里再繼續(xù)跟if中的trySuccess方法, 最后會(huì)走到DefaultPromise的trySuccess方法:

public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

這里跟到setSuccess0方法中:

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

這里的邏輯我們剛才剖析過(guò)了, 這里參數(shù)傳入一個(gè)信號(hào)SUCCESS, 表示設(shè)置成功狀

再繼續(xù)跟setValue方法:

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();
        return true;
    }
    return false;
}

同樣, 在if判斷中, 通過(guò)cas操作將參數(shù)傳入的SUCCESS對(duì)象賦值到DefaultPromise的屬性result中, 我們看這個(gè)屬性:

 private volatile Object result; 

這里是Object類型, 也就是可以賦值成任何類型

SUCCESS是一個(gè)Signal類型的對(duì)象, 這里我們可以簡(jiǎn)單理解成一種狀態(tài), SUCCESS表示一種成功的狀態(tài)

通過(guò)上述cas操作, result的值將賦值成SUCCESS

我們回到trySuccess方法:

public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

設(shè)置完成功狀態(tài)之后, 則會(huì)通過(guò)notifyListeners()執(zhí)行監(jiān)聽中的回調(diào)

我們看用戶代碼

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ChannelFuture future = ctx.writeAndFlush("test data");
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()){
                System.out.println("寫出成功");
            }else{
                System.out.println("寫出失敗");
            }
        }
    });
}

在回調(diào)中會(huì)判斷future.isSuccess(), promise設(shè)置為成功狀態(tài)這里會(huì)返回true, 從而打印寫出成功"

跟到isSuccess方法中, 這里會(huì)調(diào)用DefaultPromise的isSuccess方法:

public boolean isSuccess() {
    Object result = this.result;
    return result != null &amp;&amp; result != UNCANCELLABLE &amp;&amp; !(result instanceof CauseHolder);
}

我們看到首先會(huì)拿到result對(duì)象, 然后判斷result不為空, 并且不是UNCANCELLABLE, 并且不屬于CauseHolder對(duì)象

我們剛才分析如果promise設(shè)置為成功裝載, 則result為SUCCESS, 所以這里條件成立, 可以執(zhí)行 if (future.isSuccess()) 中if塊的邏輯

和設(shè)置錯(cuò)誤狀態(tài)的邏輯一樣, 這里也有同樣的問(wèn)題, 如果writeAndFlush是和addListener是異步操作, 那么執(zhí)行到回調(diào)的時(shí)候, 可能addListener已經(jīng)添加完成, 所以可以正常的執(zhí)行回調(diào)

那么如果writeAndFlush是和addListener是同步操作, writeAndFlush在執(zhí)行回調(diào)的時(shí)候, addListener并沒(méi)有執(zhí)行, 所以無(wú)法執(zhí)行回調(diào)方法, 那么回調(diào)方法是如何執(zhí)行的呢, 我們看addListener這個(gè)方法:

addListener傳入ChannelFutureListener對(duì)象, 并重寫了operationComplete方法, 也就是執(zhí)行回調(diào)的方法

這里會(huì)執(zhí)行到DefaultChannelPromise的addListener方法, 跟進(jìn)去

public ChannelPromise addListener(GenericFutureListener&lt;? extends Future&lt;? super Void&gt;&gt; listener) {
    super.addListener(listener);
    return this;
}

跟到父類的addListener中:

public Promise&lt;V&gt; addListener(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

這里通過(guò)addListener0方法添加listener, 因?yàn)樘砑觢istener有可能會(huì)在不同的線程中操作, 比如用戶線程和NioEventLoop線程, 為了防止并發(fā)問(wèn)題, 這里簡(jiǎn)單粗暴的加了個(gè)synchronized關(guān)鍵字

跟到addListener0方法中

private void addListener0(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners, listener);
    }
}

如果是第一次添加listener, 則成員變量listeners為null, 這樣就把參數(shù)傳入的GenericFutureListener賦值到成員變量listeners

如果是第二次添加listener, listeners不為空, 會(huì)走到else if判斷, 因?yàn)榈谝淮翁砑拥膌istener是GenericFutureListener類型, 并不是DefaultFutureListeners類型, 所以else if判斷返回false, 進(jìn)入到else塊中

else塊中, 通過(guò)new的方式創(chuàng)建一個(gè)DefaultFutureListeners對(duì)象并賦值到成員變量listeners中

DefaultFutureListeners的構(gòu)造方法中, 第一個(gè)參數(shù)傳入DefaultPromise中的成員變量listeners, 也就是第一次添加的GenericFutureListener對(duì)象, 第二個(gè)參數(shù)為第二次添加的GenericFutureListener對(duì)象, 這里通過(guò)兩個(gè)GenericFutureListener對(duì)象包裝成一個(gè)DefaultFutureListeners對(duì)象

我們看listeners的定義:

private Object listeners;

這里是個(gè)Object類型, 所以可以保存任何類型的對(duì)象

再看DefaultFutureListeners的構(gòu)造方法:

DefaultFutureListeners(
        GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; first, GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; second) {
    listeners = new GenericFutureListener[2];
    //第0個(gè)
    listeners[0] = first;
    //第1個(gè)
    listeners[1] = second;
    size = 2;
    //代碼省略
}

在DefaultFutureListeners類中也定義了一個(gè)成員變量listeners, 類型為GenericFutureListener數(shù)組

構(gòu)造方法中初始化listeners這個(gè)數(shù)組, 并且數(shù)組中第一個(gè)值賦值為我們第一次添加的GenericFutureListener, 第二個(gè)賦值為我們第二次添加的GenericFutureListener

回到addListener0方法中

private void addListener0(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners, listener);
    }
}

經(jīng)過(guò)兩次添加listener, 屬性listeners的值就變成了DefaultFutureListeners類型的對(duì)象, 如果第三次添加listener, 則會(huì)走到else if塊中, DefaultFutureListeners對(duì)象通過(guò)調(diào)用add方法繼續(xù)添加listener

跟到add方法中:

public void add(GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; l) {
    GenericFutureListener&lt;? extends Future&lt;?&gt;&gt;[] listeners = this.listeners;
    final int size = this.size;
    if (size == listeners.length) {
        this.listeners = listeners = Arrays.copyOf(listeners, size &lt;&lt; 1);
    }
    listeners[size] = l;
    this.size = size + 1;
    //代碼省略
}

這里的邏輯也比較簡(jiǎn)單, 就是為當(dāng)前的數(shù)組對(duì)象listeners中追加新的GenericFutureListener對(duì)象, 如果listeners容量不足則進(jìn)行擴(kuò)容操作

根據(jù)以上邏輯, 就完成了listener的添加邏輯

那么再看我們剛才遺留的問(wèn)題, 如果writeAndFlush和addListener是同步進(jìn)行的, writeAndFlush執(zhí)行回調(diào)時(shí)還沒(méi)有addListener還沒(méi)有執(zhí)行回調(diào), 那么回調(diào)是如何執(zhí)行的呢?

回到DefaultPromise的addListener中:

public Promise&lt;V&gt; addListener(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

我們分析完了addListener0方法, 再往下看

這個(gè)會(huì)有if判斷isDone(), isDone方法, 就是程序執(zhí)行到這一步的時(shí)候, 判斷刷新事件是否執(zhí)行完成

跟到isDone方法中

public boolean isDone() {
    return isDone0(result);
}

繼續(xù)跟isDone0, 這里傳入了成員變量result

private static boolean isDone0(Object result) {
    return result != null &amp;&amp; result != UNCANCELLABLE;
}

這里判斷result不為null并且不為UNCANCELLABLE, 則就表示完成

因?yàn)槌晒Φ臓顟B(tài)是SUCCESS, 所以flush成功這里會(huì)返回true

回到 addListener中:

如果執(zhí)行完成, 就通過(guò)notifyListeners()方法執(zhí)行回調(diào), 這也解釋剛才的問(wèn)題, 在同步操作中, writeAndFlush在執(zhí)行回調(diào)時(shí)并沒(méi)有添加listener, 所以添加listener的時(shí)候會(huì)判斷writeAndFlush的執(zhí)行狀態(tài), 如果狀態(tài)時(shí)完成, 則會(huì)這里執(zhí)行回調(diào)

同樣, 在異步操作中, 走到這里writeAndFlush可能還沒(méi)完成, 所以這里不會(huì)執(zhí)行回調(diào), 由writeAndFlush執(zhí)行回調(diào)

所以, 無(wú)論writeAndFlush和addListener誰(shuí)先完成, 都可以執(zhí)行到回調(diào)方法

跟到notifyListeners()方法中

private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

這里首先判斷是否是eventLoop線程, 如果是eventLoop線程則執(zhí)行if塊中的邏輯, 如果不是eventLoop線程, 則把執(zhí)行回調(diào)的邏輯封裝成task丟到EventLoop的任務(wù)隊(duì)列中異步執(zhí)行

我們重點(diǎn)關(guān)注notifyListenersNow()方法, 跟進(jìn)去:

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners);
        }
        //代碼省略
    }
}

在無(wú)限for循環(huán)中, 首先首先判斷l(xiāng)isteners是不是DefaultFutureListeners類型, 根據(jù)我們之前的邏輯, 如果只添加了一個(gè)listener, 則listeners是GenericFutureListener類型

通常在添加的時(shí)候只會(huì)添加一個(gè)listener, 所以我們跟到else塊中的notifyListener0方法:

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

我們看到, 這里執(zhí)行了GenericFutureListener的中我們重寫的回調(diào)函數(shù)operationComplete

以上就是執(zhí)行回調(diào)的相關(guān)邏輯

章節(jié)小結(jié)

        這一章講解了有關(guān)write和flush的相關(guān)邏輯, 并分析了有關(guān)添加監(jiān)聽和異步寫數(shù)據(jù)的相關(guān)步驟

        經(jīng)過(guò)學(xué)習(xí), 同學(xué)們應(yīng)該掌握如下知識(shí):

        write操作是如何將ByteBuf添加到發(fā)送緩沖區(qū)的

        flush操作是如何將ByteBuf寫出到chanel中的

        抽象編碼器MessageToByteEncoder中如何定義了編碼器的骨架邏輯

        writeAndFlush和addListener在同步和異步操作中是如何執(zhí)行回調(diào)的

更多關(guān)于Netty分布式Future和Promise執(zhí)行回調(diào)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天/群聊系統(tǒng)

    Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天/群聊系統(tǒng)

    這篇文章主要實(shí)現(xiàn)在好友添加、建群、聊天對(duì)話、群聊功能,使用Java作為后端語(yǔ)言進(jìn)行支持,界面友好,開發(fā)簡(jiǎn)單,文章中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下
    2023-08-08
  • SpringBoot加載應(yīng)用事件監(jiān)聽器代碼實(shí)例

    SpringBoot加載應(yīng)用事件監(jiān)聽器代碼實(shí)例

    這篇文章主要介紹了SpringBoot加載應(yīng)用事件監(jiān)聽器代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范

    Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范

    這篇文章主要介紹了Spring核心IoC容器的依賴注入接口和層級(jí)包命名規(guī)范,IOC又名控制反轉(zhuǎn),把對(duì)象創(chuàng)建和對(duì)象之間的調(diào)用過(guò)程,交給Spring進(jìn)行管理,目的是為了降低耦合度,需要的朋友可以參考下
    2023-05-05
  • Java截取字符串的幾種常用方法

    Java截取字符串的幾種常用方法

    這篇文章主要給大家介紹了關(guān)于Java截取字符串的幾種常用方法,在Java編程語(yǔ)言中,String類提供了用于操作字符串的豐富方法,文中通過(guò)代碼示例介紹的非常詳細(xì),需要的朋友可以參考下
    2023-09-09
  • SpringBoot優(yōu)化接口響應(yīng)時(shí)間的九個(gè)技巧

    SpringBoot優(yōu)化接口響應(yīng)時(shí)間的九個(gè)技巧

    在實(shí)際開發(fā)中,提升接口響應(yīng)速度是一件挺重要的事,特別是在面臨大量用戶請(qǐng)求的時(shí)候,本文為大家整理了9個(gè)SpringBoot優(yōu)化接口響應(yīng)時(shí)間的技巧,希望對(duì)大家有所幫助
    2024-01-01
  • Java中Socket下載一個(gè)文本文件

    Java中Socket下載一個(gè)文本文件

    這篇文章主要介紹了Socket下載一個(gè)文本文件的實(shí)例代碼,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2017-06-06
  • Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例

    Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例

    這篇文章主要介紹了Java語(yǔ)言實(shí)現(xiàn)最大堆代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-12-12
  • Java創(chuàng)建并運(yùn)行線程的方法

    Java創(chuàng)建并運(yùn)行線程的方法

    這篇文章主要介紹了Java創(chuàng)建并運(yùn)行線程的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01
  • Java 中的FileReader和FileWriter源碼分析_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java 中的FileReader和FileWriter源碼分析_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    本文給大家分享一段示例程序,通過(guò)示例代碼可以看出FileReader是基于InputStreamReader實(shí)現(xiàn)的,FileWriter是基于OutputStreamWriter實(shí)現(xiàn)的,具體程序代碼大家通過(guò)本文了解下吧
    2017-05-05
  • maven中resource配置的實(shí)現(xiàn)示例

    maven中resource配置的實(shí)現(xiàn)示例

    我們?cè)谑褂肕aven組件來(lái)構(gòu)建項(xiàng)目的時(shí)候,通常將配置文件放在資源文件目錄下,本文主要介紹了maven中resource配置的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-09-09

最新評(píng)論