druid的keepalive機(jī)制源碼解析
DruidDataSource
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration { private int keepAliveCheckCount = 0; private DruidConnectionHolder[] keepAliveConnections; private volatile boolean keepAlive = false; // from DruidAbstractDataSource protected volatile long keepAliveBetweenTimeMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS * 2; public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 60 * 1000L; public void init() throws SQLException { //...... if (keepAlive) { // async fill to minIdle if (createScheduler != null) { for (int i = 0; i < minIdle; ++i) { submitCreateTask(true); } } else { this.emptySignal(); } } //...... } }
DruidDataSource的init方法在keepAlive的時(shí)候觸發(fā)創(chuàng)建連接,當(dāng)createScheduler不為null時(shí)(默認(rèn)為null
)執(zhí)行submitCreateTask,否則執(zhí)行emptySignal
submitCreateTask
com/alibaba/druid/pool/DruidDataSource.java
private void submitCreateTask(boolean initTask) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(initTask); if (createTasks == null) { createTasks = new long[8]; } boolean putted = false; for (int i = 0; i < createTasks.length; ++i) { if (createTasks[i] == 0) { createTasks[i] = task.taskId; putted = true; break; } } if (!putted) { long[] array = new long[createTasks.length * 3 / 2]; System.arraycopy(createTasks, 0, array, 0, createTasks.length); array[createTasks.length] = task.taskId; createTasks = array; } this.createSchedulerFuture = createScheduler.submit(task); }
submitCreateTask創(chuàng)建CreateConnectionTask,然后提交到createScheduler
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable { private int errorCount; private boolean initTask; private final long taskId; public CreateConnectionTask() { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); } public CreateConnectionTask(boolean initTask) { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); this.initTask = initTask; } @Override public void run() { runInternal(); } private void runInternal() { for (; ; ) { // addLast lock.lock(); try { if (closed || closing) { clearCreateTask(taskId); return; } boolean emptyWait = true; if (createError != null && poolingCount == 0) { emptyWait = false; } if (emptyWait) { // 必須存在線程等待,才創(chuàng)建連接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive場(chǎng)景不能放棄創(chuàng)建 && (!initTask) // 線程池初始化時(shí)的任務(wù)不能放棄創(chuàng)建 && !isFailContinuous() // failContinuous時(shí)不能放棄創(chuàng)建,否則會(huì)無(wú)法創(chuàng)建線程 && !isOnFatalError() // onFatalError時(shí)不能放棄創(chuàng)建,否則會(huì)無(wú)法創(chuàng)建線程 ) { clearCreateTask(taskId); return; } // 防止創(chuàng)建超過(guò)maxActive數(shù)量的連接 if (activeCount + poolingCount >= maxActive) { clearCreateTask(taskId); return; } } } finally { lock.unlock(); } PhysicalConnectionInfo physicalConnection = null; try { physicalConnection = createPhysicalConnection(); } catch (OutOfMemoryError e) { LOG.error("create connection OutOfMemoryError, out memory. ", e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl, e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); // unknow fatal exception setFailContinuous(true); continue; } catch (Error e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection Error", e); // unknow fatal exception setFailContinuous(true); break; } catch (Throwable e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection unexecpted error.", e); break; } if (physicalConnection == null) { continue; } physicalConnection.createTaskId = taskId; boolean result = put(physicalConnection); if (!result) { JdbcUtils.close(physicalConnection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } break; } } }
CreateConnectionTask主要是創(chuàng)建physicalConnection,然后放到connections中。在emptyWait為true的時(shí)候會(huì)根據(jù)條件執(zhí)行empty.await()
CreateConnectionThread
public class CreateConnectionThread extends Thread { public CreateConnectionThread(String name) { super(name); this.setDaemon(true); } public void run() { initedLatch.countDown(); long lastDiscardCount = 0; int errorCount = 0; for (; ; ) { // addLast try { lock.lockInterruptibly(); } catch (InterruptedException e2) { break; } long discardCount = DruidDataSource.this.discardCount; boolean discardChanged = discardCount - lastDiscardCount > 0; lastDiscardCount = discardCount; try { boolean emptyWait = true; if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false; } if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false; } if (emptyWait) { // 必須存在線程等待,才創(chuàng)建連接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { empty.await(); } // 防止創(chuàng)建超過(guò)maxActive數(shù)量的連接 if (activeCount + poolingCount >= maxActive) { empty.await(); continue; } } } catch (InterruptedException e) { lastCreateError = e; lastErrorTimeMillis = System.currentTimeMillis(); if ((!closing) && (!closed)) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); } break; } finally { lock.unlock(); } PhysicalConnectionInfo connection = null; try { connection = createPhysicalConnection(); } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { break; } try { Thread.sleep(timeBetweenConnectErrorMillis); } catch (InterruptedException interruptEx) { break; } } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); setFailContinuous(true); continue; } catch (Error e) { LOG.error("create connection Error", e); setFailContinuous(true); break; } if (connection == null) { continue; } boolean result = put(connection); if (!result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } errorCount = 0; // reset errorCount if (closing || closed) { break; } } } }
CreateConnectionThread的邏輯與CreateConnectionTask有點(diǎn)類似,有不少重復(fù)的代碼,不像是同一個(gè)人寫的;CreateConnectionThread是在DruidDataSource的init方法中觸發(fā)createAndStartCreatorThread執(zhí)行的,看只執(zhí)行一次
shrink
public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } boolean needFill = false; int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } if (checkTime) { if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // keep order for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer, 0L, true); if (!putOk) { discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { // skip } lock.lock(); try { discardCount++; if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } if (needFill) { lock.lock(); try { int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive)
shrink方法會(huì)根據(jù)poolingCount遍歷connections,在checkTime為true時(shí)會(huì)根據(jù)idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis
),需要的話放入keepAliveConnections中,然后遍歷進(jìn)行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數(shù)組
小結(jié)
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive);該方法處理了evict及keepalive的邏輯,根據(jù)poolingCount遍歷connections,在checkTime為true時(shí)會(huì)根據(jù)idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis
),需要的話放入keepAliveConnections中,然后遍歷進(jìn)行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數(shù)組。
jedis的keepalive是直接設(shè)置socket.setKeepAlive(true),而common-pools則沒(méi)有所謂的keepalive,本質(zhì)上druid的keepalive與common-pools的testWhileIdle類似;只不過(guò)druid直接在getConnection的時(shí)候執(zhí)行testWhileIdle,這個(gè)邏輯有點(diǎn)奇怪,如果移除掉,而在shrink方法里頭的keepAlive邏輯刪除keepAliveBetweenTimeMillis判斷,那么就跟common-pools的testWhileIdle的邏輯一致了。druid的keepalive相當(dāng)于帶了keepAliveBetweenTimeMillis的testWhileIdle。
以上就是druid的keepalive機(jī)制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于druid keepalive機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
idea新建mapper.xml文件詳細(xì)步驟如:mybatis-config
這篇文章主要介紹了idea新建xml模板設(shè)置,例如:mybatis-config,本文分步驟通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07Java加載資源文件時(shí)的路徑問(wèn)題的解決辦法
今天偶然看到一篇關(guān)于tomcat加載servlet的文章,不由得想起了java加載資源文件的路徑問(wèn)題,資源文件可以使xml,properties,圖片等,可以是任何文件2013-04-04淺析Java常用API(Scanner,Random)匿名對(duì)象
這篇文章主要介紹了Java常用API(Scanner,Random)匿名對(duì)象,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03MybatisPlus字段類型轉(zhuǎn)換的實(shí)現(xiàn)示例
本文主要介紹了MybatisPlus如何完成字段類型轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03詳談Java編程之委托代理回調(diào)、內(nèi)部類以及匿名內(nèi)部類回調(diào)(閉包回調(diào))
下面小編就為大家?guī)?lái)一篇詳談Java編程之委托代理回調(diào)、內(nèi)部類以及匿名內(nèi)部類回調(diào)(閉包回調(diào))。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05Java中實(shí)現(xiàn)將jar包內(nèi)文件資源釋放出來(lái)
這篇文章主要介紹了Java中實(shí)現(xiàn)將jar包內(nèi)文件資源釋放出來(lái)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08Mybatis 級(jí)聯(lián)刪除的實(shí)現(xiàn)
這篇文章主要介紹了Mybatis 級(jí)聯(lián)刪除的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11