欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java 高并發(fā)八:NIO和AIO詳解

 更新時間:2016年09月12日 08:56:35   作者:Hosee  
本文主要介紹Java 高并發(fā)NIO和AIO 的知識,這里整理了詳細的資料,并詳細介紹了 1. 什么是NIO 2. Buffer 3. Channel 4. 網(wǎng)絡(luò)編程 5. AIO的知識,有需要的小伙伴可以參考下

IO感覺上和多線程并沒有多大關(guān)系,但是NIO改變了線程在應(yīng)用層面使用的方式,也解決了一些實際的困難。而AIO是異步IO和前面的系列也有點關(guān)系。在此,為了學(xué)習(xí)和記錄,也寫一篇文章來介紹NIO和AIO。

1. 什么是NIO

NIO是New I/O的簡稱,與舊式的基于流的I/O方法相對,從名字看,它表示新的一套Java I/O標(biāo) 準(zhǔn)。它是在Java 1.4中被納入到JDK中的,并具有以下特性:

  1. NIO是基于塊(Block)的,它以塊為基本單位處理數(shù)據(jù) (硬盤上存儲的單位也是按Block來存儲,這樣性能上比基于流的方式要好一些)
  2. 為所有的原始類型提供(Buffer)緩存支持
  3. 增加通道(Channel)對象,作為新的原始 I/O 抽象
  4. 支持鎖(我們在平時使用時經(jīng)常能看到會出現(xiàn)一些.lock的文件,這說明有線程正在使用這把鎖,當(dāng)線程釋放鎖時,會把這個文件刪除掉,這樣其他線程才能繼續(xù)拿到這把鎖)和內(nèi)存映射文件的文件訪問接口
  5. 提供了基于Selector的異步網(wǎng)絡(luò)I/O


所有的從通道中的讀寫操作,都要經(jīng)過Buffer,而通道就是io的抽象,通道的另一端就是操縱的文件。

2. Buffer

Java中Buffer的實現(xiàn)?;镜臄?shù)據(jù)類型都有它對應(yīng)的Buffer

Buffer的簡單使用例子:

package test;
 
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
 
public class Test {
 public static void main(String[] args) throws Exception {
  FileInputStream fin = new FileInputStream(new File(
    "d:\\temp_buffer.tmp"));
  FileChannel fc = fin.getChannel();
  ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  fc.read(byteBuffer);
  fc.close();
  byteBuffer.flip();//讀寫轉(zhuǎn)換
 }
}

總結(jié)下使用的步驟是:

1. 得到Channel

2. 申請Buffer

3. 建立Channel和Buffer的讀/寫關(guān)系

4. 關(guān)閉

下面的例子是使用NIO來復(fù)制文件:

public static void nioCopyFile(String resource, String destination)
   throws IOException {
  FileInputStream fis = new FileInputStream(resource);
  FileOutputStream fos = new FileOutputStream(destination);
  FileChannel readChannel = fis.getChannel(); // 讀文件通道
  FileChannel writeChannel = fos.getChannel(); // 寫文件通道
  ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數(shù)據(jù)緩存
  while (true) {
   buffer.clear();
   int len = readChannel.read(buffer); // 讀入數(shù)據(jù)
   if (len == -1) {
    break; // 讀取完畢
   }
   buffer.flip();
   writeChannel.write(buffer); // 寫入文件
  }
  readChannel.close();
  writeChannel.close();
 }

Buffer中有3個重要的參數(shù):位置(position)、容量(capactiy)和上限(limit)

這里要區(qū)別下容量和上限,比如一個Buffer有10KB,那么10KB就是容量,我將5KB的文件讀到Buffer中,那么上限就是5KB。

下面舉個例子來理解下這3個重要的參數(shù):

public static void main(String[] args) throws Exception {
  ByteBuffer b = ByteBuffer.allocate(15); // 15個字節(jié)大小的緩沖區(qū)
  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  for (int i = 0; i < 10; i++) {
   // 存入10個字節(jié)數(shù)據(jù)
   b.put((byte) i);
  }
  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  b.flip(); // 重置position
  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  for (int i = 0; i < 5; i++) {
   System.out.print(b.get());
  }
  System.out.println();
  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  b.flip();
  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
 
 }

整個過程如圖:

此時position從0到10,capactiy和limit不變。

該操作會重置position,通常,將buffer從寫模式轉(zhuǎn)換為讀 模式時需要執(zhí)行此方法 flip()操作不僅重置了當(dāng)前的position為0,還將limit設(shè)置到當(dāng)前position的位置 。

limit的意義在于,來確定哪些數(shù)據(jù)是有意義的,換句話說,從position到limit之間的數(shù)據(jù)才是有意義的數(shù)據(jù),因為是上次操作的數(shù)據(jù)。所以flip操作往往是讀寫轉(zhuǎn)換的意思。

意義同上。

而Buffer中大多數(shù)的方法都是去改變這3個參數(shù)來達到某些功能的:

public final Buffer rewind()

將position置零,并清除標(biāo)志位(mark)

public final Buffer clear()

將position置零,同時將limit設(shè)置為capacity的大小,并清除了標(biāo)志mark

public final Buffer flip()

先將limit設(shè)置到position所在位置,然后將position置零,并清除標(biāo)志位mark,通常在讀寫轉(zhuǎn)換時使用

文件映射到內(nèi)存

public static void main(String[] args) throws Exception {
  RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
  FileChannel fc = raf.getChannel();
  // 將文件映射到內(nèi)存中
  MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,
    raf.length());
  while (mbb.hasRemaining()) {
   System.out.print((char) mbb.get());
  }
  mbb.put(0, (byte) 98); // 修改文件
  raf.close();
 }

對MappedByteBuffer的修改就相當(dāng)于修改文件本身,這樣操作的速度是很快的。

3. Channel

多線程網(wǎng)絡(luò)服務(wù)器的一般結(jié)構(gòu):

簡單的多線程服務(wù)器:

public static void main(String[] args) throws Exception {
  ServerSocket echoServer = null;
  Socket clientSocket = null;
  try {
   echoServer = new ServerSocket(8000);
  } catch (IOException e) {
   System.out.println(e);
  }
  while (true) {
   try {
    clientSocket = echoServer.accept();
    System.out.println(clientSocket.getRemoteSocketAddress()
      + " connect!");
    tp.execute(new HandleMsg(clientSocket));
   } catch (IOException e) {
    System.out.println(e);
   }
  }
 }

功能就是服務(wù)器端讀到什么數(shù)據(jù),就向客戶端回寫什么數(shù)據(jù)。

這里的tp是一個線程池,HandleMsg是處理消息的類。

static class HandleMsg implements Runnable{ 
   省略部分信息     
   public void run(){   
    try {   
     is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 
     os = new PrintWriter(clientSocket.getOutputStream(), true); 
     // 從InputStream當(dāng)中讀取客戶端所發(fā)送的數(shù)據(jù)    
     String inputLine = null;     
     long b=System. currentTimeMillis ();     
     while ((inputLine = is.readLine()) != null)
     {   
      os.println(inputLine);     
     }     
     long e=System. currentTimeMillis ();     
     System. out.println ("spend:"+(e - b)+" ms ");    
   } catch (IOException e) {     
    e.printStackTrace();    
   }finally
   { 
    關(guān)閉資源 
   }  
  } 
  }

客戶端:

public static void main(String[] args) throws Exception {
  Socket client = null;
  PrintWriter writer = null;
  BufferedReader reader = null;
  try {
   client = new Socket();
   client.connect(new InetSocketAddress("localhost", 8000));
   writer = new PrintWriter(client.getOutputStream(), true);
   writer.println("Hello!");
   writer.flush();
   reader = new BufferedReader(new InputStreamReader(
     client.getInputStream()));
   System.out.println("from server: " + reader.readLine());
  } catch (Exception e) {
  } finally {
   // 省略資源關(guān)閉
  }
 }

以上的網(wǎng)絡(luò)編程是很基本的,使用這種方式,會有一些問題:

為每一個客戶端使用一個線程,如果客戶端出現(xiàn)延時等異常,線程可能會被占用很長時間。因為數(shù)據(jù)的準(zhǔn)備和讀取都在這個線程中。此時,如果客戶端數(shù)量眾多,可能會消耗大量的系統(tǒng)資源。

解決方案:

使用非阻塞的NIO (讀取數(shù)據(jù)不等待,數(shù)據(jù)準(zhǔn)備好了再工作)

為了體現(xiàn)NIO使用的高效。

這里先模擬一個低效的客戶端來模擬因網(wǎng)絡(luò)而延時的情況:

private static ExecutorService tp= Executors.newCachedThreadPool(); 
  private static final int sleep_time=1000*1000*1000; 
  public static class EchoClient implements Runnable{ 
   public void run(){   
    try {    
     client = new Socket();    
     client.connect(new InetSocketAddress("localhost", 8000)); 
     writer = new PrintWriter(client.getOutputStream(), true); 
     writer.print("H");    
     LockSupport.parkNanos(sleep_time);  
     writer.print("e");   
     LockSupport.parkNanos(sleep_time);  
     writer.print("l");  
     LockSupport.parkNanos(sleep_time); 
     writer.print("l");  
     LockSupport.parkNanos(sleep_time); 
     writer.print("o");  
     LockSupport.parkNanos(sleep_time); 
     writer.print("!");   
     LockSupport.parkNanos(sleep_time); 
     writer.println();  
     writer.flush(); 
    }catch(Exception e)
    {
    }
   }
  }

服務(wù)器端輸出:

spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms

因為

while ((inputLine = is.readLine()) != null)

是阻塞的,所以時間都花在等待中。

如果用NIO來處理這個問題會怎么做呢?

NIO有一個很大的特點就是:把數(shù)據(jù)準(zhǔn)備好了再通知我

而Channel有點類似于流,一個Channel可以和文件或者網(wǎng)絡(luò)Socket對應(yīng) 。

selector是一個選擇器,它可以選擇某一個Channel,然后做些事情。

一個線程可以對應(yīng)一個selector,而一個selector可以輪詢多個Channel,而每個Channel對應(yīng)了一個Socket。

與上面一個線程對應(yīng)一個Socket相比,使用NIO后,一個線程可以輪詢多個Socket。

當(dāng)selector調(diào)用select()時,會查看是否有客戶端準(zhǔn)備好了數(shù)據(jù)。當(dāng)沒有數(shù)據(jù)被準(zhǔn)備好時,select()會阻塞。平時都說NIO是非阻塞的,但是如果沒有數(shù)據(jù)被準(zhǔn)備好還是會有阻塞現(xiàn)象。

當(dāng)有數(shù)據(jù)被準(zhǔn)備好時,調(diào)用完select()后,會返回一個SelectionKey,SelectionKey表示在某個selector上的某個Channel的數(shù)據(jù)已經(jīng)被準(zhǔn)備好了。

只有在數(shù)據(jù)準(zhǔn)備好時,這個Channel才會被選擇。

這樣NIO實現(xiàn)了一個線程來監(jiān)控多個客戶端。

而剛剛模擬的網(wǎng)絡(luò)延遲的客戶端將不會影響NIO下的線程,因為某個Socket網(wǎng)絡(luò)延遲時,數(shù)據(jù)還未被準(zhǔn)備好,selector是不會選擇它的,而會選擇其他準(zhǔn)備好的客戶端。

selectNow()與select()的區(qū)別在于,selectNow()是不阻塞的,當(dāng)沒有客戶端準(zhǔn)備好數(shù)據(jù)時,selectNow()不會阻塞,將返回0,有客戶端準(zhǔn)備好數(shù)據(jù)時,selectNow()返回準(zhǔn)備好的客戶端的個數(shù)。

主要代碼:

package test;
 
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class MultiThreadNIOEchoServer {
 public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();
 
 class EchoClient {
  private LinkedList<ByteBuffer> outq;
 
  EchoClient() {
   outq = new LinkedList<ByteBuffer>();
  }
 
  public LinkedList<ByteBuffer> getOutputQueue() {
   return outq;
  }
 
  public void enqueue(ByteBuffer bb) {
   outq.addFirst(bb);
  }
 }
 
 class HandleMsg implements Runnable {
  SelectionKey sk;
  ByteBuffer bb;
 
  public HandleMsg(SelectionKey sk, ByteBuffer bb) {
   super();
   this.sk = sk;
   this.bb = bb;
  }
 
  @Override
  public void run() {
   // TODO Auto-generated method stub
   EchoClient echoClient = (EchoClient) sk.attachment();
   echoClient.enqueue(bb);
   sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
   selector.wakeup();
  }
 
 }
 
 private Selector selector;
 private ExecutorService tp = Executors.newCachedThreadPool();
 
 private void startServer() throws Exception {
  selector = SelectorProvider.provider().openSelector();
  ServerSocketChannel ssc = ServerSocketChannel.open();
  ssc.configureBlocking(false);
  InetSocketAddress isa = new InetSocketAddress(8000);
  ssc.socket().bind(isa);
  // 注冊感興趣的事件,此處對accpet事件感興趣
  SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
  for (;;) {
   selector.select();
   Set readyKeys = selector.selectedKeys();
   Iterator i = readyKeys.iterator();
   long e = 0;
   while (i.hasNext()) {
    SelectionKey sk = (SelectionKey) i.next();
    i.remove();
    if (sk.isAcceptable()) {
     doAccept(sk);
    } else if (sk.isValid() && sk.isReadable()) {
     if (!geym_time_stat.containsKey(((SocketChannel) sk
       .channel()).socket())) {
      geym_time_stat.put(
        ((SocketChannel) sk.channel()).socket(),
        System.currentTimeMillis());
     }
     doRead(sk);
    } else if (sk.isValid() && sk.isWritable()) {
     doWrite(sk);
     e = System.currentTimeMillis();
     long b = geym_time_stat.remove(((SocketChannel) sk
       .channel()).socket());
     System.out.println("spend:" + (e - b) + "ms");
    }
   }
  }
 }
 
 private void doWrite(SelectionKey sk) {
  // TODO Auto-generated method stub
  SocketChannel channel = (SocketChannel) sk.channel();
  EchoClient echoClient = (EchoClient) sk.attachment();
  LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
  ByteBuffer bb = outq.getLast();
  try {
   int len = channel.write(bb);
   if (len == -1) {
    disconnect(sk);
    return;
   }
   if (bb.remaining() == 0) {
    outq.removeLast();
   }
  } catch (Exception e) {
   // TODO: handle exception
   disconnect(sk);
  }
  if (outq.size() == 0) {
   sk.interestOps(SelectionKey.OP_READ);
  }
 }
 
 private void doRead(SelectionKey sk) {
  // TODO Auto-generated method stub
  SocketChannel channel = (SocketChannel) sk.channel();
  ByteBuffer bb = ByteBuffer.allocate(8192);
  int len;
  try {
   len = channel.read(bb);
   if (len < 0) {
    disconnect(sk);
    return;
   }
  } catch (Exception e) {
   // TODO: handle exception
   disconnect(sk);
   return;
  }
  bb.flip();
  tp.execute(new HandleMsg(sk, bb));
 }
 
 private void disconnect(SelectionKey sk) {
  // TODO Auto-generated method stub
  //省略略干關(guān)閉操作
 }
 
 private void doAccept(SelectionKey sk) {
  // TODO Auto-generated method stub
  ServerSocketChannel server = (ServerSocketChannel) sk.channel();
  SocketChannel clientChannel;
  try {
   clientChannel = server.accept();
   clientChannel.configureBlocking(false);
   SelectionKey clientKey = clientChannel.register(selector,
     SelectionKey.OP_READ);
   EchoClient echoClinet = new EchoClient();
   clientKey.attach(echoClinet);
   InetAddress clientAddress = clientChannel.socket().getInetAddress();
   System.out.println("Accepted connection from "
     + clientAddress.getHostAddress());
  } catch (Exception e) {
   // TODO: handle exception
  }
 }
 
 public static void main(String[] args) {
  // TODO Auto-generated method stub
  MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
  try {
   echoServer.startServer();
  } catch (Exception e) {
   // TODO: handle exception
  }
 
 }
 
}

代碼僅作參考,主要的特點是,對不同事件的感興趣來做不同的事。

當(dāng)用之前模擬的那個延遲的客戶端時,這次的時間消耗就在2ms到11ms之間了。性能提升是很明顯的。

總結(jié):

1. NIO會將數(shù)據(jù)準(zhǔn)備好后,再交由應(yīng)用進行處理,數(shù)據(jù)的讀取/寫入過程依然在應(yīng)用線程中完成,只是將等待的時間剝離到單獨的線程中去。

2. 節(jié)省數(shù)據(jù)準(zhǔn)備時間(因為Selector可以復(fù)用)

5. AIO

AIO的特點:

1. 讀完了再通知我

2. 不會加快IO,只是在讀完后進行通知

3. 使用回調(diào)函數(shù),進行業(yè)務(wù)處理

AIO的相關(guān)代碼:

AsynchronousServerSocketChannel

server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));
使用server上的accept方法

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
CompletionHandler為回調(diào)接口,當(dāng)有客戶端accept之后,就做handler中的事情。

示例代碼:

server.accept(null,
    new CompletionHandler<AsynchronousSocketChannel, Object>() {
     final ByteBuffer buffer = ByteBuffer.allocate(1024);
 
     public void completed(AsynchronousSocketChannel result,
       Object attachment) {
      System.out.println(Thread.currentThread().getName());
      Future<Integer> writeResult = null;
      try {
       buffer.clear();
       result.read(buffer).get(100, TimeUnit.SECONDS);
       buffer.flip();
       writeResult = result.write(buffer);
      } catch (InterruptedException | ExecutionException e) {
       e.printStackTrace();
      } catch (TimeoutException e) {
       e.printStackTrace();
      } finally {
       try {
        server.accept(null, this);
        writeResult.get();
        result.close();
       } catch (Exception e) {
        System.out.println(e.toString());
       }
      }
     }
 
     @Override
     public void failed(Throwable exc, Object attachment) {
      System.out.println("failed: " + exc);
     }
    });

這里使用了Future來實現(xiàn)即時返回,關(guān)于Future請參考上一篇

在理解了NIO的基礎(chǔ)上,看AIO,區(qū)別在于AIO是等讀寫過程完成后再去調(diào)用回調(diào)函數(shù)。

NIO是同步非阻塞的

AIO是異步非阻塞的

由于NIO的讀寫過程依然在應(yīng)用線程里完成,所以對于那些讀寫過程時間長的,NIO就不太適合。

而AIO的讀寫過程完成后才被通知,所以AIO能夠勝任那些重量級,讀寫過程長的任務(wù)。

相關(guān)文章

  • 詳解SpringBoot 快速整合Mybatis(去XML化+注解進階)

    詳解SpringBoot 快速整合Mybatis(去XML化+注解進階)

    本篇文章主要介紹了詳解SpringBoot 快速整合Mybatis(去XML化+注解進階),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-11-11
  • Spring Security OAuth2集成短信驗證碼登錄以及第三方登錄

    Spring Security OAuth2集成短信驗證碼登錄以及第三方登錄

    這篇文章主要介紹了Spring Security OAuth2集成短信驗證碼登錄以及第三方登錄,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • springboot的logback配置源碼解讀

    springboot的logback配置源碼解讀

    這篇文章主要為大家介紹了springboot的logback配置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-11-11
  • Java中如何用Stream分組并求各組數(shù)量

    Java中如何用Stream分組并求各組數(shù)量

    這篇文章主要給大家介紹了關(guān)于Java中如何用Stream分組并求各組數(shù)量的相關(guān)資料,文中通過實例代碼,對大家學(xué)習(xí)或者Java具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-07-07
  • MyBatis中OGNL的使用教程詳解

    MyBatis中OGNL的使用教程詳解

    有些人可能不知道MyBatis中使用了OGNL,有些人知道用到了OGNL卻不知道在MyBatis中如何使用,下面這篇文章主要介紹了MyBatis中OGNL的使用教程,文中介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-06-06
  • 淺談Java中向上造型向下造型和接口回調(diào)中的問題

    淺談Java中向上造型向下造型和接口回調(diào)中的問題

    這篇文章主要介紹了淺談Java中向上造型向下造型和接口回調(diào)中的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • Springboot使用Security實現(xiàn)OAuth2授權(quán)驗證完整過程

    Springboot使用Security實現(xiàn)OAuth2授權(quán)驗證完整過程

    安全管理是軟件系統(tǒng)必不可少的的功能。根據(jù)經(jīng)典的“墨菲定律”——凡是可能,總會發(fā)生。如果系統(tǒng)存在安全隱患,最終必然會出現(xiàn)問題,這篇文章主要介紹了SpringBoot使用Security實現(xiàn)OAuth2授權(quán)驗證完整過程
    2022-12-12
  • Java zookeeper圖形化工具ZooInspector用法詳解

    Java zookeeper圖形化工具ZooInspector用法詳解

    這篇文章主要介紹了Java zookeeper圖形化工具ZooInspector用法詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-07-07
  • java讀寫excel文件實現(xiàn)POI解析Excel的方法

    java讀寫excel文件實現(xiàn)POI解析Excel的方法

    在日常工作中,我們常常會進行Excel文件讀寫操作,這篇文章主要介紹了java讀寫excel文件實現(xiàn)POI解析Excel的方法,實例分析了java讀寫excel的技巧,非常具有實用價值,需要的朋友可以參考下
    2018-10-10
  • Java 8 lambda表達式引入詳解及實例

    Java 8 lambda表達式引入詳解及實例

    這篇文章主要介紹了Java 8 lambda表達式引入詳解及實例的相關(guān)資料,需要的朋友可以參考下
    2017-05-05

最新評論