欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

AsyncHttpClient ListenableFuture源碼流程解讀

 更新時間:2023年12月18日 08:35:35   作者:codecraft  
這篇文章主要為大家介紹了AsyncHttpClient ListenableFuture源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

public interface ListenableFuture<V> extends Future<V> {
  /**
   * Terminate and if there is no exception, mark this Future as done and release the internal lock.
   */
  void done();
  /**
   * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
   *
   * @param t the exception
   */
  void abort(Throwable t);
  /**
   * Touch the current instance to prevent external service to times out.
   */
  void touch();
  /**
   * Adds a listener and executor to the ListenableFuture.
   * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
   * to the executor} for execution when the {@code Future}'s computation is
   * {@linkplain Future#isDone() complete}.
   * <br>
   * Executor can be <code>null</code>, in that case executor will be executed
   * in the thread where completion happens.
   * <br>
   * There is no guaranteed ordering of execution of listeners, they may get
   * called in the order they were added and they may get called out of order,
   * but any listener added through this method is guaranteed to be called once
   * the computation is complete.
   *
   * @param listener the listener to run when the computation is complete.
   * @param exec     the executor to run the listener in.
   * @return this Future
   */
  ListenableFuture<V> addListener(Runnable listener, Executor exec);
  CompletableFuture<V> toCompletableFuture();
  //......
}
ListenableFuture繼承了java.util.concurrent.Future,它定義了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

class CompletedFailure<T> implements ListenableFuture<T> {
    private final ExecutionException e;
    public CompletedFailure(Throwable t) {
      e = new ExecutionException(t);
    }
    public CompletedFailure(String message, Throwable t) {
      e = new ExecutionException(message, t);
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return true;
    }
    @Override
    public boolean isCancelled() {
      return false;
    }
    @Override
    public boolean isDone() {
      return true;
    }
    @Override
    public T get() throws ExecutionException {
      throw e;
    }
    @Override
    public T get(long timeout, TimeUnit unit) throws ExecutionException {
      throw e;
    }
    @Override
    public void done() {
    }
    @Override
    public void abort(Throwable t) {
    }
    @Override
    public void touch() {
    }
    @Override
    public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
      if (exec != null) {
        exec.execute(listener);
      } else {
        listener.run();
      }
      return this;
    }
    @Override
    public CompletableFuture<T> toCompletableFuture() {
      CompletableFuture<T> future = new CompletableFuture<>();
      future.completeExceptionally(e);
      return future;
    }
  }
CompletedFailure實現(xiàn)了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

public final class NettyResponseFuture<V> implements ListenableFuture<V> {
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "redirectCount");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "currentRetry");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isDone");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isCancelled");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inProxyAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "contentProcessed");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "onThrowableCalled");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
  private final long start = unpreciseMillisTime();
  private final ChannelPoolPartitioning connectionPoolPartitioning;
  private final ConnectionSemaphore connectionSemaphore;
  private final ProxyServer proxyServer;
  private final int maxRetry;
  private final CompletableFuture<V> future = new CompletableFuture<>();          
          //......
  @Override
  public V get() throws InterruptedException, ExecutionException {
    return future.get();
  }
  @Override
  public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
    return future.get(l, tu);
  }          
}
NettyResponseFuture實現(xiàn)了ListenableFuture接口

done

public final void done() {
    if (terminateAndExit())
      return;
    try {
      loadContent();
    } catch (ExecutionException ignored) {
    } catch (RuntimeException t) {
      future.completeExceptionally(t);
    } catch (Throwable t) {
      future.completeExceptionally(t);
      throw t;
    }
  }
  private boolean terminateAndExit() {
    releasePartitionKeyLock();
    cancelTimeouts();
    this.channel = null;
    this.reuseChannel = false;
    return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
  }  
private void loadContent() throws ExecutionException {
    if (future.isDone()) {
      try {
        future.get();
      } catch (InterruptedException e) {
        throw new RuntimeException("unreachable", e);
      }
    }
    // No more retry
    CURRENT_RETRY_UPDATER.set(this, maxRetry);
    if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
      try {
        future.complete(asyncHandler.onCompleted());
      } catch (Throwable ex) {
        if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
          try {
            try {
              asyncHandler.onThrowable(ex);
            } catch (Throwable t) {
              LOGGER.debug("asyncHandler.onThrowable", t);
            }
          } finally {
            cancelTimeouts();
          }
        }
        future.completeExceptionally(ex);
      }
    }
    future.getNow(null);
  }
done方法對于terminateAndExit返回true的直接返回,否則執(zhí)行l(wèi)oadContent,它對于future.isDone()的執(zhí)行future.get(),然后執(zhí)行future.complete(asyncHandler.onCompleted())回調(diào)

abort

public final void abort(final Throwable t) {
    if (terminateAndExit())
      return;
    future.completeExceptionally(t);
    if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
      try {
        asyncHandler.onThrowable(t);
      } catch (Throwable te) {
        LOGGER.debug("asyncHandler.onThrowable", te);
      }
    }
  }
abort方法也是對于terminateAndExit返回true的直接返回,否則執(zhí)行future.completeExceptionally(t),然后觸發(fā)asyncHandler.onThrowable(t)回調(diào)

touch

public void touch() {
    touch = unpreciseMillisTime();
  }
touch方法用當前時間戳更新touch屬性

addListener

public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
    if (exec == null) {
      exec = Runnable::run;
    }
    future.whenCompleteAsync((r, v) -> listener.run(), exec);
    return this;
  }
addListener方法會執(zhí)行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

public CompletableFuture<V> toCompletableFuture() {
    return future;
  }
toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
                                                            AsyncHandler<T> asyncHandler,
                                                            NettyRequest nettyRequest,
                                                            ProxyServer proxyServer) {
    NettyResponseFuture<T> future = new NettyResponseFuture<>(
            request,
            asyncHandler,
            nettyRequest,
            config.getMaxRequestRetry(),
            request.getChannelPoolPartitioning(),
            connectionSemaphore,
            proxyServer);
    String expectHeader = request.getHeaders().get(EXPECT);
    if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
      future.setDontWriteBodyBecauseExpectContinue(true);
    return future;
  }
  private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                     AsyncHandler<T> asyncHandler,
                                                                     NettyResponseFuture<T> future,
                                                                     ProxyServer proxyServer,
                                                                     boolean performConnectRequest) {
    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
            performConnectRequest);
    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
    return Channels.isChannelActive(channel)
            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }
NettyRequestSender的newNettyResponseFuture創(chuàng)建的是NettyResponseFuture;sendRequestWithCertainForceConnect則將NettyResponseFuture傳遞給sendRequestWithOpenChannel或者sendRequestWithNewChannel來發(fā)送請求

小結(jié)

AsyncHttpClient的ListenableFuture繼承了java.util.concurrent.Future,它定義了done、abort、touch、addListener、toCompletableFuture方法;它有兩個實現(xiàn)類,分別是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法將NettyResponseFuture傳遞給sendRequestWithOpenChannel或者sendRequestWithNewChannel來發(fā)送請求。

以上就是聊聊AsyncHttpClient的ListenableFuture的詳細內(nèi)容,更多關(guān)于AsyncHttpClient的ListenableFuture的資料請關(guān)注腳本之家其它相關(guān)文章!

以上就是AsyncHttpClient ListenableFuture源碼流程解讀的詳細內(nèi)容,更多關(guān)于AsyncHttpClient ListenableFuture的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談JAVA 內(nèi)存流的實現(xiàn)

    淺談JAVA 內(nèi)存流的實現(xiàn)

    這篇文章主要介紹了淺談JAVA 內(nèi)存流的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-02-02
  • mac下idea啟動web項目報錯java.net.SocketException:socket closed問題

    mac下idea啟動web項目報錯java.net.SocketException:socket closed

    本文主要介紹了作者在項目啟動時遇到的一個問題——無法打開調(diào)試端口,經(jīng)過一系列排查和嘗試,最終發(fā)現(xiàn)是由于權(quán)限問題導致的,作者還分享了如何修改文件權(quán)限的方法,并提醒大家不要隨意kill掉占用端口的進程
    2024-12-12
  • mybatis @Intercepts的用法解讀

    mybatis @Intercepts的用法解讀

    這篇文章主要介紹了mybatis @Intercepts的用法解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringCloud Feign的使用代碼實例

    SpringCloud Feign的使用代碼實例

    這篇文章主要介紹了SpringCloud Feign的使用代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-03-03
  • java連接FTP服務器圖文教程

    java連接FTP服務器圖文教程

    這篇文章主要給大家介紹了關(guān)于java連接FTP服務器的相關(guān)資料,Ftp是一種常見的文件存儲服務器,在很多的項目中都有使用,方便存儲各種格式的文件,使用java連接ftp文件服務器也是常用的工具類,需要的朋友可以參考下
    2023-08-08
  • 將List集合中的map對象轉(zhuǎn)為List<對象>形式實例代碼

    將List集合中的map對象轉(zhuǎn)為List<對象>形式實例代碼

    這篇文章主要介紹了將List集合中的map對象轉(zhuǎn)為List<對象>形式實例代碼,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • Java實現(xiàn)藍橋杯G將軍的示例代碼

    Java實現(xiàn)藍橋杯G將軍的示例代碼

    這篇文章主要介紹了Java實現(xiàn)藍橋杯G將軍的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-02-02
  • springboot結(jié)合mybatis操作事務配置的處理

    springboot結(jié)合mybatis操作事務配置的處理

    在操作數(shù)據(jù)庫的時候,經(jīng)常會使用事務的處理,本文主要介紹了springboot結(jié)合mybatis操作事務配置的處理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-07-07
  • Spring事件監(jiān)聽器之@EventListener原理分析

    Spring事件監(jiān)聽器之@EventListener原理分析

    這篇文章主要介紹了Spring事件監(jiān)聽器之@EventListener原理分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Intellij Mybatis連接Mysql數(shù)據(jù)庫

    Intellij Mybatis連接Mysql數(shù)據(jù)庫

    最近在搞android的項目,在開發(fā)過程中遇到了好多問題,今天小編給大家說下mybatis連接MySQL數(shù)據(jù)庫的方法,感興趣的朋友跟著小編一起學習吧
    2016-10-10

最新評論