Java中的異步非阻塞AIO模型詳解
1、AIO:異步非阻塞簡介
AIO需要操作系統(tǒng)的支持,在linux內(nèi)核2.6版本中加入了對真正異步IO的支持,java從jdk1.7開始支持AIO
核心類有AsynchronousSocketChannel 、AsynchronousServerSocketChannel、AsynchronousChannelGroup
AsynchronousChannelGroup是異步Channel的分組管理器,它可以實現(xiàn)資源共享。
創(chuàng)建AsynchronousChannelGroup時,需要傳入一個ExecutorService,也就是綁定一個線程池,該線程池負責(zé)兩個任務(wù):處理IO事件和觸發(fā)CompletionHandler回調(diào)接口。
2、AsynchronousServerSocketChannel:AIO中網(wǎng)絡(luò)通信服務(wù)端Socket
accept() 方法: AsynchronousServerSocketChannel創(chuàng)建成功后,類似于ServerSocket,也是調(diào)用 accept() 方法來接受來自客戶端的連接, 由于異步IO實際的IO操作是交給操作系統(tǒng)來做的,用戶進程只負責(zé)通知操作系統(tǒng)進行IO和接受操作系統(tǒng)IO完成的通知。 所以異步的ServerChannel調(diào)用 accept() 方法后,當前線程不會阻塞, 程序也不知道accept()方法什么時候能夠接收到客戶端請求并且操作系統(tǒng)完成網(wǎng)絡(luò)IO, 為解決這個問題,AIO為accept()方法提供兩個版本:
Future<AsynchronousSocketChannel> accept() :
開始接收客戶端請求,如果當前線程需要進行網(wǎng)絡(luò)IO(即獲得AsynchronousSocketChannel),則應(yīng)該調(diào)用該方法返回的Future對象的get()方法,但是get()方法會阻塞該線程,所以這種方式是阻塞式的異步IO。
< A > void accept (Aattachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler):
開始接受來自客戶端請求,連接成功或失敗都會觸發(fā)CompletionHandler對象的相應(yīng)方法。
其中AsynchronousSocketChannel就代表該CompletionHandler處理器在處理連接成功時的result是AsynchronousSocketChannel的實例。
而CompletionHandler接口中定義了兩個方法,
completed(V result , A attachment):當IO完成時觸發(fā)該方法,該方法的第一個參數(shù)代表IO操作返回的對象,
第二個參數(shù)代表發(fā)起IO操作時傳入的附加參數(shù)。
faild(Throwable exc, A attachment):當IO失敗時觸發(fā)該方法,第一個參數(shù)代表IO操作失敗引發(fā)的異?;蝈e誤。
3、AIO編程
服務(wù)端
public class AioServer { private static int DEFAULT_PORT = 12345; private static ServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new ServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args) { AioServer.start(); } }
public class ServerHandler implements Runnable{ private AsynchronousServerSocketChannel channel; public ServerHandler(int port) { try { //創(chuàng)建服務(wù)端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("服務(wù)端已啟動,端口號:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { channel.accept(this, new AcceptHandler()); // Future <AsynchronousSocketChannel> accept = channel.accept(); //該步操作是異步操作 防止當前線程直接執(zhí)行結(jié)束 //方案1: while(true)+sleep while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // //方案2 CountDownLatch 作用:在完成一組正在執(zhí)行的操作之前,允許當前的現(xiàn)場一直阻塞 此處,讓現(xiàn)場在此阻塞,防止服務(wù)端執(zhí)行完成后退出 // // CountDownLatch count = new CountDownLatch(1); // channel.accept(this, new AcceptHandler()); // try { // count.await(); // } catch (InterruptedException e) { // e.printStackTrace(); // } } // CompletionHandler<V,A> // V-IO操作的結(jié)果,這里是成功建立的連接,AsynchronousSocketChannel // A-IO操作附件,這里傳入AsynchronousServerSocketChannel便于繼續(xù)接收請求建立新連接 class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> { @Override public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) { //創(chuàng)建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //異步讀 第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler // channel.read(buffer, buffer, new ReadHandler(channel)); //繼續(xù)接受其他客戶端請求 serverHandler.channel.accept(null, this); } @Override public void failed(Throwable exc, ServerHandler serverHandler) { exc.printStackTrace(); } } class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> { //用戶讀取或者發(fā)送消息的channel private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(ByteBuffer result, ByteBuffer attachment) { result.flip(); byte[] msg = new byte[result.remaining()]; result.get(msg); try { String expression = new String(msg, "UTF-8"); System.out.println("服務(wù)器收到消息: " + expression); // String result1 = "服務(wù)端收到消息\n"; result.clear(); //向客戶端發(fā)送消息 doWrite(expression); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //發(fā)送消息 private void doWrite(String msg) { byte[] bytes = msg.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(bytes); buffer.flip(); //異步寫數(shù)據(jù) channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { //如果沒有發(fā)送完,繼續(xù)發(fā)送 if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); } else { //創(chuàng)建新的Buffer ByteBuffer allocate = ByteBuffer.allocate(1024); //異步讀 第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler // channel.read(allocate, attachment, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } }
客戶端
public class AioClient { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static ClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new ClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向服務(wù)器發(fā)送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("exit")) return false; clientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception{ AioClient.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } }
public class ClientHandler implements Runnable{ private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public ClientHandler(String host, int port) { this.host = host; this.port = port; try { //創(chuàng)建異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //創(chuàng)建CountDownLatch等待 // latch = new CountDownLatch(1); //發(fā)起異步連接操作,回調(diào)參數(shù)就是這個類本身,如果連接成功會回調(diào)completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler()); while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // try { // latch.await(); // } catch (InterruptedException e1) { // e1.printStackTrace(); // } // try { // clientChannel.close(); // } catch (IOException e) { // e.printStackTrace(); // } } //向服務(wù)器發(fā)送消息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); System.out.println(">>>>>>msg:"+msg); writeBuffer.put(req); writeBuffer.flip(); //異步寫 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel)); } /** * 接收類 */ class AcceptHandler implements CompletionHandler<Void, ClientHandler> { public AcceptHandler() {} @Override public void completed(Void result, ClientHandler attachment) { System.out.println("連接服務(wù)器成功"); } @Override public void failed(Throwable exc, ClientHandler attachment) { exc.printStackTrace(); try { attachment.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public WriteHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { //完成全部數(shù)據(jù)的寫入 if (attachment.hasRemaining()) { //數(shù)據(jù)沒有寫完,繼續(xù)寫 System.out.println("WriteHandler.hasRemaining>>>>>"); clientChannel.write(attachment, attachment, this); } else { //讀取數(shù)據(jù) ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; public ReadHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客戶端收到結(jié)果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數(shù)據(jù)讀取失敗..."); try { clientChannel.close(); } catch (IOException e) { } } } }
到此這篇關(guān)于Java中的異步非阻塞AIO模型詳解的文章就介紹到這了,更多相關(guān)Java的AIO模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot和Vue2項目配置https協(xié)議過程
本文詳細介紹了SpringBoot項目和Vue2項目的部署流程及SSL證書配置,對于SpringBoot項目,需將.pfx文件放入resources目錄并配置server,然后打包部署,Vue2項目中,涉及檢查nginx的SSL模塊、編譯新的nginx文件2024-10-10Java 重命名 Excel 工作表并設(shè)置工作表標簽顏色的示例代碼
這篇文章主要介紹了Java 重命名 Excel 工作表并設(shè)置工作表標簽顏色的示例代碼,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10Spring?Boot?中的?@HystrixCommand?注解原理及使用方法
通過使用 @HystrixCommand 注解,我們可以輕松地實現(xiàn)對方法的隔離和監(jiān)控,從而提高系統(tǒng)的可靠性和穩(wěn)定性,本文介紹了Spring Boot 中的@HystrixCommand注解是什么,其原理以及如何使用,感興趣的朋友跟隨小編一起看看吧2023-07-07Springboot基于Redisson實現(xiàn)Redis分布式可重入鎖源碼解析
這篇文章主要介紹了Springboot基于Redisson實現(xiàn)Redis分布式可重入鎖,本文通過案例源碼分析給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-03-03