java Socket編程實現(xiàn)I/O多路復(fù)用的示例
I/O多路復(fù)用
最基本的Socket模型:可以讓進程跨主機通信.
具體流程是這樣:服務(wù)端開辟一個Socket,為這個Socket綁定IP地址(找到機器的網(wǎng)卡)和端口號(找到進程),通過listen()函數(shù)進行監(jiān)聽。在客戶端通過connect()函數(shù)發(fā)起連接后,服務(wù)端的Socket會維護兩個隊列,一個是還沒有完全建立連接的隊列,叫TCP半連接隊列;另一個是已經(jīng)建立連接(完成了三次握手)的隊列,叫全連接隊列。服務(wù)端會通過accept()函數(shù)從全連接隊列拿出一個socket,后續(xù)傳輸都用這個Socket。
多進程模型
為了讓一個服務(wù)端服務(wù)多個客戶端,由此產(chǎn)生了多進程模型。
本質(zhì)就是讓一個父進程去處理和多個客戶端的連接,然后每連一個客戶端,都fork一個父進程的子進程,讓這個子進程用于和新增的客戶端進行數(shù)據(jù)的讀寫。
但這種方式弊端還是很大的,因為創(chuàng)建進程的開銷是很大的,需要為這個進程分配虛擬內(nèi)存,棧,全局變量等信息,在這個進程進行上下文切換的時候是很費勁的。
因此這個模型也無法支持太多的并發(fā)量。
多線程模型
既然進程這個單位是很重且開銷很大的,因此我們可以用線程去替代線程。
同一個進程的多個線程可以去共享進程中的部分資源,如文件描述符列表,全局?jǐn)?shù)據(jù),堆等,這些共享資源是無需切換的。需要切換的只有線程的私有數(shù)據(jù),如幫助線程上下文切換的寄存器。所以線程的創(chuàng)建的開銷是很小的。
多線程模型的過程是這樣的,父進程同樣負責(zé)跟多個客戶端創(chuàng)建連接,然后將已經(jīng)創(chuàng)建好連接的socket放入到隊列中。在這個父進程中的多個線程負責(zé)拿鎖去從這個隊列里面取出socket,然后往對應(yīng)的取出的socket中進行讀寫。
但就算是這樣,創(chuàng)建線程的開銷即使不大,也沒辦法支撐太多的并發(fā),只是比多進程模型更優(yōu)秀。
既然為每個請求分配一個線程或者進程的方式開銷都有些大,那么有可能只用一個進程取維護多個Socket呢?
下面我們就要說說I/O復(fù)用技術(shù)了,其實I/O多路復(fù)用技術(shù)優(yōu)點類似于CPU時間片的利用。雖然一個進程同一時間內(nèi)只能處理一個socket,但是如果說這個socket的處理時間只有1ms,那么放到1s內(nèi)來看,它其實也是有100的并發(fā)量的。
select/poll/epoll是三個多路復(fù)用的接口。
select/poll/epoll
select實現(xiàn)多路復(fù)用的方式是,把已經(jīng)連接的socket的文件描述符放入內(nèi)核中,內(nèi)核負責(zé)遍歷這些socket,當(dāng)檢查有讀或者寫事件的時候,就把這個socket標(biāo)記為可讀或者可寫,接著把這些文件描述符集合拷貝回用戶態(tài)。用戶態(tài)再遍歷取出可讀或者可寫的socket并對其進行處理。
文件描述符集合就是每一項都指向一個打開的文件。
poll與select的區(qū)別就是poll是用鏈表存儲。
epoll的內(nèi)核對socket的存儲結(jié)構(gòu)和select/poll就不同了,它們是線性的存儲結(jié)構(gòu),每次執(zhí)行這兩個方法的時候都是把整個socket傳給內(nèi)核。而epoll是用紅黑樹存儲,當(dāng)有一個新的需要檢測的socket來臨時,只需要傳一個即可,大幅減少了內(nèi)核和用戶空間的拷貝過程。
此外,epoll的內(nèi)核還維護了一個鏈表來記錄就緒事件,當(dāng)檢測的socket有就緒事件發(fā)生時,就會通過回調(diào)函數(shù)把這個事件加入這個就緒事件鏈表當(dāng)中,當(dāng)用戶態(tài)需要這個就緒鏈表的時候,只會返回有事件發(fā)生的文件描述符列表,拷貝回用戶態(tài),無需像select/poll一樣,無論這個socket是否有事件發(fā)生,都全部拷貝回去。
Socket編程實現(xiàn)I/O多路復(fù)用模型
Server端:主線程負責(zé)處理連接事件,讀線程交給異步支線線程處理,達到I/O多路復(fù)用的效果,同時這也是Netty框架的部分底層思想實現(xiàn)噢~
public class SimpleServer { public static void main(String[] args) throws IOException { ? System.out.println("這里是服務(wù)端"); ? //創(chuàng)建服務(wù)端Channel ServerSocketChannel serverChannel = ServerSocketChannel.open(); ? //設(shè)置非阻塞 serverChannel.configureBlocking(false); ? //創(chuàng)建Selector Selector selector = Selector.open(); ? //0代表不對任何事件感興趣 SelectionKey selectionKey = serverChannel.register(selector, 0, serverChannel); ? //對連接接收事件感興趣 selectionKey.interestOps(SelectionKey.OP_ACCEPT); ? //綁定端口 serverChannel.bind(new InetSocketAddress(8080)); ? NioEventLoop executor = new NioEventLoop(); //主線程負責(zé)連接 while (true){ //當(dāng)沒有事件到來的時候,這里是阻塞的,有事件的時候會自動運行 selector.select(); //如果有事件到來,這里可以得到注冊到該selector上的所有的key,每一個key上都有一個channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); //得到集合的迭代器 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()){ //得到每一個key SelectionKey key = keyIterator.next(); //首先要從集合中把key刪除,否則會一直報告該key keyIterator.remove(); //接下來就要處理事件,判斷selector輪詢到的是什么事件,并根據(jù)事件作出回應(yīng) //如果是連接事件 if(key.isAcceptable()){ //之前把服務(wù)端channel注冊到selector上時候,把serverChannel放進來了 ServerSocketChannel channel = (ServerSocketChannel) key.channel(); //接收客戶端channel SocketChannel clientChannel = channel.accept(); clientChannel.configureBlocking(false); SelectionKey clientSocketKey = clientChannel.register(selector, 0, clientChannel); //將客戶端channel設(shè)置為可讀事件 clientSocketKey.interestOps(SelectionKey.OP_READ); System.out.println("客戶端連接成功"+System.currentTimeMillis()); //worker線程開始從客戶端讀數(shù)據(jù),把客戶端的channel交給worker executor.register(clientChannel,executor); //用channel寫回一條信息 clientChannel.write(ByteBuffer.wrap("服務(wù)端寫回客戶端成功".getBytes())); System.out.println("向客戶端發(fā)送數(shù)據(jù)成功"+System.currentTimeMillis()); } } } ? } }
public class NioEventLoop extends SingleThreadEventLoop{ ? protected Selector selector ; ? public NioEventLoop() throws IOException { this.selector = Selector.open(); } ? public Selector getSelector(){ return this.selector; } ? //循環(huán)阻塞,如果有事件發(fā)生,或者隊列有東西,就放行 private void select() throws IOException { while (true){ //阻塞等待事件,如果3s都沒有事件過來,可能是沒有初始化。 int select = selector.select(3000); if(select != 0 || hasTasks()){ break; } } } ? private void runAllTasks() { ? for (;;){ Runnable task = tasksQueue.poll(); if(task == null){ break; } System.out.println("開始處理注冊事件"); task.run(); } } ? private void processSelectedKeys() throws IOException { System.out.println("開始處理I/O事件"); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); //由于連接事件被處理完了,只剩下只讀事件了 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int len = 0; len = socketChannel.read(buffer); if (len == -1) { socketChannel.close(); break; } else { buffer.flip(); System.out.println(Charset.defaultCharset().decode(buffer).toString() + System.currentTimeMillis()); } } } } ? @Override public void run(){ while (true) { try { //等待任務(wù) select(); //I/O任務(wù)來了先處理I/O任務(wù) processSelectedKeys(); } catch (IOException e) { throw new RuntimeException(e); }finally { //最后處理隊列任務(wù)。第一遍走過來的時候會先處理隊列任務(wù) runAllTasks(); } } } }
public abstract class SingleThreadEventExecutor implements Executor { ? ? ?private volatile boolean isSingle = false; ? ? ?private RejectedExecutionHandler rejectedExecutionHandler; ? ? ?protected Queue<Runnable> tasksQueue; ? ? ?protected Thread thread; ? ? ?//初始化的時候,1.構(gòu)建線程池的屬性,如隊列,拒絕策略。2.構(gòu)建當(dāng)前線程的Selector ? ?public SingleThreadEventExecutor() { ? ? ? ?this.tasksQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE); ? ? ? ?this.rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ? } ? ? ?protected final void reject(Runnable task) { // ? ? ? rejectedExecutionHandler.rejectedExecution(task, this); ? } ? ? ?protected boolean inEventLoop(Thread thread){ ? ? ? ?return this.thread == thread; ? } ? ? ?protected boolean hasTasks(){ ? ? ? ?return this.tasksQueue.isEmpty(); ? } ? ? ? ?@Override ? ?public void execute(Runnable task) { ? ? ? ? ?//此時仍然是主線程。先把任務(wù)放入隊列,后續(xù)處理 ? ? ? ?addTask(task); ? ? ? ?System.out.println("任務(wù)添加完成"); ? ? ? ?startThread(task); ? } ? ? ?protected final void addTask(Runnable task){ ? ? ? ?this.tasksQueue.add(task); ? } ? ? ?private void startThread(Runnable task) { ? ? ? ?if (isSingle) { ? ? ? ? ? ?return; ? ? ? } ? ? ? ?isSingle = true; ? ? ? ?System.out.println("新線程任務(wù)成功創(chuàng)建"); ? ? ? ?//這是個異步線程,單線程執(zhí)行器的核心 ? ? ? ?new Thread(()->{ ? ? ? ? ? ?thread = Thread.currentThread(); ? ? ? ? ? ?SingleThreadEventExecutor.this.run(); ? ? ? ? ? ?System.out.println("新線程任務(wù)跑完了"); ? ? ? }).start(); ? } ? ? ? ?protected abstract void run(); ? ? }
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor{ ? private void register0(SocketChannel socketChannel,NioEventLoop nioEventLoop){ try { socketChannel.configureBlocking(false); socketChannel.register(nioEventLoop.getSelector(), SelectionKey.OP_READ); } catch (IOException e) { throw new RuntimeException(e); } } ? //注冊socketChannel到當(dāng)前的selector上 public void register(SocketChannel socketChannel,NioEventLoop nioEventLoop){ //先判斷當(dāng)前線程是否是執(zhí)行器線程,如果不是,說明是主線程,那么就還沒被構(gòu)造。 if(inEventLoop(Thread.currentThread())){ register0(socketChannel,nioEventLoop); }else { //否則是第一次注冊,就先構(gòu)造線程即可。 execute(()->{ register0(socketChannel,nioEventLoop); System.out.println("executor執(zhí)行完成"); }); } } }
到此這篇關(guān)于java Socket編程實現(xiàn)I/O多路復(fù)用的示例的文章就介紹到這了,更多相關(guān)jav I/O多路復(fù)用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea +junit單元測試獲取不到bean注入的解決方式
這篇文章主要介紹了idea +junit單元測試獲取不到bean注入的解決方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08Springcloud hystrix服務(wù)熔斷和dashboard如何實現(xiàn)
這篇文章主要介紹了Springcloud hystrix服務(wù)熔斷和dashboard如何實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-12-12springboot openfeign從JSON文件讀取數(shù)據(jù)問題
今天主要說一下在openfeign里讀取JSON文件的問題,我們將測試所需要的數(shù)據(jù)存儲到文件里,在修改時關(guān)注點比較單純2018-06-06Java使用黑盒方式模擬實現(xiàn)內(nèi)網(wǎng)穿透
這篇文章主要介紹了Java使用黑盒方式模擬實現(xiàn)內(nèi)網(wǎng)穿透,內(nèi)網(wǎng)穿透,也即 NAT 穿透,進行 NAT 穿透是為了使具有某一個特定源 IP 地址和源端口號的數(shù)據(jù)包不被 NAT 設(shè)備屏蔽而正確路由到內(nèi)網(wǎng)主機,需要的朋友可以參考下2023-05-05