欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式pipeline管道傳播事件的邏輯總結(jié)分析

 更新時間:2022年03月28日 14:12:09   作者:向南是個萬人迷  
這篇文章主要為大家介紹了Netty分布式pipeline管道傳播事件總結(jié)分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

我們在第一章和第三章中, 遺留了很多有關(guān)事件傳輸?shù)南嚓P(guān)邏輯, 這里帶大家一一回顧

問題分析

首先看兩個問題:

1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法

2.客戶端handler是什么時候被添加的?

首先看第一個問題

1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法?

我們首先看這段代碼:

public void read() {
    //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
    assert eventLoop().inEventLoop();
    //服務(wù)端channel的config
    final ChannelConfig config = config();
    //服務(wù)端channel的pipeline
    final ChannelPipeline pipeline = pipeline();
    //處理服務(wù)端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //設(shè)置配置
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //創(chuàng)建jdk底層的channel
                //readBuf用于臨時承載讀到鏈接
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //分配器將讀到的鏈接進(jìn)行計數(shù)
                allocHandle.incMessagesRead(localRead);
                //連接數(shù)是否超過最大值
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        //遍歷每一條客戶端連接
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞
            //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        //代碼省略
    } finally {
        //代碼省略
    }
}

重點看pipeline.fireChannelRead(readBuf.get(i))

首先, 這里pipeline是服務(wù)端channel的pipeline, 也就是NioServerSocketChannel的pipeline

我們學(xué)習(xí)過pipeline之后, 對這種寫法并不陌生, 就是傳遞channelRead事件, 這里通過傳遞channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 說明在這步之前, ServerBootstrapAcceptor作為一個handler添加到了服務(wù)端channel的pipeline中, 那么這個handler什么時候添加的呢?

我們回顧下第一章, 初始化NioServerSocketChannel的時候, 調(diào)用了ServerBootstrap的init方法:

void init(Channel channel) throws Exception {
    //獲取用戶定義的選項(1)
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    //獲取用戶定義的屬性(2)
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    //獲取channel的pipline(3)
    ChannelPipeline p = channel.pipeline();
    //work線程組(4)
    final EventLoopGroup currentChildGroup = childGroup;
    //用戶設(shè)置的Handler(5)
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    //選項轉(zhuǎn)化為Entry對象(6)
    synchronized (childOptions) { 
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    //屬性轉(zhuǎn)化為Entry對象(7)
    synchronized (childAttrs) { 
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    //添加服務(wù)端handler(8)
    p.addLast(new ChannelInitializer<Channel>() {
        //初始化channel
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) { 
                pipeline.addLast(handler);
            } 
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() { 
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

這個方法比較長, 我們重點關(guān)注第8步, 添加服務(wù)端channel, 這里的pipeline, 是服務(wù)服務(wù)端channel的pipeline, 也就是NioServerSocketChannel綁定的pipeline, 這里添加了一個ChannelInitializer類型的handler

我們看一下ChannelInitializer這個類的繼承關(guān)系

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    //省略類體
}

我們看到其繼承了ChannelInboundHandlerAdapter, 說明是一個inbound類型的handler

這里我們可能會想到, 添加完handler會執(zhí)行handlerAdded, 然后再handlerAdded方法中做了添加ServerBootstrapAcceptor這個handler

但是, 實際上并不是這樣的, 當(dāng)程序執(zhí)行到這里, 并沒有馬上執(zhí)行handlerAdded, 我們緊跟addLast方法

最后會跟到DefualtChannelPipeline的一個addLast方法中去:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //判斷handler是否被重復(fù)添加(1)
        checkMultiplicity(handler);
        //創(chuàng)建一個HandlerContext并添加到列表(2)
        newCtx = newContext(group, filterName(name, handler), handler);
        //添加HandlerContext(3)
        addLast0(newCtx);
        //是否已注冊
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            //回調(diào)用戶事件
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    //回調(diào)添加事件(4)
    callHandlerAdded0(newCtx);
    return this;
}

首先完成了handler的添加, 但是并沒有馬上執(zhí)行回調(diào)

這里我們重點關(guān)注if (!registered)這個條件判斷, 其實在注冊完成, registered會變成true, 但是走到這一步的時候NioServerSockeChannel并沒有完成注冊(可以回顧第一章看注冊在哪一步), 所以會進(jìn)到if里并返回自身

我們重點關(guān)注callHandlerCallbackLater這個方法, 我們跟進(jìn)去:

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;
    //判斷是否已添加, 未添加, 進(jìn)行添加, 已添加進(jìn)行刪除
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    //獲取第一個Callback任務(wù)
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    //如果第一個Callback任務(wù)為空
    if (pending == null) {
        //將第一個任務(wù)設(shè)置為剛創(chuàng)建的任務(wù)
        pendingHandlerCallbackHead = task;
    } else {
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

因我們調(diào)用這個方法的時候added傳的true, 所以PendingHandlerCallback task賦值為new PendingHandlerAddedTask(ctx)

PendingHandlerAddedTask這個類, 我們從名字可以看出, 這是一個handler添加的延遲任務(wù), 用于執(zhí)行handler延遲添加的操作, 同樣也對應(yīng)一個名字為PendingHandlerRemovedTask的類, 用于執(zhí)行延遲刪除handler的操作, 這兩個類都繼承抽象類PendingHandlerCallback

我們看PendingHandlerAddedTask類構(gòu)造方法:

PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
    super(ctx);
}

這里調(diào)用了父類的構(gòu)造方法, 再跟進(jìn)去:

PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
    this.ctx = ctx;
}

在父類中, 保存了要添加的context, 也就是ChannelInitializer類型的包裝類

回到callHandlerCallbackLater方法中

PendingHandlerCallback pending = pendingHandlerCallbackHead;

這表示獲取第一個PendingHandlerCallback的任務(wù), 其實PendingHandlerCallback是一個單向鏈表, 自身維護(hù)一個PendingHandlerCallback類型的next, 指向下一個任務(wù), 在DefaultChannelPipeline這個類中, 定義了個PendingHandlerCallback類型的引用pendingHandlerCallbackHead, 用來指向延遲回調(diào)任務(wù)的中的第一個任務(wù)

之后判斷這個任務(wù)是為空, 如果是第一次添加handler, 那么這里就是空, 所以將第一個任務(wù)賦值為我們剛創(chuàng)建的添加任務(wù)

如果不是第一次添加handler, 則將我們新創(chuàng)建的任務(wù)添加到鏈表的尾部, 因為這里我們是第一次添加, 所以第一個回調(diào)任務(wù)就指向了我們創(chuàng)建的添加handler的任務(wù)

完成這一系列操作之后, addLast方法返歸, 此時并沒有完成添加操作

而什么時候完成添加操作的呢?

在服務(wù)端channel注冊時候的會走到AbstractChannel的register0方法:

private void register0(ChannelPromise promise) {
    try {
        //做實際的注冊(1)
        doRegister();
        neverRegistered = false;
        registered = true;
        //觸發(fā)事件(2)
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        //觸發(fā)注冊成功事件(3)
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                //傳播active事件(4)
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //省略代碼
    }
}

重點關(guān)注第二步pipeline.invokeHandlerAddedIfNeeded(), 這里已經(jīng)通過doRegister()方法完成了實際的注冊, 我們跟到該方法中:

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        callHandlerAddedForAllHandlers();
    }
}

這里會判斷是否第一次注冊, 這里返回true, 然后會執(zhí)行callHandlerAddedForAllHandlers()方法, 我們跟進(jìn)去:

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;
        registered = true;
        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        this.pendingHandlerCallbackHead = null;
    }
    //獲取task
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        //執(zhí)行添加handler方法
        task.execute();
        task = task.next;
    }
}

這里拿到第一個延遲執(zhí)行handler添加的task其實就是我們之前剖析過的, 延遲執(zhí)行handler添加的task, 就是PendingHandlerAddedTask對象

在while循環(huán)中, 通過執(zhí)行execute()方法將handler添加

我們跟到PendingHandlerAddedTask的execute()方法中:

void execute() {
    //獲取當(dāng)前eventLoop線程
    EventExecutor executor = ctx.executor();
    //是當(dāng)前執(zhí)行的線程
    if (executor.inEventLoop()) {
        callHandlerAdded0(ctx);
    } else {
        try {
            //添加到隊列
            executor.execute(this);
        } catch (RejectedExecutionException e) {
            //代碼省略
        }
    }
}

終于在這里, 我們看到了執(zhí)行回調(diào)的方法

再回到init方法中:

void init(Channel channel) throws Exception {
    //獲取用戶定義的選項(1)
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    //獲取用戶定義的屬性(2)
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    //獲取channel的pipline(3)
    ChannelPipeline p = channel.pipeline();
    //work線程組(4)
    final EventLoopGroup currentChildGroup = childGroup;
    //用戶設(shè)置的Handler(5)
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    //選項轉(zhuǎn)化為Entry對象(6)
    synchronized (childOptions) { 
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    //屬性轉(zhuǎn)化為Entry對象(7)
    synchronized (childAttrs) { 
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    //添加服務(wù)端handler(8)
    p.addLast(new ChannelInitializer<Channel>() {
        //初始化channel
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) { 
                pipeline.addLast(handler);
            } 
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() { 
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

我們繼續(xù)看第8步添加服務(wù)端handler

因為這里的handler是ChannelInitializer, 所以完成添加之后會調(diào)用ChannelInitializer的handlerAdded方法

跟到handlerAdded方法:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    //默認(rèn)情況下, 會返回true
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

因為執(zhí)行到這步服務(wù)端channel已經(jīng)完成注冊, 所以會執(zhí)行到initChannel方法

跟到initChannel方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    //這段代碼是否被執(zhí)行過
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            //調(diào)用之后會刪除當(dāng)前節(jié)點
            remove(ctx);
        }
        return true;
    }
    return false;
}

我們關(guān)注initChannel這個方法, 這個方法是在ChannelInitializer的匿名內(nèi)部來實現(xiàn)的, 這里我們注意, 在initChannel方法執(zhí)行完畢之后會調(diào)用remove(ctx)刪除當(dāng)前節(jié)點

我們繼續(xù)跟進(jìn)initChannel方法:

@Override
public void initChannel(Channel ch) throws Exception {
    final ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = config.handler();
    if (handler != null) { 
        pipeline.addLast(handler);
    } 
    ch.eventLoop().execute(new Runnable() {
        @Override
        public void run() { 
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

這里首先添加用戶自定義的handler, 這里如果用戶沒有定義, 則添加不成功, 然后, 會調(diào)用addLast將ServerBootstrapAcceptor這個handler添加了進(jìn)去, 同樣這個handler也繼承了ChannelInboundHandlerAdapter, 在這個handler中, 重寫了channelRead方法, 所以, 這就是第一個問題的答案

緊接著我們看第二個問題

2.客戶端handler是什么時候被添加的?

我們這里看ServerBootstrapAcceptor的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    //添加channelHadler, 這個channelHandler, 就是用戶代碼添加的ChannelInitializer
    child.pipeline().addLast(childHandler);
    //代碼省略
    try {
        //work線程注冊channel
        childGroup.register(child).addListener(new ChannelFutureListener() {
            //代碼省略
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

這里真相可以大白了, 服務(wù)端再創(chuàng)建完客戶端channel之后, 將新創(chuàng)建的NioSocketChannel作為參數(shù)觸發(fā)channelRead事件(可以回顧NioMessageUnsafe的read方法, 代碼這里就不貼了), 所以這里的參數(shù)msg就是NioSocketChannel

拿到channel時候再將客戶端的handler添加進(jìn)去, 我們回顧客戶端handler的添加過程:

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new StringEncoder());
        ch.pipeline().addLast(new ServerHandler());
    }
});

和服務(wù)端channel的邏輯一樣, 首先會添加ChannelInitializer這個handler但是沒有注冊所以沒有執(zhí)行添加handler的回調(diào), 將任務(wù)保存到一個延遲回調(diào)的task中

等客戶端channel注冊完畢, 會將執(zhí)行添加handler的回調(diào), 也就是handlerAdded方法, 在回調(diào)中執(zhí)行initChannel方法將客戶端handler添加進(jìn)去, 然后刪除ChannelInitializer這個handler

因為在服務(wù)端channel中這塊邏輯已經(jīng)進(jìn)行了詳細(xì)的剖析, 所以這邊就不在贅述, 同學(xué)們可以自己跟進(jìn)去走一遍流程

這里注意, 因為每創(chuàng)建一個NioSoeketChannel都會調(diào)用服務(wù)端ServerBootstrapAcceptor的channelRead方法, 所以這里會將每一個NioSocketChannel的handler進(jìn)行添加

章節(jié)總結(jié)

本章剖析了事件傳輸?shù)南嚓P(guān)邏輯, 包括handler的添加, 刪除, inbound和outbound以及異常事件的傳輸, 最后結(jié)合第一章和第三章, 剖析了服務(wù)端channel和客戶端channel的添加過程, 同學(xué)們可以課后跟進(jìn)源碼, 將這些功能自己再走一遍以加深印象.其他的有關(guān)事件傳輸?shù)倪壿? 可以結(jié)合這一章的知識點進(jìn)行自行剖析

更多關(guān)于Netty分布式pipeline管道傳播事件的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 如何使用Spring工具類動態(tài)匹配url

    如何使用Spring工具類動態(tài)匹配url

    這篇文章主要介紹了如何使用Spring工具類動態(tài)匹配url,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12
  • Java日常練習(xí)題,每天進(jìn)步一點點(25)

    Java日常練習(xí)題,每天進(jìn)步一點點(25)

    下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • 淺析RxJava處理復(fù)雜表單驗證問題的方法

    淺析RxJava處理復(fù)雜表單驗證問題的方法

    這篇文章主要介紹了RxJava處理復(fù)雜表單驗證問題的相關(guān)資料,非常不錯具有參考借鑒價值,需要的朋友可以參考下
    2016-06-06
  • java中拼接字符串的5種方法效率對比

    java中拼接字符串的5種方法效率對比

    這篇文章主要給大家介紹了關(guān)于java中拼接字符串的5種方法效率對比的相關(guān)資料,文中通過示例代碼和圖片介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-01-01
  • springBoot項目如何實現(xiàn)啟動多個實例

    springBoot項目如何實現(xiàn)啟動多個實例

    這篇文章主要介紹了springBoot項目如何實現(xiàn)啟動多個實例的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • java讀取XML文件的四種方法總結(jié)(必看篇)

    java讀取XML文件的四種方法總結(jié)(必看篇)

    下面小編就為大家?guī)硪黄猨ava讀取XML文件的四種方法總結(jié)(必看篇)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • Java 使用openoffice進(jìn)行word轉(zhuǎn)換為pdf的方法步驟

    Java 使用openoffice進(jìn)行word轉(zhuǎn)換為pdf的方法步驟

    這篇文章主要介紹了Java 使用openoffice進(jìn)行word轉(zhuǎn)換為pdf的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Java自定義異常簡單示例

    Java自定義異常簡單示例

    使用Java內(nèi)置的異常類可以描述在編程時出現(xiàn)的大部分異常情況,除此之外用戶還可以自定義異常,下面這篇文章主要給大家介紹了關(guān)于Java自定義異常的相關(guān)資料,需要的朋友可以參考下
    2023-04-04
  • JAVA中的Launcher類解析

    JAVA中的Launcher類解析

    這篇文章主要介紹了JAVA中的Launcher類解析,Launcher作為JAVA應(yīng)用的入口,根據(jù)雙親委派模型,Laucher是由JVM創(chuàng)建的,它類加載器應(yīng)該是BootStrapClassLoader, 這是一個C++編寫的類加載器,是java應(yīng)用體系中最頂層的類加載器,需要的朋友可以參考下
    2023-09-09
  • windows java.exe內(nèi)存暴漲解決、idea跑java\ tomcat內(nèi)存無限增長

    windows java.exe內(nèi)存暴漲解決、idea跑java\ tomcat內(nèi)存無限增長

    這篇文章主要介紹了windows java.exe內(nèi)存暴漲解決、idea跑 java\ tomcat內(nèi)存無限增長,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評論