欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中多線程Reactor模式的實(shí)現(xiàn)

 更新時(shí)間:2021年12月03日 10:07:54   作者:西伯利亞愛學(xué)習(xí)的狼  
多線程Reactor模式旨在分配多個(gè)reactor每一個(gè)reactor獨(dú)立擁有一個(gè)selector,本文就詳細(xì)的來介紹一下Java中多線程Reactor模式的實(shí)現(xiàn),需要的朋友可以參考下

多線程Reactor模式旨在分配多個(gè)reactor每一個(gè)reactor獨(dú)立擁有一個(gè)selector,在網(wǎng)絡(luò)通信中大體設(shè)計(jì)為負(fù)責(zé)連接的主Reactor,其中在主Reactor的run函數(shù)中若selector檢測(cè)到了連接事件的發(fā)生則dispatch該事件。

讓負(fù)責(zé)管理連接的Handler處理連接,其中在這個(gè)負(fù)責(zé)連接的Handler處理器中創(chuàng)建子Handler用以處理IO請(qǐng)求。這樣一來連接請(qǐng)求與IO請(qǐng)求分開執(zhí)行提高通道的并發(fā)量。同時(shí)多個(gè)Reactor帶來的好處是多個(gè)selector可以提高通道的檢索速度

1、 主服務(wù)器

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;


class MultiThreadEchoServerReactor {
    ServerSocketChannel serverSocket;
    AtomicInteger next = new AtomicInteger(0);
    Selector bossSelector = null;
    Reactor bossReactor = null;
    //selectors集合,引入多個(gè)selector選擇器
    //多個(gè)選擇器可以更好的提高通道的并發(fā)量
    Selector[] workSelectors = new Selector[2];
    //引入多個(gè)子反應(yīng)器
    //如果CPU是多核的可以開啟多個(gè)子Reactor反應(yīng)器,這樣每一個(gè)子Reactor反應(yīng)器還可以獨(dú)立分配一個(gè)線程。
    //每一個(gè)線程可以單獨(dú)綁定一個(gè)單獨(dú)的Selector選擇器以提高通道并發(fā)量
    Reactor[] workReactors = null;

    MultiThreadEchoServerReactor() throws IOException {

        bossSelector = Selector.open();
        //初始化多個(gè)selector選擇器
        workSelectors[0] = Selector.open();
        workSelectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //第一個(gè)selector,負(fù)責(zé)監(jiān)控新連接事件
        SelectionKey sk =
                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //附加新連接處理handler處理器到SelectionKey(選擇鍵)
        sk.attach(new AcceptorHandler());

        //處理新連接的反應(yīng)器
        bossReactor = new Reactor(bossSelector);

        //第一個(gè)子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器
        Reactor subReactor1 = new Reactor(workSelectors[0]);
        //第二個(gè)子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器
        Reactor subReactor2 = new Reactor(workSelectors[1]);
        workReactors = new Reactor[]{subReactor1, subReactor2};
    }

    private void startService() {
        new Thread(bossReactor).start();
        // 一子反應(yīng)器對(duì)應(yīng)一條線程
        new Thread(workReactors[0]).start();
        new Thread(workReactors[1]).start();
    }

    //反應(yīng)器
    class Reactor implements Runnable {
        //每條線程負(fù)責(zé)一個(gè)選擇器的查詢
        final Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    //單位為毫秒
                    //每隔一秒列出選擇器感應(yīng)列表
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    if (null == selectedKeys || selectedKeys.size() == 0) {
                        //如果列表中的通道注冊(cè)事件沒有發(fā)生那就繼續(xù)執(zhí)行
                        continue;
                    }
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        //Reactor負(fù)責(zé)dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    //清楚掉已經(jīng)處理過的感應(yīng)事件,防止重復(fù)處理
                    selectedKeys.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }


        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            //調(diào)用之前attach綁定到選擇鍵的handler處理器對(duì)象
            if (handler != null) {
                handler.run();
            }
        }
    }


    // Handler:新連接處理器
    class AcceptorHandler implements Runnable {
        public void run() {
        try {
                SocketChannel channel = serverSocket.accept();
                Logger.info("接收到一個(gè)新的連接");

                if (channel != null) {
                    int index = next.get();
                    Logger.info("選擇器的編號(hào):" + index);
                    Selector selector = workSelectors[index];
                    new MultiThreadEchoHandler(selector, channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == workSelectors.length) {
                next.set(0);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startService();
    }

}

按上述的設(shè)計(jì)思想,在主服務(wù)器中實(shí)際上設(shè)計(jì)了三個(gè)Reactor,一個(gè)主Reactor專門負(fù)責(zé)連接請(qǐng)求并配已單獨(dú)的selector,但是三個(gè)Reactor的線程Run函數(shù)是做的相同的功能,都是根據(jù)每個(gè)線程內(nèi)部的selector進(jìn)行檢索事件列表,若注冊(cè)的監(jiān)聽事件發(fā)生了則調(diào)用dispactch分發(fā)到每個(gè)Reactor對(duì)應(yīng)的Handler。

這里需要注意的一開始其實(shí)只有負(fù)責(zé)連接事件的主Reactor在注冊(cè)selector的時(shí)候給相應(yīng)的key配了一個(gè)AcceptorHandler()。

 //第一個(gè)selector,負(fù)責(zé)監(jiān)控新連接事件
        SelectionKey sk =
                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //附加新連接處理handler處理器到SelectionKey(選擇鍵)
        sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相應(yīng)的selector key發(fā)生了便要dispatch到一個(gè)Handler。這里其他兩個(gè)子Reactor的Handler在哪里賦值的呢?其實(shí)在處理連接請(qǐng)求的Reactor中便創(chuàng)建了各個(gè)子Handler,如下代碼所示:
主Handler中先是根據(jù)服務(wù)器channel創(chuàng)建出客服端channel,在進(jìn)行子selector與channel的綁定。

                   int index = next.get();
                   Logger.info("選擇器的編號(hào):" + index);
                   Selector selector = workSelectors[index];
                   new MultiThreadEchoHandler(selector, channel);

2、IO請(qǐng)求handler+線程池

package com.crazymakercircle.ReactorModel;


import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入線程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        channel.configureBlocking(false);

        //喚醒選擇,防止register時(shí) boss線程被阻塞,netty 處理方式比較優(yōu)雅,會(huì)在同一個(gè)線程注冊(cè)事件,避免阻塞boss
        selector.wakeup();

        //僅僅取得選擇鍵,后設(shè)置感興趣的IO事件
        sk = channel.register(selector, 0);
        //將本Handler作為sk選擇鍵的附件,方便事件dispatch
        sk.attach(this);
        //向sk選擇鍵注冊(cè)Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);

        //喚醒選擇,是的OP_READ生效
        selector.wakeup();
        Logger.info("新的連接 注冊(cè)完成");

    }

    public void run() {
        //異步任務(wù),在獨(dú)立的線程池中執(zhí)行
        pool.execute(new AsyncTask());
    }

    //異步任務(wù),不在Reactor線程中執(zhí)行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //寫入通道
                channel.write(byteBuffer);

                //寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫模式
                byteBuffer.clear();
                //寫完后,注冊(cè)read就緒事件
                sk.interestOps(SelectionKey.OP_READ);
                //寫完后,進(jìn)入接收的狀態(tài)
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //從通道讀
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀模式
                byteBuffer.flip();
                //讀完后,注冊(cè)write就緒事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //讀完后,進(jìn)入發(fā)送的狀態(tài)
                state = SENDING;
            }
            //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //異步任務(wù)的內(nèi)部類
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }
}

3、客戶端

在處理IO請(qǐng)求的Handler中采用了線程池,已達(dá)到異步處理的目的。

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Dateutil;
import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * create by 尼恩 @ 瘋狂創(chuàng)客圈
 **/
public class EchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、獲取通道(channel)
        SocketChannel socketChannel = SocketChannel.open(address);
        Logger.info("客戶端連接成功");
        // 2、切換成非阻塞模式
        socketChannel.configureBlocking(false);
        //不斷的自旋、等待連接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
        Logger.tcfo("客戶端啟動(dòng)成功!");

        //啟動(dòng)接受線程
        Processer processer = new Processer(socketChannel);
        new Thread(processer).start();

    }

    static class Processer implements Runnable {
        final Selector selector;
        final SocketChannel channel;

        Processer(SocketChannel channel) throws IOException {
            //Reactor初始化
            selector = Selector.open();
            this.channel = channel;
            channel.register(selector,
                    SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

                            Scanner scanner = new Scanner(System.in);
                            Logger.tcfo("請(qǐng)輸入發(fā)送內(nèi)容:");
                            if (scanner.hasNext()) {
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                String next = scanner.next();
                                buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
                                buffer.flip();
                                // 操作三:發(fā)送數(shù)據(jù)
                                socketChannel.write(buffer);
                                buffer.clear();
                            }

                        }
                        if (sk.isReadable()) {
                            // 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù)
                            SocketChannel socketChannel = (SocketChannel) sk.channel();

                            //讀取數(shù)據(jù)
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int length = 0;
                            while ((length = socketChannel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
                                byteBuffer.clear();
                            }

                        }
                        //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
                        //selectionKey.cancel();
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new EchoClient().start();
    }
}

到此這篇關(guān)于Java中多線程Reactor模式的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java 多線程Reactor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 簡(jiǎn)單了解java等待喚醒機(jī)制原理及使用

    簡(jiǎn)單了解java等待喚醒機(jī)制原理及使用

    這篇文章主要介紹了簡(jiǎn)單了解java等待喚醒機(jī)制原理及使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容

    java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容

    這篇文章主要介紹了java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容的相關(guān)資料,需要的朋友可以參考下
    2017-04-04
  • Spring線程池ThreadPoolTaskExecutor配置詳情

    Spring線程池ThreadPoolTaskExecutor配置詳情

    本篇文章主要介紹了Spring線程池ThreadPoolTaskExecutor配置詳情,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-03-03
  • spring MVC中傳遞對(duì)象參數(shù)示例詳解

    spring MVC中傳遞對(duì)象參數(shù)示例詳解

    這篇文章主要給大家介紹了在spring MVC中傳遞對(duì)象參數(shù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看吧。
    2017-06-06
  • Java中Calendar日期類常用方法演示

    Java中Calendar日期類常用方法演示

    這篇文章主要給大家介紹了關(guān)于Java中Calendar日期類用法詳細(xì)介紹的相關(guān)資料,Calendar類是?Java?中用于處理日期和時(shí)間的抽象類,它提供了一種獨(dú)立于特定日歷系統(tǒng)的方式來處理日期和時(shí)間,需要的朋友可以參考下
    2023-12-12
  • 詳解SPI在Dubbo中的應(yīng)用

    詳解SPI在Dubbo中的應(yīng)用

    通過本文的學(xué)習(xí),可以了解 Dubbo SPI 的特性及實(shí)現(xiàn)原理,希望對(duì)大家的開發(fā)設(shè)計(jì)有一定的啟發(fā)性
    2021-06-06
  • Feign Client 超時(shí)時(shí)間配置不生效的解決

    Feign Client 超時(shí)時(shí)間配置不生效的解決

    這篇文章主要介紹了Feign Client 超時(shí)時(shí)間配置不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 簡(jiǎn)單講解奇偶排序算法及在Java數(shù)組中的實(shí)現(xiàn)

    簡(jiǎn)單講解奇偶排序算法及在Java數(shù)組中的實(shí)現(xiàn)

    這篇文章主要介紹了奇偶排序算法及Java數(shù)組的實(shí)現(xiàn),奇偶排序的時(shí)間復(fù)雜度為O(N^2),需要的朋友可以參考下
    2016-04-04
  • 使用javax.sound實(shí)現(xiàn)簡(jiǎn)單音頻播放

    使用javax.sound實(shí)現(xiàn)簡(jiǎn)單音頻播放

    這篇文章主要為大家詳細(xì)介紹了使用javax.sound實(shí)現(xiàn)簡(jiǎn)單音頻播放,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • Spring中如何使用Comparator接口

    Spring中如何使用Comparator接口

    Comparator比較器接口可以將自身傳遞給排序方法(比如Collections.sort或Arrays.sort),以便對(duì)排序順序進(jìn)行精確控制。本文講述Spring中如何使用Comparator接口
    2021-06-06

最新評(píng)論