欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中Reactor的反應(yīng)器模式詳解

 更新時(shí)間:2023年12月13日 09:41:21   作者:得過(guò)且過(guò)的勇者y  
這篇文章主要介紹了Java中Reactor的反應(yīng)器模式詳解,Reactor反應(yīng)器模式有點(diǎn)兒類(lèi)似事件驅(qū)動(dòng)模式,當(dāng)有事件觸發(fā)時(shí),事件源會(huì)將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理,反應(yīng)器模式中的反應(yīng)器角色類(lèi)似于事件驅(qū)動(dòng)模式中的dispatcher事件分發(fā)器角色,需要的朋友可以參考下

前言

在Java的OIO編程中,最初和最原始的網(wǎng)絡(luò)服務(wù)器程序使用一個(gè)while循環(huán),不斷地監(jiān)聽(tīng)端口是否有新的連接,如果有就調(diào)用一個(gè)處理函數(shù)來(lái)處理。這種方法最大的問(wèn)題就是如果前一個(gè)網(wǎng)絡(luò)連接的處理沒(méi)有結(jié)束,那么后面的連接請(qǐng)求沒(méi)法被接收,于是后面的請(qǐng)求統(tǒng)統(tǒng)會(huì)被阻塞住,服務(wù)器的吞吐量就太低了。

為了解決這個(gè)嚴(yán)重的連接阻塞問(wèn)題,出現(xiàn)了一個(gè)即為經(jīng)典模式:Connection Per Thread。即對(duì)于每一個(gè)新的網(wǎng)絡(luò)連接都分配一個(gè)線程,每個(gè)線程都獨(dú)自處理自己負(fù)責(zé)的輸入和輸出,任何socket連接的輸入和輸出處理不會(huì)阻塞到后面新socket連接的監(jiān)聽(tīng)和建立。早期版本的Tomcat服務(wù)器就是這樣實(shí)現(xiàn)的。

這種模式的優(yōu)點(diǎn)是解決了前面的新連接被嚴(yán)重阻塞的問(wèn)題,在一定程度上極大地提高了服務(wù)器的吞吐量。但是對(duì)于大量的連接,需要消耗大量的現(xiàn)成資源,如果線程數(shù)太多,系統(tǒng)無(wú)法承受。而且線程的反復(fù)創(chuàng)建、銷(xiāo)毀、線程的切換也需要代價(jià)。因此高并發(fā)應(yīng)用場(chǎng)景下多線程O(píng)IO的缺陷是致命的,因此引入了Reactor反應(yīng)器模式。

反應(yīng)器模式由Reactor反應(yīng)器線程、Handlers處理器兩大角色組成:

  • Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件,并且分發(fā)到Handlers處理器
  • Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯

一、單線程Reactor反應(yīng)器模式

Reactor反應(yīng)器模式有點(diǎn)兒類(lèi)似事件驅(qū)動(dòng)模式,當(dāng)有事件觸發(fā)時(shí),事件源會(huì)將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理。反應(yīng)器模式中的反應(yīng)器角色類(lèi)似于事件驅(qū)動(dòng)模式中的dispatcher事件分發(fā)器角色。

  • Reactor反應(yīng)器:負(fù)責(zé)查詢(xún)IO事件,當(dāng)檢測(cè)到一個(gè)IO時(shí)間,將其發(fā)送給對(duì)應(yīng)的Handler處理器處理,這里的IO事件就是NIO選擇器監(jiān)控的通道IO事件。
  • Handler處理器:與IO事件綁定,負(fù)責(zé)IO事件的處理,完成真正的連接建立、通道的讀取、處理業(yè)務(wù)邏輯、負(fù)責(zé)將結(jié)果寫(xiě)出到通道等。

基于NIO實(shí)現(xiàn)單線程版本的反應(yīng)器模式需要用到SelectionKey選擇鍵的幾個(gè)重要的成員方法:

  1. void attach(Object o):將任何的Java對(duì)象作為附件添加到SelectionKey實(shí)例,主要是將Handler處理器實(shí)例作為附件添加到SelectionKey實(shí)例
  2. Object attachment():取出之前通過(guò)attach添加到SelectionKey選擇鍵實(shí)例的附件,一般用于取出綁定的Handler處理器實(shí)例。

Reactor實(shí)現(xiàn)示例:

package cn.ken.jredis;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a  rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 14:29
 */
public class Reactor implements Runnable {
    final private Selector selector;
    final private ServerSocketChannel serverSocketChannel;
    public Reactor() {
        try {
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8088));
            // 注冊(cè)ServerSocket的accept事件
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 為事件綁定處理器
            sk.attach(new AcceptHandler());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey selectedKey : selectionKeys) {
                    dispatch(selectedKey);
                }
                selectionKeys.clear();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    private void dispatch(SelectionKey selectedKey) {
        Runnable handler = (Runnable) selectedKey.attachment();
        // 此處返回的可能是AcceptHandler也可能是IOHandler
        handler.run();
    }
    class AcceptHandler implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    new IOHandler(selector, socketChannel); // 注冊(cè)IO處理器,并將連接加入select列表
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    public static void main(String[] args) {
        new Reactor().run();
    }
}

Handler實(shí)現(xiàn)示例:

package cn.ken.jredis;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a  rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 14:53
 */
public class IOHandler implements Runnable {
    final private SocketChannel socketChannel;
    final private ByteBuffer buffer;
    public IOHandler(Selector selector, SocketChannel channel) {
        buffer = ByteBuffer.allocate(1024);
        socketChannel = channel;
        try {
            channel.configureBlocking(false);
            SelectionKey sk = channel.register(selector, 0); // 此處沒(méi)有注冊(cè)感興趣的事件
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ); // 注冊(cè)感興趣的事件,下一次調(diào)用select時(shí)才生效
            selector.wakeup(); // 立即喚醒當(dāng)前阻塞select操作,使得迅速進(jìn)入下次select,從而讓上面注冊(cè)的讀事件監(jiān)聽(tīng)可以立即生效
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        try {
            int length;
            while ((length = socketChannel.read(buffer)) > 0) {
                System.out.println(new String(buffer.array(), 0, length));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

在單線程反應(yīng)器模式中,Reactor反應(yīng)器和Handler處理器都執(zhí)行在同一條線程上(dispatch方法是直接調(diào)用run方法,沒(méi)有創(chuàng)建新的線程),因此當(dāng)其中某個(gè)Handler阻塞時(shí),會(huì)導(dǎo)致其他所有的Handler都得不到執(zhí)行。

二、多線程Reactor反應(yīng)器模式

既然Reactor反應(yīng)器和Handler處理器在一個(gè)線程會(huì)造成非常嚴(yán)重的性能缺陷,那么可以使用多線程對(duì)基礎(chǔ)的反應(yīng)器模式進(jìn)行改造。

  1. 將負(fù)責(zé)輸入輸出處理的IOHandler處理器的執(zhí)行,放入獨(dú)立的線程池中。這樣業(yè)務(wù)處理線程與負(fù)責(zé)服務(wù)監(jiān)聽(tīng)和IO時(shí)間查詢(xún)的反應(yīng)器線程相隔離,避免服務(wù)器的連接監(jiān)聽(tīng)收到阻塞。
  2. 如果服務(wù)器為多核的CPU,可以將反應(yīng)器線程拆分為多個(gè)子反應(yīng)器線程,同時(shí)引入多個(gè)選擇器,每一個(gè)SubReactor子線程負(fù)責(zé)一個(gè)選擇器。

MultiReactor:

package cn.ken.jredis;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a  rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 16:51
 */
public class MultiReactor {
    private final ServerSocketChannel server;
    private final Selector[] selectors = new Selector[2];
    private final SubReactor[] reactors = new SubReactor[2];
    private final AtomicInteger index = new AtomicInteger(0);
    public MultiReactor() {
        try {
            server = ServerSocketChannel.open();
            selectors[0] = Selector.open();
            selectors[1] = Selector.open();
            server.bind(new InetSocketAddress(8080));
            server.configureBlocking(false);
            SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);
            register.attach(new AcceptHandler());
            reactors[0] = new SubReactor(selectors[0]);
            reactors[1] = new SubReactor(selectors[1]);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    private void startService() {
        new Thread(reactors[0]).start();
        new Thread(reactors[1]).start();
    }
    class SubReactor implements Runnable {
        final private Selector selector;
        public SubReactor(Selector selector) {
            this.selector = selector;
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey selectionKey : selectionKeys) {
                        dispatch(selectionKey);
                    }
                    selectionKeys.clear();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        Runnable attachment = (Runnable) selectionKey.attachment();
        if (attachment != null) {
            attachment.run();
        }
    }
    class AcceptHandler implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = server.accept();
                new MultiHandler(selectors[index.getAndIncrement()], socketChannel);
                if (index.get() == selectors.length) {
                    index.set(0);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

MultiHandler:

package cn.ken.jredis;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a  rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 17:28
 */
public class MultiHandler implements Runnable {
    final private Selector selector;
    final private SocketChannel channel;
    final ByteBuffer buffer = ByteBuffer.allocate(1024);
    static ExecutorService pool = Executors.newFixedThreadPool(4);
    public MultiHandler(Selector selector, SocketChannel channel) {
        this.selector = selector;
        this.channel = channel;
        try {
            channel.configureBlocking(false);
            SelectionKey register = channel.register(selector, SelectionKey.OP_READ);
            register.attach(this);
            selector.wakeup();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        pool.execute(() -> {
            synchronized (this) {
                int length;
                try {
                    while ((length = channel.read(buffer)) > 0) {
                        System.out.println(new String(buffer.array(), 0, length));
                        buffer.clear();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }   
        });
    }
}

到此這篇關(guān)于Java中Reactor的反應(yīng)器模式詳解的文章就介紹到這了,更多相關(guān)Reactor反應(yīng)器模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring?Cloud?Gateway編碼實(shí)現(xiàn)任意地址跳轉(zhuǎn)的示例

    Spring?Cloud?Gateway編碼實(shí)現(xiàn)任意地址跳轉(zhuǎn)的示例

    本文主要介紹了Spring?Cloud?Gateway編碼實(shí)現(xiàn)任意地址跳轉(zhuǎn)的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Jenkins自動(dòng)構(gòu)建部署項(xiàng)目到遠(yuǎn)程服務(wù)器上的方法步驟

    Jenkins自動(dòng)構(gòu)建部署項(xiàng)目到遠(yuǎn)程服務(wù)器上的方法步驟

    這篇文章主要介紹了Jenkins自動(dòng)構(gòu)建部署項(xiàng)目到遠(yuǎn)程服務(wù)器上的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • Java是自學(xué)好還是參加培訓(xùn)班好?

    Java是自學(xué)好還是參加培訓(xùn)班好?

    這篇文章主要介紹了Java是自學(xué)好還是參加培訓(xùn)班好這一問(wèn)題,給大家介紹了哪些人適合自學(xué)java,哪些人適合java培訓(xùn)班學(xué)習(xí),大家可以看看內(nèi)容詳情
    2018-04-04
  • SpringBoot整合EasyExcel?3.x的完整示例

    SpringBoot整合EasyExcel?3.x的完整示例

    EasyExcel 是一個(gè)基于 Java 的、快速、簡(jiǎn)潔、解決大文件內(nèi)存溢出的 Excel 處理工具,它能讓你在不用考慮性能、內(nèi)存的等因素的情況下,快速完成 Excel 的讀、寫(xiě)等功能,這篇文章主要介紹了SpringBoot整合EasyExcel3.x的過(guò)程,需要的朋友可以參考下
    2023-07-07
  • 關(guān)于Maven如何構(gòu)建生命周期

    關(guān)于Maven如何構(gòu)建生命周期

    這篇文章主要介紹了關(guān)于Maven如何構(gòu)建生命周期,Maven構(gòu)建生命周期描述的是一次構(gòu)建過(guò)程經(jīng)歷經(jīng)歷了多少個(gè)事件,需要的朋友可以參考下
    2023-04-04
  • Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解

    Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解

    這篇文章主要介紹了Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • JDK安裝配置教程

    JDK安裝配置教程

    這篇文章主要為大家詳細(xì)介紹了JDK安裝配置教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-01-01
  • SpringBoot JPA使用配置過(guò)程詳解

    SpringBoot JPA使用配置過(guò)程詳解

    這篇文章主要介紹了SpringBoot JPA使用配置過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • 帶你快速了解Java中類(lèi)和對(duì)象的關(guān)系

    帶你快速了解Java中類(lèi)和對(duì)象的關(guān)系

    這篇文章主要給大家介紹了關(guān)于Java中類(lèi)和對(duì)象關(guān)系的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • SpringCloud Gateway的路由,過(guò)濾器和限流解讀

    SpringCloud Gateway的路由,過(guò)濾器和限流解讀

    這篇文章主要介紹了SpringCloud Gateway的路由,過(guò)濾器和限流解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-02-02

最新評(píng)論