欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

探索Java I/O 模型的演進(jìn)

 更新時(shí)間:2019年06月13日 10:57:25   作者:Way Lau  
什么是同步?什么是異步?阻塞和非阻塞又有什么區(qū)別?本文先從 Unix 的 I/O 模型講起,介紹了5種常見的 I/O 模型。而后再引出 Java 的 I/O 模型的演進(jìn)過程,并用實(shí)例說明如何選擇合適的 Java I/O 模型來提高系統(tǒng)的并發(fā)量和可用性。,需要的朋友可以參考下

相關(guān)概念

同步和異步

描述的是用戶線程與內(nèi)核的交互方式:

  • 同步是指用戶線程發(fā)起 I/O 請(qǐng)求后需要等待或者輪詢內(nèi)核 I/O 操作完成后才能繼續(xù)執(zhí)行;
  • 異步是指用戶線程發(fā)起 I/O 請(qǐng)求后仍繼續(xù)執(zhí)行,當(dāng)內(nèi)核 I/O 操作完成后會(huì)通知用戶線程,或者調(diào)用用戶線程注冊(cè)的回調(diào)函數(shù)。

阻塞和非阻塞

描述的是用戶線程調(diào)用內(nèi)核 I/O 操作的方式:

  • 阻塞是指 I/O 操作需要徹底完成后才返回到用戶空間;
  • 非阻塞是指 I/O 操作被調(diào)用后立即返回給用戶一個(gè)狀態(tài)值,無需等到 I/O 操作徹底完成。

一個(gè) I/O 操作其實(shí)分成了兩個(gè)步驟:發(fā)起 I/O 請(qǐng)求和實(shí)際的 I/O 操作。 阻塞 I/O 和非阻塞 I/O 的區(qū)別在于第一步,發(fā)起 I/O 請(qǐng)求是否會(huì)被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞 I/O ,如果不阻塞,那么就是非阻塞 I/O 。 同步 I/O 和異步 I/O 的區(qū)別就在于第二個(gè)步驟是否阻塞,如果實(shí)際的 I/O 讀寫阻塞請(qǐng)求進(jìn)程,那么就是同步 I/O 。

Unix I/O 模型

Unix 下共有五種 I/O 模型:

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 復(fù)用(select 和 poll)
  • 信號(hào)驅(qū)動(dòng) I/O(SIGIO)
  • 異步 I/O(POSIX 的 aio_系列函數(shù))

阻塞 I/O

請(qǐng)求無法立即完成則保持阻塞。

階段1:等待數(shù)據(jù)就緒。網(wǎng)絡(luò) I/O 的情況就是等待遠(yuǎn)端數(shù)據(jù)陸續(xù)抵達(dá);磁盤I/O的情況就是等待磁盤數(shù)據(jù)從磁盤上讀取到內(nèi)核態(tài)內(nèi)存中。
階段2:數(shù)據(jù)從內(nèi)核拷貝到進(jìn)程。出于系統(tǒng)安全,用戶態(tài)的程序沒有權(quán)限直接讀取內(nèi)核態(tài)內(nèi)存,因此內(nèi)核負(fù)責(zé)把內(nèi)核態(tài)內(nèi)存中的數(shù)據(jù)拷貝一份到用戶態(tài)內(nèi)存中。

非阻塞 I/O

  • socket 設(shè)置為 NONBLOCK(非阻塞)就是告訴內(nèi)核,當(dāng)所請(qǐng)求的 I/O 操作無法完成時(shí),不要將進(jìn)程睡眠,而是返回一個(gè)錯(cuò)誤碼(EWOULDBLOCK) ,這樣請(qǐng)求就不會(huì)阻塞
  • I/O 操作函數(shù)將不斷的測(cè)試數(shù)據(jù)是否已經(jīng)準(zhǔn)備好,如果沒有準(zhǔn)備好,繼續(xù)測(cè)試,直到數(shù)據(jù)準(zhǔn)備好為止。整個(gè) I/O 請(qǐng)求的過程中,雖然用戶線程每次發(fā)起 I/O 請(qǐng)求后可以立即返回,但是為了等到數(shù)據(jù),仍需要不斷地輪詢、重復(fù)請(qǐng)求,消耗了大量的 CPU 的資源
  • 數(shù)據(jù)準(zhǔn)備好了,從內(nèi)核拷貝到用戶空間。

一般很少直接使用這種模型,而是在其他 I/O 模型中使用非阻塞 I/O 這一特性。這種方式對(duì)單個(gè) I/O 請(qǐng)求意義不大,但給 I/O 多路復(fù)用鋪平了道路.

I/O 復(fù)用(異步阻塞 I/O)

I/O 多路復(fù)用會(huì)用到 select 或者 poll 函數(shù),這兩個(gè)函數(shù)也會(huì)使進(jìn)程阻塞,但是和阻塞 I/O 所不同的的,這兩個(gè)函數(shù)可以同時(shí)阻塞多個(gè) I/O 操作。而且可以同時(shí)對(duì)多個(gè)讀操作,多個(gè)寫操作的 I/O 函數(shù)進(jìn)行檢測(cè),直到有數(shù)據(jù)可讀或可寫時(shí),才真正調(diào)用 I/O 操作函數(shù)。

從流程上來看,使用 select 函數(shù)進(jìn)行 I/O 請(qǐng)求和同步阻塞模型沒有太大的區(qū)別,甚至還多了添加監(jiān)視 socket,以及調(diào)用 select 函數(shù)的額外操作,效率更差。但是,使用 select 以后最大的優(yōu)勢(shì)是用戶可以在一個(gè)線程內(nèi)同時(shí)處理多個(gè) socket 的 I/O 請(qǐng)求。用戶可以注冊(cè)多個(gè) socket,然后不斷地調(diào)用 select 讀取被激活的 socket,即可達(dá)到在同一個(gè)線程內(nèi)同時(shí)處理多個(gè) I/O 請(qǐng)求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達(dá)到這個(gè)目的。

I/O 多路復(fù)用模型使用了 Reactor 設(shè)計(jì)模式實(shí)現(xiàn)了這一機(jī)制。

調(diào)用 select / poll 該方法由一個(gè)用戶態(tài)線程負(fù)責(zé)輪詢多個(gè) socket,直到某個(gè)階段1的數(shù)據(jù)就緒,再通知實(shí)際的用戶線程執(zhí)行階段2的拷貝。 通過一個(gè)專職的用戶態(tài)線程執(zhí)行非阻塞I/O輪詢,模擬實(shí)現(xiàn)了階段一的異步化

信號(hào)驅(qū)動(dòng) I/O(SIGIO)

首先我們?cè)试S socket 進(jìn)行信號(hào)驅(qū)動(dòng) I/O,并安裝一個(gè)信號(hào)處理函數(shù),進(jìn)程繼續(xù)運(yùn)行并不阻塞。當(dāng)數(shù)據(jù)準(zhǔn)備好時(shí),進(jìn)程會(huì)收到一個(gè) SIGIO 信號(hào),可以在信號(hào)處理函數(shù)中調(diào)用 I/O 操作函數(shù)處理數(shù)據(jù)。

異步 I/O

調(diào)用 aio_read 函數(shù),告訴內(nèi)核描述字,緩沖區(qū)指針,緩沖區(qū)大小,文件偏移以及通知的方式,然后立即返回。當(dāng)內(nèi)核將數(shù)據(jù)拷貝到緩沖區(qū)后,再通知應(yīng)用程序。

異步 I/O 模型使用了 Proactor 設(shè)計(jì)模式實(shí)現(xiàn)了這一機(jī)制。

告知內(nèi)核,當(dāng)整個(gè)過程(包括階段1和階段2)全部完成時(shí),通知應(yīng)用程序來讀數(shù)據(jù).

幾種 I/O 模型的比較

前四種模型的區(qū)別是階段1不相同,階段2基本相同,都是將數(shù)據(jù)從內(nèi)核拷貝到調(diào)用者的緩沖區(qū)。而異步 I/O 的兩個(gè)階段都不同于前四個(gè)模型。

同步 I/O 操作引起請(qǐng)求進(jìn)程阻塞,直到 I/O 操作完成。異步 I/O 操作不引起請(qǐng)求進(jìn)程阻塞。

常見 Java I/O 模型

在了解了 UNIX 的 I/O 模型之后,其實(shí) Java 的 I/O 模型也是類似。

“阻塞I/O”模式

在上一節(jié) Socket 章節(jié)中的 EchoServer 就是一個(gè)簡(jiǎn)單的阻塞 I/O 例子,服務(wù)器啟動(dòng)后,等待客戶端連接。在客戶端連接服務(wù)器后,服務(wù)器就阻塞讀寫取數(shù)據(jù)流。

EchoServer 代碼:

public class EchoServer {
public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
try (
ServerSocket serverSocket =
new ServerSocket(port);
Socket clientSocket = serverSocket.accept(); 
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true); 
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port "
+ port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

改進(jìn)為“阻塞I/O+多線程”模式

使用多線程來支持多個(gè)客戶端來訪問服務(wù)器。

主線程 MultiThreadEchoServer.java

public class MultiThreadEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// MultiThread
new Thread(new EchoServerHandler(clientSocket)).start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

處理器類 EchoServerHandler.java

public class EchoServerHandler implements Runnable {
private Socket clientSocket;
public EchoServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}

存在問題:每次接收到新的連接都要新建一個(gè)線程,處理完成后銷毀線程,代價(jià)大。當(dāng)有大量地短連接出現(xiàn)時(shí),性能比較低。

改進(jìn)為“阻塞I/O+線程池”模式

針對(duì)上面多線程的模型中,出現(xiàn)的線程重復(fù)創(chuàng)建、銷毀帶來的開銷,可以采用線程池來優(yōu)化。每次接收到新連接后從池中取一個(gè)空閑線程進(jìn)行處理,處理完成后再放回池中,重用線程避免了頻率地創(chuàng)建和銷毀線程帶來的開銷。

主線程 ThreadPoolEchoServer.java

public class ThreadPoolEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// Thread Pool
threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

存在問題:在大量短連接的場(chǎng)景中性能會(huì)有提升,因?yàn)椴挥妹看味紕?chuàng)建和銷毀線程,而是重用連接池中的線程。但在大量長(zhǎng)連接的場(chǎng)景中,因?yàn)榫€程被連接長(zhǎng)期占用,不需要頻繁地創(chuàng)建和銷毀線程,因而沒有什么優(yōu)勢(shì)。

雖然這種方法可以適用于小到中度規(guī)模的客戶端的并發(fā)數(shù),如果連接數(shù)超過 100,000或更多,那么性能將很不理想。

改進(jìn)為“非阻塞I/O”模式

“阻塞I/O+線程池”網(wǎng)絡(luò)模型雖然比”阻塞I/O+多線程”網(wǎng)絡(luò)模型在性能方面有提升,但這兩種模型都存在一個(gè)共同的問題:讀和寫操作都是同步阻塞的,面對(duì)大并發(fā)(持續(xù)大量連接同時(shí)請(qǐng)求)的場(chǎng)景,需要消耗大量的線程來維持連接。CPU 在大量的線程之間頻繁切換,性能損耗很大。一旦單機(jī)的連接超過1萬,甚至達(dá)到幾萬的時(shí)候,服務(wù)器的性能會(huì)急劇下降。

而 NIO 的 Selector 卻很好地解決了這個(gè)問題,用主線程(一個(gè)線程或者是 CPU 個(gè)數(shù)的線程)保持住所有的連接,管理和讀取客戶端連接的數(shù)據(jù),將讀取的數(shù)據(jù)交給后面的線程池處理,線程池處理完業(yè)務(wù)邏輯后,將結(jié)果交給主線程發(fā)送響應(yīng)給客戶端,少量的線程就可以處理大量連接的請(qǐng)求。

Java NIO 由以下幾個(gè)核心部分組成:

  • Channel
  • Buffer
  • Selector

要使用 Selector,得向 Selector 注冊(cè) Channel,然后調(diào)用它的 select()方法。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。一旦這個(gè)方法返回,線程就可以處理這些事件,事件的例子有如新連接進(jìn)來,數(shù)據(jù)接收等。

主線程 NonBlokingEchoServer.java

public class NonBlokingEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException ex) {
ex.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}

改進(jìn)為“異步I/O”模式

Java SE 7 版本之后,引入了異步 I/O (NIO.2) 的支持,為構(gòu)建高性能的網(wǎng)絡(luò)應(yīng)用提供了一個(gè)利器。

主線程 AsyncEchoServer.java

public class AsyncEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
// create asynchronous server socket channel bound to the default group
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) {
if (asynchronousServerSocketChannel.isOpen()) {
// set some options
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// bind the server socket channel to local address
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
// display a waiting message while ... waiting clients
System.out.println("Waiting for connections ...");
while (true) {
Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
.accept();
try {
final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
.get();
Callable<String> worker = new Callable<String>() {
@Override
public String call() throws Exception {
String host = asynchronousSocketChannel.getRemoteAddress().toString();
System.out.println("Incoming connection from: " + host);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// transmitting data
while (asynchronousSocketChannel.read(buffer).get() != -1) {
buffer.flip();
asynchronousSocketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
asynchronousSocketChannel.close();
System.out.println(host + " was successfully served!");
return host;
}
};
taskExecutor.submit(worker);
} catch (InterruptedException | ExecutionException ex) {
System.err.println(ex);
System.err.println("\n Server is shutting down ...");
// this will make the executor accept no new threads
// and finish all existing threads in the queue
taskExecutor.shutdown();
// wait until all threads are finished
while (!taskExecutor.isTerminated()) {
}
break;
}
}
} else {
System.out.println("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException ex) {
System.err.println(ex);
}
}
}

源碼

本章例子的源碼,可以在 https://github.com/waylau/essential-java 中 com.waylau.essentialjava.net.echo 包下找到。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java Swing 只關(guān)閉當(dāng)前窗體的實(shí)現(xiàn)

    Java Swing 只關(guān)閉當(dāng)前窗體的實(shí)現(xiàn)

    這篇文章主要介紹了Java Swing 只關(guān)閉當(dāng)前窗體的實(shí)現(xiàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • Java創(chuàng)建與結(jié)束線程代碼示例

    Java創(chuàng)建與結(jié)束線程代碼示例

    這篇文章主要介紹了Java創(chuàng)建與結(jié)束線程代碼示例,小編覺得挺不錯(cuò)的,這里分享給大家,供需要的朋友參考。
    2017-10-10
  • Springboot系列之kafka操作使用詳解

    Springboot系列之kafka操作使用詳解

    這篇文章主要為大家介紹了Springboot系列之kafka操作使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • Struts中使用validate()輸入校驗(yàn)方法詳解

    Struts中使用validate()輸入校驗(yàn)方法詳解

    這篇文章主要介紹了Struts中使用validate()輸入校驗(yàn)方法,本文介紹的非常詳細(xì),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧
    2016-09-09
  • Spring設(shè)計(jì)模式中代理模式詳細(xì)講解

    Spring設(shè)計(jì)模式中代理模式詳細(xì)講解

    如何實(shí)現(xiàn)在不修改源碼的基礎(chǔ)上實(shí)現(xiàn)代碼功能的增強(qiáng)呢?spring為我們提供了代理模式。所謂的代理模式通俗來說就是一個(gè)中介,它給某一個(gè)對(duì)象提供一個(gè)代理對(duì)象,并由代理對(duì)象控制原對(duì)象的引用,從而實(shí)現(xiàn)在不修改源碼的基礎(chǔ)上實(shí)現(xiàn)代碼功能的增強(qiáng)
    2023-01-01
  • Java設(shè)計(jì)模式中組合模式應(yīng)用詳解

    Java設(shè)計(jì)模式中組合模式應(yīng)用詳解

    組合模式,又叫部分整體模式,它創(chuàng)建了對(duì)象組的數(shù)據(jù)結(jié)構(gòu)組合模式使得用戶對(duì)單個(gè)對(duì)象和組合對(duì)象的訪問具有一致性。本文將通過示例為大家詳細(xì)介紹一下組合模式,需要的可以參考一下
    2022-11-11
  • IDEA解決Java:程序包xxxx不存在的問題

    IDEA解決Java:程序包xxxx不存在的問題

    這篇文章主要介紹了IDEA解決Java:程序包xxxx不存在的問題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • cmd中javac和java使用及注意事項(xiàng)詳解

    cmd中javac和java使用及注意事項(xiàng)詳解

    這篇文章主要介紹了cmd中javac和java使用及注意事項(xiàng)詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-07-07
  • 設(shè)置Myeclipse中的代碼格式化、注釋模板及保存時(shí)自動(dòng)格式化

    設(shè)置Myeclipse中的代碼格式化、注釋模板及保存時(shí)自動(dòng)格式化

    這篇文章主要介紹了設(shè)置Myeclipse中的代碼格式化、注釋模板及保存時(shí)自動(dòng)格式化方法,需要的朋友可以參考下
    2014-10-10
  • 基于Java實(shí)現(xiàn)修改圖片分辨率示例代碼

    基于Java實(shí)現(xiàn)修改圖片分辨率示例代碼

    這篇文章主要介紹了一個(gè)可以修改圖片分辨率的java工具類,文中的示例代碼講解詳細(xì),對(duì)學(xué)習(xí)JAVA有一定的幫助,感興趣的小伙伴快來跟隨小編一起學(xué)習(xí)吧
    2021-12-12

最新評(píng)論