欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)

 更新時(shí)間:2023年12月14日 10:20:15   作者:codecraft  
這篇文章主要為大家介紹了AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下AsyncHttpClient的TimeoutTimerTask

TimerTask

io/netty/util/TimerTask.java

/**
 * A task which is executed after the delay specified with
 * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
 */
public interface TimerTask {
    /**
     * Executed after the delay specified with
     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;
}
netty的TimerTask接口定義了run方法,其入?yún)門imeout

Timeout

io/netty/util/Timeout.java

public interface Timeout {
    /**
     * Returns the {@link Timer} that created this handle.
     */
    Timer timer();
    /**
     * Returns the {@link TimerTask} which is associated with this handle.
     */
    TimerTask task();
    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been expired.
     */
    boolean isExpired();
    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been cancelled.
     */
    boolean isCancelled();
    /**
     * Attempts to cancel the {@link TimerTask} associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}
Timeout接口定義了timer()、task()、isExpired()、isCancelled()、cancel()方法

TimeoutTimerTask

org/asynchttpclient/netty/timeout/TimeoutTimerTask.java

public abstract class TimeoutTimerTask implements TimerTask {
  private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
  protected final AtomicBoolean done = new AtomicBoolean();
  protected final NettyRequestSender requestSender;
  final TimeoutsHolder timeoutsHolder;
  volatile NettyResponseFuture<?> nettyResponseFuture;
  TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.timeoutsHolder = timeoutsHolder;
  }
  void expire(String message, long time) {
    LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
    requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
  }
  /**
   * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
   * channel, and heavy objects such as SslEngines
   */
  public void clean() {
    if (done.compareAndSet(false, true)) {
      nettyResponseFuture = null;
    }
  }
  void appendRemoteAddress(StringBuilder sb) {
    InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
    sb.append(remoteAddress.getHostName());
    if (!remoteAddress.isUnresolved()) {
      sb.append('/').append(remoteAddress.getAddress().getHostAddress());
    }
    sb.append(':').append(remoteAddress.getPort());
  }
}
TimeoutTimerTask聲明實(shí)現(xiàn)TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture

ReadTimeoutTimerTask

org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

public class ReadTimeoutTimerTask extends TimeoutTimerTask {
  private final long readTimeout;
  ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                       NettyRequestSender requestSender,
                       TimeoutsHolder timeoutsHolder,
                       int readTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.readTimeout = readTimeout;
  }
  public void run(Timeout timeout) {
    if (done.getAndSet(true) || requestSender.isClosed())
      return;
    if (nettyResponseFuture.isDone()) {
      timeoutsHolder.cancel();
      return;
    }
    long now = unpreciseMillisTime();
    long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
    long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;
    if (durationBeforeCurrentReadTimeout <= 0L) {
      // idleConnectTimeout reached
      StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
      appendRemoteAddress(sb);
      String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
      long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
      expire(message, durationSinceLastTouch);
      // cancel request timeout sibling
      timeoutsHolder.cancel();
    } else {
      done.set(false);
      timeoutsHolder.startReadTimeout(this);
    }
  }
}
ReadTimeoutTimerTask繼承了TimeoutTimerTask,其run方法會(huì)根據(jù)readTimeout及nettyResponseFuture.getLastTouch()計(jì)算currentReadTimeoutInstant,然后判斷是否已經(jīng)超時(shí),是則執(zhí)行expire及timeoutsHolder.cancel(),否則執(zhí)行timeoutsHolder.startReadTimeout(this)

startReadTimeout

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

void startReadTimeout(ReadTimeoutTimerTask task) {
    if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
      // only schedule a new readTimeout if the requestTimeout doesn't happen first
      if (task == null) {
        // first call triggered from outside (else is read timeout is re-scheduling itself)
        task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
      }
      this.readTimeout = newTimeout(task, readTimeoutValue);
    } else if (task != null) {
      // read timeout couldn't re-scheduling itself, clean up
      task.clean();
    }
  }
startReadTimeout會(huì)判斷readTimeoutValue+當(dāng)前時(shí)間是否小于requestTimeoutMillisTime,是則通過newTimeout調(diào)度,否則執(zhí)行task.clean()

RequestTimeoutTimerTask

org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java

public class RequestTimeoutTimerTask extends TimeoutTimerTask {
  private final long requestTimeout;
  RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                                 NettyRequestSender requestSender,
                                 TimeoutsHolder timeoutsHolder,
                                 int requestTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.requestTimeout = requestTimeout;
  }
  public void run(Timeout timeout) {
    if (done.getAndSet(true) || requestSender.isClosed())
      return;
    // in any case, cancel possible readTimeout sibling
    timeoutsHolder.cancel();
    if (nettyResponseFuture.isDone())
      return;
    StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
    appendRemoteAddress(sb);
    String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
    long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
    expire(message, age);
  }
}
RequestTimeoutTimerTask繼承了TimeoutTimerTask,其run方法在done為true或者requestSender為closed則直接返回,對于nettyResponseFuture.isDone()也直接返回,其余的執(zhí)行expire方法

TimeoutsHolder

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

public class TimeoutsHolder {
  private final Timeout requestTimeout;
  private final AtomicBoolean cancelled = new AtomicBoolean();
  private final Timer nettyTimer;
  private final NettyRequestSender requestSender;
  private final long requestTimeoutMillisTime;
  private final int readTimeoutValue;
  private volatile Timeout readTimeout;
  private volatile NettyResponseFuture<?> nettyResponseFuture;
  private volatile InetSocketAddress remoteAddress;
  public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
    this.nettyTimer = nettyTimer;
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.remoteAddress = originalRemoteAddress;
    final Request targetRequest = nettyResponseFuture.getTargetRequest();
    final int readTimeoutInMs = targetRequest.getReadTimeout();
    this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs;
    int requestTimeoutInMs = targetRequest.getRequestTimeout();
    if (requestTimeoutInMs == 0) {
      requestTimeoutInMs = config.getRequestTimeout();
    }
    if (requestTimeoutInMs != -1) {
      requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
      requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
    } else {
      requestTimeoutMillisTime = -1L;
      requestTimeout = null;
    }
  }
  //......
}
TimeoutsHolder的構(gòu)造器對于requestTimeoutInMs不為-1的會(huì)創(chuàng)建RequestTimeoutTimerTask,然后通過newTimeout進(jìn)行調(diào)度

scheduleRequestTimeout

org/asynchttpclient/netty/request/NettyRequestSender.java

private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future,
                                                             AsyncHandler<T> asyncHandler,
                                                             Channel channel) {
    try {
      asyncHandler.onConnectionPooled(channel);
    } catch (Exception e) {
      LOGGER.error("onConnectionPooled crashed", e);
      abort(channel, future, e);
      return future;
    }
    SocketAddress channelRemoteAddress = channel.remoteAddress();
    if (channelRemoteAddress != null) {
      // otherwise, bad luck, the channel was closed, see bellow
      scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress);
    }
    future.setChannelState(ChannelState.POOLED);
    future.attachChannel(channel, false);
    if (LOGGER.isDebugEnabled()) {
      HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
      LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri());
    }
    // channelInactive might be called between isChannelValid and writeRequest
    // so if we don't store the Future now, channelInactive won't perform
    // handleUnexpectedClosedChannel
    Channels.setAttribute(channel, future);
    if (Channels.isChannelActive(channel)) {
      writeRequest(future, channel);
    } else {
      // bad luck, the channel was closed in-between
      // there's a very good chance onClose was already notified but the
      // future wasn't already registered
      handleUnexpectedClosedChannel(channel, future);
    }
    return future;
  }
  private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture,
                                      InetSocketAddress originalRemoteAddress) {
    nettyResponseFuture.touch();
    TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config,
            originalRemoteAddress);
    nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
  }
  public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
    NettyRequest nettyRequest = future.getNettyRequest();
    HttpRequest httpRequest = nettyRequest.getHttpRequest();
    AsyncHandler<T> asyncHandler = future.getAsyncHandler();
    // if the channel is dead because it was pooled and the remote server decided to
    // close it,
    // we just let it go and the channelInactive do its work
    if (!Channels.isChannelActive(channel))
      return;
    try {
      if (asyncHandler instanceof TransferCompletionHandler) {
        configureTransferAdapter(asyncHandler, httpRequest);
      }
      boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue()
              && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null;
      if (!future.isHeadersAlreadyWrittenOnContinue()) {
        try {
          asyncHandler.onRequestSend(nettyRequest);
        } catch (Exception e) {
          LOGGER.error("onRequestSend crashed", e);
          abort(channel, future, e);
          return;
        }
        // if the request has a body, we want to track progress
        if (writeBody) {
          // FIXME does this really work??? the promise is for the request without body!!!
          ChannelProgressivePromise promise = channel.newProgressivePromise();
          ChannelFuture f = channel.write(httpRequest, promise);
          f.addListener(new WriteProgressListener(future, true, 0L));
        } else {
          // we can just track write completion
          ChannelPromise promise = channel.newPromise();
          ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
          f.addListener(new WriteCompleteListener(future));
        }
      }
      if (writeBody)
        nettyRequest.getBody().write(channel, future);
      // don't bother scheduling read timeout if channel became invalid
      if (Channels.isChannelActive(channel)) {
        scheduleReadTimeout(future);
      }
    } catch (Exception e) {
      LOGGER.error("Can't write request", e);
      abort(channel, future, e);
    }
  }
NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不為null時(shí)會(huì)執(zhí)行scheduleRequestTimeout,創(chuàng)建TimeoutsHolder調(diào)度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel還是active的則通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask

小結(jié)

AsyncHttpClient的TimeoutTimerTask聲明實(shí)現(xiàn)了netty的TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture;它有一個(gè)抽象子類為TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask繼承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder來封裝了這些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法會(huì)觸發(fā)調(diào)度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask。

可以看到requestTimeoutMillisTime是總的請求時(shí)間,它包含了寫入數(shù)據(jù)之后的readTimeoutValue

以上就是AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)的詳細(xì)內(nèi)容,更多關(guān)于AsyncHttpClient TimeoutTimerTask的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論