欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的異步非阻塞AIO模型詳解

 更新時間:2023年09月19日 10:20:34   作者:超大充電寶  
這篇文章主要介紹了Java中的異步非阻塞AIO模型詳解,AIO需要操作系統(tǒng)的支持,在linux內(nèi)核2.6版本中加入了對真正異步IO的支持,java從jdk1.7開始支持AIO,本文提供了部分實現(xiàn)代碼,需要的朋友可以參考下

1、AIO:異步非阻塞簡介

AIO需要操作系統(tǒng)的支持,在linux內(nèi)核2.6版本中加入了對真正異步IO的支持,java從jdk1.7開始支持AIO

核心類有AsynchronousSocketChannel 、AsynchronousServerSocketChannel、AsynchronousChannelGroup

AsynchronousChannelGroup是異步Channel的分組管理器,它可以實現(xiàn)資源共享。

創(chuàng)建AsynchronousChannelGroup時,需要傳入一個ExecutorService,也就是綁定一個線程池,該線程池負責(zé)兩個任務(wù):處理IO事件和觸發(fā)CompletionHandler回調(diào)接口。

2、AsynchronousServerSocketChannel:AIO中網(wǎng)絡(luò)通信服務(wù)端Socket

accept() 方法: AsynchronousServerSocketChannel創(chuàng)建成功后,類似于ServerSocket,也是調(diào)用 accept() 方法來接受來自客戶端的連接, 由于異步IO實際的IO操作是交給操作系統(tǒng)來做的,用戶進程只負責(zé)通知操作系統(tǒng)進行IO和接受操作系統(tǒng)IO完成的通知。 所以異步的ServerChannel調(diào)用 accept() 方法后,當前線程不會阻塞, 程序也不知道accept()方法什么時候能夠接收到客戶端請求并且操作系統(tǒng)完成網(wǎng)絡(luò)IO, 為解決這個問題,AIO為accept()方法提供兩個版本:

Future<AsynchronousSocketChannel> accept() :
開始接收客戶端請求,如果當前線程需要進行網(wǎng)絡(luò)IO(即獲得AsynchronousSocketChannel),則應(yīng)該調(diào)用該方法返回的Future對象的get()方法,但是get()方法會阻塞該線程,所以這種方式是阻塞式的異步IO。

<&nbsp;A&nbsp;> void accept (Aattachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler):
開始接受來自客戶端請求,連接成功或失敗都會觸發(fā)CompletionHandler對象的相應(yīng)方法。
其中AsynchronousSocketChannel就代表該CompletionHandler處理器在處理連接成功時的result是AsynchronousSocketChannel的實例。
而CompletionHandler接口中定義了兩個方法,
completed(V result , A attachment):當IO完成時觸發(fā)該方法,該方法的第一個參數(shù)代表IO操作返回的對象,
第二個參數(shù)代表發(fā)起IO操作時傳入的附加參數(shù)。
faild(Throwable exc, A attachment):當IO失敗時觸發(fā)該方法,第一個參數(shù)代表IO操作失敗引發(fā)的異?;蝈e誤。

3、AIO編程

服務(wù)端

public class AioServer {
    private static int DEFAULT_PORT = 12345;
    private static ServerHandler serverHandle;
    public volatile static long clientCount = 0;
    public static void start(){
        start(DEFAULT_PORT);
    }
    public static synchronized void start(int port){
        if(serverHandle!=null)
            return;
        serverHandle = new ServerHandler(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args) {
        AioServer.start();
    }
}
public class ServerHandler implements Runnable{
    private AsynchronousServerSocketChannel channel;
    public ServerHandler(int port) {
        try {
            //創(chuàng)建服務(wù)端通道
            channel = AsynchronousServerSocketChannel.open();
            //綁定端口
            channel.bind(new InetSocketAddress(port));
            System.out.println("服務(wù)端已啟動,端口號:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        channel.accept(this, new AcceptHandler());
//        Future <AsynchronousSocketChannel> accept = channel.accept();
        //該步操作是異步操作 防止當前線程直接執(zhí)行結(jié)束
        //方案1: while(true)+sleep
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        //方案2 CountDownLatch 作用:在完成一組正在執(zhí)行的操作之前,允許當前的現(xiàn)場一直阻塞 此處,讓現(xiàn)場在此阻塞,防止服務(wù)端執(zhí)行完成后退出
//
//        CountDownLatch count = new CountDownLatch(1);
//        channel.accept(this, new AcceptHandler());
//        try {
//            count.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }
    // CompletionHandler<V,A>
    // V-IO操作的結(jié)果,這里是成功建立的連接,AsynchronousSocketChannel
   // A-IO操作附件,這里傳入AsynchronousServerSocketChannel便于繼續(xù)接收請求建立新連接
    class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> {
        @Override
        public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
            //創(chuàng)建新的Buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //異步讀  第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
//            channel.read(buffer, buffer, new ReadHandler(channel));
            //繼續(xù)接受其他客戶端請求
            serverHandler.channel.accept(null, this);
        }
        @Override
        public void failed(Throwable exc, ServerHandler serverHandler) {
            exc.printStackTrace();
        }
    }
    class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> {
        //用戶讀取或者發(fā)送消息的channel
        private AsynchronousSocketChannel channel;
        public ReadHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        @Override
        public void completed(ByteBuffer result, ByteBuffer attachment) {
            result.flip();
            byte[] msg = new byte[result.remaining()];
            result.get(msg);
            try {
                String expression = new String(msg, "UTF-8");
                System.out.println("服務(wù)器收到消息: " + expression);
//                String result1 = "服務(wù)端收到消息\n";
                result.clear();
                //向客戶端發(fā)送消息
                doWrite(expression);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //發(fā)送消息
        private void doWrite(String msg) {
            byte[] bytes = msg.getBytes();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put(bytes);
            buffer.flip();
            //異步寫數(shù)據(jù)
            channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    //如果沒有發(fā)送完,繼續(xù)發(fā)送
                    if (attachment.hasRemaining()) {
                        channel.write(attachment, attachment, this);
                    } else {
                        //創(chuàng)建新的Buffer
                        ByteBuffer allocate = ByteBuffer.allocate(1024);
                        //異步讀 第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
//                        channel.read(allocate, attachment, new ReadHandler(channel));
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客戶端

public class AioClient {
    private static String DEFAULT_HOST = "127.0.0.1";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandler clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(clientHandle!=null)
            return;
        clientHandle = new ClientHandler(ip,port);
        new Thread(clientHandle,"Client").start();
    }
    //向服務(wù)器發(fā)送消息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("exit")) return false;
        clientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception{
        AioClient.start();
        System.out.println("請輸入請求消息:");
        Scanner scanner = new Scanner(System.in);
        while(AioClient.sendMsg(scanner.nextLine()));
    }
}
public class ClientHandler implements Runnable{
    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;
    public ClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //創(chuàng)建異步的客戶端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //創(chuàng)建CountDownLatch等待
//        latch = new CountDownLatch(1);
        //發(fā)起異步連接操作,回調(diào)參數(shù)就是這個類本身,如果連接成功會回調(diào)completed方法
        clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler());
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        try {
//            latch.await();
//        } catch (InterruptedException e1) {
//            e1.printStackTrace();
//        }
//        try {
//            clientChannel.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
    }
    //向服務(wù)器發(fā)送消息
    public void sendMsg(String msg){
        byte[] req = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        System.out.println(">>>>>>msg:"+msg);
        writeBuffer.put(req);
        writeBuffer.flip();
        //異步寫
        clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel));
    }
    /**
     * 接收類
     */
    class AcceptHandler implements CompletionHandler<Void, ClientHandler> {
        public AcceptHandler() {}
        @Override
        public void completed(Void result, ClientHandler attachment) {
            System.out.println("連接服務(wù)器成功");
    }
        @Override
        public void failed(Throwable exc, ClientHandler attachment) {
            exc.printStackTrace();
            try {
                attachment.clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel channel;
        public WriteHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            //完成全部數(shù)據(jù)的寫入
            if (attachment.hasRemaining()) {
                //數(shù)據(jù)沒有寫完,繼續(xù)寫
                System.out.println("WriteHandler.hasRemaining>>>>>");
                clientChannel.write(attachment, attachment, this);
            } else {
                //讀取數(shù)據(jù)
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel));
            }
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel clientChannel;
        public ReadHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }
        @Override
        public void completed(Integer result,ByteBuffer buffer) {
            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            String body;
            try {
                body = new String(bytes,"UTF-8");
                System.out.println("客戶端收到結(jié)果:"+ body);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void failed(Throwable exc,ByteBuffer attachment) {
            System.err.println("數(shù)據(jù)讀取失敗...");
            try {
                clientChannel.close();
            } catch (IOException e) {
            }
        }
    }
}

到此這篇關(guān)于Java中的異步非阻塞AIO模型詳解的文章就介紹到這了,更多相關(guān)Java的AIO模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 分享40個Java多線程問題小結(jié)

    分享40個Java多線程問題小結(jié)

    多個線程共存于同一JVM進程里面,所以共用相同的內(nèi)存空間,較之多進程,多線程之間的通信更輕量級,本文給大家分享40個Java多線程問題小結(jié) 的相關(guān)資料,需要的朋友可以參考下
    2015-12-12
  • SpringBoot和Vue2項目配置https協(xié)議過程

    SpringBoot和Vue2項目配置https協(xié)議過程

    本文詳細介紹了SpringBoot項目和Vue2項目的部署流程及SSL證書配置,對于SpringBoot項目,需將.pfx文件放入resources目錄并配置server,然后打包部署,Vue2項目中,涉及檢查nginx的SSL模塊、編譯新的nginx文件
    2024-10-10
  • Java 重命名 Excel 工作表并設(shè)置工作表標簽顏色的示例代碼

    Java 重命名 Excel 工作表并設(shè)置工作表標簽顏色的示例代碼

    這篇文章主要介紹了Java 重命名 Excel 工作表并設(shè)置工作表標簽顏色的示例代碼,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10
  • 如何在Eclipse中設(shè)置Oracle的JDBC

    如何在Eclipse中設(shè)置Oracle的JDBC

    以下是對在Eclipse中設(shè)置Oracle的JDBC的具體操作方法進行了詳細的分析介紹,需要的朋友可以過來參考下
    2013-08-08
  • Java通俗易懂系列設(shè)計模式之裝飾模式

    Java通俗易懂系列設(shè)計模式之裝飾模式

    這篇文章主要介紹了Java通俗易懂系列設(shè)計模式之裝飾模式,對設(shè)計模式感興趣的同學(xué),一定要看一下
    2021-04-04
  • SpringBoot配置web訪問H2的方法

    SpringBoot配置web訪問H2的方法

    這篇文章主要介紹了SpringBoot配置web訪問H2的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Java超詳細講解三大特性之一的多態(tài)

    Java超詳細講解三大特性之一的多態(tài)

    多態(tài)就是指程序中定義的引用變量所指向的具體類型和通過該引用變量發(fā)出的方法調(diào)用在編程時并不確定,而是在程序運行期間才確定,即一個引用變量到底會指向哪個類的實例對象,該引用變量發(fā)出的方法調(diào)用到底是哪個類中實現(xiàn)的方法,必須在由程序運行期間才能決定
    2022-05-05
  • Spring?Boot?中的?@HystrixCommand?注解原理及使用方法

    Spring?Boot?中的?@HystrixCommand?注解原理及使用方法

    通過使用 @HystrixCommand 注解,我們可以輕松地實現(xiàn)對方法的隔離和監(jiān)控,從而提高系統(tǒng)的可靠性和穩(wěn)定性,本文介紹了Spring Boot 中的@HystrixCommand注解是什么,其原理以及如何使用,感興趣的朋友跟隨小編一起看看吧
    2023-07-07
  • java實現(xiàn)簡單聊天室單人版

    java實現(xiàn)簡單聊天室單人版

    這篇文章主要為大家詳細介紹了java實現(xiàn)簡單聊天室的單人版,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • Springboot基于Redisson實現(xiàn)Redis分布式可重入鎖源碼解析

    Springboot基于Redisson實現(xiàn)Redis分布式可重入鎖源碼解析

    這篇文章主要介紹了Springboot基于Redisson實現(xiàn)Redis分布式可重入鎖,本文通過案例源碼分析給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-03-03

最新評論