httpclient的CPool定義方法詳解
序
本文主要研究一下httpclient的CPool
ConnPool
org/apache/http/pool/ConnPool.java
public interface ConnPool<T, E> {
/**
* Attempts to lease a connection for the given route and with the given
* state from the pool.
*
* @param route route of the connection.
* @param state arbitrary object that represents a particular state
* (usually a security principal or a unique token identifying
* the user whose credentials have been used while establishing the connection).
* May be {@code null}.
* @param callback operation completion callback.
*
* @return future for a leased pool entry.
*/
Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);
/**
* Releases the pool entry back to the pool.
*
* @param entry pool entry leased from the pool
* @param reusable flag indicating whether or not the released connection
* is in a consistent state and is safe for further use.
*/
void release(E entry, boolean reusable);
}ConnPool定義了lease及release方法,其中定義了兩個泛型,T表示route,E表示poolEntry
ConnPoolControl
public interface ConnPoolControl<T> {
void setMaxTotal(int max);
int getMaxTotal();
void setDefaultMaxPerRoute(int max);
int getDefaultMaxPerRoute();
void setMaxPerRoute(final T route, int max);
int getMaxPerRoute(final T route);
PoolStats getTotalStats();
PoolStats getStats(final T route);
}ConnPoolControl接口定義了設(shè)置和訪問maxTotal、defaultMaxPerRoute及PoolStats的方法
AbstractConnPool
org/apache/http/pool/AbstractConnPool.java
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
private final Lock lock;
private final Condition condition;
private final ConnFactory<T, C> connFactory;
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
private final Set<E> leased;
private final LinkedList<E> available;
private final LinkedList<Future<E>> pending;
private final Map<T, Integer> maxPerRoute;
private volatile boolean isShutDown;
private volatile int defaultMaxPerRoute;
private volatile int maxTotal;
private volatile int validateAfterInactivity;
public AbstractConnPool(
final ConnFactory<T, C> connFactory,
final int defaultMaxPerRoute,
final int maxTotal) {
super();
this.connFactory = Args.notNull(connFactory, "Connection factory");
this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
this.maxTotal = Args.positive(maxTotal, "Max total value");
this.lock = new ReentrantLock();
this.condition = this.lock.newCondition();
this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
this.pending = new LinkedList<Future<E>>();
this.maxPerRoute = new HashMap<T, Integer>();
}
/**
* Creates a new entry for the given connection with the given route.
*/
protected abstract E createEntry(T route, C conn);
//......
}AbstractConnPool聲明實現(xiàn)ConnPool、ConnPoolControl接口,它定義E必須繼承PoolEntry,同時定義了泛型C,表示connectionType
shutdown
public void shutdown() throws IOException {
if (this.isShutDown) {
return ;
}
this.isShutDown = true;
this.lock.lock();
try {
for (final E entry: this.available) {
entry.close();
}
for (final E entry: this.leased) {
entry.close();
}
for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
pool.shutdown();
}
this.routeToPool.clear();
this.leased.clear();
this.available.clear();
} finally {
this.lock.unlock();
}
}shutdown方法會遍歷available、leased挨個執(zhí)行close,然后遍歷routeToPool挨個執(zhí)行shutdown
lease方法
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (done.compareAndSet(false, true)) {
cancelled.set(true);
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.cancelled();
}
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public E get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (final TimeoutException ex) {
throw new ExecutionException(ex);
}
}
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
};
}lease方法主要是get及cancel,其中g(shù)et方法主要是執(zhí)行g(shù)etPoolEntryBlocking,對于validateAfterInactivity大于0的則根據(jù)判斷是否需要validate,若需要且validate失敗則執(zhí)行l(wèi)easedEntry.close()及release方法
getPoolEntryBlocking
org/apache/http/pool/AbstractConnPool.java
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
this.lock.lock();
try {
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
boolean success = false;
try {
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}getPoolEntryBlocking先根據(jù)route從routeToPool取出對應(yīng)的RouteSpecificPool,然后pool.getFree(state),之后判斷是否過期,是否關(guān)閉,沒問題則從available移除,添加到leased中,然后執(zhí)行onReuse回調(diào),如果entry為null則通過connFactory.create(route)來創(chuàng)建
release
@Override
public void release(final E entry, final boolean reusable) {
this.lock.lock();
try {
if (this.leased.remove(entry)) {
final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
pool.free(entry, reusable);
if (reusable && !this.isShutDown) {
this.available.addFirst(entry);
} else {
entry.close();
}
onRelease(entry);
Future<E> future = pool.nextPending();
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
this.condition.signalAll();
}
}
} finally {
this.lock.unlock();
}
}release方法先獲取RouteSpecificPool,然后執(zhí)行pool.free(entry, reusable)
CPool
org/apache/http/impl/conn/CPool.java
@Contract(threading = ThreadingBehavior.SAFE)
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {
private static final AtomicLong COUNTER = new AtomicLong();
private final Log log = LogFactory.getLog(CPool.class);
private final long timeToLive;
private final TimeUnit timeUnit;
public CPool(
final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final int defaultMaxPerRoute, final int maxTotal,
final long timeToLive, final TimeUnit timeUnit) {
super(connFactory, defaultMaxPerRoute, maxTotal);
this.timeToLive = timeToLive;
this.timeUnit = timeUnit;
}
@Override
protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {
final String id = Long.toString(COUNTER.getAndIncrement());
return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);
}
@Override
protected boolean validate(final CPoolEntry entry) {
return !entry.getConnection().isStale();
}
@Override
protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
super.enumAvailable(callback);
}
@Override
protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
super.enumLeased(callback);
}
}CPool繼承了AbstractConnPool,其T為HttpRoute,C為ManagedHttpClientConnection,E為CPoolEntry;其createEntry方法創(chuàng)建CPoolEntry,validate則判斷connect是不是stale
小結(jié)
ConnPool定義了lease及release方法,其中定義了兩個泛型,T表示route,E表示poolEntry;
AbstractConnPool聲明實現(xiàn)ConnPool、ConnPoolControl接口,它定義E必須繼承PoolEntry,同時定義了泛型C,表示connectionType;CPool繼承了AbstractConnPool,其T為HttpRoute,C為ManagedHttpClientConnection,E為CPoolEntry。
AbstractConnPool的lease方法主要是get及cancel,其中g(shù)et方法主要是執(zhí)行g(shù)etPoolEntryBlocking,對于validateAfterInactivity大于0的則根據(jù)判斷是否需要validate,若需要且validate失敗則執(zhí)行l(wèi)easedEntry.close()及release方法;release方法先獲取RouteSpecificPool,然后執(zhí)行pool.free(entry, reusable)
以上就是httpclient的CPool定義方法詳解的詳細內(nèi)容,更多關(guān)于httpclient CPool方法定義的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Logback 使用TurboFilter實現(xiàn)日志級別等內(nèi)容的動態(tài)修改操作
這篇文章主要介紹了Logback 使用TurboFilter實現(xiàn)日志級別等內(nèi)容的動態(tài)修改操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Spring定時任務(wù)使用及如何使用郵件監(jiān)控服務(wù)器
這篇文章主要介紹了Spring定時任務(wù)使用及如何使用郵件監(jiān)控服務(wù)器,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-07-07
給JavaBean賦默認值并且轉(zhuǎn)Json字符串的實例
這篇文章主要介紹了給JavaBean賦默認值并且轉(zhuǎn)Json字符串的實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
如何將java -jar啟動的服務(wù)設(shè)置為systemd服務(wù)管理方式
本文詳細介紹了如何將Java應(yīng)用程序配置為由systemd管理的服務(wù),包括創(chuàng)建和配置.service文件的步驟,以及如何啟動、停止和查看服務(wù)狀態(tài)2025-01-01
解決idea2020.2遇到pom.xml文件報錯maven插件tomcat7的問題
這篇文章主要介紹了idea2020.2遇到pom.xml文件報錯maven插件tomcat7的問題,本文給大家分享解決方法,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09

