基于BIO的Java Socket通信詳解
BIO,即阻塞IO,在基于Socket的消息通信過程中,Socket服務(wù)端向外部提供服務(wù),而Socket客戶端可以建立到Socket服務(wù)端的連接,進(jìn)而發(fā)送請(qǐng)求數(shù)據(jù),然后等待Socket服務(wù)端處理,并返回處理結(jié)果(響應(yīng))。
基于BIO的通信,Socket服務(wù)端會(huì)發(fā)生阻塞,即在監(jiān)聽過程中每次accept到一個(gè)客戶端的Socket連接,就要處理這個(gè)請(qǐng)求,而此時(shí)其他連接過來的客戶端只能阻塞等待??梢?,這種模式下Socket服務(wù)端的處理能力是非常有限的,客戶端也只能等待,直到服務(wù)端空閑時(shí)進(jìn)行請(qǐng)求的處理。
BIO通信實(shí)現(xiàn)
下面基于BIO模式,來實(shí)現(xiàn)一個(gè)簡單的Socket服務(wù)端與Socket客戶端進(jìn)行通信的邏輯,對(duì)這種通信方式有一個(gè)感性的認(rèn)識(shí)。具體邏輯描述如下:
1、Socket客戶端連接到Socket服務(wù)端,并發(fā)送數(shù)據(jù)“I am the client N.”;
2、Socket服務(wù)端,監(jiān)聽服務(wù)端口,并接收客戶端請(qǐng)求數(shù)據(jù),如果請(qǐng)求數(shù)據(jù)以“I am the client”開頭,則響應(yīng)客戶端“I am the server, and you are the Nth client.”;
Socket服務(wù)端實(shí)現(xiàn),代碼如下所示:
package org.shirdrn.java.communications.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 基于BIO的Socket服務(wù)器端
*
* @author shirdrn
*/
public class SimpleBioTcpServer extends Thread {
/** 服務(wù)端口號(hào) */
private int port = 8888;
/** 為客戶端分配編號(hào) */
private static int sequence = 0;
public SimpleBioTcpServer(int port) {
this.port = port;
}
@Override
public void run() {
Socket socket = null;
try {
ServerSocket serverSocket = new ServerSocket(this.port);
while(true) {
socket = serverSocket.accept(); // 監(jiān)聽
this.handleMessage(socket); // 處理一個(gè)連接過來的客戶端請(qǐng)求
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 處理一個(gè)客戶端socket連接
* @param socket 客戶端socket
* @throws IOException
*/
private void handleMessage(Socket socket) throws IOException {
InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀)
OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫)
int receiveBytes;
byte[] receiveBuffer = new byte[128];
String clientMessage = "";
if((receiveBytes=in.read(receiveBuffer))!=-1) {
clientMessage = new String(receiveBuffer, 0, receiveBytes);
if(clientMessage.startsWith("I am the client")) {
String serverResponseWords =
"I am the server, and you are the " + (++sequence) + "th client.";
out.write(serverResponseWords.getBytes());
}
}
out.flush();
System.out.println("Server: receives clientMessage->" + clientMessage);
}
public static void main(String[] args) {
SimpleBioTcpServer server = new SimpleBioTcpServer(1983);
server.start();
}
}
上述實(shí)現(xiàn),沒有進(jìn)行復(fù)雜的異常處理。
Socket客戶端實(shí)現(xiàn),代碼如下所示:
package org.shirdrn.java.communications.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;
/**
* 基于BIO的Socket客戶端
*
* @author shirdrn
*/
public class SimpleBioTcpClient {
private String ipAddress;
private int port;
private static int pos = 0;
public SimpleBioTcpClient() {}
public SimpleBioTcpClient(String ipAddress, int port) {
this.ipAddress = ipAddress;
this.port = port;
}
/**
* 連接Socket服務(wù)端,并模擬發(fā)送請(qǐng)求數(shù)據(jù)
* @param data 請(qǐng)求數(shù)據(jù)
*/
public void send(byte[] data) {
Socket socket = null;
OutputStream out = null;
InputStream in = null;
try {
socket = new Socket(this.ipAddress, this.port); // 連接
// 發(fā)送請(qǐng)求
out = socket.getOutputStream();
out.write(data);
out.flush();
// 接收響應(yīng)
in = socket.getInputStream();
int totalBytes = 0;
int receiveBytes = 0;
byte[] receiveBuffer = new byte[128];
if((receiveBytes=in.read(receiveBuffer))!=-1) {
totalBytes += receiveBytes;
}
String serverMessage = new String(receiveBuffer, 0, receiveBytes);
System.out.println("Client: receives serverMessage->" + serverMessage);
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 發(fā)送請(qǐng)求并接收到響應(yīng),通信完成,關(guān)閉連接
out.close();
in.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int n = 1;
StringBuffer data = new StringBuffer();
Date start = new Date();
for(int i=0; i<n; i++) {
data.delete(0, data.length());
data.append("I am the client ").append(++pos).append(".");
SimpleBioTcpClient client = new SimpleBioTcpClient("localhost", 1983);
client.send(data.toString().getBytes());
}
Date end = new Date();
long cost = end.getTime() - start.getTime();
System.out.println(n + " requests cost " + cost + " ms.");
}
}
首先啟動(dòng)Socket服務(wù)端進(jìn)程SimpleBioTcpServer,然后再運(yùn)行Socket客戶端SimpleBioTcpClient??梢钥吹?,服務(wù)端接收到請(qǐng)求數(shù)據(jù),然后響應(yīng)客戶端,客戶端接收到了服務(wù)端的響應(yīng)數(shù)據(jù)。
上述實(shí)現(xiàn)中,對(duì)于Socket客戶端和服務(wù)端都是一次寫入,并一次讀出,而在實(shí)際中如果每次通信過程中數(shù)據(jù)量特別大的話,服務(wù)器端是不可能接受的,可以在確定客戶端請(qǐng)求數(shù)據(jù)字節(jié)數(shù)的情況,循環(huán)來讀取并進(jìn)行處理。
另外,對(duì)于上述實(shí)現(xiàn)中流沒有進(jìn)行裝飾(Wrapped)處理,在實(shí)際中會(huì)有性能的損失,如不能緩沖等。
對(duì)于Socket服務(wù)端接收數(shù)據(jù),如果可以使多次循環(huán)讀取到的字節(jié)數(shù)據(jù)通過一個(gè)可變長的字節(jié)緩沖區(qū)來存儲(chǔ),就能方便多了,可是使用ByteArrayOutputStream,例如:
ByteArrayOutputStream data = new ByteArrayOutputStream(); data.write(receiveBuffer, totalBytes , totalBytes + receiveBytes);
BIO通信測試
下面測試一下大量請(qǐng)求的場景下,Socket服務(wù)端處理的效率。
第一種方式:通過for循環(huán)來啟動(dòng)5000個(gè)Socket客戶端,發(fā)送請(qǐng)求,代碼如下所示:
public static void main(String[] args) {
int n = 5000;
StringBuffer data = new StringBuffer();
Date start = new Date();
for(int i=0; i<n; i++) {
data.delete(0, data.length());
data.append("I am the client ").append(++pos).append(".");
SimpleBioTcpClient client = new SimpleBioTcpClient("localhost", 1983);
client.send(data.toString().getBytes());
}
Date end = new Date();
long cost = end.getTime() - start.getTime();
System.out.println(n + " requests cost " + cost + " ms.");
}
經(jīng)過測試,大約需要9864ms,大概接近10s。
第二種方式:通過啟動(dòng)5000個(gè)獨(dú)立的客戶端線程,同時(shí)請(qǐng)求,服務(wù)端進(jìn)行計(jì)數(shù):
package org.shirdrn.java.communications.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;
/**
* 基于BIO的Socket通信測試
*
* @author shirdrn
*/
public class SimpleBioTcpTest {
static int threadCount = 5000;
/**
* 基于BIO的Socket服務(wù)端進(jìn)程
*
* @author shirdrn
*/
static class SocketServer extends Thread {
/** 服務(wù)端口號(hào) */
private int port = 8888;
/** 為客戶端分配編號(hào) */
private static int sequence = 0;
public SocketServer(int port) {
this.port = port;
}
@Override
public void run() {
Socket socket = null;
int counter = 0;
try {
ServerSocket serverSocket = new ServerSocket(this.port);
boolean flag = false;
Date start = null;
while(true) {
socket = serverSocket.accept(); // 監(jiān)聽
// 有請(qǐng)求到來才開始計(jì)時(shí)
if(!flag) {
start = new Date();
flag = true;
}
this.handleMessage(socket); // 處理一個(gè)連接過來的客戶端請(qǐng)求
if(++counter==threadCount) {
Date end = new Date();
long last = end.getTime() - start.getTime();
System.out.println(threadCount + " requests cost " + last + " ms.");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 處理一個(gè)客戶端socket連接
* @param socket 客戶端socket
* @throws IOException
*/
private void handleMessage(Socket socket) throws IOException {
InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀)
OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫)
int receiveBytes;
byte[] receiveBuffer = new byte[128];
String clientMessage = "";
if((receiveBytes=in.read(receiveBuffer))!=-1) {
clientMessage = new String(receiveBuffer, 0, receiveBytes);
if(clientMessage.startsWith("I am the client")) {
String serverResponseWords =
"I am the server, and you are the " + (++sequence) + "th client.";
out.write(serverResponseWords.getBytes());
}
}
out.flush();
System.out.println("Server: receives clientMessage->" + clientMessage);
}
}
/**
* 基于BIO的Socket客戶端線程
*
* @author shirdrn
*/
static class SocketClient implements Runnable {
private String ipAddress;
private int port;
/** 待發(fā)送的請(qǐng)求數(shù)據(jù) */
private String data;
public SocketClient(String ipAddress, int port) {
this.ipAddress = ipAddress;
this.port = port;
}
@Override
public void run() {
this.send();
}
/**
* 連接Socket服務(wù)端,并模擬發(fā)送請(qǐng)求數(shù)據(jù)
*/
public void send() {
Socket socket = null;
OutputStream out = null;
InputStream in = null;
try {
socket = new Socket(this.ipAddress, this.port); // 連接
// 發(fā)送請(qǐng)求
out = socket.getOutputStream();
out.write(data.getBytes());
out.flush();
// 接收響應(yīng)
in = socket.getInputStream();
int totalBytes = 0;
int receiveBytes = 0;
byte[] receiveBuffer = new byte[128];
if((receiveBytes=in.read(receiveBuffer))!=-1) {
totalBytes += receiveBytes;
}
String serverMessage = new String(receiveBuffer, 0, receiveBytes);
System.out.println("Client: receives serverMessage->" + serverMessage);
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 發(fā)送請(qǐng)求并接收到響應(yīng),通信完成,關(guān)閉連接
out.close();
in.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void setData(String data) {
this.data = data;
}
}
public static void main(String[] args) throws Exception {
SocketServer server = new SocketServer(1983);
server.start();
Thread.sleep(3000);
for(int i=0; i<threadCount; i++) {
SocketClient client = new SocketClient("localhost", 1983);
client.setData("I am the client " + (i+1) + ".");
new Thread(client).start();
Thread.sleep(0, 1);
}
}
}
經(jīng)過測試,大約需要7110ms,大概接近7s,沒有太大提高。
BIO通信改進(jìn)
通過上面的測試我們可以發(fā)現(xiàn),在Socket服務(wù)端對(duì)來自客戶端的請(qǐng)求進(jìn)行處理時(shí),會(huì)發(fā)生阻塞,嚴(yán)重地影響了能夠并發(fā)處理請(qǐng)求的效率。實(shí)際上,在Socket服務(wù)端接收來自客戶端連接能力的范圍內(nèi),可以將接收請(qǐng)求獨(dú)立出來,從而在將處理請(qǐng)求獨(dú)立粗話來,通過一個(gè)請(qǐng)求一個(gè)線程處理的方式來解決上述問題。這樣,服務(wù)端是多處理線程對(duì)應(yīng)客戶端多請(qǐng)求,處理效率有一定程度的提高。
下面,通過單線程接收請(qǐng)求,然后委派線程池進(jìn)行多線程并發(fā)處理請(qǐng)求:
/**
* 基于BIO的Socket服務(wù)端進(jìn)程
*
* @author shirdrn
*/
static class SocketServer extends Thread {
/** 服務(wù)端口號(hào) */
private int port = 8888;
/** 為客戶端分配編號(hào) */
private static int sequence = 0;
/** 處理客戶端請(qǐng)求的線程池 */
private ExecutorService pool;
public SocketServer(int port, int poolSize) {
this.port = port;
this.pool = Executors.newFixedThreadPool(poolSize);
}
@Override
public void run() {
Socket socket = null;
int counter = 0;
try {
ServerSocket serverSocket = new ServerSocket(this.port);
boolean flag = false;
Date start = null;
while(true) {
socket = serverSocket.accept(); // 監(jiān)聽
// 有請(qǐng)求到來才開始計(jì)時(shí)
if(!flag) {
start = new Date();
flag = true;
}
// 將客戶端請(qǐng)求放入線程池處理
pool.execute(new RequestHandler(socket));
if(++counter==threadCount) {
Date end = new Date();
long last = end.getTime() - start.getTime();
System.out.println(threadCount + " requests cost " + last + " ms.");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 客戶端請(qǐng)求處理線程類
*
* @author shirdrn
*/
class RequestHandler implements Runnable {
private Socket socket;
public RequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream in = socket.getInputStream(); // 流:客戶端->服務(wù)端(讀)
OutputStream out = socket.getOutputStream(); // 流:服務(wù)端->客戶端(寫)
int receiveBytes;
byte[] receiveBuffer = new byte[128];
String clientMessage = "";
if((receiveBytes=in.read(receiveBuffer))!=-1) {
clientMessage = new String(receiveBuffer, 0, receiveBytes);
if(clientMessage.startsWith("I am the client")) {
String serverResponseWords =
"I am the server, and you are the " + (++sequence) + "th client.";
out.write(serverResponseWords.getBytes());
}
}
out.flush();
System.out.println("Server: receives clientMessage->" + clientMessage);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
可見,這種改進(jìn)方式增強(qiáng)服務(wù)端處理請(qǐng)求的并發(fā)度,但是每一個(gè)請(qǐng)求都要由一個(gè)線程去處理,大量請(qǐng)求造成服務(wù)端啟動(dòng)大量進(jìn)程進(jìn)行處理,也是比較占用服務(wù)端資源的。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java創(chuàng)建和啟動(dòng)線程的兩種方式實(shí)例分析
這篇文章主要介紹了Java創(chuàng)建和啟動(dòng)線程的兩種方式,結(jié)合實(shí)例形式分析了java多線程創(chuàng)建、使用相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下2019-09-09
Spring Boot構(gòu)建系統(tǒng)安全層的步驟
這篇文章主要介紹了Spring Boot構(gòu)建系統(tǒng)安全層的步驟,幫助大家更好的理解和學(xué)習(xí)使用Spring Boot框架,感興趣的朋友可以了解下2021-04-04
java多線程編程之慎重使用volatile關(guān)鍵字
volatile關(guān)鍵字相信了解Java多線程的讀者都很清楚它的作用。volatile關(guān)鍵字用于聲明簡單類型變量,下面看一下為什么要慎重使用volatile關(guān)鍵字2014-01-01

