Java實現(xiàn)非阻塞式服務器的示例代碼
1.創(chuàng)建阻塞的服務器
當 ServerSocketChannel 與 SockelChannel 采用默認的阻塞模式時,為了同時處理多個客戶的連接,必須使用多線程
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService; //線程池 private static final int POOL_MULTIPLE = 4; //線程池中工作線程的數(shù)目 public EchoServer() throws IOException { //創(chuàng)建一個線程池 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE); //創(chuàng)建一個ServerSocketChannel對象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時,可以順利綁定相同的端口 serverSocketChannel.socket().setReuseAddress(true); //把服務器進程與一個本地端口綁定 serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服務器啟動"); } public void service() { while (true) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); //處理客戶連接 executorService.execute(new Handler(socketChannel)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws IOException { new EchoServer().service(); } //處理客戶連按 class Handler implements Runnable { private SocketChannel socketChannel; public Handler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public void run() { handle(socketChannel); } public void handle(SocketChannel socketChannel) { try { //獲得與socketChannel關聯(lián)的Socket對象 Socket socket = socketChannel.socket(); System.out.println("接收到客戶連接,來自:" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while ((msg = br.readLine()) != null) { System.out.println(msg); pw.println(echo(msg)); if (msg.equals("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { if(socketChannel != null) { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket) throws IOException { InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return "echo:" + msg; } }
2.創(chuàng)建非阻塞的服務器
在非阻塞模式下,EchoServer 只需要啟動一個主線程,就能同時處理三件事:
- 接收客戶的連接
- 接收客戶發(fā)送的數(shù)據(jù)
- 向客戶發(fā)回響應數(shù)據(jù)
EchoServer 委托 Selector 來負責監(jiān)控接收連接就緒事件、讀就緒事件和寫就緒事件如果有特定事件發(fā)生,就處理該事件
// 創(chuàng)建一個Selector對象 selector = Selector.open(); //創(chuàng)建一個ServerSocketChannel對象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時 //可以順利綁定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服務器進程與一個本地端口綁定 serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer 類的 service() 方法負責處理本節(jié)開頭所說的三件事,體現(xiàn)其主要流程的代碼如下:
public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1層while循環(huán) while(selector.select() > 0) { //獲得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2層while循環(huán) while (it.hasNext()) { SelectionKey key = null; //處理SelectionKey try { //取出一個SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey從Selector 的selected-key 集合中刪除 it.remove(); 1f (key.isAcceptable()) { 處理接收連接就緒事件; } if (key.isReadable()) { 處理讀就緒水件; } if (key.isWritable()) { 處理寫就緒事件; } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使這個SelectionKey失效 key.cancel(); //關閉與這個SelectionKey關聯(lián)的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } }
- 首先由
ServerSocketChannel
向Selector
注冊接收連接就緒事件,如果Selector
監(jiān)控到該事件發(fā)生,就會把相應的SelectionKey
對象加入selected-keys
集合 - 第一層 while 循環(huán),不斷詢問
Selector
已經(jīng)發(fā)生的事件,select()
方法返回當前相關事件已經(jīng)發(fā)生的SelectionKey
的個數(shù),如果當前沒有任何事件發(fā)生,該方法會阻塞下去,直到至少有一個事件發(fā)生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相關事件已經(jīng)發(fā)生的SelectionKey
對象 - 第二層 while 循環(huán),從
selected-keys
集合中依次取出每個SelectionKey
對象并從集合中刪除,,然后調(diào)用isAcceptable()
、isReadable()
和isWritable()
方法判斷到底是哪種事件發(fā)生了,從而做出相應的處理
2.1處理接收連接就緒事件
if (key.isAcceptable()) { //獲得與SelectionKey關聯(lián)的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲得與客戶連接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel設置為非阻塞模式 socketChannel.configureBlocking(false); //創(chuàng)建一個用于存放用戶發(fā)送來的數(shù)據(jù)的級沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注冊讀就緒事件和寫就緒事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); }
2.2處理讀就緒事件
public void receive(SelectionKey key) throws IOException { //獲得與SelectionKey關聯(lián)的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯(lián)的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //創(chuàng)建一個ByteBuffer用于存放讀到的數(shù)據(jù) ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的極限設為容量 buffer.limit(buffer.capacity()); //把readBuff中的內(nèi)容拷貝到buffer buffer.put(readBuff); }
2.3處理寫就緒事件
public void send(SelectionKey key) throws IOException { //獲得與SelectionKey關聯(lián)的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯(lián)的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串 String data = decode(buffer); //如果還沒有讀到一行數(shù)據(jù)就返回 if(data.indexOf("\r\n") == -1) return; //截取一行數(shù)據(jù) String outputData = data.substring(0, data.indexOf("\n") + 1); //把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //輸出outputBuffer的所有字節(jié) while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置設為temp的極限 buffer.position(temp.limit()): //刪除buffer已經(jīng)處理的數(shù)據(jù) buffer.compact(); //如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關閉SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } }
完整代碼如下:
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { // 創(chuàng)建一個Selector對象 selector = Selector.open(); //創(chuàng)建一個ServerSocketChannel對象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時 //可以順利綁定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服務器進程與一個本地端口綁定 serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1層while循環(huán) while(selector.select() > 0) { //獲得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2層while循環(huán) while (it.hasNext()) { SelectionKey key = null; //處理SelectionKey try { //取出一個SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey從Selector 的selected-key 集合中刪除 it.remove(); 1f (key.isAcceptable()) { //獲得與SelectionKey關聯(lián)的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲得與客戶連接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel設置為非阻塞模式 socketChannel.configureBlocking(false); //創(chuàng)建一個用于存放用戶發(fā)送來的數(shù)據(jù)的級沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注冊讀就緒事件和寫就緒事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使這個SelectionKey失效 key.cancel(); //關閉與這個SelectionKey關聯(lián)的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { //獲得與SelectionKey關聯(lián)的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯(lián)的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //創(chuàng)建一個ByteBuffer用于存放讀到的數(shù)據(jù) ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的極限設為容量 buffer.limit(buffer.capacity()); //把readBuff中的內(nèi)容拷貝到buffer buffer.put(readBuff); } public void send(SelectionKey key) throws IOException { //獲得與SelectionKey關聯(lián)的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關聯(lián)的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串 String data = decode(buffer); //如果還沒有讀到一行數(shù)據(jù)就返回 if(data.indexOf("\r\n") == -1) return; //截取一行數(shù)據(jù) String outputData = data.substring(0, data.indexOf("\n") + 1); //把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //輸出outputBuffer的所有字節(jié) while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置設為temp的極限 buffer.position(temp.limit()): //刪除buffer已經(jīng)處理的數(shù)據(jù) buffer.compact(); //如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關閉SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } } //解碼 public String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toStrinq(); } //編碼 public ByteBuffer encode(String str) { return charset.encode(str); } public static void main(String args[])throws Exception { EchoServer server = new EchoServer(); server.service(); } }
3.阻塞模式與非阻塞模式混合使用
使用非阻塞模式時,ServerSocketChannel 以及 SocketChannel 都被設置為非阻塞模式,這使得接收連接、接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作都采用非阻塞模式,EchoServer 采用一個線程同時完成這些操作
假如有許多客戶請求連接,可以把接收客戶連接的操作單獨由一個線程完成,把接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作由另一個線程完成,這可以提高服務器的并發(fā)性能
負責接收客戶連接的線程按照阻塞模式工作,如果收到客戶連接,就向 Selector 注冊讀就緒和寫就緒事件,否則進入阻塞狀態(tài),直到接收到了客戶的連接。負責接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的線程按照非阻塞模式工作,只有在讀就緒或?qū)懢途w事件發(fā)生時,才執(zhí)行相應的接收數(shù)據(jù)和發(fā)送數(shù)據(jù)操作
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector = null; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void accept() { while(true) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized(gate) { selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } } catch(IOException e) { e.printStackTrace(); } } } private Object gate=new Object(); public void service() throws IOException { while(true) { synchronized(gate){} int n = selector.select(); if(n == 0) continue; Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { key.cancel(); key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { ... } public void send(SelectionKey key) throws IOException { ... } public String decode(ByteBuffer buffer) { ... } public ByteBuffer encode(String str) { ... } public static void main(String args[])throws Exception { final EchoServer server = new EchoServer(); Thread accept = new Thread() { public void run() { server.accept(); } }; accept.start(); server.service(); } }
注意一點:主線程的 selector select() 方法和 Accept 線程的 register(...) 方法都會造成阻塞,因為他們都會操作 Selector 對象的共享資源 all-keys 集合,這有可能會導致死鎖
導致死鎖的具體情形是:Selector 中尚沒有任何注冊的事件,即 all-keys 集合為空,主線程執(zhí)行 selector.select() 方法時將進入阻塞狀態(tài),只有當 Accept 線程向 Selector 注冊了事件,并且該事件發(fā)生后,主線程才會從 selector.select() 方法返回。然而,由于主線程正在 selector.select() 方法中阻塞,這使得 Acccept 線程也在 register() 方法中阻塞。Accept 線程無法向 Selector 注冊事件,而主線程沒有任何事件可以監(jiān)控,所以這兩個線程將永遠阻塞下去
為了避免對共享資源的競爭,同步機制使得一個線程執(zhí)行 register() 時,不允許另一個線程同時執(zhí)行 select() 方法,反之亦然
到此這篇關于Java實現(xiàn)非阻塞式服務器的示例代碼的文章就介紹到這了,更多相關Java服務器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java之next()、nextLine()區(qū)別及問題解決
這篇文章主要介紹了Java之next()、nextLine()區(qū)別及問題解決,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08Java?18?新特性之Web服務器?jwebserver功能
JEP?408:?Simple?Web?Server,是這次Java?18推出的一個比較獨立的全新功能點。我們可以通過命令行工具來啟動一個提供靜態(tài)資源訪問的迷你Web服務器,本文通過一個構建HTML頁面的例子,來嘗試一下jwebserver的功能2022-04-04Mybatis-Plus中使用@DS注解動態(tài)選擇數(shù)據(jù)源的源碼解讀
這篇文章主要介紹了Mybatis-Plus中使用@DS注解動態(tài)選擇數(shù)據(jù)源的源碼解讀,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07