Java中多線程Reactor模式的實現(xiàn)
多線程Reactor模式旨在分配多個reactor每一個reactor獨立擁有一個selector,在網(wǎng)絡(luò)通信中大體設(shè)計為負責(zé)連接的主Reactor,其中在主Reactor的run函數(shù)中若selector檢測到了連接事件的發(fā)生則dispatch該事件。
讓負責(zé)管理連接的Handler處理連接,其中在這個負責(zé)連接的Handler處理器中創(chuàng)建子Handler用以處理IO請求。這樣一來連接請求與IO請求分開執(zhí)行提高通道的并發(fā)量。同時多個Reactor帶來的好處是多個selector可以提高通道的檢索速度
1、 主服務(wù)器
package com.crazymakercircle.ReactorModel;
import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
Selector bossSelector = null;
Reactor bossReactor = null;
//selectors集合,引入多個selector選擇器
//多個選擇器可以更好的提高通道的并發(fā)量
Selector[] workSelectors = new Selector[2];
//引入多個子反應(yīng)器
//如果CPU是多核的可以開啟多個子Reactor反應(yīng)器,這樣每一個子Reactor反應(yīng)器還可以獨立分配一個線程。
//每一個線程可以單獨綁定一個單獨的Selector選擇器以提高通道并發(fā)量
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException {
bossSelector = Selector.open();
//初始化多個selector選擇器
workSelectors[0] = Selector.open();
workSelectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//第一個selector,負責(zé)監(jiān)控新連接事件
SelectionKey sk =
serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//附加新連接處理handler處理器到SelectionKey(選擇鍵)
sk.attach(new AcceptorHandler());
//處理新連接的反應(yīng)器
bossReactor = new Reactor(bossSelector);
//第一個子反應(yīng)器,一子反應(yīng)器負責(zé)一個選擇器
Reactor subReactor1 = new Reactor(workSelectors[0]);
//第二個子反應(yīng)器,一子反應(yīng)器負責(zé)一個選擇器
Reactor subReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{subReactor1, subReactor2};
}
private void startService() {
new Thread(bossReactor).start();
// 一子反應(yīng)器對應(yīng)一條線程
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反應(yīng)器
class Reactor implements Runnable {
//每條線程負責(zé)一個選擇器的查詢
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
//單位為毫秒
//每隔一秒列出選擇器感應(yīng)列表
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
//如果列表中的通道注冊事件沒有發(fā)生那就繼續(xù)執(zhí)行
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor負責(zé)dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
//清楚掉已經(jīng)處理過的感應(yīng)事件,防止重復(fù)處理
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//調(diào)用之前attach綁定到選擇鍵的handler處理器對象
if (handler != null) {
handler.run();
}
}
}
// Handler:新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一個新的連接");
if (channel != null) {
int index = next.get();
Logger.info("選擇器的編號:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
按上述的設(shè)計思想,在主服務(wù)器中實際上設(shè)計了三個Reactor,一個主Reactor專門負責(zé)連接請求并配已單獨的selector,但是三個Reactor的線程Run函數(shù)是做的相同的功能,都是根據(jù)每個線程內(nèi)部的selector進行檢索事件列表,若注冊的監(jiān)聽事件發(fā)生了則調(diào)用dispactch分發(fā)到每個Reactor對應(yīng)的Handler。
這里需要注意的一開始其實只有負責(zé)連接事件的主Reactor在注冊selector的時候給相應(yīng)的key配了一個AcceptorHandler()。
//第一個selector,負責(zé)監(jiān)控新連接事件
SelectionKey sk =
serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//附加新連接處理handler處理器到SelectionKey(選擇鍵)
sk.attach(new AcceptorHandler());
但是Reactor的run方法里若相應(yīng)的selector key發(fā)生了便要dispatch到一個Handler。這里其他兩個子Reactor的Handler在哪里賦值的呢?其實在處理連接請求的Reactor中便創(chuàng)建了各個子Handler,如下代碼所示:
主Handler中先是根據(jù)服務(wù)器channel創(chuàng)建出客服端channel,在進行子selector與channel的綁定。
int index = next.get();
Logger.info("選擇器的編號:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector, channel);
2、IO請求handler+線程池
package com.crazymakercircle.ReactorModel;
import com.crazymakercircle.util.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入線程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
//喚醒選擇,防止register時 boss線程被阻塞,netty 處理方式比較優(yōu)雅,會在同一個線程注冊事件,避免阻塞boss
selector.wakeup();
//僅僅取得選擇鍵,后設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將本Handler作為sk選擇鍵的附件,方便事件dispatch
sk.attach(this);
//向sk選擇鍵注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//喚醒選擇,是的OP_READ生效
selector.wakeup();
Logger.info("新的連接 注冊完成");
}
public void run() {
//異步任務(wù),在獨立的線程池中執(zhí)行
pool.execute(new AsyncTask());
}
//異步任務(wù),不在Reactor線程中執(zhí)行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//寫入通道
channel.write(byteBuffer);
//寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫模式
byteBuffer.clear();
//寫完后,注冊read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//寫完后,進入接收的狀態(tài)
state = RECIEVING;
} else if (state == RECIEVING) {
//從通道讀
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀模式
byteBuffer.flip();
//讀完后,注冊write就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
//讀完后,進入發(fā)送的狀態(tài)
state = SENDING;
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//異步任務(wù)的內(nèi)部類
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
3、客戶端
在處理IO請求的Handler中采用了線程池,已達到異步處理的目的。
package com.crazymakercircle.ReactorModel;
import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Dateutil;
import com.crazymakercircle.util.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* create by 尼恩 @ 瘋狂創(chuàng)客圈
**/
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1、獲取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
Logger.info("客戶端連接成功");
// 2、切換成非阻塞模式
socketChannel.configureBlocking(false);
//不斷的自旋、等待連接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Logger.tcfo("客戶端啟動成功!");
//啟動接受線程
Processer processer = new Processer(socketChannel);
new Thread(processer).start();
}
static class Processer implements Runnable {
final Selector selector;
final SocketChannel channel;
Processer(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Logger.tcfo("請輸入發(fā)送內(nèi)容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
buffer.flip();
// 操作三:發(fā)送數(shù)據(jù)
socketChannel.write(buffer);
buffer.clear();
}
}
if (sk.isReadable()) {
// 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù)
SocketChannel socketChannel = (SocketChannel) sk.channel();
//讀取數(shù)據(jù)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
到此這篇關(guān)于Java中多線程Reactor模式的實現(xiàn)的文章就介紹到這了,更多相關(guān)Java 多線程Reactor內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容
這篇文章主要介紹了java 中如何獲取字節(jié)碼文件的相關(guān)內(nèi)容的相關(guān)資料,需要的朋友可以參考下2017-04-04
Spring線程池ThreadPoolTaskExecutor配置詳情
本篇文章主要介紹了Spring線程池ThreadPoolTaskExecutor配置詳情,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03
簡單講解奇偶排序算法及在Java數(shù)組中的實現(xiàn)
這篇文章主要介紹了奇偶排序算法及Java數(shù)組的實現(xiàn),奇偶排序的時間復(fù)雜度為O(N^2),需要的朋友可以參考下2016-04-04

