AsyncHttpClient?ChannelPool線程池頻道池源碼流程解析
序
本文主要研究一下AsyncHttpClient的ChannelPool
ChannelPool
org/asynchttpclient/channel/ChannelPool.java
public interface ChannelPool { /** * Add a channel to the pool * * @param channel an I/O channel * @param partitionKey a key used to retrieve the cached channel * @return true if added. */ boolean offer(Channel channel, Object partitionKey); /** * Remove the channel associated with the uri. * * @param partitionKey the partition used when invoking offer * @return the channel associated with the uri */ Channel poll(Object partitionKey); /** * Remove all channels from the cache. A channel might have been associated * with several uri. * * @param channel a channel * @return the true if the channel has been removed */ boolean removeAll(Channel channel); /** * Return true if a channel can be cached. A implementation can decide based * on some rules to allow caching Calling this method is equivalent of * checking the returned value of {@link ChannelPool#offer(Channel, Object)} * * @return true if a channel can be cached. */ boolean isOpen(); /** * Destroy all channels that has been cached by this instance. */ void destroy(); /** * Flush partitions based on a predicate * * @param predicate the predicate */ void flushPartitions(Predicate<Object> predicate); /** * @return The number of idle channels per host. */ Map<String, Long> getIdleChannelCountPerHost(); }
ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個(gè)實(shí)現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool
NoopChannelPool
org/asynchttpclient/channel/NoopChannelPool.java
public enum NoopChannelPool implements ChannelPool { INSTANCE; @Override public boolean offer(Channel channel, Object partitionKey) { return false; } @Override public Channel poll(Object partitionKey) { return null; } @Override public boolean removeAll(Channel channel) { return false; } @Override public boolean isOpen() { return true; } @Override public void destroy() { } @Override public void flushPartitions(Predicate<Object> predicate) { } @Override public Map<String, Long> getIdleChannelCountPerHost() { return Collections.emptyMap(); } }
NoopChannelPool是個(gè)枚舉,用枚舉實(shí)現(xiàn)了單例,其方法默認(rèn)為空操作
DefaultChannelPool
/** * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap} */ public final class DefaultChannelPool implements ChannelPool { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class); private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>(); private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Timer nettyTimer; private final int connectionTtl; private final boolean connectionTtlEnabled; private final int maxIdleTime; private final boolean maxIdleTimeEnabled; private final long cleanerPeriod; private final PoolLeaseStrategy poolLeaseStrategy; public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) { this(config.getPooledConnectionIdleTimeout(), config.getConnectionTtl(), hashedWheelTimer, config.getConnectionPoolCleanerPeriod()); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, Timer nettyTimer, int cleanerPeriod) { this(maxIdleTime, connectionTtl, PoolLeaseStrategy.LIFO, nettyTimer, cleanerPeriod); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, PoolLeaseStrategy poolLeaseStrategy, Timer nettyTimer, int cleanerPeriod) { this.maxIdleTime = maxIdleTime; this.connectionTtl = connectionTtl; connectionTtlEnabled = connectionTtl > 0; channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null; this.nettyTimer = nettyTimer; maxIdleTimeEnabled = maxIdleTime > 0; this.poolLeaseStrategy = poolLeaseStrategy; this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE)); if (connectionTtlEnabled || maxIdleTimeEnabled) scheduleNewIdleChannelDetector(new IdleChannelDetector()); } //...... }
DefaultChannelPool基于ConcurrentHashMap實(shí)現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會(huì)取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會(huì)往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時(shí)間執(zhí)行
offer
public boolean offer(Channel channel, Object partitionKey) { if (isClosed.get()) return false; long now = unpreciseMillisTime(); if (isTtlExpired(channel, now)) return false; boolean offered = offer0(channel, partitionKey, now); if (connectionTtlEnabled && offered) { registerChannelCreation(channel, partitionKey, now); } return offered; } private boolean isTtlExpired(Channel channel, long now) { if (!connectionTtlEnabled) return false; ChannelCreation creation = channelId2Creation.get(channel.id()); return creation != null && now - creation.creationTime >= connectionTtl; } private boolean offer0(Channel channel, Object partitionKey, long now) { ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition == null) { partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>()); } return partition.offerFirst(new IdleChannel(channel, now)); } private void registerChannelCreation(Channel channel, Object partitionKey, long now) { ChannelId id = channel.id(); if (!channelId2Creation.containsKey(id)) { channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey)); } }
offer接口先判斷isTtlExpired,如果channel的存活時(shí)間超過connectionTtl則返回false,否則執(zhí)行offer0,往ConcurrentLinkedDeque<IdleChannel>添加,若添加成功且connectionTtlEnabled則執(zhí)行registerChannelCreation,維護(hù)創(chuàng)建時(shí)間
poll
/** * {@inheritDoc} */ public Channel poll(Object partitionKey) { IdleChannel idleChannel = null; ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition != null) { while (idleChannel == null) { idleChannel = poolLeaseStrategy.lease(partition); if (idleChannel == null) // pool is empty break; else if (!Channels.isChannelActive(idleChannel.channel)) { idleChannel = null; LOGGER.trace("Channel is inactive, probably remotely closed!"); } else if (!idleChannel.takeOwnership()) { idleChannel = null; LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!"); } } } return idleChannel != null ? idleChannel.channel : null; }
poll方法是根據(jù)partitionKey找到對(duì)應(yīng)的ConcurrentLinkedDeque<IdleChannel>,然后循環(huán)執(zhí)行poolLeaseStrategy.lease(partition),若idleChannel為null直接break,若isChannelActive為false則重置為null繼續(xù)循環(huán),若idleChannel.takeOwnership()為false也重置為null繼續(xù)循環(huán)
removeAll
/** * {@inheritDoc} */ public boolean removeAll(Channel channel) { ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null; return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE)); }
removeAll方法會(huì)將指定的channel從channelId2Creation及ConcurrentLinkedDeque<IdleChannel>中移除
isOpen
/** * {@inheritDoc} */ public boolean isOpen() { return !isClosed.get(); }
isOpen則取的isClosed變量
destroy
/** * {@inheritDoc} */ public void destroy() { if (isClosed.getAndSet(true)) return; partitions.clear(); if (connectionTtlEnabled) { channelId2Creation.clear(); } }
destroy會(huì)設(shè)置isClosed為true,然后清空partitions及channelId2Creation
flushPartitions
public void flushPartitions(Predicate<Object> predicate) { for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) { Object partitionKey = partitionsEntry.getKey(); if (predicate.test(partitionKey)) flushPartition(partitionKey, partitionsEntry.getValue()); } } private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) { if (partition != null) { partitions.remove(partitionKey); for (IdleChannel idleChannel : partition) close(idleChannel.channel); } } private void close(Channel channel) { // FIXME pity to have to do this here Channels.setDiscard(channel); if (connectionTtlEnabled) { channelId2Creation.remove(channel.id()); } Channels.silentlyCloseChannel(channel); }
flushPartitions會(huì)遍歷partitions,然后執(zhí)行predicate.test,為true則執(zhí)行flushPartition,它將從partitions移除指定的partitionKey,然后遍歷idleChannels挨個(gè)執(zhí)行close
getIdleChannelCountPerHost
public Map<String, Long> getIdleChannelCountPerHost() { return partitions .values() .stream() .flatMap(ConcurrentLinkedDeque::stream) .map(idle -> idle.getChannel().remoteAddress()) .filter(a -> a.getClass() == InetSocketAddress.class) .map(a -> (InetSocketAddress) a) .map(InetSocketAddress::getHostName) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); }
getIdleChannelCountPerHost則遍歷partitions,然后map出remoteAddress獲取hostName,然后進(jìn)行g(shù)roupBy
PoolLeaseStrategy
public enum PoolLeaseStrategy { LIFO { public <E> E lease(Deque<E> d) { return d.pollFirst(); } }, FIFO { public <E> E lease(Deque<E> d) { return d.pollLast(); } }; abstract <E> E lease(Deque<E> d); }
PoolLeaseStrategy是個(gè)枚舉,定義了LIFO及FIFO兩個(gè)枚舉,LIFO則是對(duì)Deque執(zhí)行pollFirst,F(xiàn)IFO則是對(duì)Deque執(zhí)行pollLast
IdleChannelDetector
private final class IdleChannelDetector implements TimerTask { private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) { return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime; } private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) { // lazy create List<IdleChannel> idleTimeoutChannels = null; for (IdleChannel idleChannel : partition) { boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now); boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel); boolean isTtlExpired = isTtlExpired(idleChannel.channel, now); if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) { LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); if (idleTimeoutChannels == null) idleTimeoutChannels = new ArrayList<>(1); idleTimeoutChannels.add(idleChannel); } } return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList(); } private List<IdleChannel> closeChannels(List<IdleChannel> candidates) { // lazy create, only if we hit a non-closeable channel List<IdleChannel> closedChannels = null; for (int i = 0; i < candidates.size(); i++) { // We call takeOwnership here to avoid closing a channel that has just been taken out // of the pool, otherwise we risk closing an active connection. IdleChannel idleChannel = candidates.get(i); if (idleChannel.takeOwnership()) { LOGGER.debug("Closing Idle Channel {}", idleChannel.channel); close(idleChannel.channel); if (closedChannels != null) { closedChannels.add(idleChannel); } } else if (closedChannels == null) { // first non closeable to be skipped, copy all // previously skipped closeable channels closedChannels = new ArrayList<>(candidates.size()); for (int j = 0; j < i; j++) closedChannels.add(candidates.get(j)); } } return closedChannels != null ? closedChannels : candidates; } public void run(Timeout timeout) { if (isClosed.get()) return; if (LOGGER.isDebugEnabled()) for (Object key : partitions.keySet()) { int size = partitions.get(key).size(); if (size > 0) { LOGGER.debug("Entry count for : {} : {}", key, size); } } long start = unpreciseMillisTime(); int closedCount = 0; int totalCount = 0; for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) { // store in intermediate unsynchronized lists to minimize // the impact on the ConcurrentLinkedDeque if (LOGGER.isDebugEnabled()) totalCount += partition.size(); List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start)); if (!closedChannels.isEmpty()) { if (connectionTtlEnabled) { for (IdleChannel closedChannel : closedChannels) channelId2Creation.remove(closedChannel.channel.id()); } partition.removeAll(closedChannels); closedCount += closedChannels.size(); } } if (LOGGER.isDebugEnabled()) { long duration = unpreciseMillisTime() - start; if (closedCount > 0) { LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration); } } scheduleNewIdleChannelDetector(timeout.task()); } }
IdleChannelDetector實(shí)現(xiàn)了netty的TimerTask接口,其run方法主要是遍歷partitions,通過expiredChannels取出過期的IdleChannel,這里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在內(nèi),然后挨個(gè)執(zhí)行takeOwnership及close,再從channelId2Creation及partition中移除,最后再次調(diào)度一下IdleChannelDetector
小結(jié)
AsyncHttpClient的ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個(gè)實(shí)現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap實(shí)現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會(huì)取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會(huì)往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時(shí)間執(zhí)行。
poll方法會(huì)判斷是active,不是的話繼續(xù)循環(huán)lease,而IdleChannelDetector則會(huì)定期檢查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都會(huì)被close,offer的時(shí)候還會(huì)判斷isTtlExpired,這樣子來保證連接的活性。
以上就是AsyncHttpClient ChannelPool的詳細(xì)內(nèi)容,更多關(guān)于AsyncHttpClient ChannelPool的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)
- AsyncHttpClient?RequestFilter請(qǐng)求篩選源碼解讀
- AsyncHttpClient IOExceptionFilter異常過濾器
- AsyncHttpClient KeepAliveStrategy源碼流程解讀
- AsyncHttpClient exception異常源碼流程解析
- AsyncHttpClient的ConnectionSemaphore方法源碼流程解讀
- AsyncHttpClient的默認(rèn)配置源碼流程解讀
- AsyncHttpClient?ClientStats源碼流程解讀
相關(guān)文章
SpringBoot?@RestControllerAdvice注解對(duì)返回值統(tǒng)一封裝的處理方法
這篇文章主要介紹了SpringBoot?@RestControllerAdvice注解對(duì)返回值統(tǒng)一封裝,使用@RestControllerAdvice對(duì)響應(yīng)進(jìn)行增強(qiáng),本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09Spring調(diào)度框架EnableScheduling&Scheduled源碼解析
這篇文章主要介紹了Spring調(diào)度框架EnableScheduling&Scheduled源碼解析,@EnableScheduling&Scheduled定時(shí)調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對(duì)該調(diào)度框架進(jìn)行源碼解析,以便更好的理解其執(zhí)行過程,需要的朋友可以參考下2024-01-01java之CSV大批量數(shù)據(jù)入庫的實(shí)現(xiàn)
本文主要介紹了java之CSV大批量數(shù)據(jù)入庫的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02SpringBoot項(xiàng)目使用?axis?調(diào)用webservice接口的實(shí)踐記錄
這篇文章主要介紹了SpringBoot項(xiàng)目使用?axis?調(diào)用webservice接口,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06Java中使用Hutool的DsFactory操作多數(shù)據(jù)源的實(shí)現(xiàn)
在Java開發(fā)中,管理多個(gè)數(shù)據(jù)源是一項(xiàng)常見需求,Hutool作為一個(gè)全能的Java工具類庫,提供了DsFactory工具,幫助開發(fā)者便捷地操作多數(shù)據(jù)源,感興趣的可以了解一下2024-09-09SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign的方法
Feign是Netflix開發(fā)的聲明式、模板化的HTTP客戶端,其靈感來自Retrofit、JAXRS-2.0以及WebSocket,Feign可幫助我們更加便捷、優(yōu)雅地調(diào)用HTTP API,這篇文章主要介紹了SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign,需要的朋友可以參考下2024-07-07