Java中的異步非阻塞AIO模型詳解
1、AIO:異步非阻塞簡(jiǎn)介
AIO需要操作系統(tǒng)的支持,在linux內(nèi)核2.6版本中加入了對(duì)真正異步IO的支持,java從jdk1.7開始支持AIO
核心類有AsynchronousSocketChannel 、AsynchronousServerSocketChannel、AsynchronousChannelGroup
AsynchronousChannelGroup是異步Channel的分組管理器,它可以實(shí)現(xiàn)資源共享。
創(chuàng)建AsynchronousChannelGroup時(shí),需要傳入一個(gè)ExecutorService,也就是綁定一個(gè)線程池,該線程池負(fù)責(zé)兩個(gè)任務(wù):處理IO事件和觸發(fā)CompletionHandler回調(diào)接口。
2、AsynchronousServerSocketChannel:AIO中網(wǎng)絡(luò)通信服務(wù)端Socket
accept() 方法: AsynchronousServerSocketChannel創(chuàng)建成功后,類似于ServerSocket,也是調(diào)用 accept() 方法來接受來自客戶端的連接, 由于異步IO實(shí)際的IO操作是交給操作系統(tǒng)來做的,用戶進(jìn)程只負(fù)責(zé)通知操作系統(tǒng)進(jìn)行IO和接受操作系統(tǒng)IO完成的通知。 所以異步的ServerChannel調(diào)用 accept() 方法后,當(dāng)前線程不會(huì)阻塞, 程序也不知道accept()方法什么時(shí)候能夠接收到客戶端請(qǐng)求并且操作系統(tǒng)完成網(wǎng)絡(luò)IO, 為解決這個(gè)問題,AIO為accept()方法提供兩個(gè)版本:
Future<AsynchronousSocketChannel> accept() :
開始接收客戶端請(qǐng)求,如果當(dāng)前線程需要進(jìn)行網(wǎng)絡(luò)IO(即獲得AsynchronousSocketChannel),則應(yīng)該調(diào)用該方法返回的Future對(duì)象的get()方法,但是get()方法會(huì)阻塞該線程,所以這種方式是阻塞式的異步IO。
< A > void accept (Aattachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler):
開始接受來自客戶端請(qǐng)求,連接成功或失敗都會(huì)觸發(fā)CompletionHandler對(duì)象的相應(yīng)方法。
其中AsynchronousSocketChannel就代表該CompletionHandler處理器在處理連接成功時(shí)的result是AsynchronousSocketChannel的實(shí)例。
而CompletionHandler接口中定義了兩個(gè)方法,
completed(V result , A attachment):當(dāng)IO完成時(shí)觸發(fā)該方法,該方法的第一個(gè)參數(shù)代表IO操作返回的對(duì)象,
第二個(gè)參數(shù)代表發(fā)起IO操作時(shí)傳入的附加參數(shù)。
faild(Throwable exc, A attachment):當(dāng)IO失敗時(shí)觸發(fā)該方法,第一個(gè)參數(shù)代表IO操作失敗引發(fā)的異常或錯(cuò)誤。
3、AIO編程
服務(wù)端
public class AioServer {
private static int DEFAULT_PORT = 12345;
private static ServerHandler serverHandle;
public volatile static long clientCount = 0;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null)
return;
serverHandle = new ServerHandler(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args) {
AioServer.start();
}
}public class ServerHandler implements Runnable{
private AsynchronousServerSocketChannel channel;
public ServerHandler(int port) {
try {
//創(chuàng)建服務(wù)端通道
channel = AsynchronousServerSocketChannel.open();
//綁定端口
channel.bind(new InetSocketAddress(port));
System.out.println("服務(wù)端已啟動(dòng),端口號(hào):"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
channel.accept(this, new AcceptHandler());
// Future <AsynchronousSocketChannel> accept = channel.accept();
//該步操作是異步操作 防止當(dāng)前線程直接執(zhí)行結(jié)束
//方案1: while(true)+sleep
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// //方案2 CountDownLatch 作用:在完成一組正在執(zhí)行的操作之前,允許當(dāng)前的現(xiàn)場(chǎng)一直阻塞 此處,讓現(xiàn)場(chǎng)在此阻塞,防止服務(wù)端執(zhí)行完成后退出
//
// CountDownLatch count = new CountDownLatch(1);
// channel.accept(this, new AcceptHandler());
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
// CompletionHandler<V,A>
// V-IO操作的結(jié)果,這里是成功建立的連接,AsynchronousSocketChannel
// A-IO操作附件,這里傳入AsynchronousServerSocketChannel便于繼續(xù)接收請(qǐng)求建立新連接
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
//創(chuàng)建新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//異步讀 第三個(gè)參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
// channel.read(buffer, buffer, new ReadHandler(channel));
//繼續(xù)接受其他客戶端請(qǐng)求
serverHandler.channel.accept(null, this);
}
@Override
public void failed(Throwable exc, ServerHandler serverHandler) {
exc.printStackTrace();
}
}
class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> {
//用戶讀取或者發(fā)送消息的channel
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(ByteBuffer result, ByteBuffer attachment) {
result.flip();
byte[] msg = new byte[result.remaining()];
result.get(msg);
try {
String expression = new String(msg, "UTF-8");
System.out.println("服務(wù)器收到消息: " + expression);
// String result1 = "服務(wù)端收到消息\n";
result.clear();
//向客戶端發(fā)送消息
doWrite(expression);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//發(fā)送消息
private void doWrite(String msg) {
byte[] bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(bytes);
buffer.flip();
//異步寫數(shù)據(jù)
channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//如果沒有發(fā)送完,繼續(xù)發(fā)送
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
} else {
//創(chuàng)建新的Buffer
ByteBuffer allocate = ByteBuffer.allocate(1024);
//異步讀 第三個(gè)參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
// channel.read(allocate, attachment, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}客戶端
public class AioClient {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandler clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
return;
clientHandle = new ClientHandler(ip,port);
new Thread(clientHandle,"Client").start();
}
//向服務(wù)器發(fā)送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("exit")) return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) throws Exception{
AioClient.start();
System.out.println("請(qǐng)輸入請(qǐng)求消息:");
Scanner scanner = new Scanner(System.in);
while(AioClient.sendMsg(scanner.nextLine()));
}
}public class ClientHandler implements Runnable{
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public ClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//創(chuàng)建異步的客戶端通道
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//創(chuàng)建CountDownLatch等待
// latch = new CountDownLatch(1);
//發(fā)起異步連接操作,回調(diào)參數(shù)就是這個(gè)類本身,如果連接成功會(huì)回調(diào)completed方法
clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler());
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// try {
// latch.await();
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// try {
// clientChannel.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
}
//向服務(wù)器發(fā)送消息
public void sendMsg(String msg){
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
System.out.println(">>>>>>msg:"+msg);
writeBuffer.put(req);
writeBuffer.flip();
//異步寫
clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel));
}
/**
* 接收類
*/
class AcceptHandler implements CompletionHandler<Void, ClientHandler> {
public AcceptHandler() {}
@Override
public void completed(Void result, ClientHandler attachment) {
System.out.println("連接服務(wù)器成功");
}
@Override
public void failed(Throwable exc, ClientHandler attachment) {
exc.printStackTrace();
try {
attachment.clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public WriteHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//完成全部數(shù)據(jù)的寫入
if (attachment.hasRemaining()) {
//數(shù)據(jù)沒有寫完,繼續(xù)寫
System.out.println("WriteHandler.hasRemaining>>>>>");
clientChannel.write(attachment, attachment, this);
} else {
//讀取數(shù)據(jù)
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
public ReadHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("客戶端收到結(jié)果:"+ body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
System.err.println("數(shù)據(jù)讀取失敗...");
try {
clientChannel.close();
} catch (IOException e) {
}
}
}
}到此這篇關(guān)于Java中的異步非阻塞AIO模型詳解的文章就介紹到這了,更多相關(guān)Java的AIO模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot和Vue2項(xiàng)目配置https協(xié)議過程
本文詳細(xì)介紹了SpringBoot項(xiàng)目和Vue2項(xiàng)目的部署流程及SSL證書配置,對(duì)于SpringBoot項(xiàng)目,需將.pfx文件放入resources目錄并配置server,然后打包部署,Vue2項(xiàng)目中,涉及檢查nginx的SSL模塊、編譯新的nginx文件2024-10-10
Java 重命名 Excel 工作表并設(shè)置工作表標(biāo)簽顏色的示例代碼
這篇文章主要介紹了Java 重命名 Excel 工作表并設(shè)置工作表標(biāo)簽顏色的示例代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Spring?Boot?中的?@HystrixCommand?注解原理及使用方法
通過使用 @HystrixCommand 注解,我們可以輕松地實(shí)現(xiàn)對(duì)方法的隔離和監(jiān)控,從而提高系統(tǒng)的可靠性和穩(wěn)定性,本文介紹了Spring Boot 中的@HystrixCommand注解是什么,其原理以及如何使用,感興趣的朋友跟隨小編一起看看吧2023-07-07
java實(shí)現(xiàn)簡(jiǎn)單聊天室單人版
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)單聊天室的單人版,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07
Springboot基于Redisson實(shí)現(xiàn)Redis分布式可重入鎖源碼解析
這篇文章主要介紹了Springboot基于Redisson實(shí)現(xiàn)Redis分布式可重入鎖,本文通過案例源碼分析給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-03-03

