欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Java?NIO編寫一個(gè)簡單版Netty服務(wù)端

 更新時(shí)間:2024年04月03日 08:22:09   作者:gg12138  
基于?NIO?實(shí)現(xiàn)的網(wǎng)絡(luò)框架,可以用少量的線程,處理大量的連接,更適用于高并發(fā)場景,所以被就將利用NIO編寫一個(gè)簡單版Netty服務(wù)端,需要的可以參考下

前置知識

NIO

  • NIO 一般指 同步非阻塞 IO,同樣用于**描述程序訪問數(shù)據(jù)方式 **的還有BIO(同步阻塞)、AIO(異步非阻塞)
  • 同步異步指獲取結(jié)果的方式,同步為主動(dòng)去獲取結(jié)果,不管結(jié)果是否準(zhǔn)備好,異步為等待結(jié)果準(zhǔn)備好的通知
  • 阻塞非阻塞是線程在結(jié)果沒有到來之前,是否進(jìn)行等待,阻塞為進(jìn)行等待,非阻塞則不進(jìn)行等待
  • NIO 主動(dòng)地去獲取結(jié)果,但是在結(jié)果沒有準(zhǔn)備好之前,不會進(jìn)行等待。而是通過一個(gè) 多路復(fù)用器 管理多個(gè)通道,由一個(gè)線程輪訓(xùn)地去檢查是否準(zhǔn)備好即可。在網(wǎng)絡(luò)編程中,多路復(fù)用器通常由操作系統(tǒng)提供,Linux中主要有 select、poll、epoll。同步非阻塞指線程不等待數(shù)據(jù)的傳輸,而是完成后由多路復(fù)用器通知,線程再將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間內(nèi)存進(jìn)行處理。

Java NIO

基于 NIO 實(shí)現(xiàn)的網(wǎng)絡(luò)框架,可以用少量的線程,處理大量的連接,更適用于高并發(fā)場景。于是,Java提供了NIO包提供相關(guān)組件,用于實(shí)現(xiàn)同步非阻塞IO

核心三個(gè)類Channel、Buffer、Selector。Channel代表一個(gè)數(shù)據(jù)傳輸通道,但不進(jìn)行數(shù)據(jù)存取,有Buffer類進(jìn)行數(shù)據(jù)管理,Selector為一個(gè)復(fù)用器,管理多個(gè)通道

Bytebuffer

  • 該類為NIO 包中用于操作內(nèi)存的抽象類,具體實(shí)現(xiàn)由HeapByteBuffer、DirectByteBuffer兩種
  • HeapByteBuffer為堆內(nèi)內(nèi)存,底層通過 byte[ ] 存取數(shù)據(jù)
  • DirectByteBuffer 為堆外內(nèi)存,通過JDK提供的 Unsafe類去存??;同時(shí)創(chuàng)建對象會關(guān)聯(lián)的一個(gè)Cleaner對象,當(dāng)對象被GC時(shí),通過cleaner對象去釋放堆外內(nèi)存

各核心組件介紹

NioServer

為啟動(dòng)程序類,監(jiān)聽端口,初始化Channel

下面為NIO模式下簡單服務(wù)端處理代碼

// 1、創(chuàng)建服務(wù)端Channel,綁定端口并配置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);

// 2、創(chuàng)建多路復(fù)用器selector,并將channel注冊到多路復(fù)用器上
// 不能直接調(diào)用channel的accept方法,因?yàn)閷儆诜亲枞?,直接調(diào)用沒有新連接會直接返回
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 3、循環(huán)處理多路復(fù)用器的IO事件
while(true){

    // 3.1、select屬于阻塞的方法,這里阻塞等待1秒
    // 如果返回0,說明沒有事件處理
    if (selector.select(1000) == 0){
        System.out.println("服務(wù)器等待了1秒,無IO事件");
        continue;
    }
    // 3.2、遍歷事件進(jìn)行處理
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while(iterator.hasNext()){
        SelectionKey key = iterator.next();
        // accept事件,說明有新的客戶端連接
        if (key.isAcceptable()){
            // 新建一個(gè)socketChannel,注冊到selector,并關(guān)聯(lián)buffer
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
            System.out.println("客戶端連接:"+socketChannel.getRemoteAddress());
        }
        // read事件 (內(nèi)核緩沖區(qū)的數(shù)據(jù)準(zhǔn)備好了)
        if(key.isReadable()){
            SocketChannel channel = (SocketChannel)key.channel();
            ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
            try {
              // 將數(shù)據(jù)寫進(jìn)buffer
                int readNum = channel.read(byteBuffer);
                if (readNum == -1){
                    System.out.println("讀取-1時(shí),表示IO流已結(jié)束");
                    channel.close();
                    break;
                }
                // 打印buffer
                byteBuffer.flip();
                byte[] bytes = new byte[readNum];
                byteBuffer.get(bytes, 0, readNum);
                System.out.println("讀取到數(shù)據(jù):" + new String(bytes));
            } catch (IOException e) {
                System.out.println("讀取發(fā)生異常,廣播socket");
                channel.close();
            }

        }
        // write事件 (操作系統(tǒng)有內(nèi)存寫出了)
        if (key.isWritable()){
            SocketChannel channel = (SocketChannel)key.channel();
            // 讀取read時(shí)暫存數(shù)據(jù)
            byte[] bytes = (byte[])key.attachment();
            if (bytes != null){
                System.out.println("可寫事件發(fā)生,寫入數(shù)據(jù): " + new String(bytes));
                channel.write(ByteBuffer.wrap(bytes));
            }
            // 清空暫存數(shù)據(jù),并切換成關(guān)注讀事件
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
        iterator.remove();
    }
}

EventLoop

處理 Channel 中數(shù)據(jù)的讀寫

  • 在上面的Server中,大量并發(fā)時(shí)單線程地處理讀寫事件會導(dǎo)致延遲,因此將讀寫處理抽取出來,可利用多線程實(shí)現(xiàn)高并發(fā)
  • 一個(gè)EventLoop會關(guān)聯(lián)一個(gè)selector,只會處理這個(gè)selector上的Channel
public class EventLoop2 implements Runnable{


    private final Thread thread;
    /**
     * 復(fù)用器,當(dāng)前線程只處理這個(gè)復(fù)用器上的channel
     */
    public Selector selector;
    /**
     * 待處理的注冊任務(wù)
     */
    private final Queue<Runnable> queue = new LinkedBlockingQueue<>();

    /**
     * 初始化復(fù)用器,線程啟動(dòng)
     * @throws IOException
     */
    public EventLoop2() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        this.thread = new Thread(this);
        thread.start();
    }

    /**
     * 將通道注冊給當(dāng)前的線程處理
     * @param socketChannel
     * @param keyOps
     */
    public void register(SocketChannel socketChannel,int keyOps){
        // 將注冊新的socketChannel到當(dāng)前selector封裝成一個(gè)任務(wù)
        queue.add(()->{
            try {
                MyChannel myChannel = new MyChannel(socketChannel, this);
                SelectionKey key = socketChannel.register(selector, keyOps);
                key.attach(myChannel);
            } catch (Exception e){
                e.printStackTrace();
            }
        });
        // 喚醒阻塞等待的selector線程
        selector.wakeup();
    }

    /**
     * 循環(huán)地處理 注冊事件、讀寫事件
     */
    @Override
    public void run() {
        while (!thread.isInterrupted()){
            try {
                int select = selector.select(1000);
                // 處理注冊到當(dāng)前selector的事件
                if (select == 0){
                    Runnable task;
                    while ((task = queue.poll()) != null){
                        task.run();
                    }
                    continue;
                }
                // 處理讀寫事件
                System.out.println("服務(wù)器收到讀寫事件,select:" + select);
                processReadWrite();

            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 處理讀寫事件
     * @throws Exception
     */
    private void processReadWrite() throws Exception{
        System.out.println(Thread.currentThread() + "開始監(jiān)聽讀寫事件");
        // 3.2、遍歷事件進(jìn)行處理
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            MyChannel myChannel = (MyChannel)key.attachment();
            if(key.isReadable()){
                // 將數(shù)據(jù)讀進(jìn)buffer
                myChannel.doRead(key);
            }
            if (key.isWritable()){
                myChannel.doWrite(key);
            }
            iterator.remove();
        }
    }
}

EventloopGroup

一組EventLoop,輪訓(xùn)地為eventLoop分配Channel

public class EventLoopGroup {
    private EventLoop2[] children = new EventLoop2[1];

    private AtomicInteger idx = new AtomicInteger(0);

    public EventLoopGroup() throws IOException {
        for (int i = 0; i < children.length; i++){
            children[i] = new EventLoop2();
        }
    }

    public EventLoop2 next(){
        // 輪訓(xùn)每一個(gè)children
        return children[idx.getAndIncrement() & (children.length - 1)];
    }

    public void register(SocketChannel channel,int ops){
        next().register(channel,ops);
    }
}

Channel

封裝了SocketChannel 和 Pipline,將從Channel讀寫的消息,沿著Pipline上的節(jié)點(diǎn)進(jìn)行處理

  • 在上面EventLoop中,注冊Channel到對應(yīng)的Selector前,會進(jìn)行封裝,將自定義的Channel放在讀寫事件觸發(fā)時(shí)會返回的SelectionKey里面
  • 同時(shí)提供了數(shù)據(jù)讀寫處理方法,讀寫事件觸發(fā)時(shí)調(diào)用該方法,數(shù)據(jù)會沿著pipline上去處理
public class MyChannel {

    private SocketChannel channel;

    private EventLoop2 eventLoop;

    private Queue<ByteBuffer> writeQueue;

    private PipLine pipLine;

    /**
     * 一個(gè)channel關(guān)聯(lián)一個(gè)eventLoop、一個(gè)pipLine、一個(gè)socketChannel、一個(gè)writeQueue
     * @param channel
     * @param eventLoop
     */
    public MyChannel(SocketChannel channel, EventLoop2 eventLoop) {
        this.channel = channel;
        this.eventLoop = eventLoop;
        this.writeQueue = new ArrayDeque<>();
        this.pipLine = new PipLine(this,eventLoop);
        this.pipLine.addLast(new MyHandler1());
        this.pipLine.addLast(new MyHandler2());
    }

    /**
     * 讀事件處理
     * @param key
     * @throws Exception
     */
    public void doRead(SelectionKey key) throws Exception{
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int readNum = channel.read(buffer);
            if (readNum == -1){
                System.out.println("讀取-1時(shí),表示IO流已結(jié)束");
                channel.close();
                return;
            }
            // 轉(zhuǎn)成可讀狀態(tài)
            buffer.flip();
            // 消息放入pipLine,交給頭節(jié)點(diǎn), 頭節(jié)點(diǎn)開始傳遞
            pipLine.headContext.fireChannelRead(buffer);

        } catch (IOException e) {
            System.out.println("讀取發(fā)生異常,廣播socket");
            channel.close();
        }
    }

    /**
     * 真正地寫出數(shù)據(jù),關(guān)注寫事件后,會觸發(fā)
     * @param key
     * @throws IOException
     */
    public void doWrite(SelectionKey key) throws IOException{
        ByteBuffer buffer;
        while ((buffer =writeQueue.poll()) != null){
            channel.write(buffer);
        }
        // 回復(fù)讀取狀態(tài)
        key.interestOps(SelectionKey.OP_READ);

    }

    /**
     * 寫出到隊(duì)列
     * @param msg
     */
    public void doWriteQueue(ByteBuffer msg){
        writeQueue.add(msg);
    }

    /**
     * 從最后一個(gè)節(jié)點(diǎn)進(jìn)行寫出,寫出到頭節(jié)點(diǎn)是調(diào)用doWriteQueue
     * @param msg
     */
    public void write(Object msg){
        this.pipLine.tailContext.write(msg);
    }

    /**
     * 從最后一個(gè)節(jié)點(diǎn)進(jìn)行flush,寫出到頭節(jié)點(diǎn)時(shí)調(diào)用doFlush
     */
    public void flush(){
        this.pipLine.tailContext.flush();
    }

    /**
     * 關(guān)注寫事件,才能進(jìn)行真正地寫出
     */
    public void doFlush(){
        this.channel.keyFor(eventLoop.selector).interestOps(SelectionKey.OP_WRITE);
    }

}

Handler 和 HandlerContext

handler 接口定義了可以擴(kuò)展處理的消息,由開發(fā)人員實(shí)現(xiàn)具體的處理

handlerContext 類封裝了handler的實(shí)現(xiàn)類,將handler的上一個(gè)節(jié)點(diǎn)和下一個(gè)節(jié)點(diǎn),讓消息可以延者鏈表傳遞

public interface Handler {

    /**
     * 讀取數(shù)據(jù)處理
     * @param ctx
     * @param msg
     */
    void channelRead(HandlerContext ctx,Object msg);

    /**
     * 寫出數(shù)據(jù)
     * @param ctx
     * @param msg
     */
    void write(HandlerContext ctx,Object msg);

    /**
     * 刷下數(shù)據(jù)
     * @param ctx
     */
    void flush(HandlerContext ctx);
}
public class HandlerContext {

    private Handler handler;

    MyChannel channel;

    HandlerContext prev;

    HandlerContext next;

    public HandlerContext(Handler handler, MyChannel channel) {
        this.handler = handler;
        this.channel = channel;
    }

    /**
     * 讀消息的傳遞,從頭節(jié)點(diǎn)開始往后傳
     * @param msg
     */
    public void fireChannelRead(Object msg){
        HandlerContext next = this.next;
        if (next != null){
            next.handler.channelRead(next,msg);
        }
    }

    /**
     * 從尾節(jié)點(diǎn)開始往前傳
     * @param msg
     */
    public void write(Object msg){
        HandlerContext prev = this.prev;
        if (prev != null){
            prev.handler.write(prev,msg);
        }
    }

    /**
     * 從尾節(jié)點(diǎn)開始往前傳
     */
    public void flush(){
        HandlerContext prev = this.prev;
        if (prev != null){
            prev.handler.flush(prev);
        }
    }
}

Pipline

本質(zhì)是鏈表,包含了頭尾節(jié)點(diǎn)的HandlerContext,提供方法給開發(fā)人員加節(jié)點(diǎn)

public class PipLine {

    private MyChannel channel;

    private EventLoop2 eventLoop;

    public HandlerContext headContext;

    public HandlerContext tailContext;

    public PipLine(MyChannel channel, EventLoop2 eventLoop) {
        this.channel = channel;
        this.eventLoop = eventLoop;
        PipHandler headHandler = new PipHandler();
        this.headContext = new HandlerContext(headHandler,channel);
        PipHandler tailHandler = new PipHandler();
        this.tailContext = new HandlerContext(tailHandler,channel);
        // 構(gòu)建鏈表
        this.headContext.next = this.tailContext;
        this.tailContext.prev = this.headContext;
    }

    public void addLast(Handler handler){
        HandlerContext curr = new HandlerContext(handler, channel);

        // 連接在倒數(shù)第二個(gè)后面
        HandlerContext lastButOne = this.tailContext.prev;
        lastButOne.next = curr;
        curr.prev = lastButOne;

        // 連接在最后一個(gè)前面
        curr.next = tailContext;
        tailContext.prev = curr;

    }

    public static class PipHandler implements Handler{

        @Override
        public void channelRead(HandlerContext ctx, Object msg) {
            System.out.println("接收"+(String) msg +"進(jìn)行資源釋放");
        }

        @Override
        public void write(HandlerContext ctx, Object msg) {
            System.out.println("寫出"+msg.toString());
        }

        @Override
        public void flush(HandlerContext ctx) {
            System.out.println("flush");
        }
    }
}

到此這篇關(guān)于基于Java NIO編寫一個(gè)簡單版Netty服務(wù)端的文章就介紹到這了,更多相關(guān)Java NIO編寫Netty服務(wù)端內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java ConcurrentHashMap用法案例詳解

    Java ConcurrentHashMap用法案例詳解

    這篇文章主要介紹了Java ConcurrentHashMap用法案例詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • Java獲取時(shí)間打印到控制臺代碼實(shí)例

    Java獲取時(shí)間打印到控制臺代碼實(shí)例

    這篇文章主要介紹了Java獲取時(shí)間打印到控制臺代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • JSON 格式的弊端與解決方法(真實(shí)示例)

    JSON 格式的弊端與解決方法(真實(shí)示例)

    JSON 格式是目前最流行的數(shù)據(jù)交互格式,廣泛應(yīng)用于前后端分離的系統(tǒng)。但也有一些場合不適合使用 JSON 格式,這篇文章主要介紹了JSON 格式的弊端與解決方法,需要的朋友可以參考下
    2022-09-09
  • Java集合ArrayList與LinkedList詳解

    Java集合ArrayList與LinkedList詳解

    這篇文章主要介紹了Java集合ArrayList與LinkedList詳解,對于ArrayList和LinkedList,他們都是List接口的一個(gè)實(shí)現(xiàn)類,并且我們知道他們的實(shí)現(xiàn)方式各不相同,例如ArrayList底層實(shí)現(xiàn)是一個(gè)數(shù)組
    2022-08-08
  • idea導(dǎo)入maven工程的三種方法

    idea導(dǎo)入maven工程的三種方法

    這篇文章主要介紹了idea導(dǎo)入maven工程的三種方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • 在CentOS系統(tǒng)中檢測Java安裝及運(yùn)行jar應(yīng)用的方法

    在CentOS系統(tǒng)中檢測Java安裝及運(yùn)行jar應(yīng)用的方法

    這篇文章主要介紹了在CentOS系統(tǒng)中檢測Java安裝及運(yùn)行jar應(yīng)用的方法,同樣適用于Fedora等其他RedHat系的Linux系統(tǒng),需要的朋友可以參考下
    2015-06-06
  • Java多線程的用法詳細(xì)介紹

    Java多線程的用法詳細(xì)介紹

    這篇文章主要介紹了Java多線程的用法詳細(xì)介紹的相關(guān)資料,希望通過本文能幫助到大家,需要的朋友可以參考下
    2017-09-09
  • JAVA實(shí)現(xiàn)異步調(diào)用實(shí)例代碼

    JAVA實(shí)現(xiàn)異步調(diào)用實(shí)例代碼

    在java平臺,實(shí)現(xiàn)異步調(diào)用的角色主要三種角色:調(diào)用者、取貨憑證、真實(shí)數(shù)據(jù)。本篇文章給大家介紹java實(shí)現(xiàn)異步調(diào)用實(shí)例代碼,需要的朋友可以參考下
    2015-09-09
  • java 替換docx文件中的字符串方法實(shí)現(xiàn)

    java 替換docx文件中的字符串方法實(shí)現(xiàn)

    這篇文章主要介紹了java 替換docx文件中的字符串方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • Java實(shí)現(xiàn)AES加密算法的簡單示例分享

    Java實(shí)現(xiàn)AES加密算法的簡單示例分享

    這篇文章主要介紹了Java實(shí)現(xiàn)AES加密算法的簡單示例分享,AES算法是基于對密碼值的置換和替代,需要的朋友可以參考下
    2016-04-04

最新評論