基于BIO的Java Socket通信詳解
BIO,即阻塞IO,在基于Socket的消息通信過程中,Socket服務(wù)端向外部提供服務(wù),而Socket客戶端可以建立到Socket服務(wù)端的連接,進而發(fā)送請求數(shù)據(jù),然后等待Socket服務(wù)端處理,并返回處理結(jié)果(響應(yīng))。
基于BIO的通信,Socket服務(wù)端會發(fā)生阻塞,即在監(jiān)聽過程中每次accept到一個客戶端的Socket連接,就要處理這個請求,而此時其他連接過來的客戶端只能阻塞等待??梢?,這種模式下Socket服務(wù)端的處理能力是非常有限的,客戶端也只能等待,直到服務(wù)端空閑時進行請求的處理。
BIO通信實現(xiàn)
下面基于BIO模式,來實現(xiàn)一個簡單的Socket服務(wù)端與Socket客戶端進行通信的邏輯,對這種通信方式有一個感性的認識。具體邏輯描述如下:
1、Socket客戶端連接到Socket服務(wù)端,并發(fā)送數(shù)據(jù)“I am the client N.”;
2、Socket服務(wù)端,監(jiān)聽服務(wù)端口,并接收客戶端請求數(shù)據(jù),如果請求數(shù)據(jù)以“I am the client”開頭,則響應(yīng)客戶端“I am the server, and you are the Nth client.”;
Socket服務(wù)端實現(xiàn),代碼如下所示:
package org.shirdrn.java.communications.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; /** * 基于BIO的Socket服務(wù)器端 * * @author shirdrn */ public class SimpleBioTcpServer extends Thread { /** 服務(wù)端口號 */ private int port = 8888; /** 為客戶端分配編號 */ private static int sequence = 0; public SimpleBioTcpServer(int port) { this.port = port; } @Override public void run() { Socket socket = null; try { ServerSocket serverSocket = new ServerSocket(this.port); while(true) { socket = serverSocket.accept(); // 監(jiān)聽 this.handleMessage(socket); // 處理一個連接過來的客戶端請求 } } catch (IOException e) { e.printStackTrace(); } } /** * 處理一個客戶端socket連接 * @param socket 客戶端socket * @throws IOException */ private void handleMessage(Socket socket) throws IOException { InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀) OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫) int receiveBytes; byte[] receiveBuffer = new byte[128]; String clientMessage = ""; if((receiveBytes=in.read(receiveBuffer))!=-1) { clientMessage = new String(receiveBuffer, 0, receiveBytes); if(clientMessage.startsWith("I am the client")) { String serverResponseWords = "I am the server, and you are the " + (++sequence) + "th client."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server: receives clientMessage->" + clientMessage); } public static void main(String[] args) { SimpleBioTcpServer server = new SimpleBioTcpServer(1983); server.start(); } }
上述實現(xiàn),沒有進行復(fù)雜的異常處理。
Socket客戶端實現(xiàn),代碼如下所示:
package org.shirdrn.java.communications.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; /** * 基于BIO的Socket客戶端 * * @author shirdrn */ public class SimpleBioTcpClient { private String ipAddress; private int port; private static int pos = 0; public SimpleBioTcpClient() {} public SimpleBioTcpClient(String ipAddress, int port) { this.ipAddress = ipAddress; this.port = port; } /** * 連接Socket服務(wù)端,并模擬發(fā)送請求數(shù)據(jù) * @param data 請求數(shù)據(jù) */ public void send(byte[] data) { Socket socket = null; OutputStream out = null; InputStream in = null; try { socket = new Socket(this.ipAddress, this.port); // 連接 // 發(fā)送請求 out = socket.getOutputStream(); out.write(data); out.flush(); // 接收響應(yīng) in = socket.getInputStream(); int totalBytes = 0; int receiveBytes = 0; byte[] receiveBuffer = new byte[128]; if((receiveBytes=in.read(receiveBuffer))!=-1) { totalBytes += receiveBytes; } String serverMessage = new String(receiveBuffer, 0, receiveBytes); System.out.println("Client: receives serverMessage->" + serverMessage); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { try { // 發(fā)送請求并接收到響應(yīng),通信完成,關(guān)閉連接 out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { int n = 1; StringBuffer data = new StringBuffer(); Date start = new Date(); for(int i=0; i<n; i++) { data.delete(0, data.length()); data.append("I am the client ").append(++pos).append("."); SimpleBioTcpClient client = new SimpleBioTcpClient("localhost", 1983); client.send(data.toString().getBytes()); } Date end = new Date(); long cost = end.getTime() - start.getTime(); System.out.println(n + " requests cost " + cost + " ms."); } }
首先啟動Socket服務(wù)端進程SimpleBioTcpServer,然后再運行Socket客戶端SimpleBioTcpClient??梢钥吹?,服務(wù)端接收到請求數(shù)據(jù),然后響應(yīng)客戶端,客戶端接收到了服務(wù)端的響應(yīng)數(shù)據(jù)。
上述實現(xiàn)中,對于Socket客戶端和服務(wù)端都是一次寫入,并一次讀出,而在實際中如果每次通信過程中數(shù)據(jù)量特別大的話,服務(wù)器端是不可能接受的,可以在確定客戶端請求數(shù)據(jù)字節(jié)數(shù)的情況,循環(huán)來讀取并進行處理。
另外,對于上述實現(xiàn)中流沒有進行裝飾(Wrapped)處理,在實際中會有性能的損失,如不能緩沖等。
對于Socket服務(wù)端接收數(shù)據(jù),如果可以使多次循環(huán)讀取到的字節(jié)數(shù)據(jù)通過一個可變長的字節(jié)緩沖區(qū)來存儲,就能方便多了,可是使用ByteArrayOutputStream,例如:
ByteArrayOutputStream data = new ByteArrayOutputStream(); data.write(receiveBuffer, totalBytes , totalBytes + receiveBytes);
BIO通信測試
下面測試一下大量請求的場景下,Socket服務(wù)端處理的效率。
第一種方式:通過for循環(huán)來啟動5000個Socket客戶端,發(fā)送請求,代碼如下所示:
public static void main(String[] args) { int n = 5000; StringBuffer data = new StringBuffer(); Date start = new Date(); for(int i=0; i<n; i++) { data.delete(0, data.length()); data.append("I am the client ").append(++pos).append("."); SimpleBioTcpClient client = new SimpleBioTcpClient("localhost", 1983); client.send(data.toString().getBytes()); } Date end = new Date(); long cost = end.getTime() - start.getTime(); System.out.println(n + " requests cost " + cost + " ms."); }
經(jīng)過測試,大約需要9864ms,大概接近10s。
第二種方式:通過啟動5000個獨立的客戶端線程,同時請求,服務(wù)端進行計數(shù):
package org.shirdrn.java.communications.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; /** * 基于BIO的Socket通信測試 * * @author shirdrn */ public class SimpleBioTcpTest { static int threadCount = 5000; /** * 基于BIO的Socket服務(wù)端進程 * * @author shirdrn */ static class SocketServer extends Thread { /** 服務(wù)端口號 */ private int port = 8888; /** 為客戶端分配編號 */ private static int sequence = 0; public SocketServer(int port) { this.port = port; } @Override public void run() { Socket socket = null; int counter = 0; try { ServerSocket serverSocket = new ServerSocket(this.port); boolean flag = false; Date start = null; while(true) { socket = serverSocket.accept(); // 監(jiān)聽 // 有請求到來才開始計時 if(!flag) { start = new Date(); flag = true; } this.handleMessage(socket); // 處理一個連接過來的客戶端請求 if(++counter==threadCount) { Date end = new Date(); long last = end.getTime() - start.getTime(); System.out.println(threadCount + " requests cost " + last + " ms."); } } } catch (IOException e) { e.printStackTrace(); } } /** * 處理一個客戶端socket連接 * @param socket 客戶端socket * @throws IOException */ private void handleMessage(Socket socket) throws IOException { InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀) OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫) int receiveBytes; byte[] receiveBuffer = new byte[128]; String clientMessage = ""; if((receiveBytes=in.read(receiveBuffer))!=-1) { clientMessage = new String(receiveBuffer, 0, receiveBytes); if(clientMessage.startsWith("I am the client")) { String serverResponseWords = "I am the server, and you are the " + (++sequence) + "th client."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server: receives clientMessage->" + clientMessage); } } /** * 基于BIO的Socket客戶端線程 * * @author shirdrn */ static class SocketClient implements Runnable { private String ipAddress; private int port; /** 待發(fā)送的請求數(shù)據(jù) */ private String data; public SocketClient(String ipAddress, int port) { this.ipAddress = ipAddress; this.port = port; } @Override public void run() { this.send(); } /** * 連接Socket服務(wù)端,并模擬發(fā)送請求數(shù)據(jù) */ public void send() { Socket socket = null; OutputStream out = null; InputStream in = null; try { socket = new Socket(this.ipAddress, this.port); // 連接 // 發(fā)送請求 out = socket.getOutputStream(); out.write(data.getBytes()); out.flush(); // 接收響應(yīng) in = socket.getInputStream(); int totalBytes = 0; int receiveBytes = 0; byte[] receiveBuffer = new byte[128]; if((receiveBytes=in.read(receiveBuffer))!=-1) { totalBytes += receiveBytes; } String serverMessage = new String(receiveBuffer, 0, receiveBytes); System.out.println("Client: receives serverMessage->" + serverMessage); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { try { // 發(fā)送請求并接收到響應(yīng),通信完成,關(guān)閉連接 out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } public void setData(String data) { this.data = data; } } public static void main(String[] args) throws Exception { SocketServer server = new SocketServer(1983); server.start(); Thread.sleep(3000); for(int i=0; i<threadCount; i++) { SocketClient client = new SocketClient("localhost", 1983); client.setData("I am the client " + (i+1) + "."); new Thread(client).start(); Thread.sleep(0, 1); } } }
經(jīng)過測試,大約需要7110ms,大概接近7s,沒有太大提高。
BIO通信改進
通過上面的測試我們可以發(fā)現(xiàn),在Socket服務(wù)端對來自客戶端的請求進行處理時,會發(fā)生阻塞,嚴(yán)重地影響了能夠并發(fā)處理請求的效率。實際上,在Socket服務(wù)端接收來自客戶端連接能力的范圍內(nèi),可以將接收請求獨立出來,從而在將處理請求獨立粗話來,通過一個請求一個線程處理的方式來解決上述問題。這樣,服務(wù)端是多處理線程對應(yīng)客戶端多請求,處理效率有一定程度的提高。
下面,通過單線程接收請求,然后委派線程池進行多線程并發(fā)處理請求:
/** * 基于BIO的Socket服務(wù)端進程 * * @author shirdrn */ static class SocketServer extends Thread { /** 服務(wù)端口號 */ private int port = 8888; /** 為客戶端分配編號 */ private static int sequence = 0; /** 處理客戶端請求的線程池 */ private ExecutorService pool; public SocketServer(int port, int poolSize) { this.port = port; this.pool = Executors.newFixedThreadPool(poolSize); } @Override public void run() { Socket socket = null; int counter = 0; try { ServerSocket serverSocket = new ServerSocket(this.port); boolean flag = false; Date start = null; while(true) { socket = serverSocket.accept(); // 監(jiān)聽 // 有請求到來才開始計時 if(!flag) { start = new Date(); flag = true; } // 將客戶端請求放入線程池處理 pool.execute(new RequestHandler(socket)); if(++counter==threadCount) { Date end = new Date(); long last = end.getTime() - start.getTime(); System.out.println(threadCount + " requests cost " + last + " ms."); } } } catch (IOException e) { e.printStackTrace(); } } /** * 客戶端請求處理線程類 * * @author shirdrn */ class RequestHandler implements Runnable { private Socket socket; public RequestHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀) OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫) int receiveBytes; byte[] receiveBuffer = new byte[128]; String clientMessage = ""; if((receiveBytes=in.read(receiveBuffer))!=-1) { clientMessage = new String(receiveBuffer, 0, receiveBytes); if(clientMessage.startsWith("I am the client")) { String serverResponseWords = "I am the server, and you are the " + (++sequence) + "th client."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server: receives clientMessage->" + clientMessage); } catch (IOException e) { e.printStackTrace(); } } } }
可見,這種改進方式增強服務(wù)端處理請求的并發(fā)度,但是每一個請求都要由一個線程去處理,大量請求造成服務(wù)端啟動大量進程進行處理,也是比較占用服務(wù)端資源的。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Boot構(gòu)建系統(tǒng)安全層的步驟
這篇文章主要介紹了Spring Boot構(gòu)建系統(tǒng)安全層的步驟,幫助大家更好的理解和學(xué)習(xí)使用Spring Boot框架,感興趣的朋友可以了解下2021-04-04java多線程編程之慎重使用volatile關(guān)鍵字
volatile關(guān)鍵字相信了解Java多線程的讀者都很清楚它的作用。volatile關(guān)鍵字用于聲明簡單類型變量,下面看一下為什么要慎重使用volatile關(guān)鍵字2014-01-01