java Socket編程實(shí)現(xiàn)I/O多路復(fù)用的示例
I/O多路復(fù)用
最基本的Socket模型:可以讓進(jìn)程跨主機(jī)通信.
具體流程是這樣:服務(wù)端開(kāi)辟一個(gè)Socket,為這個(gè)Socket綁定IP地址(找到機(jī)器的網(wǎng)卡)和端口號(hào)(找到進(jìn)程),通過(guò)listen()函數(shù)進(jìn)行監(jiān)聽(tīng)。在客戶端通過(guò)connect()函數(shù)發(fā)起連接后,服務(wù)端的Socket會(huì)維護(hù)兩個(gè)隊(duì)列,一個(gè)是還沒(méi)有完全建立連接的隊(duì)列,叫TCP半連接隊(duì)列;另一個(gè)是已經(jīng)建立連接(完成了三次握手)的隊(duì)列,叫全連接隊(duì)列。服務(wù)端會(huì)通過(guò)accept()函數(shù)從全連接隊(duì)列拿出一個(gè)socket,后續(xù)傳輸都用這個(gè)Socket。
多進(jìn)程模型
為了讓一個(gè)服務(wù)端服務(wù)多個(gè)客戶端,由此產(chǎn)生了多進(jìn)程模型。
本質(zhì)就是讓一個(gè)父進(jìn)程去處理和多個(gè)客戶端的連接,然后每連一個(gè)客戶端,都fork一個(gè)父進(jìn)程的子進(jìn)程,讓這個(gè)子進(jìn)程用于和新增的客戶端進(jìn)行數(shù)據(jù)的讀寫(xiě)。
但這種方式弊端還是很大的,因?yàn)閯?chuàng)建進(jìn)程的開(kāi)銷(xiāo)是很大的,需要為這個(gè)進(jìn)程分配虛擬內(nèi)存,棧,全局變量等信息,在這個(gè)進(jìn)程進(jìn)行上下文切換的時(shí)候是很費(fèi)勁的。
因此這個(gè)模型也無(wú)法支持太多的并發(fā)量。
多線程模型
既然進(jìn)程這個(gè)單位是很重且開(kāi)銷(xiāo)很大的,因此我們可以用線程去替代線程。
同一個(gè)進(jìn)程的多個(gè)線程可以去共享進(jìn)程中的部分資源,如文件描述符列表,全局?jǐn)?shù)據(jù),堆等,這些共享資源是無(wú)需切換的。需要切換的只有線程的私有數(shù)據(jù),如幫助線程上下文切換的寄存器。所以線程的創(chuàng)建的開(kāi)銷(xiāo)是很小的。
多線程模型的過(guò)程是這樣的,父進(jìn)程同樣負(fù)責(zé)跟多個(gè)客戶端創(chuàng)建連接,然后將已經(jīng)創(chuàng)建好連接的socket放入到隊(duì)列中。在這個(gè)父進(jìn)程中的多個(gè)線程負(fù)責(zé)拿鎖去從這個(gè)隊(duì)列里面取出socket,然后往對(duì)應(yīng)的取出的socket中進(jìn)行讀寫(xiě)。
但就算是這樣,創(chuàng)建線程的開(kāi)銷(xiāo)即使不大,也沒(méi)辦法支撐太多的并發(fā),只是比多進(jìn)程模型更優(yōu)秀。
既然為每個(gè)請(qǐng)求分配一個(gè)線程或者進(jìn)程的方式開(kāi)銷(xiāo)都有些大,那么有可能只用一個(gè)進(jìn)程取維護(hù)多個(gè)Socket呢?
下面我們就要說(shuō)說(shuō)I/O復(fù)用技術(shù)了,其實(shí)I/O多路復(fù)用技術(shù)優(yōu)點(diǎn)類(lèi)似于CPU時(shí)間片的利用。雖然一個(gè)進(jìn)程同一時(shí)間內(nèi)只能處理一個(gè)socket,但是如果說(shuō)這個(gè)socket的處理時(shí)間只有1ms,那么放到1s內(nèi)來(lái)看,它其實(shí)也是有100的并發(fā)量的。
select/poll/epoll是三個(gè)多路復(fù)用的接口。
select/poll/epoll
select實(shí)現(xiàn)多路復(fù)用的方式是,把已經(jīng)連接的socket的文件描述符放入內(nèi)核中,內(nèi)核負(fù)責(zé)遍歷這些socket,當(dāng)檢查有讀或者寫(xiě)事件的時(shí)候,就把這個(gè)socket標(biāo)記為可讀或者可寫(xiě),接著把這些文件描述符集合拷貝回用戶態(tài)。用戶態(tài)再遍歷取出可讀或者可寫(xiě)的socket并對(duì)其進(jìn)行處理。
文件描述符集合就是每一項(xiàng)都指向一個(gè)打開(kāi)的文件。
poll與select的區(qū)別就是poll是用鏈表存儲(chǔ)。
epoll的內(nèi)核對(duì)socket的存儲(chǔ)結(jié)構(gòu)和select/poll就不同了,它們是線性的存儲(chǔ)結(jié)構(gòu),每次執(zhí)行這兩個(gè)方法的時(shí)候都是把整個(gè)socket傳給內(nèi)核。而epoll是用紅黑樹(shù)存儲(chǔ),當(dāng)有一個(gè)新的需要檢測(cè)的socket來(lái)臨時(shí),只需要傳一個(gè)即可,大幅減少了內(nèi)核和用戶空間的拷貝過(guò)程。
此外,epoll的內(nèi)核還維護(hù)了一個(gè)鏈表來(lái)記錄就緒事件,當(dāng)檢測(cè)的socket有就緒事件發(fā)生時(shí),就會(huì)通過(guò)回調(diào)函數(shù)把這個(gè)事件加入這個(gè)就緒事件鏈表當(dāng)中,當(dāng)用戶態(tài)需要這個(gè)就緒鏈表的時(shí)候,只會(huì)返回有事件發(fā)生的文件描述符列表,拷貝回用戶態(tài),無(wú)需像select/poll一樣,無(wú)論這個(gè)socket是否有事件發(fā)生,都全部拷貝回去。
Socket編程實(shí)現(xiàn)I/O多路復(fù)用模型

Server端:主線程負(fù)責(zé)處理連接事件,讀線程交給異步支線線程處理,達(dá)到I/O多路復(fù)用的效果,同時(shí)這也是Netty框架的部分底層思想實(shí)現(xiàn)噢~
public class SimpleServer {
public static void main(String[] args) throws IOException {
?
System.out.println("這里是服務(wù)端");
?
//創(chuàng)建服務(wù)端Channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
?
//設(shè)置非阻塞
serverChannel.configureBlocking(false);
?
//創(chuàng)建Selector
Selector selector = Selector.open();
?
//0代表不對(duì)任何事件感興趣
SelectionKey selectionKey = serverChannel.register(selector, 0, serverChannel);
?
//對(duì)連接接收事件感興趣
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
?
//綁定端口
serverChannel.bind(new InetSocketAddress(8080));
?
NioEventLoop executor = new NioEventLoop();
//主線程負(fù)責(zé)連接
while (true){
//當(dāng)沒(méi)有事件到來(lái)的時(shí)候,這里是阻塞的,有事件的時(shí)候會(huì)自動(dòng)運(yùn)行
selector.select();
//如果有事件到來(lái),這里可以得到注冊(cè)到該selector上的所有的key,每一個(gè)key上都有一個(gè)channel
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//得到集合的迭代器
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
//得到每一個(gè)key
SelectionKey key = keyIterator.next();
//首先要從集合中把key刪除,否則會(huì)一直報(bào)告該key
keyIterator.remove();
//接下來(lái)就要處理事件,判斷selector輪詢到的是什么事件,并根據(jù)事件作出回應(yīng)
//如果是連接事件
if(key.isAcceptable()){
//之前把服務(wù)端channel注冊(cè)到selector上時(shí)候,把serverChannel放進(jìn)來(lái)了
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//接收客戶端channel
SocketChannel clientChannel = channel.accept();
clientChannel.configureBlocking(false);
SelectionKey clientSocketKey = clientChannel.register(selector, 0, clientChannel);
//將客戶端channel設(shè)置為可讀事件
clientSocketKey.interestOps(SelectionKey.OP_READ);
System.out.println("客戶端連接成功"+System.currentTimeMillis());
//worker線程開(kāi)始從客戶端讀數(shù)據(jù),把客戶端的channel交給worker
executor.register(clientChannel,executor);
//用channel寫(xiě)回一條信息
clientChannel.write(ByteBuffer.wrap("服務(wù)端寫(xiě)回客戶端成功".getBytes()));
System.out.println("向客戶端發(fā)送數(shù)據(jù)成功"+System.currentTimeMillis());
}
}
}
?
}
}public class NioEventLoop extends SingleThreadEventLoop{
?
protected Selector selector ;
?
public NioEventLoop() throws IOException {
this.selector = Selector.open();
}
?
public Selector getSelector(){
return this.selector;
}
?
//循環(huán)阻塞,如果有事件發(fā)生,或者隊(duì)列有東西,就放行
private void select() throws IOException {
while (true){
//阻塞等待事件,如果3s都沒(méi)有事件過(guò)來(lái),可能是沒(méi)有初始化。
int select = selector.select(3000);
if(select != 0 || hasTasks()){
break;
}
}
}
?
private void runAllTasks() {
?
for (;;){
Runnable task = tasksQueue.poll();
if(task == null){
break;
}
System.out.println("開(kāi)始處理注冊(cè)事件");
task.run();
}
}
?
private void processSelectedKeys() throws IOException {
System.out.println("開(kāi)始處理I/O事件");
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//由于連接事件被處理完了,只剩下只讀事件了
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
len = socketChannel.read(buffer);
if (len == -1) {
socketChannel.close();
break;
} else {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString() + System.currentTimeMillis());
}
}
}
}
?
@Override
public void run(){
while (true) {
try {
//等待任務(wù)
select();
//I/O任務(wù)來(lái)了先處理I/O任務(wù)
processSelectedKeys();
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
//最后處理隊(duì)列任務(wù)。第一遍走過(guò)來(lái)的時(shí)候會(huì)先處理隊(duì)列任務(wù)
runAllTasks();
}
}
}
}public abstract class SingleThreadEventExecutor implements Executor {
?
? ?private volatile boolean isSingle = false;
?
? ?private RejectedExecutionHandler rejectedExecutionHandler;
?
? ?protected Queue<Runnable> tasksQueue;
?
? ?protected Thread thread;
?
? ?//初始化的時(shí)候,1.構(gòu)建線程池的屬性,如隊(duì)列,拒絕策略。2.構(gòu)建當(dāng)前線程的Selector
? ?public SingleThreadEventExecutor() {
? ? ? ?this.tasksQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
? ? ? ?this.rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
? }
?
? ?protected final void reject(Runnable task) {
// ? ? ? rejectedExecutionHandler.rejectedExecution(task, this);
? }
?
? ?protected boolean inEventLoop(Thread thread){
? ? ? ?return this.thread == thread;
? }
?
? ?protected boolean hasTasks(){
? ? ? ?return this.tasksQueue.isEmpty();
? }
?
?
? ?@Override
? ?public void execute(Runnable task) {
?
? ? ? ?//此時(shí)仍然是主線程。先把任務(wù)放入隊(duì)列,后續(xù)處理
? ? ? ?addTask(task);
? ? ? ?System.out.println("任務(wù)添加完成");
? ? ? ?startThread(task);
? }
?
? ?protected final void addTask(Runnable task){
? ? ? ?this.tasksQueue.add(task);
? }
?
? ?private void startThread(Runnable task) {
? ? ? ?if (isSingle) {
? ? ? ? ? ?return;
? ? ? }
? ? ? ?isSingle = true;
? ? ? ?System.out.println("新線程任務(wù)成功創(chuàng)建");
? ? ? ?//這是個(gè)異步線程,單線程執(zhí)行器的核心
? ? ? ?new Thread(()->{
? ? ? ? ? ?thread = Thread.currentThread();
? ? ? ? ? ?SingleThreadEventExecutor.this.run();
? ? ? ? ? ?System.out.println("新線程任務(wù)跑完了");
? ? ? }).start();
? }
?
?
? ?protected abstract void run();
?
?
}public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor{
?
private void register0(SocketChannel socketChannel,NioEventLoop nioEventLoop){
try {
socketChannel.configureBlocking(false);
socketChannel.register(nioEventLoop.getSelector(), SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
?
//注冊(cè)socketChannel到當(dāng)前的selector上
public void register(SocketChannel socketChannel,NioEventLoop nioEventLoop){
//先判斷當(dāng)前線程是否是執(zhí)行器線程,如果不是,說(shuō)明是主線程,那么就還沒(méi)被構(gòu)造。
if(inEventLoop(Thread.currentThread())){
register0(socketChannel,nioEventLoop);
}else {
//否則是第一次注冊(cè),就先構(gòu)造線程即可。
execute(()->{
register0(socketChannel,nioEventLoop);
System.out.println("executor執(zhí)行完成");
});
}
}
}到此這篇關(guān)于java Socket編程實(shí)現(xiàn)I/O多路復(fù)用的示例的文章就介紹到這了,更多相關(guān)jav I/O多路復(fù)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法實(shí)例
這篇文章主要給大家介紹了關(guān)于Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
詳解Java如何優(yōu)雅的實(shí)現(xiàn)異常捕獲
在一個(gè)優(yōu)秀的項(xiàng)目中一定少不了對(duì)程序流程良好的異常捕獲與日志打印,所以本文主要為大家介紹了如何優(yōu)雅的實(shí)現(xiàn)異常捕獲與日志打印輸出,有需要的可以參考下2023-09-09
idea +junit單元測(cè)試獲取不到bean注入的解決方式
這篇文章主要介紹了idea +junit單元測(cè)試獲取不到bean注入的解決方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08
Java動(dòng)態(tài)規(guī)劃之編輯距離問(wèn)題示例代碼
這篇文章主要介紹了Java動(dòng)態(tài)規(guī)劃之編輯距離問(wèn)題示例代碼,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
Springcloud hystrix服務(wù)熔斷和dashboard如何實(shí)現(xiàn)
這篇文章主要介紹了Springcloud hystrix服務(wù)熔斷和dashboard如何實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12
springboot openfeign從JSON文件讀取數(shù)據(jù)問(wèn)題
今天主要說(shuō)一下在openfeign里讀取JSON文件的問(wèn)題,我們將測(cè)試所需要的數(shù)據(jù)存儲(chǔ)到文件里,在修改時(shí)關(guān)注點(diǎn)比較單純2018-06-06
Java使用黑盒方式模擬實(shí)現(xiàn)內(nèi)網(wǎng)穿透
這篇文章主要介紹了Java使用黑盒方式模擬實(shí)現(xiàn)內(nèi)網(wǎng)穿透,內(nèi)網(wǎng)穿透,也即 NAT 穿透,進(jìn)行 NAT 穿透是為了使具有某一個(gè)特定源 IP 地址和源端口號(hào)的數(shù)據(jù)包不被 NAT 設(shè)備屏蔽而正確路由到內(nèi)網(wǎng)主機(jī),需要的朋友可以參考下2023-05-05

