欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式客戶端處理接入事件handle源碼解析

 更新時間:2022年03月25日 17:27:07   作者:向南是個萬人迷  
這篇文章主要為大家介紹了Netty源碼分析客戶端流程處理接入事件handle創(chuàng)建,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前文傳送門 :客戶端接入流程初始化源碼分析

上一小節(jié)我們剖析完成了與channel綁定的ChannelConfig初始化相關的流程,

這一小節(jié)繼續(xù)剖析客戶端連接事件的處理

處理接入事件創(chuàng)建handle

回到上一章NioEventLoop的processSelectedKey ()方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //獲取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //如果這個key不是合法的, 說明這個channel可能有問題
    if (!k.isValid()) {
        //代碼省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyOps = k.readyOps();
        //鏈接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        //寫事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        //讀事件和接受鏈接事件
        //如果當前NioEventLoop是work線程的話, 這里就是op_read事件
        //如果是當前NioEventLoop是boss線程的話, 這里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

我們看其中的if判斷:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)

上一小節(jié)我們分析過, 如果當前NioEventLoop是work線程的話, 這里就是op_read事件, 如果是當前NioEventLoop是boss線程的話, 這里就是op_accept事件, 這里我們以boss線程為例進行分析

之前我們講過, 無論處理op_read事件還是op_accept事件, 都走的unsafe的read()方法, 這里unsafe是通過channel拿到, 我們知道如果是處理accept事件, 這里的channel是NioServerSocketChannel, 這里與之綁定的unsafe是NioMessageUnsafe

我們跟到NioMessageUnsafe的read()方法:

public void read() {
    //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
    assert eventLoop().inEventLoop();
    //服務端channel的config
    final ChannelConfig config = config();
    //服務端channel的pipeline
    final ChannelPipeline pipeline = pipeline();
    //處理服務端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //設置配置
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //創(chuàng)建jdk底層的channel
                //readBuf用于臨時承載讀到鏈接
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //分配器將讀到的鏈接進行計數(shù)
                allocHandle.incMessagesRead(localRead);
                //連接數(shù)是否超過最大值
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        //遍歷每一條客戶端連接
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //傳遞事件, 將創(chuàng)建NioSokectChannel進行傳遞
            //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        //代碼省略
    } finally {
        //代碼省略
    }
}

首先獲取與NioServerSocketChannel綁定config和pipeline, config我們上一小節(jié)進行分析過, pipeline我們將在下一章進行剖析

我們看這一句:

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

這里通過RecvByteBufAllocator接口調(diào)用了其內(nèi)部接口Handler

我們看其RecvByteBufAllocator接口

public interface RecvByteBufAllocator {
    Handle newHandle();
    interface Handle {
        int guess();
        void reset(ChannelConfig config);
        void incMessagesRead(int numMessages);
        void lastBytesRead(int bytes);
        int lastBytesRead();
        void attemptedBytesRead(int bytes);
        int attemptedBytesRead();
        boolean continueReading();
        void readComplete();    
    }
}

我們看到RecvByteBufAllocator接口只有一個方法newHandle(), 顧名思義就是用于創(chuàng)建Handle對象的方法, 而Handle中的方法, 才是實際用于操作的方法

在RecvByteBufAllocator實現(xiàn)類中包含Handle的子類, 具體實現(xiàn)關系如下:

回到read()方法中再看這段代碼:

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

unsafe()返回當前channel綁定的unsafe對象, recvBufAllocHandle()最終會調(diào)用AbstractChannel內(nèi)部類AbstractUnsafe的recvBufAllocHandle()方法

跟進AbstractUnsafe的recvBufAllocHandle()方法:

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    //如果不存在, 則創(chuàng)建一個recvHandle的實例
    if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

如果如果是第一次執(zhí)行到這里, 自身屬性recvHandle為空, 會創(chuàng)建一個recvHandle實例, config()返回NioServerSocketChannel綁定的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator對象, 這兩部分上一小節(jié)剖析過了, 這里通過newHandle()創(chuàng)建一個Handle, 這里會走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中

跟進newHandle()方法中

public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

這里創(chuàng)建HandleImpl傳入了三個參數(shù), 這三個參數(shù)我們上一小節(jié)剖析過, minIndex為最小內(nèi)存在SIZE_TABLE中的下標, maxIndex為最大內(nèi)存在SEIZE_TABEL中的下標, initial是初始內(nèi)存, 我們跟到HandleImpl的構造方法中:

public HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;
    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

初始化minIndex和maxIndex, 根據(jù)initial找到當前的下標, nextReceiveBufferSize是根據(jù)當前的下標找到對應的內(nèi)存

這樣, 我們就創(chuàng)建了個Handle對象

在這里我們需要知道, 這個handle, 是和channel唯一綁定的屬性, 而AdaptiveRecvByteBufAllocator對象是和ChannelConfig對象唯一綁定的, 間接也是和channel進行唯一綁定

繼續(xù)回到read()方法

public void read() {
    //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
    assert eventLoop().inEventLoop();
    //服務端channel的config
    final ChannelConfig config = config();
    //服務端channel的pipeline
    final ChannelPipeline pipeline = pipeline();
    //處理服務端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //設置配置
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //創(chuàng)建jdk底層的channel
                //readBuf用于臨時承載讀到鏈接
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //分配器將讀到的鏈接進行計數(shù)
                allocHandle.incMessagesRead(localRead);
                //連接數(shù)是否超過最大值
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        //遍歷每一條客戶端連接
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //傳遞事件, 將創(chuàng)建NioSokectChannel進行傳遞
            //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        //代碼省略
    } finally {
        //代碼省略
    }
}

繼續(xù)往下跟:

allocHandle.reset(config);

這個段代碼是重新設置配置, 也就是將之前的配置信息進行初始化, 最終會走到, DefaultMaxMessagesRecvByteBufAllocator中的內(nèi)部類MaxMessageHandle的reet中

我們跟進reset中

public void reset(ChannelConfig config) {
    this.config = config;
    maxMessagePerRead = maxMessagesPerRead();
    totalMessages = totalBytesRead = 0;
}

這里僅僅對幾個屬性做了賦值, 簡單介紹下這幾個屬性:

config:當前channelConfig對象

maxMessagePerRead:表示讀取消息的時候可以讀取幾次(循環(huán)次數(shù)), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節(jié)已經(jīng)做過剖析

totalMessages:代表目前讀循環(huán)已經(jīng)讀取的消息個數(shù), 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里初始化為0

totalBytesRead:代表目前已經(jīng)讀取到的消息字節(jié)總數(shù), 這里同樣也初始化為0

我們繼續(xù)往下走, 這里首先是一個do-while循環(huán), 循環(huán)體里通過int localRead = doReadMessages(readBuf)這種方式將讀取到的連接數(shù)放入到一個List集合中, 這一步我們下一小節(jié)再分析, 我們繼續(xù)往下走:

我們首先看allocHandle.incMessagesRead(localRead)這一步, 這里的localRead表示這次循環(huán)往readBuf中放入的連接數(shù), 在Nio模式下這, 如果讀取到一條連接會返回1

跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:

public final void incMessagesRead(int amt) {
    totalMessages += amt;
}

這里將totalMessages增加amt, 也就是+1

這里totalMessage, 剛才已經(jīng)剖析過, 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里每次執(zhí)行一次循環(huán)都會加一

再去看循環(huán)終止條件allocHandle.continueReading()

跟到MaxMessageHandle的continueReading()方法中:

public boolean continueReading() {
    //config.isAutoRead()默認返回true
    // totalMessages < maxMessagePerRead
    //totalMessages代表當前讀到的鏈接, 默認是1
    //maxMessagePerRead每一次最大讀多少鏈接(默認16)
    return config.isAutoRead() &&
           attemptedBytesRead == lastBytesRead &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead < Integer.MAX_VALUE;
}

我們逐個分析判斷條件:

config.isAutoRead(): 這里默認為true

attemptedBytesRead == lastBytesRead: 表示本次讀取的字節(jié)數(shù)和最后一次讀取的字節(jié)數(shù)相等, 因為到這里都沒有進行字節(jié)數(shù)組的讀取操作, 所以默認都為0, 這里也返回true

totalMessages < maxMessagePerRead

表示當前讀取的次數(shù)是否小于最大讀取次數(shù), 我們知道totalMessages每次循環(huán)都會自增, 而maxMessagePerRead默認值為16, 所以這里會限制循環(huán)不能超過16次, 也就是最多一次只能讀取16條連接

totalBytesRead < Integer.MAX_VALUE

表示讀取的字節(jié)數(shù)不能超過int類型的最大值

這里就剖析完了Handle的創(chuàng)建和初始化過程, 并且剖析了循環(huán)終止條件等相關的邏輯

以上就是Netty分布式客戶端處理接入事件handle源碼解析的詳細內(nèi)容,更多關于Netty分布式客戶端接入事件handle的資料請關注腳本之家其它相關文章!

相關文章

  • 簡單了解Java日志脫敏框架sensitive

    簡單了解Java日志脫敏框架sensitive

    這篇文章主要介紹了簡單了解Java日志脫敏框架sensitive,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • java身份證合法性校驗并提取身份證有效信息

    java身份證合法性校驗并提取身份證有效信息

    這篇文章主要為大家詳細介紹了java身份證合法性校驗,并獲取并根據(jù)身份證號提取身份證相關信息,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-08-08
  • Java中的形式參數(shù)和實際參數(shù)案例詳解

    Java中的形式參數(shù)和實際參數(shù)案例詳解

    這篇文章主要介紹了Java中的形式參數(shù)和實際參數(shù),形參和實參間的關系,兩者是在調(diào)用的時候進行結合的,通常實參會將取值傳遞給形參,形參去之后進行函數(shù)過程運算,然后可能將某些值經(jīng)過參數(shù)或函數(shù)符號返回給調(diào)用者,需要的朋友可以參考下
    2023-10-10
  • 深入理解Java中HashCode方法

    深入理解Java中HashCode方法

    這篇文章主要介紹了深入理解Java中HashCode方法,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • 簡單了解Java的默認和靜態(tài)方法

    簡單了解Java的默認和靜態(tài)方法

    這篇文章主要介紹了簡單了解Java的默認和靜態(tài)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • IDEA調(diào)試源碼小技巧之辨別抽象類或接口多種實現(xiàn)類的正確路徑

    IDEA調(diào)試源碼小技巧之辨別抽象類或接口多種實現(xiàn)類的正確路徑

    這篇文章主要介紹了IDEA調(diào)試源碼小技巧之辨別抽象類或接口多種實現(xiàn)類的正確路徑,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • SpringBoot?AnnotationUtils工具類的使用實例詳解

    SpringBoot?AnnotationUtils工具類的使用實例詳解

    這篇文章主要介紹了SpringBoot?AnnotationUtils工具類的使用,使用自定義注解標記業(yè)務方法,原生Java獲取注解及AnnotationUtils工具類獲取方法,本文通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下
    2022-09-09
  • SpringBoot整合MongoDB實現(xiàn)文件上傳下載刪除

    SpringBoot整合MongoDB實現(xiàn)文件上傳下載刪除

    這篇文章主要介紹了SpringBoot整合MongoDB實現(xiàn)文件上傳下載刪除的方法,幫助大家更好的理解和學習使用SpringBoot框架,感興趣的朋友可以了解下
    2021-05-05
  • Java實現(xiàn)定時備份文件

    Java實現(xiàn)定時備份文件

    這篇文章主要為大家詳細介紹了Java實現(xiàn)定時備份文件,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • springboot實現(xiàn)在工具類(util)中調(diào)用注入service層方法

    springboot實現(xiàn)在工具類(util)中調(diào)用注入service層方法

    這篇文章主要介紹了springboot實現(xiàn)在工具類(util)中調(diào)用注入service層方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06

最新評論