Java創(chuàng)建多線程服務(wù)器流程
一個典型的單線程服務(wù)器示例如下:
while (true) {
Socket socket = null;
try {
// 接收客戶連接
socket = serverSocket.accept();
// 從socket中獲得輸入流與輸出流,與客戶通信
...
} catch(IOException e) {
e.printStackTrace()
} finally {
try {
if(socket != null) {
// 斷開連接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}服務(wù)端接收到一個客戶連接,就與客戶進行通信,通信完畢后斷開連接,然后接收下一個客戶連接,假如同時有多個客戶連接請求這些客戶就必須排隊等候。如果長時間讓客戶等待,就會使網(wǎng)站失去信譽,從而降低訪問量。
一般用并發(fā)性能來衡量一個服務(wù)器同時響應(yīng)多個客戶的能力,一個具有好的并發(fā)性能的服務(wù)器,必須符合兩個條件:
- 能同時接收并處理多個客戶連接
- 對于每個客戶,都會迅速給予響應(yīng)
用多個線程來同時為多個客戶提供服務(wù),這是提高服務(wù)器并發(fā)性能的最常用的手段,一般有三種方式:
- 為每個客戶分配一個工作線程
- 創(chuàng)建一個線程池,由其中的工作線程來為客戶服務(wù)
- 利用 Java 類庫中現(xiàn)成的線程池,由它的工作線程來為客戶服務(wù)
為每個客戶分配一個線程
服務(wù)器的主線程負責(zé)接收客戶的連接,每次接收到一個客戶連接,都會創(chuàng)建一個工作線程,由它負責(zé)與客戶的通信
public class EchoServer {
private int port = 8000;
private ServerSocket serverSocket;
public EchoServer() throws IOException {
serverSocket = new ServerSocket(port);
System.out.println("服務(wù)器啟動");
}
public void service() {
while(true) {
Socket socket = null;
try {
// 接教客戶連接
socket = serverSocket.accept();
// 創(chuàng)建一個工作線程
Thread workThread = new Thread(new Handler(socket));
// 啟動工作線程
workThread.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws TOException {
new EchoServer().service();
}
// 負責(zé)與單個客戶的通信
class Handler implements Runnable {
private Socket socket;
pub1ic Handler(Socket socket) {
this.socket = socket;
}
private PrintWriter getWriter(Socket socket) throws IOException {...}
private BufferedReader getReader(Socket socket) throws IOException {...}
public String echo(String msg) {...}
public void run() {
try {
System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
// 接收和發(fā)送數(shù)據(jù),直到通信結(jié)束
while ((msg = br.readLine()) != null) {
System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg);
pw.println(echo(msg));
if (msg.equals("bye")) break;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
// 斷開連接
if(socket != nulll) socket.close();
} catch (IOException e) {
e,printStackTrace();
}
}
}
}
}創(chuàng)建線程池
上一種實現(xiàn)方式有以下不足之處:
- 服務(wù)器創(chuàng)建和銷毀工作線程的開銷很大,如果服務(wù)器需要與許多客戶通信,并且與每個客戶的通信時間都很短,那么有可能服務(wù)器為客戶創(chuàng)建新線程的開銷比實際與客戶通信的開銷還要大
- 除了創(chuàng)建和銷毀線程的開銷,活動的線程也消耗系統(tǒng)資源。每個線程都會占用一定的內(nèi)存,如果同時有大量客戶連接服務(wù)器,就必須創(chuàng)建大量工作線程,它們消耗了大量內(nèi)存,可能會導(dǎo)致系統(tǒng)的內(nèi)存空間不足
線程池中預(yù)先創(chuàng)建了一些工作線程,它們不斷地從工作隊列中取出任務(wù),然后執(zhí)行該任務(wù)。當(dāng)工作線程執(zhí)行完一個任務(wù),就會繼續(xù)執(zhí)行工作隊列中的下一個任務(wù)
線程池具有以下優(yōu)點:
- 減少了創(chuàng)建和銷毀線程的次數(shù),每個工作線程都可以一直被重用,能執(zhí)行多個任務(wù)
- 可以根據(jù)系統(tǒng)的承載能力,方便調(diào)整線程池中線程的數(shù)目,防止因為消耗過量系統(tǒng)資源而導(dǎo)致系統(tǒng)崩潰
public class ThreadPool extends ThreadGroup {
// 線程池是否關(guān)閉
private boolean isClosed = false;
// 表示工作隊列
private LinkedList<Runnable> workQueue;
// 表示線程池ID
private static int threadPoolID;
// 表示工作線程ID
// poolSize 指定線程池中的工作線程數(shù)目
public ThreadPool(int poolSize) {
super("ThreadPool-"+ (threadPoolID++));
setDaemon(true);
// 創(chuàng)建工作隊列
workQueue = new LinkedList<Runnable>();
for (int i = 0; i < poolSize; i++) {
// 創(chuàng)建并啟動工作線程
new WorkThread().start();
}
}
/**
* 向工作隊列中加入一個新任務(wù),由工作線程去執(zhí)行任務(wù)
*/
public synchronized void execute(Runnable tank) {
// 線程池被關(guān)則拋出IllegalStateException異常
if(isClosed) {
throw new IllegalStateException();
}
if(task != null) {
workQueue.add(task);
// 喚醒正在getTask()方法中等待任務(wù)的工作線限
notify();
}
}
/**
* 從工作隊列中取出一個任務(wù),工作線程會調(diào)用此方法
*/
protected synchronized Runnable getTask() throws InterruptedException {
while(workQueue,size() == 0) {
if (isClosed) return null;
wait(); // 如果工作隊列中沒有任務(wù),就等待任務(wù)
}
return workQueue.removeFirst();
}
/**
* 關(guān)閉線程池
*/
public synchronized void close() {
if(!isClosed) {
isClosed = true;
// 清空工作隊列
workQueue.clear();
// 中斷所有的工作線程,該方法繼承自ThreadGroup類
interrupt();
}
}
/**
* 等待工作線程把所有任務(wù)執(zhí)行完
*/
public void join() {
synchronized (this) {
isClosed = true;
// 喚醒還在getTask()方法中等待任務(wù)的工作線程
notifyAll();
}
Thread[] threads = new Thread[activeCount()];
// enumerate()方法繼承自ThreadGroup類獲得線程組中當(dāng)前所有活著的工作線程
int count = enumerate(threads);
// 等待所有工作線程運行結(jié)束
for(int i = 0; i < count; i++) {
try {
// 等待工作線程運行結(jié)束
threads[i].join();
} catch((InterruptedException ex) {}
}
}
/**
* 內(nèi)部類:工作線程
*/
private class WorkThread extends Thread {
public WorkThread() {
// 加入當(dāng)前 ThreadPool 線程組
super(ThreadPool.this, "WorkThread-" + (threadID++));
}
public void run() {
// isInterrupted()方法承自Thread類,判斷線程是否被中斷
while (!isInterrupted()) {
Runnable task = null;
try {
// 取出任務(wù)
task = getTask();
} catch(InterruptedException ex) {}
// 如果 getTask() 返回 nu11 或者線程執(zhí)行 getTask() 時被中斷,則結(jié)束此線程
if(task != null) return;
// 運行任務(wù),異常在catch代碼塊中被捕獲
try {
task.run();
} catch(Throwable t) {
t.printStackTrace();
}
}
}
}
}使用線程池實現(xiàn)的服務(wù)器如下:
publlc class EchoServer {
private int port = 8000;
private ServerSocket serverSocket;
private ThreadPool threadPool; // 線程港
private final int POOL_SIZE = 4; // 單個CPU時線程池中工作線程的數(shù)目
public EchoServer() throws IOException {
serverSocket = new ServerSocket(port);
// 創(chuàng)建線程池
// Runtime 的 availableProcessors() 方法返回當(dāng)前系統(tǒng)的CPU的數(shù)目
// 系統(tǒng)的CPU越多,線程池中工作線程的數(shù)目也越多
threadPool= new ThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("服務(wù)器啟動");
}
public void service() {
while (true) {
Socket socket = null;
try {
socket = serverSocket.accept();
// 把與客戶通信的任務(wù)交給線程池
threadPool.execute(new Handler(socket));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws TOException {
new EchoServer().service();
}
// 負責(zé)與單個客戶的通信,與上例類似
class Handler implements Runnable {...}
}使用 Java 提供的線程池
java.util.concurrent 包提供了現(xiàn)成的線程池的實現(xiàn),更加健壯,功能也更強大,更多關(guān)于線程池的介紹可以這篇文章
public class Echoserver {
private int port = 8000;
private ServerSocket serverSocket;
// 線程池
private ExecutorService executorService;
// 單個CPU時線程池中工作線程的數(shù)目
private final int POOL_SIZE = 4;
public EchoServer() throws IOException {
serverSocket = new ServerSocket(port);
// 創(chuàng)建線程池
// Runtime 的 availableProcessors() 方法返回當(dāng)前系統(tǒng)的CPU的數(shù)目
// 系統(tǒng)的CPU越多,線程池中工作線程的數(shù)目也越多
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("服務(wù)器啟動");
}
public void service() {
while(true) {
Socket socket = null;
try {
socket = serverSocket.accept();
executorService.execute(new Handler(socket));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws TOException {
new EchoServer().service();
}
// 負責(zé)與單個客戶的通信,與上例類似
class Handler implements Runnable {...}
}使用線程池的注意事項
雖然線程池能大大提高服務(wù)器的并發(fā)性能,但使用它也存在一定風(fēng)險,容易引發(fā)下面的問題:
- 死鎖
任何多線程應(yīng)用程序都有死鎖風(fēng)險。造成死鎖的最簡單的情形是:線程 A 持有對象 X 的鎖,并且在等待對象 Y 的鎖,而線程 B 持有對象 Y 的鎖,并且在等待對象 X 的鎖,線程 A 與線程 B 都不釋放自己持有的鎖,并且等待對方的鎖,這就導(dǎo)致兩個線程永遠等待下去,死鎖就這樣產(chǎn)生了
任何多線程程序都有死鎖的風(fēng)險,但線程池還會導(dǎo)致另外一種死鎖:假定線程池中的所有工作線程都在執(zhí)行各自任務(wù)時被阻塞,它們都在等待某個任務(wù) A 的執(zhí)行結(jié)果。而任務(wù) A 依然在工作隊列中,由于沒有空閑線程,使得任務(wù) A 一直不能被執(zhí)行。這使得線程池中的所有工作線程都永遠阻塞下去,死鎖就這樣產(chǎn)生了
- 系統(tǒng)資源不足
如果線程池中的線程數(shù)目非常多,這些線程就會消耗包括內(nèi)存和其他系統(tǒng)資源在內(nèi)的大量資源,從而嚴(yán)重影響系統(tǒng)性能
- 并發(fā)錯誤
線程池的工作隊列依靠 wait() 和 notify() 方法來使工作線程及時取得任務(wù),但這兩個方法都難以使用。如果編碼不正確,就可能會丟失通知,導(dǎo)致工作線程一直保持空閑狀態(tài),無視工作隊列中需要處理的任務(wù)
- 線程泄漏
對于工作線程數(shù)目固定的線程池,如果工作線程在執(zhí)行任務(wù)時拋出 RuntimeException 或 Error,并且這些異?;蝈e誤沒有被捕獲,那么這個工作線程就會異常終止,使得線程池永久地失去了一個工作線程。如果所有的工作線程都異常終止,線程池變?yōu)榭?,沒有任何可用的工作線程來處理任務(wù)
導(dǎo)致線程泄漏的另一種情形是,工作線程在執(zhí)行一個任務(wù)時被阻塞,比如等待用戶的輸入數(shù)據(jù),但是由于用戶一直不輸入數(shù)據(jù)(可能是因為用戶走開了),導(dǎo)致這個工作線程一直被阻塞。這樣的工作線程名存實亡,它實際上不執(zhí)行任何任務(wù)了。假如線程池中所有的工作線程都處于這樣的阻塞狀態(tài),那么線程池就無法處理新加入的任務(wù)了
- 任務(wù)過載
當(dāng)工作隊列中有大量排隊等候執(zhí)行的任務(wù),這些任務(wù)本身可能會消耗太多的系統(tǒng)資源而引起系統(tǒng)資源缺乏
綜上所述,線程池可能會帶來種種風(fēng)險,為了盡可能避免它們,使用線程池時需要遵循以下原則:
如果任務(wù) A 在執(zhí)行過程中需要同步等待任務(wù) B 的執(zhí)行結(jié)果,那么任務(wù) A 不適合加入線程池的工作隊列中。如集把像任務(wù) A 一樣的需要等待其他任務(wù)執(zhí)行結(jié)果的任務(wù)加入工作隊列中,就可能會導(dǎo)致線程池的死鎖
如果執(zhí)行某個任務(wù)時可能會阻塞,并且是長時間的阻塞,則應(yīng)該設(shè)定超時時間避免工作線程永久地阻塞下去而導(dǎo)致線程泄漏
了解任務(wù)的特點,分析任務(wù)是執(zhí)行經(jīng)常會阻塞的 IO 操作,還是執(zhí)行一直不會阻塞的運算操作。前者時斷時續(xù)地占用 CPU,而后者對 CPU 具有更高的利用率。根據(jù)任務(wù)的特點,對任務(wù)進行分類,然后把不同類型的任務(wù)分別加入不同線程池的工作隊列中,這樣可以根據(jù)任務(wù)的特點分別調(diào)整每個線程池
調(diào)整線程池的大小,線程池的最佳大小主要取決于系統(tǒng)的可用 CPU 的數(shù)目以及工作隊列中任務(wù)的特點。假如在一個具有 N 個 CPU 的系統(tǒng)上只有一個工作隊列并且其中全部是運算性質(zhì)的任務(wù),那么當(dāng)線程池具有 N 或 N+1 個工作線程時,一般會獲得最大的 CPU 利用率
如果工作隊列中包含會執(zhí)行 IO 操作并經(jīng)常阻塞的任務(wù),則要讓線程池的大小超過可用 CPU 的數(shù)目,因為并不是所有工作線程都一直在工作。選擇一個典型的任務(wù),然后估計在執(zhí)行這個任務(wù)的過程中,等待時間(WT)與實際占用 CPU 進行運算的時間(ST)之間的比:WT/ST。對于一個具有 N 個 CPU 的系統(tǒng),需要設(shè)置大約 N(1+WT/ST) 個線程來保證 CPU 得到充分利用
避免任務(wù)過載,服務(wù)器應(yīng)根據(jù)系統(tǒng)的承受能力,限制客戶的并發(fā)連接的數(shù)目。當(dāng)客戶的并發(fā)連接的數(shù)目超過了限制值,服務(wù)器可以拒絕連接請求,并給予客戶友好提示
到此這篇關(guān)于Java創(chuàng)建多線程服務(wù)器流程的文章就介紹到這了,更多相關(guān)Java多線程服務(wù)器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java.text.DecimalFormat類十進制格式化
這篇文章主要為大家詳細介紹了java.text.DecimalFormat類十進制格式化的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03
基于java查找并打印輸出字符串中字符出現(xiàn)次數(shù)
這篇文章主要介紹了基于java查找并打印輸出字符串中字符出現(xiàn)次數(shù),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11
spring boot請求異常處理并返回對應(yīng)的html頁面
這篇文章主要介紹了spring boot處理請求異常并返回對應(yīng)的html頁面,包括404異常處理和500異常處理,需要的朋友可以參考下2017-07-07
關(guān)于springboot集成swagger及knife4j的增強問題
這篇文章主要介紹了springboot集成swagger以及knife4j的增強,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03

