欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

AsyncHttpClient?ChannelPool線程池頻道池源碼流程解析

 更新時(shí)間:2023年12月08日 09:55:08   作者:codecraft  
這篇文章主要為大家介紹了AsyncHttpClient ChannelPool線程池頻道池源碼流程解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下AsyncHttpClient的ChannelPool

ChannelPool

org/asynchttpclient/channel/ChannelPool.java

public interface ChannelPool {
  /**
   * Add a channel to the pool
   *
   * @param channel      an I/O channel
   * @param partitionKey a key used to retrieve the cached channel
   * @return true if added.
   */
  boolean offer(Channel channel, Object partitionKey);
  /**
   * Remove the channel associated with the uri.
   *
   * @param partitionKey the partition used when invoking offer
   * @return the channel associated with the uri
   */
  Channel poll(Object partitionKey);
  /**
   * Remove all channels from the cache. A channel might have been associated
   * with several uri.
   *
   * @param channel a channel
   * @return the true if the channel has been removed
   */
  boolean removeAll(Channel channel);
  /**
   * Return true if a channel can be cached. A implementation can decide based
   * on some rules to allow caching Calling this method is equivalent of
   * checking the returned value of {@link ChannelPool#offer(Channel, Object)}
   *
   * @return true if a channel can be cached.
   */
  boolean isOpen();
  /**
   * Destroy all channels that has been cached by this instance.
   */
  void destroy();
  /**
   * Flush partitions based on a predicate
   *
   * @param predicate the predicate
   */
  void flushPartitions(Predicate<Object> predicate);
  /**
   * @return The number of idle channels per host.
   */
  Map<String, Long> getIdleChannelCountPerHost();
}
ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個(gè)實(shí)現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool

NoopChannelPool

org/asynchttpclient/channel/NoopChannelPool.java

public enum NoopChannelPool implements ChannelPool {
  INSTANCE;
  @Override
  public boolean offer(Channel channel, Object partitionKey) {
    return false;
  }
  @Override
  public Channel poll(Object partitionKey) {
    return null;
  }
  @Override
  public boolean removeAll(Channel channel) {
    return false;
  }
  @Override
  public boolean isOpen() {
    return true;
  }
  @Override
  public void destroy() {
  }
  @Override
  public void flushPartitions(Predicate<Object> predicate) {
  }
  @Override
  public Map<String, Long> getIdleChannelCountPerHost() {
    return Collections.emptyMap();
  }
}
NoopChannelPool是個(gè)枚舉,用枚舉實(shí)現(xiàn)了單例,其方法默認(rèn)為空操作

DefaultChannelPool

/**
 * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}
 */
public final class DefaultChannelPool implements ChannelPool {
  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);
  private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;
  private final AtomicBoolean isClosed = new AtomicBoolean(false);
  private final Timer nettyTimer;
  private final int connectionTtl;
  private final boolean connectionTtlEnabled;
  private final int maxIdleTime;
  private final boolean maxIdleTimeEnabled;
  private final long cleanerPeriod;
  private final PoolLeaseStrategy poolLeaseStrategy;
  public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {
    this(config.getPooledConnectionIdleTimeout(),
            config.getConnectionTtl(),
            hashedWheelTimer,
            config.getConnectionPoolCleanerPeriod());
  }
  public DefaultChannelPool(int maxIdleTime,
                            int connectionTtl,
                            Timer nettyTimer,
                            int cleanerPeriod) {
    this(maxIdleTime,
            connectionTtl,
            PoolLeaseStrategy.LIFO,
            nettyTimer,
            cleanerPeriod);
  }
  public DefaultChannelPool(int maxIdleTime,
                            int connectionTtl,
                            PoolLeaseStrategy poolLeaseStrategy,
                            Timer nettyTimer,
                            int cleanerPeriod) {
    this.maxIdleTime = maxIdleTime;
    this.connectionTtl = connectionTtl;
    connectionTtlEnabled = connectionTtl > 0;
    channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;
    this.nettyTimer = nettyTimer;
    maxIdleTimeEnabled = maxIdleTime > 0;
    this.poolLeaseStrategy = poolLeaseStrategy;
    this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));
    if (connectionTtlEnabled || maxIdleTimeEnabled)
      scheduleNewIdleChannelDetector(new IdleChannelDetector());
  }
  //......
}
DefaultChannelPool基于ConcurrentHashMap實(shí)現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會(huì)取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會(huì)往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時(shí)間執(zhí)行

offer

public boolean offer(Channel channel, Object partitionKey) {
    if (isClosed.get())
      return false;
    long now = unpreciseMillisTime();
    if (isTtlExpired(channel, now))
      return false;
    boolean offered = offer0(channel, partitionKey, now);
    if (connectionTtlEnabled && offered) {
      registerChannelCreation(channel, partitionKey, now);
    }
    return offered;
  }
  private boolean isTtlExpired(Channel channel, long now) {
    if (!connectionTtlEnabled)
      return false;
    ChannelCreation creation = channelId2Creation.get(channel.id());
    return creation != null && now - creation.creationTime >= connectionTtl;
  }  
  private boolean offer0(Channel channel, Object partitionKey, long now) {
    ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
    if (partition == null) {
      partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());
    }
    return partition.offerFirst(new IdleChannel(channel, now));
  }  
  private void registerChannelCreation(Channel channel, Object partitionKey, long now) {
    ChannelId id = channel.id();
    if (!channelId2Creation.containsKey(id)) {
      channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey));
    }
  }
offer接口先判斷isTtlExpired,如果channel的存活時(shí)間超過connectionTtl則返回false,否則執(zhí)行offer0,往ConcurrentLinkedDeque<IdleChannel>添加,若添加成功且connectionTtlEnabled則執(zhí)行registerChannelCreation,維護(hù)創(chuàng)建時(shí)間

poll

/**
   * {@inheritDoc}
   */
  public Channel poll(Object partitionKey) {
    IdleChannel idleChannel = null;
    ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
    if (partition != null) {
      while (idleChannel == null) {
        idleChannel = poolLeaseStrategy.lease(partition);
        if (idleChannel == null)
          // pool is empty
          break;
        else if (!Channels.isChannelActive(idleChannel.channel)) {
          idleChannel = null;
          LOGGER.trace("Channel is inactive, probably remotely closed!");
        } else if (!idleChannel.takeOwnership()) {
          idleChannel = null;
          LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");
        }
      }
    }
    return idleChannel != null ? idleChannel.channel : null;
  }
poll方法是根據(jù)partitionKey找到對(duì)應(yīng)的ConcurrentLinkedDeque<IdleChannel>,然后循環(huán)執(zhí)行poolLeaseStrategy.lease(partition),若idleChannel為null直接break,若isChannelActive為false則重置為null繼續(xù)循環(huán),若idleChannel.takeOwnership()為false也重置為null繼續(xù)循環(huán)

removeAll

/**
   * {@inheritDoc}
   */
  public boolean removeAll(Channel channel) {
    ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null;
    return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE));
  }
removeAll方法會(huì)將指定的channel從channelId2Creation及ConcurrentLinkedDeque<IdleChannel>中移除

isOpen

/**
   * {@inheritDoc}
   */
  public boolean isOpen() {
    return !isClosed.get();
  }
isOpen則取的isClosed變量

destroy

/**
   * {@inheritDoc}
   */
  public void destroy() {
    if (isClosed.getAndSet(true))
      return;
    partitions.clear();
    if (connectionTtlEnabled) {
      channelId2Creation.clear();
    }
  }
destroy會(huì)設(shè)置isClosed為true,然后清空partitions及channelId2Creation

flushPartitions

public void flushPartitions(Predicate<Object> predicate) {
    for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
      Object partitionKey = partitionsEntry.getKey();
      if (predicate.test(partitionKey))
        flushPartition(partitionKey, partitionsEntry.getValue());
    }
  }
  private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {
    if (partition != null) {
      partitions.remove(partitionKey);
      for (IdleChannel idleChannel : partition)
        close(idleChannel.channel);
    }
  }
  private void close(Channel channel) {
    // FIXME pity to have to do this here
    Channels.setDiscard(channel);
    if (connectionTtlEnabled) {
      channelId2Creation.remove(channel.id());
    }
    Channels.silentlyCloseChannel(channel);
  }
flushPartitions會(huì)遍歷partitions,然后執(zhí)行predicate.test,為true則執(zhí)行flushPartition,它將從partitions移除指定的partitionKey,然后遍歷idleChannels挨個(gè)執(zhí)行close

getIdleChannelCountPerHost

public Map<String, Long> getIdleChannelCountPerHost() {
    return partitions
            .values()
            .stream()
            .flatMap(ConcurrentLinkedDeque::stream)
            .map(idle -> idle.getChannel().remoteAddress())
            .filter(a -> a.getClass() == InetSocketAddress.class)
            .map(a -> (InetSocketAddress) a)
            .map(InetSocketAddress::getHostName)
            .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
  }
getIdleChannelCountPerHost則遍歷partitions,然后map出remoteAddress獲取hostName,然后進(jìn)行g(shù)roupBy

PoolLeaseStrategy

public enum PoolLeaseStrategy {
    LIFO {
      public <E> E lease(Deque<E> d) {
        return d.pollFirst();
      }
    },
    FIFO {
      public <E> E lease(Deque<E> d) {
        return d.pollLast();
      }
    };
    abstract <E> E lease(Deque<E> d);
  }
PoolLeaseStrategy是個(gè)枚舉,定義了LIFO及FIFO兩個(gè)枚舉,LIFO則是對(duì)Deque執(zhí)行pollFirst,F(xiàn)IFO則是對(duì)Deque執(zhí)行pollLast

IdleChannelDetector

private final class IdleChannelDetector implements TimerTask {
    private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {
      return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;
    }
    private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {
      // lazy create
      List<IdleChannel> idleTimeoutChannels = null;
      for (IdleChannel idleChannel : partition) {
        boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);
        boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel);
        boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);
        if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {
          LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);
          if (idleTimeoutChannels == null)
            idleTimeoutChannels = new ArrayList<>(1);
          idleTimeoutChannels.add(idleChannel);
        }
      }
      return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList();
    }
    private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {
      // lazy create, only if we hit a non-closeable channel
      List<IdleChannel> closedChannels = null;
      for (int i = 0; i < candidates.size(); i++) {
        // We call takeOwnership here to avoid closing a channel that has just been taken out
        // of the pool, otherwise we risk closing an active connection.
        IdleChannel idleChannel = candidates.get(i);
        if (idleChannel.takeOwnership()) {
          LOGGER.debug("Closing Idle Channel {}", idleChannel.channel);
          close(idleChannel.channel);
          if (closedChannels != null) {
            closedChannels.add(idleChannel);
          }
        } else if (closedChannels == null) {
          // first non closeable to be skipped, copy all
          // previously skipped closeable channels
          closedChannels = new ArrayList<>(candidates.size());
          for (int j = 0; j < i; j++)
            closedChannels.add(candidates.get(j));
        }
      }
      return closedChannels != null ? closedChannels : candidates;
    }
    public void run(Timeout timeout) {
      if (isClosed.get())
        return;
      if (LOGGER.isDebugEnabled())
        for (Object key : partitions.keySet()) {
          int size = partitions.get(key).size();
          if (size > 0) {
            LOGGER.debug("Entry count for : {} : {}", key, size);
          }
        }
      long start = unpreciseMillisTime();
      int closedCount = 0;
      int totalCount = 0;
      for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {
        // store in intermediate unsynchronized lists to minimize
        // the impact on the ConcurrentLinkedDeque
        if (LOGGER.isDebugEnabled())
          totalCount += partition.size();
        List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));
        if (!closedChannels.isEmpty()) {
          if (connectionTtlEnabled) {
            for (IdleChannel closedChannel : closedChannels)
              channelId2Creation.remove(closedChannel.channel.id());
          }
          partition.removeAll(closedChannels);
          closedCount += closedChannels.size();
        }
      }
      if (LOGGER.isDebugEnabled()) {
        long duration = unpreciseMillisTime() - start;
        if (closedCount > 0) {
          LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration);
        }
      }
      scheduleNewIdleChannelDetector(timeout.task());
    }
  }
IdleChannelDetector實(shí)現(xiàn)了netty的TimerTask接口,其run方法主要是遍歷partitions,通過expiredChannels取出過期的IdleChannel,這里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在內(nèi),然后挨個(gè)執(zhí)行takeOwnership及close,再從channelId2Creation及partition中移除,最后再次調(diào)度一下IdleChannelDetector

小結(jié)

AsyncHttpClient的ChannelPool定義了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有兩個(gè)實(shí)現(xiàn)類,分別是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap實(shí)現(xiàn)了ChannelPool接口,主要的參數(shù)為connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod會(huì)取connectionTtl、maxIdleTime、傳入的cleanerPeriod的最小值;開啟connectionTtl或者maxIdleTime的話,會(huì)往nettyTimer添加IdleChannelDetector,延后cleanerPeriod時(shí)間執(zhí)行。

poll方法會(huì)判斷是active,不是的話繼續(xù)循環(huán)lease,而IdleChannelDetector則會(huì)定期檢查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都會(huì)被close,offer的時(shí)候還會(huì)判斷isTtlExpired,這樣子來保證連接的活性。

以上就是AsyncHttpClient ChannelPool的詳細(xì)內(nèi)容,更多關(guān)于AsyncHttpClient ChannelPool的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot?@RestControllerAdvice注解對(duì)返回值統(tǒng)一封裝的處理方法

    SpringBoot?@RestControllerAdvice注解對(duì)返回值統(tǒng)一封裝的處理方法

    這篇文章主要介紹了SpringBoot?@RestControllerAdvice注解對(duì)返回值統(tǒng)一封裝,使用@RestControllerAdvice對(duì)響應(yīng)進(jìn)行增強(qiáng),本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-09-09
  • 基于Luhn算法的銀行卡校驗(yàn)規(guī)則

    基于Luhn算法的銀行卡校驗(yàn)規(guī)則

    這篇文章主要為大家介紹了基于Luhn算法的銀行卡校驗(yàn)規(guī)則,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Spring調(diào)度框架EnableScheduling&Scheduled源碼解析

    Spring調(diào)度框架EnableScheduling&Scheduled源碼解析

    這篇文章主要介紹了Spring調(diào)度框架EnableScheduling&Scheduled源碼解析,@EnableScheduling&Scheduled定時(shí)調(diào)度框架,本著不僅知其然還要知其所以然的指導(dǎo)思想,下面對(duì)該調(diào)度框架進(jìn)行源碼解析,以便更好的理解其執(zhí)行過程,需要的朋友可以參考下
    2024-01-01
  • Java類和成員上的一些方法實(shí)例代碼

    Java類和成員上的一些方法實(shí)例代碼

    這篇文章主要介紹了Java類和成員上的一些方法實(shí)例代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-01-01
  • java之CSV大批量數(shù)據(jù)入庫的實(shí)現(xiàn)

    java之CSV大批量數(shù)據(jù)入庫的實(shí)現(xiàn)

    本文主要介紹了java之CSV大批量數(shù)據(jù)入庫的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • SpringBoot項(xiàng)目使用?axis?調(diào)用webservice接口的實(shí)踐記錄

    SpringBoot項(xiàng)目使用?axis?調(diào)用webservice接口的實(shí)踐記錄

    這篇文章主要介紹了SpringBoot項(xiàng)目使用?axis?調(diào)用webservice接口,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-06-06
  • Java中使用Hutool的DsFactory操作多數(shù)據(jù)源的實(shí)現(xiàn)

    Java中使用Hutool的DsFactory操作多數(shù)據(jù)源的實(shí)現(xiàn)

    在Java開發(fā)中,管理多個(gè)數(shù)據(jù)源是一項(xiàng)常見需求,Hutool作為一個(gè)全能的Java工具類庫,提供了DsFactory工具,幫助開發(fā)者便捷地操作多數(shù)據(jù)源,感興趣的可以了解一下
    2024-09-09
  • Java GUI編程實(shí)現(xiàn)在線聊天室

    Java GUI編程實(shí)現(xiàn)在線聊天室

    這篇文章主要為大家詳細(xì)介紹了Java GUI編程實(shí)現(xiàn)在線聊天室,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign的方法

    SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign的方法

    Feign是Netflix開發(fā)的聲明式、模板化的HTTP客戶端,其靈感來自Retrofit、JAXRS-2.0以及WebSocket,Feign可幫助我們更加便捷、優(yōu)雅地調(diào)用HTTP API,這篇文章主要介紹了SpringCloudAlibaba微服務(wù)調(diào)用組件OpenFeign,需要的朋友可以參考下
    2024-07-07
  • spring boot讀取Excel操作示例

    spring boot讀取Excel操作示例

    這篇文章主要介紹了spring boot讀取Excel操作,結(jié)合實(shí)例形式詳細(xì)分析了spring boot解析、讀取Excel相關(guān)操作技巧,需要的朋友可以參考下
    2019-11-11

最新評(píng)論