Java中Reactor的反應(yīng)器模式詳解
前言
在Java的OIO編程中,最初和最原始的網(wǎng)絡(luò)服務(wù)器程序使用一個(gè)while循環(huán),不斷地監(jiān)聽(tīng)端口是否有新的連接,如果有就調(diào)用一個(gè)處理函數(shù)來(lái)處理。這種方法最大的問(wèn)題就是如果前一個(gè)網(wǎng)絡(luò)連接的處理沒(méi)有結(jié)束,那么后面的連接請(qǐng)求沒(méi)法被接收,于是后面的請(qǐng)求統(tǒng)統(tǒng)會(huì)被阻塞住,服務(wù)器的吞吐量就太低了。
為了解決這個(gè)嚴(yán)重的連接阻塞問(wèn)題,出現(xiàn)了一個(gè)即為經(jīng)典模式:Connection Per Thread。即對(duì)于每一個(gè)新的網(wǎng)絡(luò)連接都分配一個(gè)線程,每個(gè)線程都獨(dú)自處理自己負(fù)責(zé)的輸入和輸出,任何socket連接的輸入和輸出處理不會(huì)阻塞到后面新socket連接的監(jiān)聽(tīng)和建立。早期版本的Tomcat服務(wù)器就是這樣實(shí)現(xiàn)的。
這種模式的優(yōu)點(diǎn)是解決了前面的新連接被嚴(yán)重阻塞的問(wèn)題,在一定程度上極大地提高了服務(wù)器的吞吐量。但是對(duì)于大量的連接,需要消耗大量的現(xiàn)成資源,如果線程數(shù)太多,系統(tǒng)無(wú)法承受。而且線程的反復(fù)創(chuàng)建、銷(xiāo)毀、線程的切換也需要代價(jià)。因此高并發(fā)應(yīng)用場(chǎng)景下多線程O(píng)IO的缺陷是致命的,因此引入了Reactor反應(yīng)器模式。
反應(yīng)器模式由Reactor反應(yīng)器線程、Handlers處理器兩大角色組成:
- Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件,并且分發(fā)到Handlers處理器
- Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯
一、單線程Reactor反應(yīng)器模式
Reactor反應(yīng)器模式有點(diǎn)兒類(lèi)似事件驅(qū)動(dòng)模式,當(dāng)有事件觸發(fā)時(shí),事件源會(huì)將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理。反應(yīng)器模式中的反應(yīng)器角色類(lèi)似于事件驅(qū)動(dòng)模式中的dispatcher事件分發(fā)器角色。
- Reactor反應(yīng)器:負(fù)責(zé)查詢(xún)IO事件,當(dāng)檢測(cè)到一個(gè)IO時(shí)間,將其發(fā)送給對(duì)應(yīng)的Handler處理器處理,這里的IO事件就是NIO選擇器監(jiān)控的通道IO事件。
- Handler處理器:與IO事件綁定,負(fù)責(zé)IO事件的處理,完成真正的連接建立、通道的讀取、處理業(yè)務(wù)邏輯、負(fù)責(zé)將結(jié)果寫(xiě)出到通道等。
基于NIO實(shí)現(xiàn)單線程版本的反應(yīng)器模式需要用到SelectionKey選擇鍵的幾個(gè)重要的成員方法:
- void attach(Object o):將任何的Java對(duì)象作為附件添加到SelectionKey實(shí)例,主要是將Handler處理器實(shí)例作為附件添加到SelectionKey實(shí)例
- Object attachment():取出之前通過(guò)attach添加到SelectionKey選擇鍵實(shí)例的附件,一般用于取出綁定的Handler處理器實(shí)例。
Reactor實(shí)現(xiàn)示例:
package cn.ken.jredis; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; /** * <pre> * * </pre> * * @author <a rel="external nofollow" rel="external nofollow" rel="external nofollow" rel="external nofollow" >Ken-Chy129</a> * @since 2023/10/14 14:29 */ public class Reactor implements Runnable { final private Selector selector; final private ServerSocketChannel serverSocketChannel; public Reactor() { try { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8088)); // 注冊(cè)ServerSocket的accept事件 SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 為事件綁定處理器 sk.attach(new AcceptHandler()); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectedKey : selectionKeys) { dispatch(selectedKey); } selectionKeys.clear(); } } catch (Exception e) { throw new RuntimeException(e); } } private void dispatch(SelectionKey selectedKey) { Runnable handler = (Runnable) selectedKey.attachment(); // 此處返回的可能是AcceptHandler也可能是IOHandler handler.run(); } class AcceptHandler implements Runnable { @Override public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { new IOHandler(selector, socketChannel); // 注冊(cè)IO處理器,并將連接加入select列表 } } catch (IOException e) { throw new RuntimeException(e); } } } public static void main(String[] args) { new Reactor().run(); } }
Handler實(shí)現(xiàn)示例:
package cn.ken.jredis; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * <pre> * * </pre> * * @author <a rel="external nofollow" rel="external nofollow" rel="external nofollow" rel="external nofollow" >Ken-Chy129</a> * @since 2023/10/14 14:53 */ public class IOHandler implements Runnable { final private SocketChannel socketChannel; final private ByteBuffer buffer; public IOHandler(Selector selector, SocketChannel channel) { buffer = ByteBuffer.allocate(1024); socketChannel = channel; try { channel.configureBlocking(false); SelectionKey sk = channel.register(selector, 0); // 此處沒(méi)有注冊(cè)感興趣的事件 sk.attach(this); sk.interestOps(SelectionKey.OP_READ); // 注冊(cè)感興趣的事件,下一次調(diào)用select時(shí)才生效 selector.wakeup(); // 立即喚醒當(dāng)前阻塞select操作,使得迅速進(jìn)入下次select,從而讓上面注冊(cè)的讀事件監(jiān)聽(tīng)可以立即生效 } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { try { int length; while ((length = socketChannel.read(buffer)) > 0) { System.out.println(new String(buffer.array(), 0, length)); } } catch (IOException e) { throw new RuntimeException(e); } } }
在單線程反應(yīng)器模式中,Reactor反應(yīng)器和Handler處理器都執(zhí)行在同一條線程上(dispatch方法是直接調(diào)用run方法,沒(méi)有創(chuàng)建新的線程),因此當(dāng)其中某個(gè)Handler阻塞時(shí),會(huì)導(dǎo)致其他所有的Handler都得不到執(zhí)行。
二、多線程Reactor反應(yīng)器模式
既然Reactor反應(yīng)器和Handler處理器在一個(gè)線程會(huì)造成非常嚴(yán)重的性能缺陷,那么可以使用多線程對(duì)基礎(chǔ)的反應(yīng)器模式進(jìn)行改造。
- 將負(fù)責(zé)輸入輸出處理的IOHandler處理器的執(zhí)行,放入獨(dú)立的線程池中。這樣業(yè)務(wù)處理線程與負(fù)責(zé)服務(wù)監(jiān)聽(tīng)和IO時(shí)間查詢(xún)的反應(yīng)器線程相隔離,避免服務(wù)器的連接監(jiān)聽(tīng)收到阻塞。
- 如果服務(wù)器為多核的CPU,可以將反應(yīng)器線程拆分為多個(gè)子反應(yīng)器線程,同時(shí)引入多個(gè)選擇器,每一個(gè)SubReactor子線程負(fù)責(zé)一個(gè)選擇器。
MultiReactor:
package cn.ken.jredis; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** * <pre> * * </pre> * * @author <a rel="external nofollow" rel="external nofollow" rel="external nofollow" rel="external nofollow" >Ken-Chy129</a> * @since 2023/10/14 16:51 */ public class MultiReactor { private final ServerSocketChannel server; private final Selector[] selectors = new Selector[2]; private final SubReactor[] reactors = new SubReactor[2]; private final AtomicInteger index = new AtomicInteger(0); public MultiReactor() { try { server = ServerSocketChannel.open(); selectors[0] = Selector.open(); selectors[1] = Selector.open(); server.bind(new InetSocketAddress(8080)); server.configureBlocking(false); SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT); register.attach(new AcceptHandler()); reactors[0] = new SubReactor(selectors[0]); reactors[1] = new SubReactor(selectors[1]); } catch (IOException e) { throw new RuntimeException(e); } } private void startService() { new Thread(reactors[0]).start(); new Thread(reactors[1]).start(); } class SubReactor implements Runnable { final private Selector selector; public SubReactor(Selector selector) { this.selector = selector; } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { throw new RuntimeException(e); } } } } private void dispatch(SelectionKey selectionKey) { Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { attachment.run(); } } class AcceptHandler implements Runnable { @Override public void run() { try { SocketChannel socketChannel = server.accept(); new MultiHandler(selectors[index.getAndIncrement()], socketChannel); if (index.get() == selectors.length) { index.set(0); } } catch (IOException e) { throw new RuntimeException(e); } } } }
MultiHandler:
package cn.ken.jredis; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * <pre> * * </pre> * * @author <a rel="external nofollow" rel="external nofollow" rel="external nofollow" rel="external nofollow" >Ken-Chy129</a> * @since 2023/10/14 17:28 */ public class MultiHandler implements Runnable { final private Selector selector; final private SocketChannel channel; final ByteBuffer buffer = ByteBuffer.allocate(1024); static ExecutorService pool = Executors.newFixedThreadPool(4); public MultiHandler(Selector selector, SocketChannel channel) { this.selector = selector; this.channel = channel; try { channel.configureBlocking(false); SelectionKey register = channel.register(selector, SelectionKey.OP_READ); register.attach(this); selector.wakeup(); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { pool.execute(() -> { synchronized (this) { int length; try { while ((length = channel.read(buffer)) > 0) { System.out.println(new String(buffer.array(), 0, length)); buffer.clear(); } } catch (IOException e) { throw new RuntimeException(e); } } }); } }
到此這篇關(guān)于Java中Reactor的反應(yīng)器模式詳解的文章就介紹到這了,更多相關(guān)Reactor反應(yīng)器模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Cloud?Gateway編碼實(shí)現(xiàn)任意地址跳轉(zhuǎn)的示例
本文主要介紹了Spring?Cloud?Gateway編碼實(shí)現(xiàn)任意地址跳轉(zhuǎn)的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12Jenkins自動(dòng)構(gòu)建部署項(xiàng)目到遠(yuǎn)程服務(wù)器上的方法步驟
這篇文章主要介紹了Jenkins自動(dòng)構(gòu)建部署項(xiàng)目到遠(yuǎn)程服務(wù)器上的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01SpringBoot整合EasyExcel?3.x的完整示例
EasyExcel 是一個(gè)基于 Java 的、快速、簡(jiǎn)潔、解決大文件內(nèi)存溢出的 Excel 處理工具,它能讓你在不用考慮性能、內(nèi)存的等因素的情況下,快速完成 Excel 的讀、寫(xiě)等功能,這篇文章主要介紹了SpringBoot整合EasyExcel3.x的過(guò)程,需要的朋友可以參考下2023-07-07Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解
這篇文章主要介紹了Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12帶你快速了解Java中類(lèi)和對(duì)象的關(guān)系
這篇文章主要給大家介紹了關(guān)于Java中類(lèi)和對(duì)象關(guān)系的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04SpringCloud Gateway的路由,過(guò)濾器和限流解讀
這篇文章主要介紹了SpringCloud Gateway的路由,過(guò)濾器和限流解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02