AsyncHttpClient?ChannelPool線程池頻道池源碼流程解析
序
本文主要研究一下AsyncHttpClient的ChannelPool
ChannelPool
org/asynchttpclient/channel/ChannelPool.java
public interface ChannelPool {
/**
* Add a channel to the pool
*
* @param channel an I/O channel
* @param partitionKey a key used to retrieve the cached channel
* @return true if added.
*/
boolean offer(Channel channel, Object partitionKey);
/**
* Remove the channel associated with the uri.
*
* @param partitionKey the partition used when invoking offer
* @return the channel associated with the uri
*/
Channel poll(Object partitionKey);
/**
* Remove all channels from the cache. A channel might have been associated
* with several uri.
*
* @param channel a channel
* @return the true if the channel has been removed
*/
boolean removeAll(Channel channel);
/**
* Return true if a channel can be cached. A implementation can decide based
* on some rules to allow caching Calling this method is equivalent of
* checking the returned value of {@link ChannelPool#offer(Channel, Object)}
*
* @return true if a channel can be cached.
*/
boolean isOpen();
/**
* Destroy all channels that has been cached by this instance.
*/
void destroy();
/**
* Flush partitions based on a predicate
*
* @param predicate the predicate
*/
void flushPartitions(Predicate<Object> predicate);
/**
* @return The number of idle channels per host.
*/
Map<String, Long> getIdleChannelCountPerHost();
}ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個實現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool
NoopChannelPool
org/asynchttpclient/channel/NoopChannelPool.java
public enum NoopChannelPool implements ChannelPool {
INSTANCE;
@Override
public boolean offer(Channel channel, Object partitionKey) {
return false;
}
@Override
public Channel poll(Object partitionKey) {
return null;
}
@Override
public boolean removeAll(Channel channel) {
return false;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void destroy() {
}
@Override
public void flushPartitions(Predicate<Object> predicate) {
}
@Override
public Map<String, Long> getIdleChannelCountPerHost() {
return Collections.emptyMap();
}
}NoopChannelPool是個枚舉,用枚舉實現(xiàn)了單例,其方法默認為空操作
DefaultChannelPool
/**
* A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}
*/
public final class DefaultChannelPool implements ChannelPool {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);
private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Timer nettyTimer;
private final int connectionTtl;
private final boolean connectionTtlEnabled;
private final int maxIdleTime;
private final boolean maxIdleTimeEnabled;
private final long cleanerPeriod;
private final PoolLeaseStrategy poolLeaseStrategy;
public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {
this(config.getPooledConnectionIdleTimeout(),
config.getConnectionTtl(),
hashedWheelTimer,
config.getConnectionPoolCleanerPeriod());
}
public DefaultChannelPool(int maxIdleTime,
int connectionTtl,
Timer nettyTimer,
int cleanerPeriod) {
this(maxIdleTime,
connectionTtl,
PoolLeaseStrategy.LIFO,
nettyTimer,
cleanerPeriod);
}
public DefaultChannelPool(int maxIdleTime,
int connectionTtl,
PoolLeaseStrategy poolLeaseStrategy,
Timer nettyTimer,
int cleanerPeriod) {
this.maxIdleTime = maxIdleTime;
this.connectionTtl = connectionTtl;
connectionTtlEnabled = connectionTtl > 0;
channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;
this.nettyTimer = nettyTimer;
maxIdleTimeEnabled = maxIdleTime > 0;
this.poolLeaseStrategy = poolLeaseStrategy;
this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));
if (connectionTtlEnabled || maxIdleTimeEnabled)
scheduleNewIdleChannelDetector(new IdleChannelDetector());
}
//......
}DefaultChannelPool基于ConcurrentHashMap實現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時間執(zhí)行
offer
public boolean offer(Channel channel, Object partitionKey) {
if (isClosed.get())
return false;
long now = unpreciseMillisTime();
if (isTtlExpired(channel, now))
return false;
boolean offered = offer0(channel, partitionKey, now);
if (connectionTtlEnabled && offered) {
registerChannelCreation(channel, partitionKey, now);
}
return offered;
}
private boolean isTtlExpired(Channel channel, long now) {
if (!connectionTtlEnabled)
return false;
ChannelCreation creation = channelId2Creation.get(channel.id());
return creation != null && now - creation.creationTime >= connectionTtl;
}
private boolean offer0(Channel channel, Object partitionKey, long now) {
ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
if (partition == null) {
partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());
}
return partition.offerFirst(new IdleChannel(channel, now));
}
private void registerChannelCreation(Channel channel, Object partitionKey, long now) {
ChannelId id = channel.id();
if (!channelId2Creation.containsKey(id)) {
channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey));
}
}offer接口先判斷isTtlExpired,如果channel的存活時間超過connectionTtl則返回false,否則執(zhí)行offer0,往ConcurrentLinkedDeque<IdleChannel>添加,若添加成功且connectionTtlEnabled則執(zhí)行registerChannelCreation,維護創(chuàng)建時間
poll
/**
* {@inheritDoc}
*/
public Channel poll(Object partitionKey) {
IdleChannel idleChannel = null;
ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
if (partition != null) {
while (idleChannel == null) {
idleChannel = poolLeaseStrategy.lease(partition);
if (idleChannel == null)
// pool is empty
break;
else if (!Channels.isChannelActive(idleChannel.channel)) {
idleChannel = null;
LOGGER.trace("Channel is inactive, probably remotely closed!");
} else if (!idleChannel.takeOwnership()) {
idleChannel = null;
LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");
}
}
}
return idleChannel != null ? idleChannel.channel : null;
}poll方法是根據(jù)partitionKey找到對應(yīng)的ConcurrentLinkedDeque<IdleChannel>,然后循環(huán)執(zhí)行poolLeaseStrategy.lease(partition),若idleChannel為null直接break,若isChannelActive為false則重置為null繼續(xù)循環(huán),若idleChannel.takeOwnership()為false也重置為null繼續(xù)循環(huán)
removeAll
/**
* {@inheritDoc}
*/
public boolean removeAll(Channel channel) {
ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null;
return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE));
}removeAll方法會將指定的channel從channelId2Creation及ConcurrentLinkedDeque<IdleChannel>中移除
isOpen
/**
* {@inheritDoc}
*/
public boolean isOpen() {
return !isClosed.get();
}isOpen則取的isClosed變量
destroy
/**
* {@inheritDoc}
*/
public void destroy() {
if (isClosed.getAndSet(true))
return;
partitions.clear();
if (connectionTtlEnabled) {
channelId2Creation.clear();
}
}destroy會設(shè)置isClosed為true,然后清空partitions及channelId2Creation
flushPartitions
public void flushPartitions(Predicate<Object> predicate) {
for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
Object partitionKey = partitionsEntry.getKey();
if (predicate.test(partitionKey))
flushPartition(partitionKey, partitionsEntry.getValue());
}
}
private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {
if (partition != null) {
partitions.remove(partitionKey);
for (IdleChannel idleChannel : partition)
close(idleChannel.channel);
}
}
private void close(Channel channel) {
// FIXME pity to have to do this here
Channels.setDiscard(channel);
if (connectionTtlEnabled) {
channelId2Creation.remove(channel.id());
}
Channels.silentlyCloseChannel(channel);
}flushPartitions會遍歷partitions,然后執(zhí)行predicate.test,為true則執(zhí)行flushPartition,它將從partitions移除指定的partitionKey,然后遍歷idleChannels挨個執(zhí)行close
getIdleChannelCountPerHost
public Map<String, Long> getIdleChannelCountPerHost() {
return partitions
.values()
.stream()
.flatMap(ConcurrentLinkedDeque::stream)
.map(idle -> idle.getChannel().remoteAddress())
.filter(a -> a.getClass() == InetSocketAddress.class)
.map(a -> (InetSocketAddress) a)
.map(InetSocketAddress::getHostName)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}getIdleChannelCountPerHost則遍歷partitions,然后map出remoteAddress獲取hostName,然后進行g(shù)roupBy
PoolLeaseStrategy
public enum PoolLeaseStrategy {
LIFO {
public <E> E lease(Deque<E> d) {
return d.pollFirst();
}
},
FIFO {
public <E> E lease(Deque<E> d) {
return d.pollLast();
}
};
abstract <E> E lease(Deque<E> d);
}PoolLeaseStrategy是個枚舉,定義了LIFO及FIFO兩個枚舉,LIFO則是對Deque執(zhí)行pollFirst,F(xiàn)IFO則是對Deque執(zhí)行pollLast
IdleChannelDetector
private final class IdleChannelDetector implements TimerTask {
private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {
return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;
}
private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {
// lazy create
List<IdleChannel> idleTimeoutChannels = null;
for (IdleChannel idleChannel : partition) {
boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);
boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel);
boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);
if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {
LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);
if (idleTimeoutChannels == null)
idleTimeoutChannels = new ArrayList<>(1);
idleTimeoutChannels.add(idleChannel);
}
}
return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList();
}
private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {
// lazy create, only if we hit a non-closeable channel
List<IdleChannel> closedChannels = null;
for (int i = 0; i < candidates.size(); i++) {
// We call takeOwnership here to avoid closing a channel that has just been taken out
// of the pool, otherwise we risk closing an active connection.
IdleChannel idleChannel = candidates.get(i);
if (idleChannel.takeOwnership()) {
LOGGER.debug("Closing Idle Channel {}", idleChannel.channel);
close(idleChannel.channel);
if (closedChannels != null) {
closedChannels.add(idleChannel);
}
} else if (closedChannels == null) {
// first non closeable to be skipped, copy all
// previously skipped closeable channels
closedChannels = new ArrayList<>(candidates.size());
for (int j = 0; j < i; j++)
closedChannels.add(candidates.get(j));
}
}
return closedChannels != null ? closedChannels : candidates;
}
public void run(Timeout timeout) {
if (isClosed.get())
return;
if (LOGGER.isDebugEnabled())
for (Object key : partitions.keySet()) {
int size = partitions.get(key).size();
if (size > 0) {
LOGGER.debug("Entry count for : {} : {}", key, size);
}
}
long start = unpreciseMillisTime();
int closedCount = 0;
int totalCount = 0;
for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {
// store in intermediate unsynchronized lists to minimize
// the impact on the ConcurrentLinkedDeque
if (LOGGER.isDebugEnabled())
totalCount += partition.size();
List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));
if (!closedChannels.isEmpty()) {
if (connectionTtlEnabled) {
for (IdleChannel closedChannel : closedChannels)
channelId2Creation.remove(closedChannel.channel.id());
}
partition.removeAll(closedChannels);
closedCount += closedChannels.size();
}
}
if (LOGGER.isDebugEnabled()) {
long duration = unpreciseMillisTime() - start;
if (closedCount > 0) {
LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration);
}
}
scheduleNewIdleChannelDetector(timeout.task());
}
}IdleChannelDetector實現(xiàn)了netty的TimerTask接口,其run方法主要是遍歷partitions,通過expiredChannels取出過期的IdleChannel,這里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在內(nèi),然后挨個執(zhí)行takeOwnership及close,再從channelId2Creation及partition中移除,最后再次調(diào)度一下IdleChannelDetector
小結(jié)
AsyncHttpClient的ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個實現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap實現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時間執(zhí)行。
poll方法會判斷是active,不是的話繼續(xù)循環(huán)lease,而IdleChannelDetector則會定期檢查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都會被close,offer的時候還會判斷isTtlExpired,這樣子來保證連接的活性。
以上就是AsyncHttpClient ChannelPool的詳細內(nèi)容,更多關(guān)于AsyncHttpClient ChannelPool的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot?@RestControllerAdvice注解對返回值統(tǒng)一封裝的處理方法
這篇文章主要介紹了SpringBoot?@RestControllerAdvice注解對返回值統(tǒng)一封裝,使用@RestControllerAdvice對響應(yīng)進行增強,本文結(jié)合實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09
Spring調(diào)度框架EnableScheduling&Scheduled源碼解析
這篇文章主要介紹了Spring調(diào)度框架EnableScheduling&Scheduled源碼解析,@EnableScheduling&Scheduled定時調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對該調(diào)度框架進行源碼解析,以便更好的理解其執(zhí)行過程,需要的朋友可以參考下2024-01-01
java之CSV大批量數(shù)據(jù)入庫的實現(xiàn)
本文主要介紹了java之CSV大批量數(shù)據(jù)入庫的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02
SpringBoot項目使用?axis?調(diào)用webservice接口的實踐記錄
這篇文章主要介紹了SpringBoot項目使用?axis?調(diào)用webservice接口,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-06-06
Java中使用Hutool的DsFactory操作多數(shù)據(jù)源的實現(xiàn)
在Java開發(fā)中,管理多個數(shù)據(jù)源是一項常見需求,Hutool作為一個全能的Java工具類庫,提供了DsFactory工具,幫助開發(fā)者便捷地操作多數(shù)據(jù)源,感興趣的可以了解一下2024-09-09
SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign的方法
Feign是Netflix開發(fā)的聲明式、模板化的HTTP客戶端,其靈感來自Retrofit、JAXRS-2.0以及WebSocket,Feign可幫助我們更加便捷、優(yōu)雅地調(diào)用HTTP API,這篇文章主要介紹了SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign,需要的朋友可以參考下2024-07-07

