druid的borrow行為方法源碼解析
序
本文主要研究一下druid的borrow行為
getConnection
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}DruidDataSource的getConnection方法內(nèi)部調(diào)用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder); // 傳入null,避免重復(fù)關(guān)閉
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
public boolean isFull() {
lock.lock();
try {
return this.poolingCount + this.activeCount >= this.maxActive;
} finally {
lock.unlock();
}
}getConnectionDirect在一個for循環(huán)里頭進行獲取連接,首先執(zhí)行g(shù)etConnectionInternal(maxWaitMillis),若出現(xiàn)GetConnectionTimeoutException異常,則在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount時會遞增notFullTimeoutRetryCnt,然后continue繼續(xù)循環(huán),否則直接拋出GetConnectionTimeoutException跳出循環(huán)
獲取到連接之后,判斷是否是testOnBorrow,如果是則執(zhí)行testConnectionInternal,若校驗不成功則執(zhí)行discardConnection,然后繼續(xù)循環(huán);若非testOnBorrow則判斷conn是否closed,若是則執(zhí)行discardConnection,然后繼續(xù)循環(huán),若非closed則進入testWhileIdle的邏輯(druid直接在getConnection的時候執(zhí)行testWhileIdle有點令人匪夷所思)
最后是removeAbandoned,維護connectedTimeNano,將當前連接放到activeConnections中
getConnectionInternal
com/alibaba/druid/pool/DruidDataSource.java
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false; ; ) {
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("conn-direct_create ");
}
boolean discard;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {
errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
}
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
if (holder == null) {
long waitNanos = waitNanosLocal.get();
final long activeCount;
final long maxActive;
final long creatingCount;
final long createStartNanos;
final long createErrorCount;
final Throwable createError;
try {
lock.lock();
activeCount = this.activeCount;
maxActive = this.maxActive;
creatingCount = this.creatingCount;
createStartNanos = this.createStartNanos;
createErrorCount = this.createErrorCount;
createError = this.createError;
} finally {
lock.unlock();
}
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")
.append(waitNanos / (1000 * 1000))
.append(", active ").append(activeCount)
.append(", maxActive ").append(maxActive)
.append(", creating ").append(creatingCount);
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}
if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('\n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}getConnectionInternal方法先判斷是否closed,如果是則拋出DataSourceClosedException,接著判斷是否enable,如果不是則拋出DataSourceDisableException,緊接著for循環(huán),它主要根據(jù)createDirect來執(zhí)行不同邏輯,第一次默認createDirect為false;
createDirect為false,對于notEmptyWaitThreadCount大于等于maxWaitThreadCount則拋出SQLException,對于poolingCount為0且activeCount小于maxActive,createScheduler的queue大小大于0的,則設(shè)置createDirect為true;否則對于maxWait大于0的,執(zhí)行pollLast(nanos),否則執(zhí)行takeLast()
createDirect為true,會通過DruidDataSource.this.createPhysicalConnection()創(chuàng)建物理連接,對于activeCount小于maxActive的,則維護activeCount跳出循環(huán),否則標記discard為true,通過JdbcUtils.close(pyConnInfo.getPhysicalConnection())關(guān)閉連接
pollLast
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (; ; ) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}pollLast方法在poolingCount為0時執(zhí)行emptySignal,另外主要是處理notEmpty這個condition,然后取connections[poolingCount]
takeLast
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}takeLast方法在poolingCount為0的時候執(zhí)行emptySignal,然后通過notEmpty.await()進行阻塞等待,最后返回connections[poolingCount]
emptySignal
private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
submitCreateTask(false);
}emptySignal方法,對于createScheduler為null的執(zhí)行empty.signal(),之后判斷task數(shù)量即maxActive判斷,最后執(zhí)行submitCreateTask(false)
submitCreateTask
private void submitCreateTask(boolean initTask) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(initTask);
if (createTasks == null) {
createTasks = new long[8];
}
boolean putted = false;
for (int i = 0; i < createTasks.length; ++i) {
if (createTasks[i] == 0) {
createTasks[i] = task.taskId;
putted = true;
break;
}
}
if (!putted) {
long[] array = new long[createTasks.length * 3 / 2];
System.arraycopy(createTasks, 0, array, 0, createTasks.length);
array[createTasks.length] = task.taskId;
createTasks = array;
}
this.createSchedulerFuture = createScheduler.submit(task);
}submitCreateTask會創(chuàng)建CreateConnectionTask,然后提交到createScheduler執(zhí)行
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable {
private int errorCount;
private boolean initTask;
private final long taskId;
public CreateConnectionTask() {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
}
public CreateConnectionTask(boolean initTask) {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (; ; ) {
// addLast
lock.lock();
try {
if (closed || closing) {
clearCreateTask(taskId);
return;
}
boolean emptyWait = true;
if (createError != null && poolingCount == 0) {
emptyWait = false;
}
if (emptyWait) {
// 必須存在線程等待,才創(chuàng)建連接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive場景不能放棄創(chuàng)建
&& (!initTask) // 線程池初始化時的任務(wù)不能放棄創(chuàng)建
&& !isFailContinuous() // failContinuous時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程
&& !isOnFatalError() // onFatalError時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程
) {
clearCreateTask(taskId);
return;
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
clearCreateTask(taskId);
return;
}
}
} finally {
lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {
physicalConnection = createPhysicalConnection();
} catch (OutOfMemoryError e) {
LOG.error("create connection OutOfMemoryError, out memory. ", e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl, e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection Error", e);
// unknow fatal exception
setFailContinuous(true);
break;
} catch (Throwable e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection unexecpted error.", e);
break;
}
if (physicalConnection == null) {
continue;
}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {
JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}CreateConnectionTask通過for循環(huán),然后加鎖處理minIdle及maxActive,最后通過createPhysicalConnection創(chuàng)建物理連接
createPhysicalConnection
com/alibaba/druid/pool/DruidAbstractDataSource.java
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
String url = this.getUrl();
Properties connectProperties = getConnectProperties();
String user;
if (getUserCallback() != null) {
user = getUserCallback().getName();
} else {
user = getUsername();
}
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null) {
if (passwordCallback instanceof DruidPasswordCallback) {
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if (chars != null) {
password = new String(chars);
}
}
Properties physicalConnectProperties = new Properties();
if (connectProperties != null) {
physicalConnectProperties.putAll(connectProperties);
}
if (user != null && user.length() != 0) {
physicalConnectProperties.put("user", user);
}
if (password != null && password.length() != 0) {
physicalConnectProperties.put("password", password);
}
Connection conn = null;
long connectStartNanos = System.nanoTime();
long connectedNanos, initedNanos, validatedNanos;
Map<String, Object> variables = initVariants
? new HashMap<String, Object>()
: null;
Map<String, Object> globalVariables = initGlobalVariants
? new HashMap<String, Object>()
: null;
createStartNanosUpdater.set(this, connectStartNanos);
creatingCountUpdater.incrementAndGet(this);
try {
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {
throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
}
initPhysicalConnection(conn, variables, globalVariables);
initedNanos = System.nanoTime();
validateConnection(conn);
validatedNanos = System.nanoTime();
setFailContinuous(false);
setCreateError(null);
} catch (SQLException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (RuntimeException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (Error ex) {
createErrorCountUpdater.incrementAndGet(this);
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} finally {
long nano = System.nanoTime() - connectStartNanos;
createTimespan += nano;
creatingCountUpdater.decrementAndGet(this);
}
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
}createPhysicalConnection通過try catch去創(chuàng)建物理連接,若有異常則會通過JdbcUtils.close(conn)去關(guān)閉連接
testConnectionInternal
protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {
String sqlFile = JdbcSqlStat.getContextSqlFile();
String sqlName = JdbcSqlStat.getContextSqlName();
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(null);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(null);
}
try {
if (validConnectionChecker != null) {
boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
long currentTimeMillis = System.currentTimeMillis();
if (holder != null) {
holder.lastValidTimeMillis = currentTimeMillis;
holder.lastExecTimeMillis = currentTimeMillis;
}
if (valid && isMySql) { // unexcepted branch
long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
if (lastPacketReceivedTimeMs > 0) {
long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
if (lastPacketReceivedTimeMs > 0 //
&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
discardConnection(holder);
String errorMsg = "discard long time none received connection. "
+ ", jdbcUrl : " + jdbcUrl
+ ", version : " + VERSION.getVersionNumber()
+ ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;
LOG.warn(errorMsg);
return false;
}
}
}
if (valid && onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false;
}
} finally {
lock.unlock();
}
}
return valid;
}
if (conn.isClosed()) {
return false;
}
if (null == validationQuery) {
return true;
}
Statement stmt = null;
ResultSet rset = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {
return false;
}
} finally {
JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}
if (onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false;
}
} finally {
lock.unlock();
}
}
return true;
} catch (Throwable ex) {
// skip
return false;
} finally {
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(sqlFile);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(sqlName);
}
}
}testConnectionInternal主要通過validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)來校驗連接,如果validConnectionChecker為null則通過jdbc執(zhí)行validationQuery進行校驗
discardConnection
public void discardConnection(DruidConnectionHolder holder) {
if (holder == null) {
return;
}
Connection conn = holder.getConnection();
if (conn != null) {
JdbcUtils.close(conn);
}
lock.lock();
try {
if (holder.discard) {
return;
}
if (holder.active) {
activeCount--;
holder.active = false;
}
discardCount++;
holder.discard = true;
if (activeCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}discardConnection方法主要是關(guān)閉connection,之后枷鎖處理一些統(tǒng)計標記
小結(jié)
DruidDataSource的getConnection方法內(nèi)部調(diào)用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect在一個for循環(huán)里頭進行獲取連接,首先執(zhí)行g(shù)etConnectionInternal(maxWaitMillis),若出現(xiàn)GetConnectionTimeoutException異常,則在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount時會遞增notFullTimeoutRetryCnt,然后continue繼續(xù)循環(huán),否則直接拋出GetConnectionTimeoutException跳出循環(huán)
獲取到連接之后,判斷是否是testOnBorrow,如果是則執(zhí)行testConnectionInternal,若校驗不成功則執(zhí)行discardConnection,然后繼續(xù)循環(huán);若非testOnBorrow則判斷conn是否closed,若是則執(zhí)行discardConnection,然后繼續(xù)循環(huán),若非closed則進入testWhileIdle的邏輯
最后是removeAbandoned,維護connectedTimeNano,將當前連接放到activeConnections中
整體代碼看下來感覺跟commons-pool相比,druid代碼的實現(xiàn)感覺有點粗糙,抽象層級不夠高,代碼充斥大量統(tǒng)計標記、狀態(tài)位的處理,維護起來得很小心,另外druid直接在getConnection的時候執(zhí)行testWhileIdle有點令人匪夷所思
以上就是druid的borrow行為方法源碼解析的詳細內(nèi)容,更多關(guān)于druid borrow行為的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java連接MySQL數(shù)據(jù)庫并實現(xiàn)數(shù)據(jù)交互的示例
數(shù)據(jù)庫是非常重要的一種存儲格式,可以大大提高存儲效率,本文主要介紹了Java連接MySQL數(shù)據(jù)庫并實現(xiàn)數(shù)據(jù)交互的示例,具有一定的參考價值,感興趣的可以了解一下2024-03-03
詳解Spring Security的Web應(yīng)用和指紋登錄實踐
這篇文章主要介紹了詳解Spring Security的Web應(yīng)用和指紋登錄實踐,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-03-03
springboot整合shiro實現(xiàn)記住我功能
這篇文章主要介紹了springboot整合shiro實現(xiàn)記住我功能,配置類 ShiroConfig,通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2021-10-10
springboot Jpa多數(shù)據(jù)源(不同庫)配置過程
這篇文章主要介紹了springboot Jpa多數(shù)據(jù)源(不同庫)配置過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05

