Java之NIO基本簡介
一、NIO基本簡介
NIO (New lO)也有人稱之為java non-blocking lO是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java lO API。NIO與原來的IO有同樣的作用和目的,但是使用的方式完全不同,NIO支持面向緩沖區(qū)的、基于通道的IO操作。NIO將以更加高效的方式進行文件的讀寫操作。NIO可以理解為非阻塞IO,傳統(tǒng)的IO的read和write只能阻塞執(zhí)行,線程在讀寫IO期間不能干其他事情,比如調(diào)用socket.read()時,如果服務(wù)器一直沒有數(shù)據(jù)傳輸過來,線程就一直阻塞,而NIO中可以配置socket為非阻塞模式。
NIO相關(guān)類都被放在java.nio包及子包下,并且對原java.io包中的很多類進行改寫。NIO有三大核心部分:Channel(通道),Buffer(緩沖區(qū)), Selector(選擇器)Java NlO的非阻塞模式,使一個線程從某通道發(fā)送請求或者讀取數(shù)據(jù),但是它僅能得到目前可用的數(shù)據(jù),如果目前沒有數(shù)據(jù)可用時,就什么都不會獲取,而不是保持線程阻塞,所以直至數(shù)據(jù)變的可以讀取之前,該線程可以繼續(xù)做其他的事情。非阻塞寫也是如此,一個線程請求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。通俗理解:NIO是可以做到用一個線程來處理多個操作的。假設(shè)有1000個請求過來,根據(jù)實際情況,可以分配20或者80個線程來處理。不像之前的阻塞IO那樣,非得分配1000個。
二、NIO 與 BIO的比較
NIO可以先將數(shù)據(jù)寫入到緩沖區(qū),然后再有緩沖區(qū)寫入通道,因此可以做到同步非阻塞。
BIO則是面向的流,讀寫數(shù)據(jù)都是單向的。因此是同步阻塞。
三、NIO 三大核心原理示意圖
NIO有三大核心部分: Channel(通道),Buffer(緩沖區(qū)),Selector(選擇器)
Buffer(緩沖區(qū))
緩沖區(qū)本質(zhì)上是一塊可以寫入數(shù)據(jù),然后可以從中讀取數(shù)據(jù)的內(nèi)存。這塊內(nèi)存被包裝成NIO Buffer對象,并提供了一組方法,用來方便的訪問該塊內(nèi)存。相比較直接對數(shù)組的操作,Buffer APl更加容易操作和管理。
Channel(通道)
Java NIO的通道類似流,但又有些不同:既可以從通道中讀取數(shù)據(jù),又可以寫數(shù)據(jù)到通道。但流的(input或output)讀寫通常是單向的。通道可以非阻塞讀取和寫入通道,通道可以支持讀取或?qū)懭刖彌_區(qū),也支持異步地讀寫。
Selector(選擇器)
Selector是一個ava NIO組件,可以能夠檢查一個或多個NIO通道,并確定哪些通道已經(jīng)準備好進行讀取或?qū)懭搿_@樣,一個單獨的線程可以管理多個channel,從而管理多個網(wǎng)絡(luò)連接,提高效率
- 每個channel都會對應(yīng)一個 Buffer
- 一個線程對應(yīng)Selector ,一個Selector對應(yīng)多個channel(連接)程序
- 切換到哪個channel是由事件決定的
- Selector 會根據(jù)不同的事件,在各個通道上切換
- Buffer 就是一個內(nèi)存塊,底層是一個數(shù)組
- 數(shù)據(jù)的讀取寫入是通過 Buffer完成的,BlO中要么是輸入流,或者是輸出流,不能雙向,但是NIO的Buffer是可以讀也可以寫。
- Java NIO系統(tǒng)的核心在于:通道(Channel)和緩沖區(qū)(Buffer)。通道表示打開到lO設(shè)備(例如:文件、套接字)的連接。若需要使用NIO系統(tǒng),需要獲取用于連接IO設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū)。然后操作緩沖區(qū),對數(shù)據(jù)進行處理。簡而言之,Channel負責(zé)傳輸,Buffer負責(zé)存取數(shù)據(jù)
四、NIO核心一:緩存區(qū) (Buffer)
緩沖區(qū)(Buffer)一個用于特定基本數(shù)據(jù)類型的容器。由 java.nio 包定義的,所有緩沖區(qū) 都是 Buffer 抽象類的子類.。Java NIO 中的 Buffer 主要用于與 NIO 通道進行 交互,數(shù)據(jù)是從通道讀入緩沖區(qū),從緩沖區(qū)寫入通道中的
Buffer 類及其子類:
Buffer就像一個數(shù)組,可以保存多個相同類型的數(shù)據(jù)。根據(jù) 數(shù)據(jù)類型不同 ,有以下 Buffer 常用子類:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
上述 Buffer 類他們都采用相似的方法進行管理數(shù)據(jù),只是各自 管理的數(shù)據(jù)類型不同而已。都是通過如下方法獲取一個 Buffer 對象:
static XxxBuffer allocate(int capacity) : 創(chuàng)建一個容量為capacity 的 XxxBuffer 對象
緩沖區(qū)的基本屬性 Buffer 中的重要概念:
容量 (capacity) :作為一個內(nèi)存塊,Buffer具有一定的固定大小, 也稱為"容量",緩沖區(qū)容量不能為負,并且創(chuàng)建后不能更改。
限制 (limit):表示緩沖區(qū)中可以操作數(shù)據(jù)的大小 (limit 后數(shù)據(jù)不能進行讀寫)。緩沖區(qū)的限制不能 為負,并且不能大于其容量。 寫入模式,限制等于 buffer的容量。讀取模式下,limit等于寫入的數(shù)據(jù)量。
位置 (position):下一個要讀取或?qū)懭氲臄?shù)據(jù)的索引。 緩沖區(qū)的位置不能為 負,并且不能大于其限制
標記 (mark)與重置 (reset):標記是一個索引, 通過 Buffer 中的 mark() 方法 指定 Buffer 中一個 特定的 position,之后可以通過調(diào)用 reset() 方法恢 復(fù)到這 個 position.
標記、位置、限制、容量遵守以下不變式: 0 <= mark <= position <= limit <= capacity
Buffer常見方法:
- Buffer clear() :清空緩沖區(qū)并返回對緩沖區(qū)的引用
- Buffer flip() :為 將緩沖區(qū)的界限設(shè)置為當前位置, 并將當前位置重置為 0
- int capacity() :返回 Buffer 的 capacity 大小
- boolean hasRemaining(): 判斷緩沖區(qū)中是否還有元素
- int limit() :返回 Buffer 的界限(limit) 的位置
- Buffer limit(int n) 將設(shè)置緩沖區(qū)界限為 n, 并返回一個具有新 limit 的緩沖區(qū)對象
- Buffer mark(): 對緩沖區(qū)設(shè)置標記
- int position() :返回緩沖區(qū)的當前位置 position
- Buffer position(int n) :將設(shè)置緩沖區(qū)的當前位置為 n, 并返回修改后的 Buffer 對象
- int remaining() :返回 position 和 limit 之間的元素個數(shù)
- Buffer reset() :將位置 position 轉(zhuǎn)到以前設(shè)置的mark 所在的位置
- Buffer rewind() :將位置設(shè)為為 0, 取消設(shè)置的 mark
緩沖區(qū)的數(shù)據(jù)操作 Buffer 所有子類提供了兩個用于數(shù)據(jù)操作的方法:
- get() :讀取單個字節(jié)
- get(byte[] dst):批量讀取多個字節(jié)到 dst 中
- get(int index):讀取指定索引位置的字節(jié)(不會移動 position)放到入數(shù)據(jù)到Buffer中
- put(byte b):將給定單個字節(jié)寫入緩沖區(qū)的當前位置
- put(byte[] src):將 src 中的字節(jié)寫入緩沖區(qū)的當前位置
- put(int index, byte b):將指定字節(jié)寫入緩沖區(qū)的索引 位置(不會移動 position)
使用Buffer讀寫數(shù)據(jù)一般遵循以下四個步驟:
- 寫入數(shù)據(jù)到Buffer
- 調(diào)用flip()方法,轉(zhuǎn)換為讀取模式
- 從Buffer中讀取數(shù)據(jù)
- 調(diào)用buffer.clear()方法或者buffer.compact()方 法清除緩沖區(qū)
package com.kgf.kgfjavalearning2021.io.nio; import org.junit.Test; import java.nio.ByteBuffer; /*** * Buffer測試類 */ public class TestBuffer { @Test public void test1(){ //1. 分配一個指定大小的緩沖區(qū) ByteBuffer buf = ByteBuffer.allocate(1024); System.out.println("-----------------allocate()----------------"); System.out.println(buf.position());// 0: 表示當前的位置為0 System.out.println(buf.limit());// 1024: 表示界限為1024,前1024個位置是允許我們讀寫的 System.out.println(buf.capacity());//1024:表示容量大小為1024 //2. 利用 put() 存入數(shù)據(jù)到緩沖區(qū)中 System.out.println("-----------------put()----------------"); String str = "itheima"; buf.put(str.getBytes()); System.out.println(buf.position());// 7表示下一個可以寫入的位置是7,因為我們寫入的字節(jié)是7個,從0開始已經(jīng)寫了7個,位置為8的position為7 System.out.println(buf.limit());// 1024:表示界限為1024,前1024個位置是允許我們讀寫的 System.out.println(buf.capacity());//1024:表示容量大小為1024 //3. 切換讀取數(shù)據(jù)模式 System.out.println("-----------------flip()----------------"); buf.flip(); System.out.println(buf.position());// 0: 讀取的起始位置為0 System.out.println(buf.limit());// 7: 表示界限為7,前7個位置有數(shù)據(jù)可以讀取 System.out.println(buf.capacity());// 1024:表示容量大小為1024 //4. 利用 get() 讀取緩沖區(qū)中的數(shù)據(jù) System.out.println("-----------------get()----------------"); byte[] dst = new byte[buf.limit()];//創(chuàng)建一個界限為limit大小的字節(jié)數(shù)組 buf.get(dst);//批量將limit大小的字節(jié)寫入到dst字節(jié)數(shù)組中 System.out.println(new String(dst, 0, dst.length));//結(jié)果為itheima System.out.println(buf.position());//7: 讀取的位置變?yōu)?,因為前面的7個字節(jié)數(shù)據(jù)已經(jīng)全部讀取出去,下一個可讀取的位置為7,從0開始的 System.out.println(buf.limit());//7: 可讀取的界限大小為7 System.out.println(buf.capacity());// 1024: 表示容量大小為1024 //5. rewind() : 可重復(fù)讀 System.out.println("-----------------rewind()----------------"); buf.rewind();// 將位置設(shè)為為 0,從頭開始讀取 System.out.println(buf.position());// 0 System.out.println(buf.limit());// 7 System.out.println(buf.capacity());// 1024 //6. clear() : 清空緩沖區(qū). 但是緩沖區(qū)中的數(shù)據(jù)依然存在,但是處于“被遺忘”狀態(tài) System.out.println("-----------------clear()----------------"); buf.clear(); System.out.println(buf.position());// 0 System.out.println(buf.limit());// 1024 System.out.println(buf.capacity());// 1024 System.out.println((char)buf.get());//i } @Test public void test2(){ String str = "itheima"; ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(str.getBytes());// 將str寫入到buf緩沖區(qū)中 buf.flip();//轉(zhuǎn)換為讀模式 byte[] dst = new byte[buf.limit()];//定義一個字節(jié)數(shù)組 buf.get(dst, 0, 2);//將前2個字節(jié)批量寫入到dst字節(jié)數(shù)組中 System.out.println(new String(dst, 0, 2));//打印結(jié)果為it System.out.println(buf.position());//當前下一個讀取的位置為2 //mark() : 標記 buf.mark(); buf.get(dst, 2, 2);//從第3個位置開始將2個字節(jié)批量寫入到dst字節(jié)數(shù)組中 System.out.println(new String(dst, 2, 2));//打印結(jié)果為he System.out.println(buf.position());// 當前下一個讀取的位置為4 //reset() : 恢復(fù)到 mark 的位置 buf.reset(); System.out.println(buf.position());// 2 //判斷緩沖區(qū)中是否還有剩余數(shù)據(jù) if(buf.hasRemaining()){ //獲取緩沖區(qū)中可以操作的數(shù)量 System.out.println(buf.remaining());// 5: 返回 position 和 limit 之間的元素個數(shù) } } @Test public void test3(){ //分配直接緩沖區(qū) ByteBuffer buf = ByteBuffer.allocateDirect(1024); System.out.println(buf.isDirect()); } }
直接與非直接緩沖區(qū):
byte byffer可以是兩種類型,一種是基于直接內(nèi)存(也就是非堆內(nèi)存);另一種是非直接內(nèi)存(也就是堆內(nèi)存)。對于直接內(nèi)存來說,JVM將會在IO操作上具有更高的性能,因為它
直接作用于本地系統(tǒng)的IO操作。而非直接內(nèi)存,也就是堆內(nèi)存中的數(shù)據(jù),如果要作IO操作,會先從本進程內(nèi)存復(fù)制到直接內(nèi)存,再利用本地IO處理。從數(shù)據(jù)流的角度,非直接內(nèi)存是下面這樣的作用鏈:
本地IO-->直接內(nèi)存-->非直接內(nèi)存-->直接內(nèi)存-->本地IO
而直接內(nèi)存是:
本地IO-->直接內(nèi)存-->本地IO
很明顯,在做IO處理時,比如網(wǎng)絡(luò)發(fā)送大量數(shù)據(jù)時,直接內(nèi)存會具有更高的效率。直接內(nèi)存使用allocateDirect創(chuàng)建,但是它比申請普通的堆內(nèi)存需要耗費更高的性能。不過,這部分的數(shù)據(jù)是在JVM之外的,因此它不會占用應(yīng)用的內(nèi)存。所以呢,當你有很大的數(shù)據(jù)要緩存,并且它的生命周期又很長,那么就比較適合使用直接內(nèi)存。只是一般來說,如果不是能帶來很明顯的性能提升,還是推薦直接使用堆內(nèi)存。字節(jié)緩沖區(qū)是直接緩沖區(qū)還是非直接緩沖區(qū)可通過調(diào)用其 isDirect() 方法來確定。
使用場景
- 有很大的數(shù)據(jù)需要存儲,它的生命周期又很長
- 適合頻繁的IO操作,比如網(wǎng)絡(luò)并發(fā)場景
五、NIO核心二:通道(Channel)
1、通道Channe概述2、NIO 的通道類似于流,但有些區(qū)別如下:
通道可以同時進行讀寫,而流只能讀或者只能寫
通道可以實現(xiàn)異步讀寫數(shù)據(jù)
通道可以從緩沖讀數(shù)據(jù),也可以寫數(shù)據(jù)到緩沖:
通道可以同時進行讀寫,而流只能讀或者只能寫
通道可以實現(xiàn)異步讀寫數(shù)據(jù)
通道可以從緩沖讀數(shù)據(jù),也可以寫數(shù)據(jù)到緩沖:
3、BIO 中的 stream 是單向的,例如 FileInputStream 對象只能進行讀取數(shù)據(jù)的操作,而 NIO 中的通道(Channel)是雙向的,可以讀操作,也可以寫操作。
public interface Channel extends Closeable{}
5、常用的Channel實現(xiàn)類
- FileChannel:用于讀取、寫入、映射和操作文件的通道。
- DatagramChannel:通過 UDP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)通道。
- SocketChannel:通過 TCP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)。
- ServerSocketChannel:可以監(jiān)聽新進來的 TCP 連接,對每一個新進來的連接都會創(chuàng)建一個 SocketChannel。 【ServerSocketChanne 類似 ServerSocket , SocketChannel 類似 Socket】
6、FileChannel 類獲取通道的一種方式是對支持通道的對象調(diào)用getChannel() 方法。支持通道的類如下
- FileInputStream
- FileOutputStream
- RandomAccessFile
- DatagramSocket
- Socket
- ServerSocket
- 獲取通道的其他方式是使用 Files 類的靜態(tài)方法 newByteChannel() 獲取字節(jié)通道?;蛘咄ㄟ^通道的靜態(tài)方法 open() 打開并返回指定通道
7、FileChannel常用方法
- int read(ByteBuffer dst) :從Channel 到 中讀取數(shù)據(jù)到 ByteBuffer
- long read(ByteBuffer[] dsts) : 將Channel中的數(shù)據(jù)“分散”到 ByteBuffer[]
- int write(ByteBuffer src) :將 ByteBuffer中的數(shù)據(jù)寫入到 Channel
- long write(ByteBuffer[] srcs) :將 ByteBuffer[] 到 中的數(shù)據(jù)“聚集”到 Channel
- long position() :返回此通道的文件位置
- FileChannel position(long p) :設(shè)置此通道的文件位置
- long size() :返回此通道的文件的當前大小
- FileChannel truncate(long s) :將此通道的文件截取為給定大小
- void force(boolean metaData) :強制將所有對此通道的文件更新寫入到存儲設(shè)備中
8、案例1-本地文件寫數(shù)據(jù)
package com.kgf.kgfjavalearning2021.io.nio; import org.junit.Test; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /*** * 需求:使用前面學(xué)習(xí)后的 ByteBuffer(緩沖)和 FileChannel(通道), 將數(shù)據(jù)寫入到 data.txt 中. */ public class ChannelTest { @Test public void write(){ try { // 1、字節(jié)輸出流通向目標文件 FileOutputStream fos = new FileOutputStream("E:\\test\\data01.txt"); // 2、得到字節(jié)輸出流對應(yīng)的通道Channel FileChannel channel = fos.getChannel(); // 3、分配緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); for (int i = 0; i < 10; i++) { buffer.clear();//清空緩沖區(qū) buffer.put(("hello,使用Buffer和channel實現(xiàn)寫數(shù)據(jù)到文件中"+i+"\r\n").getBytes()); // 4、把緩沖區(qū)切換成寫出模式 buffer.flip(); channel.write(buffer);//將緩沖區(qū)的數(shù)據(jù)寫入到文件通道 } channel.close(); System.out.println("寫數(shù)據(jù)到文件中!"); } catch (Exception e) { e.printStackTrace(); } } }
9、案例2-本地文件讀數(shù)據(jù)
/*** * 設(shè)置兩個緩沖區(qū),一大一小,大的緩沖區(qū)為每次讀取的量,小的緩沖區(qū)存放每行的數(shù)據(jù)(確保大小可存放文本中最長的那行)。讀取的時候判斷是不是換行符13,是的話則返回一行數(shù)據(jù),不是的話繼續(xù)讀取,直到讀完文件。 * @throws Exception */ @Test public void read() throws Exception { // 1、定義一個文件字節(jié)輸入流與源文件接通 FileInputStream is = new FileInputStream("E:\\test\\data01.txt"); // 2、需要得到文件字節(jié)輸入流的文件通道 FileChannel channel = is.getChannel(); // 3、定義一個緩沖區(qū) int bufferSize = 1024 * 1024; // 每一塊的大小 ByteBuffer buffer = ByteBuffer.allocate(bufferSize); ByteBuffer bb = ByteBuffer.allocate(1024); // 4、讀取數(shù)據(jù)到緩沖區(qū) int bytesRead = channel.read(buffer); while (bytesRead != -1) { buffer.flip();// 切換模式,寫->讀 while (buffer.hasRemaining()) {//返回 position 和 limit 之間的元素個數(shù) byte b = buffer.get(); if (b == 10 || b == 13) { // 換行或回車 bb.flip(); // 這里就是一個行 final String line = Charset.forName("utf-8").decode(bb).toString(); System.out.println(line);// 解碼已經(jīng)讀到的一行所對應(yīng)的字節(jié) bb.clear(); } else { if (bb.hasRemaining()) bb.put(b); else { // 空間不夠擴容 bb = reAllocate(bb); bb.put(b); } } } buffer.clear();// 清空,position位置為0,limit=capacity // 繼續(xù)往buffer中寫 bytesRead = channel.read(buffer); } channel.close(); }
10、案例3-使用Buffer完成文件復(fù)制
/** * 使用 FileChannel(通道) ,完成文件的拷貝。 * @throws Exception */ @Test public void copy() throws Exception { // 源文件 File srcFile = new File("E:\\test\\Aurora-4k.jpg"); File destFile = new File("E:\\test\\Aurora-4k-new.jpg"); // 得到一個字節(jié)字節(jié)輸入流 FileInputStream fis = new FileInputStream(srcFile); // 得到一個字節(jié)輸出流 FileOutputStream fos = new FileOutputStream(destFile); // 得到的是文件通道 FileChannel isChannel = fis.getChannel(); FileChannel osChannel = fos.getChannel(); // 分配緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); while(isChannel.read(buffer)>0){ // 已經(jīng)讀取了數(shù)據(jù) ,把緩沖區(qū)的模式切換成可讀模式 buffer.flip(); // 把數(shù)據(jù)寫出到 osChannel.write(buffer);//將buffer緩沖區(qū)中的數(shù)據(jù)寫入到osChannel中 // 必須先清空緩沖然后再寫入數(shù)據(jù)到緩沖區(qū) buffer.clear(); } isChannel.close(); osChannel.close(); System.out.println("復(fù)制完成!"); }
11、案例4-transferFrom()從目標通道中去復(fù)制原通道數(shù)據(jù)
@Test public void test02() throws Exception { // 1、字節(jié)輸入管道 FileInputStream is = new FileInputStream("E:\\test\\Aurora-4k.jpg"); FileChannel isChannel = is.getChannel(); // 2、字節(jié)輸出流管道 FileOutputStream fos = new FileOutputStream("E:\\test\\Aurora-4knew3.jpg"); FileChannel osChannel = fos.getChannel(); // 3、復(fù)制 osChannel.transferFrom(isChannel,isChannel.position(),isChannel.size()); isChannel.close(); osChannel.close(); }
12、案例5-transferTo()把原通道數(shù)據(jù)復(fù)制到目標通道
@Test public void test03() throws Exception { // 1、字節(jié)輸入管道 FileInputStream is = new FileInputStream("E:\\test\\Aurora-4k.jpg"); FileChannel isChannel = is.getChannel(); // 2、字節(jié)輸出流管道 FileOutputStream fos = new FileOutputStream("E:\\test\\Aurora-4knew4.jpg"); FileChannel osChannel = fos.getChannel(); // 3、復(fù)制 isChannel.transferTo(isChannel.position() , isChannel.size() , osChannel); isChannel.close(); osChannel.close(); }
13、案例6-分散 (Scatter) 和聚集 (Gather) 分散讀取
- 分散讀取(Scatter ):是指把Channel通道的數(shù)據(jù)讀入到 多個緩沖區(qū)中去
- 聚集寫入(Gathering )是指將多個 Buffer 中的數(shù) 據(jù)“聚集”到 Channel。
//分散和聚集 @Test public void test() throws IOException{ RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw"); //1. 獲取通道 FileChannel channel1 = raf1.getChannel(); //2. 分配指定大小的緩沖區(qū) ByteBuffer buf1 = ByteBuffer.allocate(100); ByteBuffer buf2 = ByteBuffer.allocate(1024); //3. 分散讀取 ByteBuffer[] bufs = {buf1, buf2}; channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(), 0, bufs[0].limit())); System.out.println("-----------------"); System.out.println(new String(bufs[1].array(), 0, bufs[1].limit())); //4. 聚集寫入 RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw"); FileChannel channel2 = raf2.getChannel(); channel2.write(bufs); }
六、NIO核心三:選擇器(Selector)
1、選擇器(Selector)概述選擇器(Selector)是SelectableChannle對象的多路復(fù)用器,Selector可以同時監(jiān)控多個SelectableChannel的IO狀況,也就是說,利用Selector可使一個單獨的線程管理多個Channel。Selector是非阻塞IO的核心。
- Java 的 NIO,用非阻塞的 IO 方式??梢杂靡粋€線程,處理多個的客戶端連接,就會使用到 Selector(選擇器)
- Selector 能夠檢測多個注冊的通道上是否有事件發(fā)生(注意:多個 Channel 以事件的方式可以注冊到同一個(Selector),如果有事件發(fā)生,便獲取事件然后針對每個事件進行相應(yīng)的處理。這樣就可以只用一個單線程去管
- 理多個通道,也就是管理多個連接和請求。
- 只有在連接/通道真正有讀寫事件發(fā)生時,才會進行讀寫,就大大地減少了系統(tǒng)開銷,并且不必為每個連接都創(chuàng)建一個線程,不用去維護多個線程
- 避免了多線程之間的上下文切換導(dǎo)致的開銷
2、選擇器的應(yīng)用創(chuàng)建 Selector :通過調(diào)用 Selector.open() 方法創(chuàng)建一個 Selector。
Selector selector = Selector.open();
向選擇器注冊通道:SelectableChannel.register(Selector sel, int ops)
//1. 獲取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2. 切換非阻塞模式 ssChannel.configureBlocking(false); //3. 綁定連接 ssChannel.bind(new InetSocketAddress(9898)); //4. 獲取選擇器 Selector selector = Selector.open(); //5. 將通道注冊到選擇器上, 并且指定“監(jiān)聽接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT);
當調(diào)用 register(Selector sel, int ops) 將通道注冊選擇器時,選擇器對通道的監(jiān)聽事件,需要通過第二個參數(shù) ops 指定??梢员O(jiān)聽的事件類型(用 可使用 SelectionKey 的四個常量 表示):
- 讀 : SelectionKey.OP_READ (1)
- 寫 : SelectionKey.OP_WRITE (4)
- 連接 : SelectionKey.OP_CONNECT (8)
- 接收 : SelectionKey.OP_ACCEPT (16)
若注冊時不止監(jiān)聽一個事件,則可以使用“位或”操作符連接。
3、NIO非阻塞式網(wǎng)絡(luò)通信原理分析 3.1、Selector 示意圖和特點說明
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITESelector可以實現(xiàn): 一個 I/O 線程可以并發(fā)處理 N 個客戶端連接和讀寫操作,這從根本上解決了傳統(tǒng)同步阻塞 I/O 一連接一線程模型,架構(gòu)的性能、彈性伸縮能力和可靠性都得到了極大的提升。
3.2、服務(wù)端流程
1)、獲取通道。當客戶端連接服務(wù)端時,服務(wù)端會通過 ServerSocketChannel 得到 SocketChannel:
ServerSocketChannel ssChannel = ServerSocketChannel.open();
2)、切換非阻塞模式
ssChannel.configureBlocking(false);
3)、綁定連接
ssChannel.bind(new InetSocketAddress(8888));
4)、獲取選擇器
Selector selector = Selector.open();
5)、將通道注冊到選擇器上, 并且指定“監(jiān)聽接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
6)、輪詢式的獲取選擇器上已經(jīng)“準備就緒”的事件
while (selector.select() > 0){ System.out.println("開啟事件處理"); //7.獲取選擇器中所有注冊的通道中已準備好的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //8.開始遍歷事件 while (it.hasNext()){ SelectionKey selectionKey = it.next(); System.out.println("--->"+selectionKey); //9.判斷這個事件具體是啥 if (selectionKey.isAcceptable()){ //10.獲取當前接入事件的客戶端通道 SocketChannel socketChannel = serverSocketChannel.accept(); //11.切換成非阻塞模式 socketChannel.configureBlocking(false); //12.將本客戶端注冊到選擇器 socketChannel.register(selector,SelectionKey.OP_READ); }else if (selectionKey.isReadable()){ //13.獲取當前選擇器上的讀 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //14.讀取 ByteBuffer buffer = ByteBuffer.allocate(1024); int len; while ((len = socketChannel.read(buffer)) > 0){ buffer.flip(); System.out.println(new String(buffer.array(),0,len)); //清除之前的數(shù)據(jù)(覆蓋寫入) buffer.clear(); } } //15.處理完畢后,移除當前事件 it.remove(); } }
3.3、客戶端流程1)、獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
2)、切換非阻塞模式
sChannel.configureBlocking(false);
3)、分配指定大小的緩沖區(qū)?????
ByteBuffer buffer = ByteBuffer.allocate(1024);
4)、發(fā)送數(shù)據(jù)給綁定的服務(wù)端
Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.nextLine(); buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis()) + "\n" + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } //關(guān)閉通道 sChannel.close();
4、NIO非阻塞式網(wǎng)絡(luò)通信入門案例需求:服務(wù)端接收客戶端的連接請求,并接收多個客戶端發(fā)送過來的事件。
Server端代碼實現(xiàn):
package nio.ss; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class Server { public static void main(String[] args) { try { //1.獲取管道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2.設(shè)置非阻塞模式 serverSocketChannel.configureBlocking(false); //3.綁定端口 serverSocketChannel.bind(new InetSocketAddress(8888)); //4.獲取選擇器 Selector selector = Selector.open(); //5.將通道注冊到選擇器上,并且開始指定監(jiān)聽的接收事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //6.輪詢已經(jīng)就緒的事件 while (selector.select() > 0){ System.out.println("開啟事件處理"); //7.獲取選擇器中所有注冊的通道中已準備好的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //8.開始遍歷事件 while (it.hasNext()){ SelectionKey selectionKey = it.next(); System.out.println("--->"+selectionKey); //9.判斷這個事件具體是啥 if (selectionKey.isAcceptable()){ //10.獲取當前接入事件的客戶端通道 SocketChannel socketChannel = serverSocketChannel.accept(); //11.切換成非阻塞模式 socketChannel.configureBlocking(false); //12.將本客戶端注冊到選擇器 socketChannel.register(selector,SelectionKey.OP_READ); }else if (selectionKey.isReadable()){ //13.獲取當前選擇器上的讀 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //14.讀取 ByteBuffer buffer = ByteBuffer.allocate(1024); int len; while ((len = socketChannel.read(buffer)) > 0){ buffer.flip(); System.out.println(new String(buffer.array(),0,len)); //清除之前的數(shù)據(jù)(覆蓋寫入) buffer.clear(); } } //15.處理完畢后,移除當前事件 it.remove(); } } } catch (IOException e) { e.printStackTrace(); } } }
Client端代碼實現(xiàn):
package nio.ss; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner; public class Client { public static void main(String[] args) { try { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8888)); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while (true){ System.out.print("請輸入:"); String msg = scanner.nextLine(); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } } }
5、NIO 網(wǎng)絡(luò)編程應(yīng)用實例-群聊系統(tǒng)需求:進一步理解 NIO 非阻塞網(wǎng)絡(luò)編程機制,實現(xiàn)多人群聊
- 編寫一個 NIO 群聊系統(tǒng),實現(xiàn)客戶端與客戶端的通信需求(非阻塞)
- 服務(wù)器端:可以監(jiān)測用戶上線,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)功能
- 客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有客戶端用戶,同時可以接受其它客戶端用戶通過服務(wù)端轉(zhuǎn)發(fā)來的消息
服務(wù)端代碼:
package nio.chat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; /** * */ public class Server { //定義屬性 private Selector selector; private ServerSocketChannel ssChannel; private static final int PORT = 9999; //構(gòu)造器 //初始化工作 public Server() { try { // 1、獲取通道 ssChannel = ServerSocketChannel.open(); // 2、切換為非阻塞模式 ssChannel.configureBlocking(false); // 3、綁定連接的端口 ssChannel.bind(new InetSocketAddress(PORT)); // 4、獲取選擇器Selector selector = Selector.open(); // 5、將通道都注冊到選擇器上去,并且開始指定監(jiān)聽接收事件 ssChannel.register(selector , SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } //監(jiān)聽 public void listen() { System.out.println("監(jiān)聽線程:" + Thread.currentThread().getName()); try { while (selector.select() > 0){ // 7、獲取選擇器中的所有注冊的通道中已經(jīng)就緒好的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); // 8、開始遍歷這些準備好的事件 while (it.hasNext()){ // 提取當前這個事件 SelectionKey sk = it.next(); // 9、判斷這個事件具體是什么 if(sk.isAcceptable()){ // 10、直接獲取當前接入的客戶端通道 SocketChannel schannel = ssChannel.accept(); // 11 、切換成非阻塞模式 schannel.configureBlocking(false); // 12、將本客戶端通道注冊到選擇器 System.out.println(schannel.getRemoteAddress() + " 上線 "); schannel.register(selector , SelectionKey.OP_READ); //提示 }else if(sk.isReadable()){ //處理讀 (專門寫方法..) readData(sk); } it.remove(); // 處理完畢之后需要移除當前事件 } } }catch (Exception e) { e.printStackTrace(); }finally { //發(fā)生異常處理.... } } //讀取客戶端消息 private void readData(SelectionKey key) { //獲取關(guān)聯(lián)的channel SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //創(chuàng)建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根據(jù)count的值做處理 if(count > 0) { //把緩存區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串 String msg = new String(buffer.array()); //輸出該消息 System.out.println("來自客戶端---> " + msg); //向其它的客戶端轉(zhuǎn)發(fā)消息(去掉自己), 專門寫一個方法來處理 sendInfoToOtherClients(msg, channel); } }catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + " 離線了.."); e.printStackTrace(); //取消注冊 key.cancel(); //關(guān)閉通道 channel.close(); }catch (IOException e2) { e2.printStackTrace();; } } } //轉(zhuǎn)發(fā)消息給其它客戶(通道) private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{ System.out.println("服務(wù)器轉(zhuǎn)發(fā)消息中..."); System.out.println("服務(wù)器轉(zhuǎn)發(fā)數(shù)據(jù)給客戶端線程: " + Thread.currentThread().getName()); //遍歷 所有注冊到selector 上的 SocketChannel,并排除 self for(SelectionKey key: selector.keys()) { //通過 key 取出對應(yīng)的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self) { //轉(zhuǎn)型 SocketChannel dest = (SocketChannel)targetChannel; //將msg 存儲到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //將buffer 的數(shù)據(jù)寫入 通道 dest.write(buffer); } } } public static void main(String[] args) { //創(chuàng)建服務(wù)器對象 Server groupChatServer = new Server(); groupChatServer.listen(); } }
客戶端代碼:
package nio.chat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class Client { //定義相關(guān)的屬性 private final String HOST = "127.0.0.1"; // 服務(wù)器的ip private final int PORT = 9999; //服務(wù)器端口 private Selector selector; private SocketChannel socketChannel; private String username; //構(gòu)造器, 完成初始化工作 public Client() throws IOException { selector = Selector.open(); //連接服務(wù)器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //設(shè)置非阻塞 socketChannel.configureBlocking(false); //將channel 注冊到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); } //向服務(wù)器發(fā)送消息 public void sendInfo(String info) { info = username + " 說:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); }catch (IOException e) { e.printStackTrace(); } } //讀取從服務(wù)器端回復(fù)的消息 public void readInfo() { try { int readChannels = selector.select(); if(readChannels > 0) {//有可以用的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isReadable()) { //得到相關(guān)的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一個Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取 sc.read(buffer); //把讀到的緩沖區(qū)的數(shù)據(jù)轉(zhuǎn)成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //刪除當前的selectionKey, 防止重復(fù)操作 } else { //System.out.println("沒有可以用的通道..."); } }catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //啟動我們客戶端 Client chatClient = new Client(); //啟動一個線程, 每個3秒,讀取從服務(wù)器發(fā)送數(shù)據(jù) new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //發(fā)送數(shù)據(jù)給服務(wù)器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }
七、AIO 深入剖析
Java AIO(NIO.2) : 異步非阻塞,服務(wù)器實現(xiàn)模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動線程進行處理。
AIO:異步非阻塞,基于NIO的,可以稱之為NIO2.0 BIO NIO AIO Socket SocketChannel AsynchronousSocketChannel ServerSocket ServerSocketChannel AsynchronousServerSocketChannel
與NIO不同,當進行讀寫操作時,只須直接調(diào)用API的read或write方法即可, 這兩種方法均為異步的,對于讀操作而言,當有流可讀取時,操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū),對于寫操作而言,當操作系統(tǒng)將write方法傳遞的流寫入完畢時,操作系統(tǒng)主動通知應(yīng)用程序
即可以理解為,read/write方法都是異步的,完成后會主動調(diào)用回調(diào)函數(shù)。在JDK1.7中,這部分內(nèi)容被稱作NIO.2,主要在Java.nio.channels包下增加了下面四個異步通道:
- AsynchronousSocketChannel
- ? AsynchronousServerSocketChannel
- ? AsynchronousFileChannel
- ? AsynchronousDatagramChannel
八、總結(jié)
BIO、NIO、AIO:
- Java BIO : 同步并阻塞,服務(wù)器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務(wù)器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善。
- Java NIO : 同步非阻塞,服務(wù)器實現(xiàn)模式為一個請求一個線程,即客戶端發(fā)送的連接請求都會注冊到多路復(fù)用器上,多路復(fù)用器輪詢到連接有I/O請求時才啟動一個線程進行處理。
- Java AIO(NIO.2) : 異步非阻塞,服務(wù)器實現(xiàn)模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動線程進行處理。
BIO、NIO、AIO適用場景分析:
- BIO方式適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對服務(wù)器資源要求比較高,并發(fā)局限于應(yīng)用中,JDK1.4以前的唯一選擇,但程序直觀簡單易理解。
- NIO方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器,并發(fā)局限于應(yīng)用中,編程比較復(fù)雜,JDK1.4開始支持。
- AIO方式使用于連接數(shù)目多且連接比較長(重操作)的架構(gòu),比如相冊服務(wù)器,充分調(diào)用OS參與并發(fā)操作,編程比較復(fù)雜,JDK7開始支持。Netty!
到此這篇關(guān)于Java之NIO簡介的文章就介紹到這了,更多相關(guān)java NIO內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring-boot整合Micrometer+Prometheus的詳細過程
這篇文章主要介紹了springboot整合Micrometer+Prometheus的詳細過程,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-05-05Java使用jni清屏功能的實現(xiàn)(只針對cmd)
JNI是Java Native Interface的縮寫,它提供了若干的API實現(xiàn)了Java和其他語言的通信(主要是C&C++)。這篇文章主要介紹了Java使用jni清屏功能的實現(xiàn)(只針對cmd) ,感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05Java實現(xiàn)調(diào)用接口API并返回數(shù)據(jù)
這篇文章主要介紹了Java實現(xiàn)調(diào)用接口API并返回數(shù)據(jù)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05