Java中多線程Reactor模式的實(shí)現(xiàn)
多線程Reactor模式旨在分配多個(gè)reactor每一個(gè)reactor獨(dú)立擁有一個(gè)selector,在網(wǎng)絡(luò)通信中大體設(shè)計(jì)為負(fù)責(zé)連接的主Reactor,其中在主Reactor的run函數(shù)中若selector檢測(cè)到了連接事件的發(fā)生則dispatch該事件。
讓負(fù)責(zé)管理連接的Handler處理連接,其中在這個(gè)負(fù)責(zé)連接的Handler處理器中創(chuàng)建子Handler用以處理IO請(qǐng)求。這樣一來連接請(qǐng)求與IO請(qǐng)求分開執(zhí)行提高通道的并發(fā)量。同時(shí)多個(gè)Reactor帶來的好處是多個(gè)selector可以提高通道的檢索速度
1、 主服務(wù)器
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; class MultiThreadEchoServerReactor { ServerSocketChannel serverSocket; AtomicInteger next = new AtomicInteger(0); Selector bossSelector = null; Reactor bossReactor = null; //selectors集合,引入多個(gè)selector選擇器 //多個(gè)選擇器可以更好的提高通道的并發(fā)量 Selector[] workSelectors = new Selector[2]; //引入多個(gè)子反應(yīng)器 //如果CPU是多核的可以開啟多個(gè)子Reactor反應(yīng)器,這樣每一個(gè)子Reactor反應(yīng)器還可以獨(dú)立分配一個(gè)線程。 //每一個(gè)線程可以單獨(dú)綁定一個(gè)單獨(dú)的Selector選擇器以提高通道并發(fā)量 Reactor[] workReactors = null; MultiThreadEchoServerReactor() throws IOException { bossSelector = Selector.open(); //初始化多個(gè)selector選擇器 workSelectors[0] = Selector.open(); workSelectors[1] = Selector.open(); serverSocket = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); serverSocket.socket().bind(address); //非阻塞 serverSocket.configureBlocking(false); //第一個(gè)selector,負(fù)責(zé)監(jiān)控新連接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新連接處理handler處理器到SelectionKey(選擇鍵) sk.attach(new AcceptorHandler()); //處理新連接的反應(yīng)器 bossReactor = new Reactor(bossSelector); //第一個(gè)子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器 Reactor subReactor1 = new Reactor(workSelectors[0]); //第二個(gè)子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器 Reactor subReactor2 = new Reactor(workSelectors[1]); workReactors = new Reactor[]{subReactor1, subReactor2}; } private void startService() { new Thread(bossReactor).start(); // 一子反應(yīng)器對(duì)應(yīng)一條線程 new Thread(workReactors[0]).start(); new Thread(workReactors[1]).start(); } //反應(yīng)器 class Reactor implements Runnable { //每條線程負(fù)責(zé)一個(gè)選擇器的查詢 final Selector selector; public Reactor(Selector selector) { this.selector = selector; } public void run() { try { while (!Thread.interrupted()) { //單位為毫秒 //每隔一秒列出選擇器感應(yīng)列表 selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (null == selectedKeys || selectedKeys.size() == 0) { //如果列表中的通道注冊(cè)事件沒有發(fā)生那就繼續(xù)執(zhí)行 continue; } Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { //Reactor負(fù)責(zé)dispatch收到的事件 SelectionKey sk = it.next(); dispatch(sk); } //清楚掉已經(jīng)處理過的感應(yīng)事件,防止重復(fù)處理 selectedKeys.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } void dispatch(SelectionKey sk) { Runnable handler = (Runnable) sk.attachment(); //調(diào)用之前attach綁定到選擇鍵的handler處理器對(duì)象 if (handler != null) { handler.run(); } } } // Handler:新連接處理器 class AcceptorHandler implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); Logger.info("接收到一個(gè)新的連接"); if (channel != null) { int index = next.get(); Logger.info("選擇器的編號(hào):" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } if (next.incrementAndGet() == workSelectors.length) { next.set(0); } } } public static void main(String[] args) throws IOException { MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor(); server.startService(); } }
按上述的設(shè)計(jì)思想,在主服務(wù)器中實(shí)際上設(shè)計(jì)了三個(gè)Reactor,一個(gè)主Reactor專門負(fù)責(zé)連接請(qǐng)求并配已單獨(dú)的selector,但是三個(gè)Reactor的線程Run函數(shù)是做的相同的功能,都是根據(jù)每個(gè)線程內(nèi)部的selector進(jìn)行檢索事件列表,若注冊(cè)的監(jiān)聽事件發(fā)生了則調(diào)用dispactch分發(fā)到每個(gè)Reactor對(duì)應(yīng)的Handler。
這里需要注意的一開始其實(shí)只有負(fù)責(zé)連接事件的主Reactor在注冊(cè)selector的時(shí)候給相應(yīng)的key配了一個(gè)AcceptorHandler()。
//第一個(gè)selector,負(fù)責(zé)監(jiān)控新連接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新連接處理handler處理器到SelectionKey(選擇鍵) sk.attach(new AcceptorHandler());
但是Reactor的run方法里若相應(yīng)的selector key發(fā)生了便要dispatch到一個(gè)Handler。這里其他兩個(gè)子Reactor的Handler在哪里賦值的呢?其實(shí)在處理連接請(qǐng)求的Reactor中便創(chuàng)建了各個(gè)子Handler,如下代碼所示:
主Handler中先是根據(jù)服務(wù)器channel創(chuàng)建出客服端channel,在進(jìn)行子selector與channel的綁定。
int index = next.get(); Logger.info("選擇器的編號(hào):" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel);
2、IO請(qǐng)求handler+線程池
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MultiThreadEchoHandler implements Runnable { final SocketChannel channel; final SelectionKey sk; final ByteBuffer byteBuffer = ByteBuffer.allocate(1024); static final int RECIEVING = 0, SENDING = 1; int state = RECIEVING; //引入線程池 static ExecutorService pool = Executors.newFixedThreadPool(4); MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException { channel = c; channel.configureBlocking(false); //喚醒選擇,防止register時(shí) boss線程被阻塞,netty 處理方式比較優(yōu)雅,會(huì)在同一個(gè)線程注冊(cè)事件,避免阻塞boss selector.wakeup(); //僅僅取得選擇鍵,后設(shè)置感興趣的IO事件 sk = channel.register(selector, 0); //將本Handler作為sk選擇鍵的附件,方便事件dispatch sk.attach(this); //向sk選擇鍵注冊(cè)Read就緒事件 sk.interestOps(SelectionKey.OP_READ); //喚醒選擇,是的OP_READ生效 selector.wakeup(); Logger.info("新的連接 注冊(cè)完成"); } public void run() { //異步任務(wù),在獨(dú)立的線程池中執(zhí)行 pool.execute(new AsyncTask()); } //異步任務(wù),不在Reactor線程中執(zhí)行 public synchronized void asyncRun() { try { if (state == SENDING) { //寫入通道 channel.write(byteBuffer); //寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫模式 byteBuffer.clear(); //寫完后,注冊(cè)read就緒事件 sk.interestOps(SelectionKey.OP_READ); //寫完后,進(jìn)入接收的狀態(tài) state = RECIEVING; } else if (state == RECIEVING) { //從通道讀 int length = 0; while ((length = channel.read(byteBuffer)) > 0) { Logger.info(new String(byteBuffer.array(), 0, length)); } //讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀模式 byteBuffer.flip(); //讀完后,注冊(cè)write就緒事件 sk.interestOps(SelectionKey.OP_WRITE); //讀完后,進(jìn)入發(fā)送的狀態(tài) state = SENDING; } //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用 //sk.cancel(); } catch (IOException ex) { ex.printStackTrace(); } } //異步任務(wù)的內(nèi)部類 class AsyncTask implements Runnable { public void run() { MultiThreadEchoHandler.this.asyncRun(); } } }
3、客戶端
在處理IO請(qǐng)求的Handler中采用了線程池,已達(dá)到異步處理的目的。
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Dateutil; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * create by 尼恩 @ 瘋狂創(chuàng)客圈 **/ public class EchoClient { public void start() throws IOException { InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); // 1、獲取通道(channel) SocketChannel socketChannel = SocketChannel.open(address); Logger.info("客戶端連接成功"); // 2、切換成非阻塞模式 socketChannel.configureBlocking(false); //不斷的自旋、等待連接完成,或者做一些其他的事情 while (!socketChannel.finishConnect()) { } Logger.tcfo("客戶端啟動(dòng)成功!"); //啟動(dòng)接受線程 Processer processer = new Processer(socketChannel); new Thread(processer).start(); } static class Processer implements Runnable { final Selector selector; final SocketChannel channel; Processer(SocketChannel channel) throws IOException { //Reactor初始化 selector = Selector.open(); this.channel = channel; channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); if (sk.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); Scanner scanner = new Scanner(System.in); Logger.tcfo("請(qǐng)輸入發(fā)送內(nèi)容:"); if (scanner.hasNext()) { SocketChannel socketChannel = (SocketChannel) sk.channel(); String next = scanner.next(); buffer.put((Dateutil.getNow() + " >>" + next).getBytes()); buffer.flip(); // 操作三:發(fā)送數(shù)據(jù) socketChannel.write(buffer); buffer.clear(); } } if (sk.isReadable()) { // 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù) SocketChannel socketChannel = (SocketChannel) sk.channel(); //讀取數(shù)據(jù) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0; while ((length = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); Logger.info("server echo:" + new String(byteBuffer.array(), 0, length)); byteBuffer.clear(); } } //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用 //selectionKey.cancel(); } selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new EchoClient().start(); } }
到此這篇關(guān)于Java中多線程Reactor模式的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java 多線程Reactor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容
這篇文章主要介紹了java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容的相關(guān)資料,需要的朋友可以參考下2017-04-04Spring線程池ThreadPoolTaskExecutor配置詳情
本篇文章主要介紹了Spring線程池ThreadPoolTaskExecutor配置詳情,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03spring MVC中傳遞對(duì)象參數(shù)示例詳解
這篇文章主要給大家介紹了在spring MVC中傳遞對(duì)象參數(shù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看吧。2017-06-06Feign Client 超時(shí)時(shí)間配置不生效的解決
這篇文章主要介紹了Feign Client 超時(shí)時(shí)間配置不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09簡(jiǎn)單講解奇偶排序算法及在Java數(shù)組中的實(shí)現(xiàn)
這篇文章主要介紹了奇偶排序算法及Java數(shù)組的實(shí)現(xiàn),奇偶排序的時(shí)間復(fù)雜度為O(N^2),需要的朋友可以參考下2016-04-04使用javax.sound實(shí)現(xiàn)簡(jiǎn)單音頻播放
這篇文章主要為大家詳細(xì)介紹了使用javax.sound實(shí)現(xiàn)簡(jiǎn)單音頻播放,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03