欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

AsyncHttpClient的ConnectionSemaphore方法源碼流程解讀

 更新時間:2023年12月07日 14:01:49   作者:codecraft  
這篇文章主要為大家介紹了AsyncHttpClient的ConnectionSemaphore方法源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

本文主要研究一下AsyncHttpClient的ConnectionSemaphore

ConnectionSemaphore

org/asynchttpclient/netty/channel/ConnectionSemaphore.java

/**
 * Max connections and max-per-host connections limiter.
 *
 * @author Stepan Koltsov
 */
public class ConnectionSemaphore {
  private final NonBlockingSemaphoreLike freeChannels;
  private final int maxConnectionsPerHost;
  private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
  private final IOException tooManyConnections;
  private final IOException tooManyConnectionsPerHost;
  private ConnectionSemaphore(AsyncHttpClientConfig config) {
    tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ConnectionSemaphore.class, "acquireChannelLock");
    tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ConnectionSemaphore.class, "acquireChannelLock");
    int maxTotalConnections = config.getMaxConnections();
    maxConnectionsPerHost = config.getMaxConnectionsPerHost();
    freeChannels = maxTotalConnections > 0 ?
            new NonBlockingSemaphore(config.getMaxConnections()) :
            NonBlockingSemaphoreInfinite.INSTANCE;
  }
  public static ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
    return config.getMaxConnections() > 0 || config.getMaxConnectionsPerHost() > 0 ? new ConnectionSemaphore(config) : null;
  }
  private boolean tryAcquireGlobal() {
    return freeChannels.tryAcquire();
  }
  private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
    return maxConnectionsPerHost > 0 ?
            freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) :
            NonBlockingSemaphoreInfinite.INSTANCE;
  }
  private boolean tryAcquirePerHost(Object partitionKey) {
    return getFreeConnectionsForHost(partitionKey).tryAcquire();
  }
  public void acquireChannelLock(Object partitionKey) throws IOException {
    if (!tryAcquireGlobal())
      throw tooManyConnections;
    if (!tryAcquirePerHost(partitionKey)) {
      freeChannels.release();
      throw tooManyConnectionsPerHost;
    }
  }
  public void releaseChannelLock(Object partitionKey) {
    freeChannels.release();
    getFreeConnectionsForHost(partitionKey).release();
  }
}
ConnectionSemaphore主要用于控制連接的maxConnections及maxConnectionsPerHost;它定義了freeChannels表示可用連接的信號量,定義了freeChannelsPerHost維護每個host的可用連接新用量,類型是NonBlockingSemaphoreLike;它提供了tryAcquireGlobal用于獲取全局的空閑連接,tryAcquirePerHost用于獲取指定host的空閑連接;acquireChannelLock先獲取全局空閑連接,獲取不到拋出TooManyConnectionsException,再獲取指定host的空閑連接,獲取不到則釋放全局空閑連接,拋出TooManyConnectionsPerHostException;releaseChannelLock則先釋放全局空閑連接,再釋放指定host的空閑連接

NonBlockingSemaphoreLike

org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java

/**
 * Non-blocking semaphore API.
 *
 * @author Stepan Koltsov
 */
interface NonBlockingSemaphoreLike {
  void release();
  boolean tryAcquire();
}
NonBlockingSemaphoreLike接口定義了release、tryAcquire方法,它有兩個實現(xiàn)類,分別是NonBlockingSemaphore、NonBlockingSemaphoreInfinite

NonBlockingSemaphore

org/asynchttpclient/netty/channel/NonBlockingSemaphore.java

class NonBlockingSemaphore implements NonBlockingSemaphoreLike {
  private final AtomicInteger permits;
  NonBlockingSemaphore(int permits) {
    this.permits = new AtomicInteger(permits);
  }
  @Override
  public void release() {
    permits.incrementAndGet();
  }
  @Override
  public boolean tryAcquire() {
    for (; ; ) {
      int count = permits.get();
      if (count <= 0) {
        return false;
      }
      if (permits.compareAndSet(count, count - 1)) {
        return true;
      }
    }
  }
  @Override
  public String toString() {
    // mimic toString of Semaphore class
    return super.toString() + "[Permits = " + permits + "]";
  }
}
NonBlockingSemaphore內(nèi)部使用AtomicInteger來進行控制,permits表示可用的數(shù)量,release方法則遞增permits,tryAcquire則循環(huán)執(zhí)行先判斷permits是否大于0,否則返回false,若permits.compareAndSet(count, count - 1)成功則返回true,否則繼續(xù)循環(huán)執(zhí)行直到返回false或者true

NonBlockingSemaphoreInfinite

org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java

enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike {
  INSTANCE;
  @Override
  public void release() {
  }
  @Override
  public boolean tryAcquire() {
    return true;
  }
  @Override
  public String toString() {
    return NonBlockingSemaphore.class.getName();
  }
}
NonBlockingSemaphoreInfinite表示無限的信號量,release為空操作,tryAcquire始終返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

/**
 * A {@link Future} that can be used to track when an asynchronous HTTP request
 * has been fully processed.
 *
 * @param <V> the result type
 */
public final class NettyResponseFuture<V> implements ListenableFuture<V> {
  //......
  public void acquirePartitionLockLazily() throws IOException {
    if (connectionSemaphore == null || partitionKeyLock != null) {
      return;
    }
    Object partitionKey = getPartitionKey();
    connectionSemaphore.acquireChannelLock(partitionKey);
    Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey);
    if (prevKey != null) {
      // self-check
      connectionSemaphore.releaseChannelLock(prevKey);
      releasePartitionKeyLock();
      throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report.");
    }
    if (isDone()) {
      // may be cancelled while we acquired a lock
      releasePartitionKeyLock();
    }
  }    
  public boolean cancel(boolean force) {
    releasePartitionKeyLock();
    cancelTimeouts();
    if (IS_CANCELLED_FIELD.getAndSet(this, 1) != 0)
      return false;
    // cancel could happen before channel was attached
    if (channel != null) {
      Channels.setDiscard(channel);
      Channels.silentlyCloseChannel(channel);
    }
    if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
      try {
        asyncHandler.onThrowable(new CancellationException());
      } catch (Throwable t) {
        LOGGER.warn("cancel", t);
      }
    }
    future.cancel(false);
    return true;
  }
  private boolean terminateAndExit() {
    releasePartitionKeyLock();
    cancelTimeouts();
    this.channel = null;
    this.reuseChannel = false;
    return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
  }    
  private void releasePartitionKeyLock() {
    if (connectionSemaphore == null) {
      return;
    }
    Object partitionKey = takePartitionKeyLock();
    if (partitionKey != null) {
      connectionSemaphore.releaseChannelLock(partitionKey);
    }
  }  
}
NettyResponseFuture提供了acquirePartitionLockLazily方法,它會通過connectionSemaphore.acquireChannelLock(partitionKey)來獲取連接信號量;cancel和terminateAndExit都會執(zhí)行releasePartitionKeyLock,它會調(diào)用connectionSemaphore.releaseChannelLock(partitionKey)

NettyRequestSender

org/asynchttpclient/netty/request/NettyRequestSender.java

public final class NettyRequestSender {
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class);
  private final AsyncHttpClientConfig config;
  private final ChannelManager channelManager;
  private final ConnectionSemaphore connectionSemaphore;
  private final Timer nettyTimer;
  private final AsyncHttpClientState clientState;
  private final NettyRequestFactory requestFactory;
  public NettyRequestSender(AsyncHttpClientConfig config,
                            ChannelManager channelManager,
                            Timer nettyTimer,
                            AsyncHttpClientState clientState) {
    this.config = config;
    this.channelManager = channelManager;
    this.connectionSemaphore = ConnectionSemaphore.newConnectionSemaphore(config);
    this.nettyTimer = nettyTimer;
    this.clientState = clientState;
    requestFactory = new NettyRequestFactory(config);
  }
  public <T> ListenableFuture<T> sendRequest(final Request request,
                                             final AsyncHandler<T> asyncHandler,
                                             NettyResponseFuture<T> future) {
    if (isClosed()) {
      throw new IllegalStateException("Closed");
    }
    validateWebSocketRequest(request, asyncHandler);
    ProxyServer proxyServer = getProxyServer(config, request);
    // WebSockets use connect tunneling to work with proxies
    if (proxyServer != null //
            && (request.getUri().isSecured() || request.getUri().isWebSocket()) //
            && !isConnectDone(request, future) //
            && proxyServer.getProxyType().isHttp()) {
      // Proxy with HTTPS or WebSocket: CONNECT for sure
      if (future != null && future.isConnectAllowed()) {
        // Perform CONNECT
        return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, true);
      } else {
        // CONNECT will depend if we can pool or connection or if we have to open a new
        // one
        return sendRequestThroughSslProxy(request, asyncHandler, future, proxyServer);
      }
    } else {
      // no CONNECT for sure
      return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, false);
    }
  }
  //......
}
NettyRequestSender的構(gòu)造器會根據(jù)配置創(chuàng)建ConnectionSemaphore,其sendRequest方法內(nèi)部調(diào)用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy

sendRequestWithCertainForceConnect

/**
   * We know for sure if we have to force to connect or not, so we can build the
   * HttpRequest right away This reduces the probability of having a pooled
   * channel closed by the server by the time we build the request
   */
  private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                     AsyncHandler<T> asyncHandler,
                                                                     NettyResponseFuture<T> future,
                                                                     ProxyServer proxyServer,
                                                                     boolean performConnectRequest) {
    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
            performConnectRequest);
    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
    return Channels.isChannelActive(channel)
            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }
sendRequestWithCertainForceConnect方法先通過getOpenChannel獲取channel,然后執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel

sendRequestThroughSslProxy

private <T> ListenableFuture<T> sendRequestThroughSslProxy(Request request,
                                                             AsyncHandler<T> asyncHandler,
                                                             NettyResponseFuture<T> future,
                                                             ProxyServer proxyServer) {
    NettyResponseFuture<T> newFuture = null;
    for (int i = 0; i < 3; i++) {
      Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
      if (channel == null) {
        // pool is empty
        break;
      }
      if (newFuture == null) {
        newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, false);
      }
      if (Channels.isChannelActive(channel)) {
        // if the channel is still active, we can use it,
        // otherwise, channel was closed by the time we computed the request, try again
        return sendRequestWithOpenChannel(newFuture, asyncHandler, channel);
      }
    }
    // couldn't poll an active channel
    newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, true);
    return sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }
sendRequestThroughSslProxy也是先通過getOpenChannel獲取channel,然后執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel

sendRequestWithNewChannel

private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request,
                                                            ProxyServer proxy,
                                                            NettyResponseFuture<T> future,
                                                            AsyncHandler<T> asyncHandler) {
    // some headers are only set when performing the first request
    HttpHeaders headers = future.getNettyRequest().getHttpRequest().headers();
    Realm realm = future.getRealm();
    Realm proxyRealm = future.getProxyRealm();
    requestFactory.addAuthorizationHeader(headers, perConnectionAuthorizationHeader(request, proxy, realm));
    requestFactory.setProxyAuthorizationHeader(headers, perConnectionProxyAuthorizationHeader(request, proxyRealm));
    future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM);
    future.setInProxyAuth(
            proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM);
    try {
      if (!channelManager.isOpen()) {
        throw PoolAlreadyClosedException.INSTANCE;
      }
      // Do not throw an exception when we need an extra connection for a
      // redirect.
      future.acquirePartitionLockLazily();
    } catch (Throwable t) {
      abort(null, future, getCause(t));
      // exit and don't try to resolve address
      return future;
    }
    resolveAddresses(request, proxy, future, asyncHandler)
            .addListener(new SimpleFutureListener<List<InetSocketAddress>>() {
              @Override
              protected void onSuccess(List<InetSocketAddress> addresses) {
                NettyConnectListener<T> connectListener = new NettyConnectListener<>(future,
                        NettyRequestSender.this, channelManager, connectionSemaphore);
                NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(),
                        addresses, asyncHandler, clientState);
                if (!future.isDone()) {
                  // Do not throw an exception when we need an extra connection for a redirect
                  // FIXME why? This violate the max connection per host handling, right?
                  channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy)
                          .addListener((Future<Bootstrap> whenBootstrap) -> {
                            if (whenBootstrap.isSuccess()) {
                              connector.connect(whenBootstrap.get(), connectListener);
                            } else {
                              abort(null, future, whenBootstrap.cause());
                            }
                          });
                }
              }
              @Override
              protected void onFailure(Throwable cause) {
                abort(null, future, getCause(cause));
              }
            });
    return future;
  }
sendRequestWithNewChannel方法會執(zhí)行future.acquirePartitionLockLazily()來判斷連接是否超出限制,而sendRequestWithOpenChannel方法則沒有這一層判斷

小結(jié)

  • AsyncHttpClient通過ConnectionSemaphore來控制連接的maxConnections及maxConnectionsPerHost
  • NonBlockingSemaphore內(nèi)部使用AtomicInteger來進行控制,permits表示可用的數(shù)量,release方法則遞增permits,tryAcquire則循環(huán)執(zhí)行先判斷permits是否大于0,否則返回false,若permits.compareAndSet(count, count - 1)成功則返回true,否則繼續(xù)循環(huán)執(zhí)行直到返回false或者true
  • NettyResponseFuture提供了acquirePartitionLockLazily方法,它會通過connectionSemaphore.acquireChannelLock(partitionKey)來獲取連接信號量;cancel和terminateAndExit都會執(zhí)行releasePartitionKeyLock,它會調(diào)用connectionSemaphore.releaseChannelLock(partitionKey)
  • NettyRequestSender的構(gòu)造器會根據(jù)配置創(chuàng)建ConnectionSemaphore,其sendRequest方法內(nèi)部調(diào)用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy,它們都是先通過getOpenChannel獲取channel,然后根據(jù)channel是否active來執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel;sendRequestWithNewChannel方法會執(zhí)行future.acquirePartitionLockLazily()來判斷連接是否超出限制,而sendRequestWithOpenChannel方法則沒有這一層判斷
綜上,AsyncHttpClient有定義了ChannelPool,不過其連接數(shù)的控制不是在ChannelPool里頭,而是通過ConnectionSemaphore來控制連接的maxConnections及maxConnectionsPerHost來執(zhí)行,它主要是在每次sendRequestWithNewChannel的時候進行控制,先執(zhí)行future.acquirePartitionLockLazily()獲取允許,再進行connect建立連接。

以上就是聊聊AsyncHttpClient的ConnectionSemaphore的詳細內(nèi)容,更多關(guān)于AsyncHttpClient的ConnectionSemaphore的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java這個名字的來歷與優(yōu)勢

    Java這個名字的來歷與優(yōu)勢

    Java是Sun公司開發(fā)的一種編程語言,Sun公司最初的方向是讓Java來開發(fā)一些電器裝置程序,Java名字的由來,實際上是一個有趣的故事。
    2014-10-10
  • Java8使用stream查找重復(fù)元素的方法示例

    Java8使用stream查找重復(fù)元素的方法示例

    Java?8?是一個非常成功的版本,這個版本新增的Stream,配合同版本出現(xiàn)的Lambda?,給我們操作集合(Collection)提供了極大的便利,Stream流是JDK8新增的成員,本文給大家介紹了Java8使用stream查找重復(fù)元素的方法示例,需要的朋友可以參考下
    2024-04-04
  • Java實現(xiàn)短信驗證碼的示例代碼

    Java實現(xiàn)短信驗證碼的示例代碼

    本文主要介紹了Java實現(xiàn)短信驗證碼的示例代碼,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • Java實現(xiàn)從jar包中讀取指定文件的方法

    Java實現(xiàn)從jar包中讀取指定文件的方法

    這篇文章主要介紹了Java實現(xiàn)從jar包中讀取指定文件的方法,涉及java針對jar文件的讀取及查找相關(guān)操作技巧,需要的朋友可以參考下
    2017-08-08
  • Java Stream流之求和的實現(xiàn)

    Java Stream流之求和的實現(xiàn)

    這篇文章主要介紹了Java Stream流之求和的實現(xiàn),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java接口和抽象類原理詳解

    Java接口和抽象類原理詳解

    這篇文章主要介紹了Java接口和抽象類原理詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-02-02
  • spring-cloud入門之spring-cloud-config(配置中心)

    spring-cloud入門之spring-cloud-config(配置中心)

    這篇文章主要介紹了spring-cloud入門之spring-cloud-config(配置中心),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-01-01
  • springBoot解決static和@Component遇到的bug

    springBoot解決static和@Component遇到的bug

    這篇文章主要介紹了springBoot解決static和@Component遇到的bug,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 《阿里巴巴 Java開發(fā)手冊》讀后感小結(jié)

    《阿里巴巴 Java開發(fā)手冊》讀后感小結(jié)

    這篇文章主要介紹了《阿里巴巴 Java開發(fā)手冊》讀后感小結(jié),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-12-12
  • java實現(xiàn)excel導(dǎo)出合并單元格的步驟詳解

    java實現(xiàn)excel導(dǎo)出合并單元格的步驟詳解

    這篇文章主要介紹了java實現(xiàn)excel導(dǎo)出合并單元格,通過使用Apache POI庫,我們可以方便地創(chuàng)建Excel文件、填充數(shù)據(jù)、合并單元格和導(dǎo)出Excel文件,需要的朋友可以參考下
    2023-04-04

最新評論