springboot內(nèi)置tomcat之NIO處理流程一覽
前言
springboot內(nèi)置的tomcat目前默認(rèn)是基于NIO來實現(xiàn)的,本文介紹下tomcat接受請求的一些組件及組件之間的關(guān)聯(lián)
tomcat組件
本文只介紹NIO中tomcat的組件
我們直接看NIO的核心類NioEndpoint的startInternal方法
Acceptor組件
public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads // 核心代碼1 pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } // 核心代碼2 startAcceptorThreads(); } }
看核心代碼1的位置構(gòu)造了一個Poller數(shù)組,Poller是一個實現(xiàn)了Runnable的類,并且啟動了該線程類,
getPollerThreadCount()方法返回了2和當(dāng)前物理機CPU內(nèi)核數(shù)的最小值,即創(chuàng)建的數(shù)組最大值為2
接下來看核心代碼2startAcceptorThreads()
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
創(chuàng)建了多個Acceptor類,Acceptor也是實現(xiàn)了Runnable的線程類,創(chuàng)建個數(shù)默認(rèn)是1
然后我們看下Acceptor啟動后做了什么,我們直接看run方法
public void run() { ...... U socket = null; try { // Accept the next incoming connection from the server // socket // 核心代碼1 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { ...... } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 核心代碼2 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } ...... }
核心代碼1很明顯是一個阻塞模型,即接受客戶端連接的,當(dāng)沒有客戶端連接時會處于阻塞,這里可以看到默認(rèn)情況下tomcat在nio模式下只有一個Acceptor線程類來接受連接
然后看核心代碼2
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 核心代碼 getPoller0().register(channel); } catch (Throwable t) { ...... } return true; }
我們看核心代碼getPoller0().register(channel)
public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 核心代碼 addEvent(r); }
看addEvent
private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
events的定義
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
這里可以看到封裝了一個PollerEvent 并且扔到了一個隊列里面,然后當(dāng)前類就結(jié)束了
由此可得Acceptor的作用就是接受客戶端連接,并且把連接封裝起來扔到了一個隊列中
Poller
我們前面已經(jīng)創(chuàng)建并且啟動了多個Poller線程類,默認(rèn)的數(shù)量是小于等于2的。
然后我們看下Poller類做了什么,同樣我們看run方法
@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { // 核心代碼1 hasEvents = events(); ....... Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 核心代碼2 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
先看核心代碼1 hasEvents = events()
public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { // 核心代碼 pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }
核心代碼run
@Override public void run() { if (interestOps == OP_REGISTER) { try { // 核心代碼,注冊到selector輪訓(xùn)器 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { ...... } }
可以看出大概的意思就是從剛才我們放進去的隊列events里面取數(shù)據(jù)放到了eventCache里面,eventCache的定義SynchronizedStack eventCache,當(dāng)取到數(shù)據(jù)后返回true,這個時候就會進入核心代碼2處的processKey(sk, attachment),也就是開始處理請求了
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { // 核心代碼 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } ...... }
這里就可以看到我們的NIO的模型了,也就是多路復(fù)用的模型,輪詢來判斷key的狀態(tài),當(dāng)key是可讀或者可寫時執(zhí)行processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 核心代碼 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
這里就是核心代碼了,可以看到getExecutor()方法,獲取線程池,這個線程池是在初始化tomcat時提前初始化好的,默認(rèn)情況下核心線程是10,最大線程是200。線程池的配置可以根據(jù)我們自己配置來設(shè)置大小。
這里拿到線程池然后包裝了一個SocketProcessorBase線程類扔到線程池里面取執(zhí)行
從這里可以看到Poller的功能就是從前面的隊列里面獲取連接然后包裝成SocketProcessorBase之后扔到線程池里面去執(zhí)行,SocketProcessorBase才是最終真正處理請求的
總結(jié)
根據(jù)上面的分析我們已經(jīng)可以看到tomcat的執(zhí)行流程了,這里盜用網(wǎng)上的一張比較好的圖
大致流程為
1、創(chuàng)建一個Acceptor線程來接收用戶連接,接收到之后扔到events queue隊列里面,默認(rèn)情況下只有一個線程來接收
2、創(chuàng)建Poller線程,數(shù)量小于等于2,Poller對象是NIO的核心,在Poller中,維護了一個Selector對象;當(dāng)Poller從隊列中取出socket后,注冊到該Selector中;然后通過遍歷Selector,找出其中可讀的socket,然后扔到線程池中處理相應(yīng)請求,這就是典型的NIO多路復(fù)用模型。
3、扔到線程池中的SocketProcessorBase處理請求
相較于BIO模型的tomcat,NIO的優(yōu)勢分析
1、BIO中的流程應(yīng)該是接收到請求之后直接把請求扔給線程池去做處理,在這個情況下一個連接即需要一個線程來處理,線程既需要讀取數(shù)據(jù)還需要處理請求,線程占用時間長,很容易達到最大線程
2、NIO的流程的不同點在于Poller類采用了多路復(fù)用模型,即Poller類只有檢查到可讀或者可寫的連接時才把當(dāng)前連接扔給線程池來處理,這樣的好處是大大節(jié)省了連接還不能讀寫時的處理時間(如讀取請求數(shù)據(jù)),也就是說NIO“讀取socket并交給Worker中的線程”這個過程是非阻塞的,當(dāng)socket在等待下一個請求或等待釋放時,并不會占用工作線程,因此Tomcat可以同時處理的socket數(shù)目遠(yuǎn)大于最大線程數(shù),并發(fā)性能大大提高。
以上就是我對于tomcat中nio處理模型的一些理解。希望能給大家一個參考,也希望大家多多支持腳本之家
相關(guān)文章
Java Fluent Mybatis實戰(zhàn)之構(gòu)建項目與代碼生成篇上
Java中常用的ORM框架主要是mybatis, hibernate, JPA等框架。國內(nèi)又以Mybatis用的多,基于mybatis上的增強框架,又有mybatis plus和TK mybatis等。今天我們介紹一個新的mybatis增強框架 fluent mybatis2021-10-10Java從控制臺讀入數(shù)據(jù)的幾種方法總結(jié)
下面小編就為大家?guī)硪黄狫ava從控制臺讀入數(shù)據(jù)的幾種方法總結(jié)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-10-10java 輸入一個數(shù)字,反轉(zhuǎn)輸出這個數(shù)字的值(實現(xiàn)方法)
下面小編就為大家?guī)硪黄猨ava 輸入一個數(shù)字,反轉(zhuǎn)輸出這個數(shù)字的值(實現(xiàn)方法)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-10-10