Java實(shí)現(xiàn)非阻塞式服務(wù)器的示例代碼
1.創(chuàng)建阻塞的服務(wù)器
當(dāng) ServerSocketChannel 與 SockelChannel 采用默認(rèn)的阻塞模式時(shí),為了同時(shí)處理多個(gè)客戶的連接,必須使用多線程
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService; //線程池 private static final int POOL_MULTIPLE = 4; //線程池中工作線程的數(shù)目 public EchoServer() throws IOException { //創(chuàng)建一個(gè)線程池 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE); //創(chuàng)建一個(gè)ServerSocketChannel對(duì)象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí),可以順利綁定相同的端口 serverSocketChannel.socket().setReuseAddress(true); //把服務(wù)器進(jìn)程與一個(gè)本地端口綁定 serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服務(wù)器啟動(dòng)"); } public void service() { while (true) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); //處理客戶連接 executorService.execute(new Handler(socketChannel)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws IOException { new EchoServer().service(); } //處理客戶連按 class Handler implements Runnable { private SocketChannel socketChannel; public Handler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public void run() { handle(socketChannel); } public void handle(SocketChannel socketChannel) { try { //獲得與socketChannel關(guān)聯(lián)的Socket對(duì)象 Socket socket = socketChannel.socket(); System.out.println("接收到客戶連接,來自:" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while ((msg = br.readLine()) != null) { System.out.println(msg); pw.println(echo(msg)); if (msg.equals("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { if(socketChannel != null) { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket) throws IOException { InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return "echo:" + msg; } }
2.創(chuàng)建非阻塞的服務(wù)器
在非阻塞模式下,EchoServer 只需要啟動(dòng)一個(gè)主線程,就能同時(shí)處理三件事:
- 接收客戶的連接
- 接收客戶發(fā)送的數(shù)據(jù)
- 向客戶發(fā)回響應(yīng)數(shù)據(jù)
EchoServer 委托 Selector 來負(fù)責(zé)監(jiān)控接收連接就緒事件、讀就緒事件和寫就緒事件如果有特定事件發(fā)生,就處理該事件
// 創(chuàng)建一個(gè)Selector對(duì)象 selector = Selector.open(); //創(chuàng)建一個(gè)ServerSocketChannel對(duì)象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí) //可以順利綁定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服務(wù)器進(jìn)程與一個(gè)本地端口綁定 serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer 類的 service() 方法負(fù)責(zé)處理本節(jié)開頭所說的三件事,體現(xiàn)其主要流程的代碼如下:
public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1層while循環(huán) while(selector.select() > 0) { //獲得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2層while循環(huán) while (it.hasNext()) { SelectionKey key = null; //處理SelectionKey try { //取出一個(gè)SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey從Selector 的selected-key 集合中刪除 it.remove(); 1f (key.isAcceptable()) { 處理接收連接就緒事件; } if (key.isReadable()) { 處理讀就緒水件; } if (key.isWritable()) { 處理寫就緒事件; } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使這個(gè)SelectionKey失效 key.cancel(); //關(guān)閉與這個(gè)SelectionKey關(guān)聯(lián)的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } }
- 首先由
ServerSocketChannel
向Selector
注冊(cè)接收連接就緒事件,如果Selector
監(jiān)控到該事件發(fā)生,就會(huì)把相應(yīng)的SelectionKey
對(duì)象加入selected-keys
集合 - 第一層 while 循環(huán),不斷詢問
Selector
已經(jīng)發(fā)生的事件,select()
方法返回當(dāng)前相關(guān)事件已經(jīng)發(fā)生的SelectionKey
的個(gè)數(shù),如果當(dāng)前沒有任何事件發(fā)生,該方法會(huì)阻塞下去,直到至少有一個(gè)事件發(fā)生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相關(guān)事件已經(jīng)發(fā)生的SelectionKey
對(duì)象 - 第二層 while 循環(huán),從
selected-keys
集合中依次取出每個(gè)SelectionKey
對(duì)象并從集合中刪除,,然后調(diào)用isAcceptable()
、isReadable()
和isWritable()
方法判斷到底是哪種事件發(fā)生了,從而做出相應(yīng)的處理
2.1處理接收連接就緒事件
if (key.isAcceptable()) { //獲得與SelectionKey關(guān)聯(lián)的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲得與客戶連接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel設(shè)置為非阻塞模式 socketChannel.configureBlocking(false); //創(chuàng)建一個(gè)用于存放用戶發(fā)送來的數(shù)據(jù)的級(jí)沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注冊(cè)讀就緒事件和寫就緒事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); }
2.2處理讀就緒事件
public void receive(SelectionKey key) throws IOException { //獲得與SelectionKey關(guān)聯(lián)的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關(guān)聯(lián)的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //創(chuàng)建一個(gè)ByteBuffer用于存放讀到的數(shù)據(jù) ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的極限設(shè)為容量 buffer.limit(buffer.capacity()); //把readBuff中的內(nèi)容拷貝到buffer buffer.put(readBuff); }
2.3處理寫就緒事件
public void send(SelectionKey key) throws IOException { //獲得與SelectionKey關(guān)聯(lián)的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關(guān)聯(lián)的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串 String data = decode(buffer); //如果還沒有讀到一行數(shù)據(jù)就返回 if(data.indexOf("\r\n") == -1) return; //截取一行數(shù)據(jù) String outputData = data.substring(0, data.indexOf("\n") + 1); //把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //輸出outputBuffer的所有字節(jié) while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置設(shè)為temp的極限 buffer.position(temp.limit()): //刪除buffer已經(jīng)處理的數(shù)據(jù) buffer.compact(); //如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關(guān)閉SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } }
完整代碼如下:
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { // 創(chuàng)建一個(gè)Selector對(duì)象 selector = Selector.open(); //創(chuàng)建一個(gè)ServerSocketChannel對(duì)象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí) //可以順利綁定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服務(wù)器進(jìn)程與一個(gè)本地端口綁定 serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1層while循環(huán) while(selector.select() > 0) { //獲得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2層while循環(huán) while (it.hasNext()) { SelectionKey key = null; //處理SelectionKey try { //取出一個(gè)SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey從Selector 的selected-key 集合中刪除 it.remove(); 1f (key.isAcceptable()) { //獲得與SelectionKey關(guān)聯(lián)的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲得與客戶連接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel設(shè)置為非阻塞模式 socketChannel.configureBlocking(false); //創(chuàng)建一個(gè)用于存放用戶發(fā)送來的數(shù)據(jù)的級(jí)沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注冊(cè)讀就緒事件和寫就緒事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使這個(gè)SelectionKey失效 key.cancel(); //關(guān)閉與這個(gè)SelectionKey關(guān)聯(lián)的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { //獲得與SelectionKey關(guān)聯(lián)的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關(guān)聯(lián)的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //創(chuàng)建一個(gè)ByteBuffer用于存放讀到的數(shù)據(jù) ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的極限設(shè)為容量 buffer.limit(buffer.capacity()); //把readBuff中的內(nèi)容拷貝到buffer buffer.put(readBuff); } public void send(SelectionKey key) throws IOException { //獲得與SelectionKey關(guān)聯(lián)的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //獲得與SelectionKey關(guān)聯(lián)的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串 String data = decode(buffer); //如果還沒有讀到一行數(shù)據(jù)就返回 if(data.indexOf("\r\n") == -1) return; //截取一行數(shù)據(jù) String outputData = data.substring(0, data.indexOf("\n") + 1); //把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //輸出outputBuffer的所有字節(jié) while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置設(shè)為temp的極限 buffer.position(temp.limit()): //刪除buffer已經(jīng)處理的數(shù)據(jù) buffer.compact(); //如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關(guān)閉SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } } //解碼 public String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toStrinq(); } //編碼 public ByteBuffer encode(String str) { return charset.encode(str); } public static void main(String args[])throws Exception { EchoServer server = new EchoServer(); server.service(); } }
3.阻塞模式與非阻塞模式混合使用
使用非阻塞模式時(shí),ServerSocketChannel 以及 SocketChannel 都被設(shè)置為非阻塞模式,這使得接收連接、接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作都采用非阻塞模式,EchoServer 采用一個(gè)線程同時(shí)完成這些操作
假如有許多客戶請(qǐng)求連接,可以把接收客戶連接的操作單獨(dú)由一個(gè)線程完成,把接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作由另一個(gè)線程完成,這可以提高服務(wù)器的并發(fā)性能
負(fù)責(zé)接收客戶連接的線程按照阻塞模式工作,如果收到客戶連接,就向 Selector 注冊(cè)讀就緒和寫就緒事件,否則進(jìn)入阻塞狀態(tài),直到接收到了客戶的連接。負(fù)責(zé)接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的線程按照非阻塞模式工作,只有在讀就緒或?qū)懢途w事件發(fā)生時(shí),才執(zhí)行相應(yīng)的接收數(shù)據(jù)和發(fā)送數(shù)據(jù)操作
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector = null; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void accept() { while(true) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized(gate) { selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } } catch(IOException e) { e.printStackTrace(); } } } private Object gate=new Object(); public void service() throws IOException { while(true) { synchronized(gate){} int n = selector.select(); if(n == 0) continue; Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { key.cancel(); key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { ... } public void send(SelectionKey key) throws IOException { ... } public String decode(ByteBuffer buffer) { ... } public ByteBuffer encode(String str) { ... } public static void main(String args[])throws Exception { final EchoServer server = new EchoServer(); Thread accept = new Thread() { public void run() { server.accept(); } }; accept.start(); server.service(); } }
注意一點(diǎn):主線程的 selector select() 方法和 Accept 線程的 register(...) 方法都會(huì)造成阻塞,因?yàn)樗麄兌紩?huì)操作 Selector 對(duì)象的共享資源 all-keys 集合,這有可能會(huì)導(dǎo)致死鎖
導(dǎo)致死鎖的具體情形是:Selector 中尚沒有任何注冊(cè)的事件,即 all-keys 集合為空,主線程執(zhí)行 selector.select() 方法時(shí)將進(jìn)入阻塞狀態(tài),只有當(dāng) Accept 線程向 Selector 注冊(cè)了事件,并且該事件發(fā)生后,主線程才會(huì)從 selector.select() 方法返回。然而,由于主線程正在 selector.select() 方法中阻塞,這使得 Acccept 線程也在 register() 方法中阻塞。Accept 線程無法向 Selector 注冊(cè)事件,而主線程沒有任何事件可以監(jiān)控,所以這兩個(gè)線程將永遠(yuǎn)阻塞下去
為了避免對(duì)共享資源的競(jìng)爭(zhēng),同步機(jī)制使得一個(gè)線程執(zhí)行 register() 時(shí),不允許另一個(gè)線程同時(shí)執(zhí)行 select() 方法,反之亦然
到此這篇關(guān)于Java實(shí)現(xiàn)非阻塞式服務(wù)器的示例代碼的文章就介紹到這了,更多相關(guān)Java服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java模擬實(shí)現(xiàn)QQ三方登錄(單點(diǎn)登錄2.0)
這篇文章主要為大家詳細(xì)介紹了Java模擬實(shí)現(xiàn)QQ三方登錄,單點(diǎn)登錄2.0,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-06-06Java之next()、nextLine()區(qū)別及問題解決
這篇文章主要介紹了Java之next()、nextLine()區(qū)別及問題解決,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08Java?18?新特性之Web服務(wù)器?jwebserver功能
JEP?408:?Simple?Web?Server,是這次Java?18推出的一個(gè)比較獨(dú)立的全新功能點(diǎn)。我們可以通過命令行工具來啟動(dòng)一個(gè)提供靜態(tài)資源訪問的迷你Web服務(wù)器,本文通過一個(gè)構(gòu)建HTML頁(yè)面的例子,來嘗試一下jwebserver的功能2022-04-04IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法
這篇文章主要介紹了IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03Mybatis-Plus中使用@DS注解動(dòng)態(tài)選擇數(shù)據(jù)源的源碼解讀
這篇文章主要介紹了Mybatis-Plus中使用@DS注解動(dòng)態(tài)選擇數(shù)據(jù)源的源碼解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07