Java 高并發(fā)八:NIO和AIO詳解
IO感覺上和多線程并沒有多大關(guān)系,但是NIO改變了線程在應(yīng)用層面使用的方式,也解決了一些實(shí)際的困難。而AIO是異步IO和前面的系列也有點(diǎn)關(guān)系。在此,為了學(xué)習(xí)和記錄,也寫一篇文章來介紹NIO和AIO。
1. 什么是NIO
NIO是New I/O的簡(jiǎn)稱,與舊式的基于流的I/O方法相對(duì),從名字看,它表示新的一套Java I/O標(biāo) 準(zhǔn)。它是在Java 1.4中被納入到JDK中的,并具有以下特性:
- NIO是基于塊(Block)的,它以塊為基本單位處理數(shù)據(jù) (硬盤上存儲(chǔ)的單位也是按Block來存儲(chǔ),這樣性能上比基于流的方式要好一些)
- 為所有的原始類型提供(Buffer)緩存支持
- 增加通道(Channel)對(duì)象,作為新的原始 I/O 抽象
- 支持鎖(我們?cè)谄綍r(shí)使用時(shí)經(jīng)常能看到會(huì)出現(xiàn)一些.lock的文件,這說明有線程正在使用這把鎖,當(dāng)線程釋放鎖時(shí),會(huì)把這個(gè)文件刪除掉,這樣其他線程才能繼續(xù)拿到這把鎖)和內(nèi)存映射文件的文件訪問接口
- 提供了基于Selector的異步網(wǎng)絡(luò)I/O

所有的從通道中的讀寫操作,都要經(jīng)過Buffer,而通道就是io的抽象,通道的另一端就是操縱的文件。
2. Buffer

Java中Buffer的實(shí)現(xiàn)?;镜臄?shù)據(jù)類型都有它對(duì)應(yīng)的Buffer
Buffer的簡(jiǎn)單使用例子:
package test;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class Test {
public static void main(String[] args) throws Exception {
FileInputStream fin = new FileInputStream(new File(
"d:\\temp_buffer.tmp"));
FileChannel fc = fin.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fc.read(byteBuffer);
fc.close();
byteBuffer.flip();//讀寫轉(zhuǎn)換
}
}
總結(jié)下使用的步驟是:
1. 得到Channel
2. 申請(qǐng)Buffer
3. 建立Channel和Buffer的讀/寫關(guān)系
4. 關(guān)閉
下面的例子是使用NIO來復(fù)制文件:
public static void nioCopyFile(String resource, String destination)
throws IOException {
FileInputStream fis = new FileInputStream(resource);
FileOutputStream fos = new FileOutputStream(destination);
FileChannel readChannel = fis.getChannel(); // 讀文件通道
FileChannel writeChannel = fos.getChannel(); // 寫文件通道
ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數(shù)據(jù)緩存
while (true) {
buffer.clear();
int len = readChannel.read(buffer); // 讀入數(shù)據(jù)
if (len == -1) {
break; // 讀取完畢
}
buffer.flip();
writeChannel.write(buffer); // 寫入文件
}
readChannel.close();
writeChannel.close();
}
Buffer中有3個(gè)重要的參數(shù):位置(position)、容量(capactiy)和上限(limit)
這里要區(qū)別下容量和上限,比如一個(gè)Buffer有10KB,那么10KB就是容量,我將5KB的文件讀到Buffer中,那么上限就是5KB。
下面舉個(gè)例子來理解下這3個(gè)重要的參數(shù):
public static void main(String[] args) throws Exception {
ByteBuffer b = ByteBuffer.allocate(15); // 15個(gè)字節(jié)大小的緩沖區(qū)
System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
+ " position=" + b.position());
for (int i = 0; i < 10; i++) {
// 存入10個(gè)字節(jié)數(shù)據(jù)
b.put((byte) i);
}
System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
+ " position=" + b.position());
b.flip(); // 重置position
System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
+ " position=" + b.position());
for (int i = 0; i < 5; i++) {
System.out.print(b.get());
}
System.out.println();
System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
+ " position=" + b.position());
b.flip();
System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
+ " position=" + b.position());
}
整個(gè)過程如圖:

此時(shí)position從0到10,capactiy和limit不變。

該操作會(huì)重置position,通常,將buffer從寫模式轉(zhuǎn)換為讀 模式時(shí)需要執(zhí)行此方法 flip()操作不僅重置了當(dāng)前的position為0,還將limit設(shè)置到當(dāng)前position的位置 。
limit的意義在于,來確定哪些數(shù)據(jù)是有意義的,換句話說,從position到limit之間的數(shù)據(jù)才是有意義的數(shù)據(jù),因?yàn)槭巧洗尾僮鞯臄?shù)據(jù)。所以flip操作往往是讀寫轉(zhuǎn)換的意思。

意義同上。
而Buffer中大多數(shù)的方法都是去改變這3個(gè)參數(shù)來達(dá)到某些功能的:
public final Buffer rewind()
將position置零,并清除標(biāo)志位(mark)
public final Buffer clear()
將position置零,同時(shí)將limit設(shè)置為capacity的大小,并清除了標(biāo)志mark
public final Buffer flip()
先將limit設(shè)置到position所在位置,然后將position置零,并清除標(biāo)志位mark,通常在讀寫轉(zhuǎn)換時(shí)使用
文件映射到內(nèi)存
public static void main(String[] args) throws Exception {
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
FileChannel fc = raf.getChannel();
// 將文件映射到內(nèi)存中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,
raf.length());
while (mbb.hasRemaining()) {
System.out.print((char) mbb.get());
}
mbb.put(0, (byte) 98); // 修改文件
raf.close();
}
對(duì)MappedByteBuffer的修改就相當(dāng)于修改文件本身,這樣操作的速度是很快的。
3. Channel
多線程網(wǎng)絡(luò)服務(wù)器的一般結(jié)構(gòu):

簡(jiǎn)單的多線程服務(wù)器:
public static void main(String[] args) throws Exception {
ServerSocket echoServer = null;
Socket clientSocket = null;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System.out.println(e);
}
while (true) {
try {
clientSocket = echoServer.accept();
System.out.println(clientSocket.getRemoteSocketAddress()
+ " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System.out.println(e);
}
}
}
功能就是服務(wù)器端讀到什么數(shù)據(jù),就向客戶端回寫什么數(shù)據(jù)。
這里的tp是一個(gè)線程池,HandleMsg是處理消息的類。
static class HandleMsg implements Runnable{
省略部分信息
public void run(){
try {
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
// 從InputStream當(dāng)中讀取客戶端所發(fā)送的數(shù)據(jù)
String inputLine = null;
long b=System. currentTimeMillis ();
while ((inputLine = is.readLine()) != null)
{
os.println(inputLine);
}
long e=System. currentTimeMillis ();
System. out.println ("spend:"+(e - b)+" ms ");
} catch (IOException e) {
e.printStackTrace();
}finally
{
關(guān)閉資源
}
}
}
客戶端:
public static void main(String[] args) throws Exception {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true);
writer.println("Hello!");
writer.flush();
reader = new BufferedReader(new InputStreamReader(
client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch (Exception e) {
} finally {
// 省略資源關(guān)閉
}
}
以上的網(wǎng)絡(luò)編程是很基本的,使用這種方式,會(huì)有一些問題:
為每一個(gè)客戶端使用一個(gè)線程,如果客戶端出現(xiàn)延時(shí)等異常,線程可能會(huì)被占用很長(zhǎng)時(shí)間。因?yàn)閿?shù)據(jù)的準(zhǔn)備和讀取都在這個(gè)線程中。此時(shí),如果客戶端數(shù)量眾多,可能會(huì)消耗大量的系統(tǒng)資源。
解決方案:
使用非阻塞的NIO (讀取數(shù)據(jù)不等待,數(shù)據(jù)準(zhǔn)備好了再工作)
為了體現(xiàn)NIO使用的高效。
這里先模擬一個(gè)低效的客戶端來模擬因網(wǎng)絡(luò)而延時(shí)的情況:
private static ExecutorService tp= Executors.newCachedThreadPool();
private static final int sleep_time=1000*1000*1000;
public static class EchoClient implements Runnable{
public void run(){
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true);
writer.print("H");
LockSupport.parkNanos(sleep_time);
writer.print("e");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("o");
LockSupport.parkNanos(sleep_time);
writer.print("!");
LockSupport.parkNanos(sleep_time);
writer.println();
writer.flush();
}catch(Exception e)
{
}
}
}
服務(wù)器端輸出:
spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms
因?yàn)?/p>
while ((inputLine = is.readLine()) != null)
是阻塞的,所以時(shí)間都花在等待中。
如果用NIO來處理這個(gè)問題會(huì)怎么做呢?
NIO有一個(gè)很大的特點(diǎn)就是:把數(shù)據(jù)準(zhǔn)備好了再通知我
而Channel有點(diǎn)類似于流,一個(gè)Channel可以和文件或者網(wǎng)絡(luò)Socket對(duì)應(yīng) 。

selector是一個(gè)選擇器,它可以選擇某一個(gè)Channel,然后做些事情。
一個(gè)線程可以對(duì)應(yīng)一個(gè)selector,而一個(gè)selector可以輪詢多個(gè)Channel,而每個(gè)Channel對(duì)應(yīng)了一個(gè)Socket。
與上面一個(gè)線程對(duì)應(yīng)一個(gè)Socket相比,使用NIO后,一個(gè)線程可以輪詢多個(gè)Socket。
當(dāng)selector調(diào)用select()時(shí),會(huì)查看是否有客戶端準(zhǔn)備好了數(shù)據(jù)。當(dāng)沒有數(shù)據(jù)被準(zhǔn)備好時(shí),select()會(huì)阻塞。平時(shí)都說NIO是非阻塞的,但是如果沒有數(shù)據(jù)被準(zhǔn)備好還是會(huì)有阻塞現(xiàn)象。
當(dāng)有數(shù)據(jù)被準(zhǔn)備好時(shí),調(diào)用完select()后,會(huì)返回一個(gè)SelectionKey,SelectionKey表示在某個(gè)selector上的某個(gè)Channel的數(shù)據(jù)已經(jīng)被準(zhǔn)備好了。
只有在數(shù)據(jù)準(zhǔn)備好時(shí),這個(gè)Channel才會(huì)被選擇。
這樣NIO實(shí)現(xiàn)了一個(gè)線程來監(jiān)控多個(gè)客戶端。
而剛剛模擬的網(wǎng)絡(luò)延遲的客戶端將不會(huì)影響NIO下的線程,因?yàn)槟硞€(gè)Socket網(wǎng)絡(luò)延遲時(shí),數(shù)據(jù)還未被準(zhǔn)備好,selector是不會(huì)選擇它的,而會(huì)選擇其他準(zhǔn)備好的客戶端。
selectNow()與select()的區(qū)別在于,selectNow()是不阻塞的,當(dāng)沒有客戶端準(zhǔn)備好數(shù)據(jù)時(shí),selectNow()不會(huì)阻塞,將返回0,有客戶端準(zhǔn)備好數(shù)據(jù)時(shí),selectNow()返回準(zhǔn)備好的客戶端的個(gè)數(shù)。
主要代碼:
package test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadNIOEchoServer {
public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();
class EchoClient {
private LinkedList<ByteBuffer> outq;
EchoClient() {
outq = new LinkedList<ByteBuffer>();
}
public LinkedList<ByteBuffer> getOutputQueue() {
return outq;
}
public void enqueue(ByteBuffer bb) {
outq.addFirst(bb);
}
}
class HandleMsg implements Runnable {
SelectionKey sk;
ByteBuffer bb;
public HandleMsg(SelectionKey sk, ByteBuffer bb) {
super();
this.sk = sk;
this.bb = bb;
}
@Override
public void run() {
// TODO Auto-generated method stub
EchoClient echoClient = (EchoClient) sk.attachment();
echoClient.enqueue(bb);
sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup();
}
}
private Selector selector;
private ExecutorService tp = Executors.newCachedThreadPool();
private void startServer() throws Exception {
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(8000);
ssc.socket().bind(isa);
// 注冊(cè)感興趣的事件,此處對(duì)accpet事件感興趣
SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
for (;;) {
selector.select();
Set readyKeys = selector.selectedKeys();
Iterator i = readyKeys.iterator();
long e = 0;
while (i.hasNext()) {
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isAcceptable()) {
doAccept(sk);
} else if (sk.isValid() && sk.isReadable()) {
if (!geym_time_stat.containsKey(((SocketChannel) sk
.channel()).socket())) {
geym_time_stat.put(
((SocketChannel) sk.channel()).socket(),
System.currentTimeMillis());
}
doRead(sk);
} else if (sk.isValid() && sk.isWritable()) {
doWrite(sk);
e = System.currentTimeMillis();
long b = geym_time_stat.remove(((SocketChannel) sk
.channel()).socket());
System.out.println("spend:" + (e - b) + "ms");
}
}
}
}
private void doWrite(SelectionKey sk) {
// TODO Auto-generated method stub
SocketChannel channel = (SocketChannel) sk.channel();
EchoClient echoClient = (EchoClient) sk.attachment();
LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
ByteBuffer bb = outq.getLast();
try {
int len = channel.write(bb);
if (len == -1) {
disconnect(sk);
return;
}
if (bb.remaining() == 0) {
outq.removeLast();
}
} catch (Exception e) {
// TODO: handle exception
disconnect(sk);
}
if (outq.size() == 0) {
sk.interestOps(SelectionKey.OP_READ);
}
}
private void doRead(SelectionKey sk) {
// TODO Auto-generated method stub
SocketChannel channel = (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);
int len;
try {
len = channel.read(bb);
if (len < 0) {
disconnect(sk);
return;
}
} catch (Exception e) {
// TODO: handle exception
disconnect(sk);
return;
}
bb.flip();
tp.execute(new HandleMsg(sk, bb));
}
private void disconnect(SelectionKey sk) {
// TODO Auto-generated method stub
//省略略干關(guān)閉操作
}
private void doAccept(SelectionKey sk) {
// TODO Auto-generated method stub
ServerSocketChannel server = (ServerSocketChannel) sk.channel();
SocketChannel clientChannel;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);
SelectionKey clientKey = clientChannel.register(selector,
SelectionKey.OP_READ);
EchoClient echoClinet = new EchoClient();
clientKey.attach(echoClinet);
InetAddress clientAddress = clientChannel.socket().getInetAddress();
System.out.println("Accepted connection from "
+ clientAddress.getHostAddress());
} catch (Exception e) {
// TODO: handle exception
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
try {
echoServer.startServer();
} catch (Exception e) {
// TODO: handle exception
}
}
}
代碼僅作參考,主要的特點(diǎn)是,對(duì)不同事件的感興趣來做不同的事。
當(dāng)用之前模擬的那個(gè)延遲的客戶端時(shí),這次的時(shí)間消耗就在2ms到11ms之間了。性能提升是很明顯的。
總結(jié):
1. NIO會(huì)將數(shù)據(jù)準(zhǔn)備好后,再交由應(yīng)用進(jìn)行處理,數(shù)據(jù)的讀取/寫入過程依然在應(yīng)用線程中完成,只是將等待的時(shí)間剝離到單獨(dú)的線程中去。
2. 節(jié)省數(shù)據(jù)準(zhǔn)備時(shí)間(因?yàn)镾elector可以復(fù)用)
5. AIO
AIO的特點(diǎn):
1. 讀完了再通知我
2. 不會(huì)加快IO,只是在讀完后進(jìn)行通知
3. 使用回調(diào)函數(shù),進(jìn)行業(yè)務(wù)處理
AIO的相關(guān)代碼:
AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));
使用server上的accept方法
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
CompletionHandler為回調(diào)接口,當(dāng)有客戶端accept之后,就做handler中的事情。
示例代碼:
server.accept(null,
new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
public void completed(AsynchronousSocketChannel result,
Object attachment) {
System.out.println(Thread.currentThread().getName());
Future<Integer> writeResult = null;
try {
buffer.clear();
result.read(buffer).get(100, TimeUnit.SECONDS);
buffer.flip();
writeResult = result.write(buffer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
server.accept(null, this);
writeResult.get();
result.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed: " + exc);
}
});
這里使用了Future來實(shí)現(xiàn)即時(shí)返回,關(guān)于Future請(qǐng)參考上一篇
在理解了NIO的基礎(chǔ)上,看AIO,區(qū)別在于AIO是等讀寫過程完成后再去調(diào)用回調(diào)函數(shù)。
NIO是同步非阻塞的
AIO是異步非阻塞的
由于NIO的讀寫過程依然在應(yīng)用線程里完成,所以對(duì)于那些讀寫過程時(shí)間長(zhǎng)的,NIO就不太適合。
而AIO的讀寫過程完成后才被通知,所以AIO能夠勝任那些重量級(jí),讀寫過程長(zhǎng)的任務(wù)。
相關(guān)文章
詳解SpringBoot 快速整合Mybatis(去XML化+注解進(jìn)階)
本篇文章主要介紹了詳解SpringBoot 快速整合Mybatis(去XML化+注解進(jìn)階),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-11-11
Spring Security OAuth2集成短信驗(yàn)證碼登錄以及第三方登錄
這篇文章主要介紹了Spring Security OAuth2集成短信驗(yàn)證碼登錄以及第三方登錄,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-04-04
淺談Java中向上造型向下造型和接口回調(diào)中的問題
這篇文章主要介紹了淺談Java中向上造型向下造型和接口回調(diào)中的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08
Springboot使用Security實(shí)現(xiàn)OAuth2授權(quán)驗(yàn)證完整過程
安全管理是軟件系統(tǒng)必不可少的的功能。根據(jù)經(jīng)典的“墨菲定律”——凡是可能,總會(huì)發(fā)生。如果系統(tǒng)存在安全隱患,最終必然會(huì)出現(xiàn)問題,這篇文章主要介紹了SpringBoot使用Security實(shí)現(xiàn)OAuth2授權(quán)驗(yàn)證完整過程2022-12-12
Java zookeeper圖形化工具ZooInspector用法詳解
這篇文章主要介紹了Java zookeeper圖形化工具ZooInspector用法詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
java讀寫excel文件實(shí)現(xiàn)POI解析Excel的方法
在日常工作中,我們常常會(huì)進(jìn)行Excel文件讀寫操作,這篇文章主要介紹了java讀寫excel文件實(shí)現(xiàn)POI解析Excel的方法,實(shí)例分析了java讀寫excel的技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-10-10
Java 8 lambda表達(dá)式引入詳解及實(shí)例
這篇文章主要介紹了Java 8 lambda表達(dá)式引入詳解及實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-05-05

