java Socket編程實現(xiàn)I/O多路復(fù)用的示例
I/O多路復(fù)用
最基本的Socket模型:可以讓進程跨主機通信.
具體流程是這樣:服務(wù)端開辟一個Socket,為這個Socket綁定IP地址(找到機器的網(wǎng)卡)和端口號(找到進程),通過listen()函數(shù)進行監(jiān)聽。在客戶端通過connect()函數(shù)發(fā)起連接后,服務(wù)端的Socket會維護兩個隊列,一個是還沒有完全建立連接的隊列,叫TCP半連接隊列;另一個是已經(jīng)建立連接(完成了三次握手)的隊列,叫全連接隊列。服務(wù)端會通過accept()函數(shù)從全連接隊列拿出一個socket,后續(xù)傳輸都用這個Socket。
多進程模型
為了讓一個服務(wù)端服務(wù)多個客戶端,由此產(chǎn)生了多進程模型。
本質(zhì)就是讓一個父進程去處理和多個客戶端的連接,然后每連一個客戶端,都fork一個父進程的子進程,讓這個子進程用于和新增的客戶端進行數(shù)據(jù)的讀寫。
但這種方式弊端還是很大的,因為創(chuàng)建進程的開銷是很大的,需要為這個進程分配虛擬內(nèi)存,棧,全局變量等信息,在這個進程進行上下文切換的時候是很費勁的。
因此這個模型也無法支持太多的并發(fā)量。
多線程模型
既然進程這個單位是很重且開銷很大的,因此我們可以用線程去替代線程。
同一個進程的多個線程可以去共享進程中的部分資源,如文件描述符列表,全局數(shù)據(jù),堆等,這些共享資源是無需切換的。需要切換的只有線程的私有數(shù)據(jù),如幫助線程上下文切換的寄存器。所以線程的創(chuàng)建的開銷是很小的。
多線程模型的過程是這樣的,父進程同樣負責(zé)跟多個客戶端創(chuàng)建連接,然后將已經(jīng)創(chuàng)建好連接的socket放入到隊列中。在這個父進程中的多個線程負責(zé)拿鎖去從這個隊列里面取出socket,然后往對應(yīng)的取出的socket中進行讀寫。
但就算是這樣,創(chuàng)建線程的開銷即使不大,也沒辦法支撐太多的并發(fā),只是比多進程模型更優(yōu)秀。
既然為每個請求分配一個線程或者進程的方式開銷都有些大,那么有可能只用一個進程取維護多個Socket呢?
下面我們就要說說I/O復(fù)用技術(shù)了,其實I/O多路復(fù)用技術(shù)優(yōu)點類似于CPU時間片的利用。雖然一個進程同一時間內(nèi)只能處理一個socket,但是如果說這個socket的處理時間只有1ms,那么放到1s內(nèi)來看,它其實也是有100的并發(fā)量的。
select/poll/epoll是三個多路復(fù)用的接口。
select/poll/epoll
select實現(xiàn)多路復(fù)用的方式是,把已經(jīng)連接的socket的文件描述符放入內(nèi)核中,內(nèi)核負責(zé)遍歷這些socket,當(dāng)檢查有讀或者寫事件的時候,就把這個socket標(biāo)記為可讀或者可寫,接著把這些文件描述符集合拷貝回用戶態(tài)。用戶態(tài)再遍歷取出可讀或者可寫的socket并對其進行處理。
文件描述符集合就是每一項都指向一個打開的文件。
poll與select的區(qū)別就是poll是用鏈表存儲。
epoll的內(nèi)核對socket的存儲結(jié)構(gòu)和select/poll就不同了,它們是線性的存儲結(jié)構(gòu),每次執(zhí)行這兩個方法的時候都是把整個socket傳給內(nèi)核。而epoll是用紅黑樹存儲,當(dāng)有一個新的需要檢測的socket來臨時,只需要傳一個即可,大幅減少了內(nèi)核和用戶空間的拷貝過程。
此外,epoll的內(nèi)核還維護了一個鏈表來記錄就緒事件,當(dāng)檢測的socket有就緒事件發(fā)生時,就會通過回調(diào)函數(shù)把這個事件加入這個就緒事件鏈表當(dāng)中,當(dāng)用戶態(tài)需要這個就緒鏈表的時候,只會返回有事件發(fā)生的文件描述符列表,拷貝回用戶態(tài),無需像select/poll一樣,無論這個socket是否有事件發(fā)生,都全部拷貝回去。
Socket編程實現(xiàn)I/O多路復(fù)用模型

Server端:主線程負責(zé)處理連接事件,讀線程交給異步支線線程處理,達到I/O多路復(fù)用的效果,同時這也是Netty框架的部分底層思想實現(xiàn)噢~
public class SimpleServer {
public static void main(String[] args) throws IOException {
?
System.out.println("這里是服務(wù)端");
?
//創(chuàng)建服務(wù)端Channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
?
//設(shè)置非阻塞
serverChannel.configureBlocking(false);
?
//創(chuàng)建Selector
Selector selector = Selector.open();
?
//0代表不對任何事件感興趣
SelectionKey selectionKey = serverChannel.register(selector, 0, serverChannel);
?
//對連接接收事件感興趣
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
?
//綁定端口
serverChannel.bind(new InetSocketAddress(8080));
?
NioEventLoop executor = new NioEventLoop();
//主線程負責(zé)連接
while (true){
//當(dāng)沒有事件到來的時候,這里是阻塞的,有事件的時候會自動運行
selector.select();
//如果有事件到來,這里可以得到注冊到該selector上的所有的key,每一個key上都有一個channel
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//得到集合的迭代器
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
//得到每一個key
SelectionKey key = keyIterator.next();
//首先要從集合中把key刪除,否則會一直報告該key
keyIterator.remove();
//接下來就要處理事件,判斷selector輪詢到的是什么事件,并根據(jù)事件作出回應(yīng)
//如果是連接事件
if(key.isAcceptable()){
//之前把服務(wù)端channel注冊到selector上時候,把serverChannel放進來了
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//接收客戶端channel
SocketChannel clientChannel = channel.accept();
clientChannel.configureBlocking(false);
SelectionKey clientSocketKey = clientChannel.register(selector, 0, clientChannel);
//將客戶端channel設(shè)置為可讀事件
clientSocketKey.interestOps(SelectionKey.OP_READ);
System.out.println("客戶端連接成功"+System.currentTimeMillis());
//worker線程開始從客戶端讀數(shù)據(jù),把客戶端的channel交給worker
executor.register(clientChannel,executor);
//用channel寫回一條信息
clientChannel.write(ByteBuffer.wrap("服務(wù)端寫回客戶端成功".getBytes()));
System.out.println("向客戶端發(fā)送數(shù)據(jù)成功"+System.currentTimeMillis());
}
}
}
?
}
}public class NioEventLoop extends SingleThreadEventLoop{
?
protected Selector selector ;
?
public NioEventLoop() throws IOException {
this.selector = Selector.open();
}
?
public Selector getSelector(){
return this.selector;
}
?
//循環(huán)阻塞,如果有事件發(fā)生,或者隊列有東西,就放行
private void select() throws IOException {
while (true){
//阻塞等待事件,如果3s都沒有事件過來,可能是沒有初始化。
int select = selector.select(3000);
if(select != 0 || hasTasks()){
break;
}
}
}
?
private void runAllTasks() {
?
for (;;){
Runnable task = tasksQueue.poll();
if(task == null){
break;
}
System.out.println("開始處理注冊事件");
task.run();
}
}
?
private void processSelectedKeys() throws IOException {
System.out.println("開始處理I/O事件");
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//由于連接事件被處理完了,只剩下只讀事件了
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
len = socketChannel.read(buffer);
if (len == -1) {
socketChannel.close();
break;
} else {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString() + System.currentTimeMillis());
}
}
}
}
?
@Override
public void run(){
while (true) {
try {
//等待任務(wù)
select();
//I/O任務(wù)來了先處理I/O任務(wù)
processSelectedKeys();
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
//最后處理隊列任務(wù)。第一遍走過來的時候會先處理隊列任務(wù)
runAllTasks();
}
}
}
}public abstract class SingleThreadEventExecutor implements Executor {
?
? ?private volatile boolean isSingle = false;
?
? ?private RejectedExecutionHandler rejectedExecutionHandler;
?
? ?protected Queue<Runnable> tasksQueue;
?
? ?protected Thread thread;
?
? ?//初始化的時候,1.構(gòu)建線程池的屬性,如隊列,拒絕策略。2.構(gòu)建當(dāng)前線程的Selector
? ?public SingleThreadEventExecutor() {
? ? ? ?this.tasksQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
? ? ? ?this.rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
? }
?
? ?protected final void reject(Runnable task) {
// ? ? ? rejectedExecutionHandler.rejectedExecution(task, this);
? }
?
? ?protected boolean inEventLoop(Thread thread){
? ? ? ?return this.thread == thread;
? }
?
? ?protected boolean hasTasks(){
? ? ? ?return this.tasksQueue.isEmpty();
? }
?
?
? ?@Override
? ?public void execute(Runnable task) {
?
? ? ? ?//此時仍然是主線程。先把任務(wù)放入隊列,后續(xù)處理
? ? ? ?addTask(task);
? ? ? ?System.out.println("任務(wù)添加完成");
? ? ? ?startThread(task);
? }
?
? ?protected final void addTask(Runnable task){
? ? ? ?this.tasksQueue.add(task);
? }
?
? ?private void startThread(Runnable task) {
? ? ? ?if (isSingle) {
? ? ? ? ? ?return;
? ? ? }
? ? ? ?isSingle = true;
? ? ? ?System.out.println("新線程任務(wù)成功創(chuàng)建");
? ? ? ?//這是個異步線程,單線程執(zhí)行器的核心
? ? ? ?new Thread(()->{
? ? ? ? ? ?thread = Thread.currentThread();
? ? ? ? ? ?SingleThreadEventExecutor.this.run();
? ? ? ? ? ?System.out.println("新線程任務(wù)跑完了");
? ? ? }).start();
? }
?
?
? ?protected abstract void run();
?
?
}public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor{
?
private void register0(SocketChannel socketChannel,NioEventLoop nioEventLoop){
try {
socketChannel.configureBlocking(false);
socketChannel.register(nioEventLoop.getSelector(), SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
?
//注冊socketChannel到當(dāng)前的selector上
public void register(SocketChannel socketChannel,NioEventLoop nioEventLoop){
//先判斷當(dāng)前線程是否是執(zhí)行器線程,如果不是,說明是主線程,那么就還沒被構(gòu)造。
if(inEventLoop(Thread.currentThread())){
register0(socketChannel,nioEventLoop);
}else {
//否則是第一次注冊,就先構(gòu)造線程即可。
execute(()->{
register0(socketChannel,nioEventLoop);
System.out.println("executor執(zhí)行完成");
});
}
}
}到此這篇關(guān)于java Socket編程實現(xiàn)I/O多路復(fù)用的示例的文章就介紹到這了,更多相關(guān)jav I/O多路復(fù)用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea +junit單元測試獲取不到bean注入的解決方式
這篇文章主要介紹了idea +junit單元測試獲取不到bean注入的解決方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
Springcloud hystrix服務(wù)熔斷和dashboard如何實現(xiàn)
這篇文章主要介紹了Springcloud hystrix服務(wù)熔斷和dashboard如何實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-12-12
springboot openfeign從JSON文件讀取數(shù)據(jù)問題
今天主要說一下在openfeign里讀取JSON文件的問題,我們將測試所需要的數(shù)據(jù)存儲到文件里,在修改時關(guān)注點比較單純2018-06-06
Java使用黑盒方式模擬實現(xiàn)內(nèi)網(wǎng)穿透
這篇文章主要介紹了Java使用黑盒方式模擬實現(xiàn)內(nèi)網(wǎng)穿透,內(nèi)網(wǎng)穿透,也即 NAT 穿透,進行 NAT 穿透是為了使具有某一個特定源 IP 地址和源端口號的數(shù)據(jù)包不被 NAT 設(shè)備屏蔽而正確路由到內(nèi)網(wǎng)主機,需要的朋友可以參考下2023-05-05

