Druid核心源碼解析DruidDataSource
配置讀取
druid連接池支持的所有連接參數(shù)可在類com.alibaba.druid.pool.DruidDataSourceFactory中查看。
配置讀取代碼:
public void configFromPropety(Properties properties) {
//這方法太長,自己看源碼去吧,就是讀讀屬性。。。。
}
整體代碼比較簡單,就是把配置內(nèi)容,讀取到dataSource。
連接池初始化
首先是簡單的判斷,加鎖:
if (inited) {
//已經(jīng)被初始化好了,直接return
return;
}
// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();
/**控制創(chuàng)建移除連接的鎖,并且通過條件去控制一個連接的生成消費**/
// public DruidAbstractDataSource(boolean lockFair){
// lock = new ReentrantLock(lockFair);
//
// notEmpty = lock.newCondition();
// empty = lock.newCondition();
// }
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
之后會更新一些JMX的監(jiān)控指標(biāo):
//一些jmx監(jiān)控指標(biāo)
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
druid的監(jiān)控指標(biāo)都是通過jmx實現(xiàn)的。
解析連接串:
if (this.jdbcUrl != null) {
//解析連接串
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}
initFromWrapDriverUrl方法,除了從jdbc url中解析出連接和驅(qū)動信息,后面還把filters的名字,解析成了對應(yīng)的filter類。
private void initFromWrapDriverUrl() throws SQLException {
if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
return;
}
DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
this.driverClass = config.getRawDriverClassName();
LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");
this.jdbcUrl = config.getRawUrl();
if (this.name == null) {
this.name = config.getName();
}
for (Filter filter : config.getFilters()) {
addFilter(filter);
}
}
之后在init方法里面,會進行filters的初始化:
//初始化filter 屬性
for (Filter filter : filters) {
filter.init(this);
}
之后解析數(shù)據(jù)庫類型:
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
注意枚舉值: com.alibaba.druid.DbType,這個里面包含了目前durid連接池支持的所有數(shù)據(jù)源 類型,另外,druid還額外提供了一些驅(qū)動類,例如:
elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver
clickhouse還提供了負載均衡的驅(qū)動類:
com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver 。
在回到init方法,之后是一堆參數(shù)解析,不再說,跳過了。 之后是通過SPI加載自定義的filter:
private void initFromSPIServiceLoader() {
if (loadSpifilterSkip) {
return;
}
if (autoFilters == null) {
List<Filter> filters = new ArrayList<Filter>();
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
for (Filter filter : autoFilterLoader) {
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value()) {
filters.add(filter);
}
}
autoFilters = filters;
}
for (Filter filter : autoFilters) {
if (LOG.isInfoEnabled()) {
LOG.info("load filter from spi :" + filter.getClass().getName());
}
addFilter(filter);
}
}
注意自定義的filter,要使用com.alibaba.druid.filter.AutoLoad。
解析驅(qū)動:
protected void resolveDriver() throws SQLException {
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
Properties info = new Properties();
info.put("user", username);
info.put("password", password);
info.putAll(connectProperties);
driver = new BalancedClickhouseDriver(jdbcUrl, info);
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set");
}
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
}
其中durid自己的mock驅(qū)動和clickhouse的負載均衡的驅(qū)動,特殊判斷了下,其他走的都是class forname.
之后是exception sorter和checker的一些東西,跟主線劇情關(guān)系不大,skip.
之后是一些初始化JdbcDataSourceStat,沒啥東西。
之后是核心:
connections = new DruidConnectionHolder[maxActive]; //連接數(shù)組
evictConnections = new DruidConnectionHolder[maxActive]; //銷毀的連接數(shù)組
keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活躍可用的數(shù)組
dataSource的連接,都被包裝在類DruidConnectionHolder中,之后是一個同步去初始化連接還是異步去初始化的連接,總之,是去初始化 連接的過程:
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// init connections
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
初始化的連接個數(shù)為連接串里面配置的initialSize.
核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection(),在這方法里面,會拿用戶名密碼,之后執(zhí)行真正的獲取connection:
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn;
if (getProxyFilters().size() == 0) {
conn = getDriver().connect(url, info);
} else {
conn = new FilterChainImpl(this).connection_connect(info);
}
createCountUpdater.incrementAndGet(this);
return conn;
}
注意,如果配置了filters,則所有操作,都會在操作前執(zhí)行filter處理鏈。
public ConnectionProxy connection_connect(Properties info) throws SQLException {
if (this.pos < filterSize) {
return nextFilter()
.connection_connect(this, info);
}
Driver driver = dataSource.getRawDriver();
String url = dataSource.getRawJdbcUrl();
Connection nativeConnection = driver.connect(url, info);
if (nativeConnection == null) {
return null;
}
return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
}
再回到主流程init方法,connections數(shù)組初始化完成之后, 開啟額外線程:
createAndLogThread(); //打印連接信息
createAndStartCreatorThread(); //創(chuàng)建連接線程
createAndStartDestroyThread(); //銷毀連接線程
先看注釋,具體里面的內(nèi)容后面單獨拉出來講。
之后:
initedLatch.await(); //初始化 latch -1
init = true; //標(biāo)記已經(jīng)初始化完成
initedTime = new Date(); //時間
registerMbean(); //為datasource 注冊jmx監(jiān)控指標(biāo)
最后的最后,如果配置了keepAlive:
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true);
}
} else {
this.emptySignal();
}
}
這時候,會根據(jù)配置的活躍連接數(shù)minIdle,去給datasource的連接,做個保持活躍連接個數(shù),具體后面再說。
連接池使用的核心邏輯
首先,使用數(shù)組作為連接的容器,對于真實連接的加入和移除,使用lock就行同步,另外,在加入和移除連接時候,對比生產(chǎn)消費模型,通過lock上的條件,來通知是否可以獲取或者加入連接。
public DruidAbstractDataSource(boolean lockFair){
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition(); //非空,有連接
empty = lock.newCondition(); //空的
}
另外,默認的fairlock為false
public DruidDataSource(){
this(false);
}
public DruidDataSource(boolean fairLock){
super(fairLock);
configFromPropety(System.getProperties());
}
創(chuàng)建連接
在線程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread中:
if (emptyWait) {
// 必須存在線程等待,才創(chuàng)建連接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
必須存在線程等待獲取連接時候,才能創(chuàng)建連接,并且要保持總的連接數(shù),不能超過配置的最大連接。
創(chuàng)建完連接之后,執(zhí)行 notEmpty.signalAll();通知消費者。
獲取連接
外層代碼:
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
忽略掉filter chain,其實最后執(zhí)行的還是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect:
方法內(nèi)部:
poolableConnection = getConnectionInternal(maxWaitMillis);
- 1 , 連接不足,需要直接去創(chuàng)建新的,跟我們初始化一樣
- 2,從connections里面拿
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
其中,maxWait默認為-1,配置在init里面:
String property = properties.getProperty("druid.maxWait");
if (property != null && property.length() > 0) {
try {
int value = Integer.parseInt(property);
this.setMaxWait(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.maxWait'", e);
}
}
這個用于配置拿連接時候,是否在這個時間上進行等待,默認是否,即一直等到拿到連接為止。
直接看下阻塞拿的過程:
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
//沒連接了
while (poolingCount == 0) {
//暗示下創(chuàng)建線程沒連接了
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
//傻等著創(chuàng)建或者回收,能給整出來點兒連接
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
}
//拿數(shù)組的最后一個連接
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
連接回收
protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
//自定義配置銷毀 ,適用于連接數(shù)非常多的 情況
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
//單線程銷毀
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
實際的銷毀:
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
最終 執(zhí)行的還是 shrink方法。
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle; //需要檢測連接的數(shù)量
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) { //檢測目前connections數(shù)組中的連接
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
//是否設(shè)置了物理連接的超時時間phyTimoutMills。假如設(shè)置了該時間,
// 判斷連接時間存活時間是否已經(jīng)超過phyTimeoutMills,是則放入evictConnections中
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;//獲取連接空閑時間
//如果某條連接空閑時間小于minEvictableIdleTimeMillis,則不用繼續(xù)檢查剩下的連接了
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
if (idleMillis >= minEvictableIdleTimeMillis) {
// check checkTime is silly code
//檢測檢查了幾個連接了
if (checkTime && i < checkCount) {
//超時了
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
//超時了
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
//配置了keepAlive,并且在存活時間內(nèi),放到keepAlive數(shù)組
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
//不需要檢查時間的,直接移除
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount; //移除了幾個
//由于使用connections連接時候,都是取后面的,后面 的是最新的連接,只考慮前面過期就行,所以只需要挪動前面的連接
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
//不夠核心的活躍連接時候,需要去創(chuàng)建啦
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
//銷毀連接
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate; //沒通過validate
if (validate) {
//通過keep alive檢查,更新時間
holer.lastKeepTimeMillis = System.currentTimeMillis();
//這里還會嘗試放回connections數(shù)組
boolean putOk = put(holer, 0L, true);
if (!putOk) {
//沒放入,標(biāo)記要丟棄了
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
//發(fā)信號讓創(chuàng)建線程去創(chuàng)建
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
//又要去創(chuàng)建了
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
工具數(shù)組evictConnections,keepAliveConnections 用完即被置空,老工具人了。
一波操作下來,完成了對connections數(shù)組的大清洗。
小結(jié)
- 只寫了核心邏輯,很多validate,checker,filter省略了。
- druid連接池源碼里面還有很多好用的工具,比如數(shù)據(jù)庫驅(qū)動工具,jdbc工具,解析SQL的語法樹,ibatis的支持,wall過濾,多數(shù)據(jù)源...
- 最新的代碼我看還有支持配套ZK的高可用方案,用到的話后期我會繼續(xù)更新源碼解析。
以上就是Druid核心源碼解析DruidDataSource的詳細內(nèi)容,更多關(guān)于Druid核心DruidDataSource的資料請關(guān)注腳本之家其它相關(guān)文章!
Spring Boot 中常用的注解@RequestParam及基本用法
SpringBoot啟動類@SpringBootApplication注解背后的秘密

