AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)
序
本文主要研究一下AsyncHttpClient的TimeoutTimerTask
TimerTask
io/netty/util/TimerTask.java
/** * A task which is executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. */ public interface TimerTask { /** * Executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. * * @param timeout a handle which is associated with this task */ void run(Timeout timeout) throws Exception; }
netty的TimerTask接口定義了run方法,其入?yún)門imeout
Timeout
io/netty/util/Timeout.java
public interface Timeout { /** * Returns the {@link Timer} that created this handle. */ Timer timer(); /** * Returns the {@link TimerTask} which is associated with this handle. */ TimerTask task(); /** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been expired. */ boolean isExpired(); /** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been cancelled. */ boolean isCancelled(); /** * Attempts to cancel the {@link TimerTask} associated with this handle. * If the task has been executed or cancelled already, it will return with * no side effect. * * @return True if the cancellation completed successfully, otherwise false */ boolean cancel(); }
Timeout接口定義了timer()、task()、isExpired()、isCancelled()、cancel()方法
TimeoutTimerTask
org/asynchttpclient/netty/timeout/TimeoutTimerTask.java
public abstract class TimeoutTimerTask implements TimerTask { private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class); protected final AtomicBoolean done = new AtomicBoolean(); protected final NettyRequestSender requestSender; final TimeoutsHolder timeoutsHolder; volatile NettyResponseFuture<?> nettyResponseFuture; TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) { this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; this.timeoutsHolder = timeoutsHolder; } void expire(String message, long time) { LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time); requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message)); } /** * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the * channel, and heavy objects such as SslEngines */ public void clean() { if (done.compareAndSet(false, true)) { nettyResponseFuture = null; } } void appendRemoteAddress(StringBuilder sb) { InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress(); sb.append(remoteAddress.getHostName()); if (!remoteAddress.isUnresolved()) { sb.append('/').append(remoteAddress.getAddress().getHostAddress()); } sb.append(':').append(remoteAddress.getPort()); } }
TimeoutTimerTask聲明實(shí)現(xiàn)TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture
ReadTimeoutTimerTask
org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java
public class ReadTimeoutTimerTask extends TimeoutTimerTask { private final long readTimeout; ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder, int readTimeout) { super(nettyResponseFuture, requestSender, timeoutsHolder); this.readTimeout = readTimeout; } public void run(Timeout timeout) { if (done.getAndSet(true) || requestSender.isClosed()) return; if (nettyResponseFuture.isDone()) { timeoutsHolder.cancel(); return; } long now = unpreciseMillisTime(); long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch(); long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now; if (durationBeforeCurrentReadTimeout <= 0L) { // idleConnectTimeout reached StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to "); appendRemoteAddress(sb); String message = sb.append(" after ").append(readTimeout).append(" ms").toString(); long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch(); expire(message, durationSinceLastTouch); // cancel request timeout sibling timeoutsHolder.cancel(); } else { done.set(false); timeoutsHolder.startReadTimeout(this); } } }
ReadTimeoutTimerTask繼承了TimeoutTimerTask,其run方法會(huì)根據(jù)readTimeout及nettyResponseFuture.getLastTouch()計(jì)算currentReadTimeoutInstant,然后判斷是否已經(jīng)超時(shí),是則執(zhí)行expire及timeoutsHolder.cancel(),否則執(zhí)行timeoutsHolder.startReadTimeout(this)
startReadTimeout
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
void startReadTimeout(ReadTimeoutTimerTask task) { if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) { // only schedule a new readTimeout if the requestTimeout doesn't happen first if (task == null) { // first call triggered from outside (else is read timeout is re-scheduling itself) task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue); } this.readTimeout = newTimeout(task, readTimeoutValue); } else if (task != null) { // read timeout couldn't re-scheduling itself, clean up task.clean(); } }
startReadTimeout會(huì)判斷readTimeoutValue+當(dāng)前時(shí)間是否小于requestTimeoutMillisTime,是則通過newTimeout調(diào)度,否則執(zhí)行task.clean()
RequestTimeoutTimerTask
org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java
public class RequestTimeoutTimerTask extends TimeoutTimerTask { private final long requestTimeout; RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder, int requestTimeout) { super(nettyResponseFuture, requestSender, timeoutsHolder); this.requestTimeout = requestTimeout; } public void run(Timeout timeout) { if (done.getAndSet(true) || requestSender.isClosed()) return; // in any case, cancel possible readTimeout sibling timeoutsHolder.cancel(); if (nettyResponseFuture.isDone()) return; StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to "); appendRemoteAddress(sb); String message = sb.append(" after ").append(requestTimeout).append(" ms").toString(); long age = unpreciseMillisTime() - nettyResponseFuture.getStart(); expire(message, age); } }
RequestTimeoutTimerTask繼承了TimeoutTimerTask,其run方法在done為true或者requestSender為closed則直接返回,對于nettyResponseFuture.isDone()也直接返回,其余的執(zhí)行expire方法
TimeoutsHolder
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
public class TimeoutsHolder { private final Timeout requestTimeout; private final AtomicBoolean cancelled = new AtomicBoolean(); private final Timer nettyTimer; private final NettyRequestSender requestSender; private final long requestTimeoutMillisTime; private final int readTimeoutValue; private volatile Timeout readTimeout; private volatile NettyResponseFuture<?> nettyResponseFuture; private volatile InetSocketAddress remoteAddress; public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) { this.nettyTimer = nettyTimer; this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; this.remoteAddress = originalRemoteAddress; final Request targetRequest = nettyResponseFuture.getTargetRequest(); final int readTimeoutInMs = targetRequest.getReadTimeout(); this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs; int requestTimeoutInMs = targetRequest.getRequestTimeout(); if (requestTimeoutInMs == 0) { requestTimeoutInMs = config.getRequestTimeout(); } if (requestTimeoutInMs != -1) { requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs; requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs); } else { requestTimeoutMillisTime = -1L; requestTimeout = null; } } //...... }
TimeoutsHolder的構(gòu)造器對于requestTimeoutInMs不為-1的會(huì)創(chuàng)建RequestTimeoutTimerTask,然后通過newTimeout進(jìn)行調(diào)度
scheduleRequestTimeout
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler, Channel channel) { try { asyncHandler.onConnectionPooled(channel); } catch (Exception e) { LOGGER.error("onConnectionPooled crashed", e); abort(channel, future, e); return future; } SocketAddress channelRemoteAddress = channel.remoteAddress(); if (channelRemoteAddress != null) { // otherwise, bad luck, the channel was closed, see bellow scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress); } future.setChannelState(ChannelState.POOLED); future.attachChannel(channel, false); if (LOGGER.isDebugEnabled()) { HttpRequest httpRequest = future.getNettyRequest().getHttpRequest(); LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri()); } // channelInactive might be called between isChannelValid and writeRequest // so if we don't store the Future now, channelInactive won't perform // handleUnexpectedClosedChannel Channels.setAttribute(channel, future); if (Channels.isChannelActive(channel)) { writeRequest(future, channel); } else { // bad luck, the channel was closed in-between // there's a very good chance onClose was already notified but the // future wasn't already registered handleUnexpectedClosedChannel(channel, future); } return future; } private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture, InetSocketAddress originalRemoteAddress) { nettyResponseFuture.touch(); TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config, originalRemoteAddress); nettyResponseFuture.setTimeoutsHolder(timeoutsHolder); } public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) { NettyRequest nettyRequest = future.getNettyRequest(); HttpRequest httpRequest = nettyRequest.getHttpRequest(); AsyncHandler<T> asyncHandler = future.getAsyncHandler(); // if the channel is dead because it was pooled and the remote server decided to // close it, // we just let it go and the channelInactive do its work if (!Channels.isChannelActive(channel)) return; try { if (asyncHandler instanceof TransferCompletionHandler) { configureTransferAdapter(asyncHandler, httpRequest); } boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null; if (!future.isHeadersAlreadyWrittenOnContinue()) { try { asyncHandler.onRequestSend(nettyRequest); } catch (Exception e) { LOGGER.error("onRequestSend crashed", e); abort(channel, future, e); return; } // if the request has a body, we want to track progress if (writeBody) { // FIXME does this really work??? the promise is for the request without body!!! ChannelProgressivePromise promise = channel.newProgressivePromise(); ChannelFuture f = channel.write(httpRequest, promise); f.addListener(new WriteProgressListener(future, true, 0L)); } else { // we can just track write completion ChannelPromise promise = channel.newPromise(); ChannelFuture f = channel.writeAndFlush(httpRequest, promise); f.addListener(new WriteCompleteListener(future)); } } if (writeBody) nettyRequest.getBody().write(channel, future); // don't bother scheduling read timeout if channel became invalid if (Channels.isChannelActive(channel)) { scheduleReadTimeout(future); } } catch (Exception e) { LOGGER.error("Can't write request", e); abort(channel, future, e); } }
NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不為null時(shí)會(huì)執(zhí)行scheduleRequestTimeout,創(chuàng)建TimeoutsHolder調(diào)度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel還是active的則通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask
小結(jié)
AsyncHttpClient的TimeoutTimerTask聲明實(shí)現(xiàn)了netty的TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture;它有一個(gè)抽象子類為TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask繼承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder來封裝了這些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法會(huì)觸發(fā)調(diào)度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask。
可以看到requestTimeoutMillisTime是總的請求時(shí)間,它包含了寫入數(shù)據(jù)之后的readTimeoutValue
以上就是AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)的詳細(xì)內(nèi)容,更多關(guān)于AsyncHttpClient TimeoutTimerTask的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java連接MYSQL數(shù)據(jù)庫的詳細(xì)步驟
這篇文章主要為大家介紹了Java連接MYSQL數(shù)據(jù)庫的詳細(xì)步驟,感興趣的小伙伴們可以參考一下2016-05-05String與XML互轉(zhuǎn)以及從XML取節(jié)點(diǎn)值并修改的方法
今天小編就為大家分享一篇String與XML互轉(zhuǎn)以及從XML取節(jié)點(diǎn)值并修改的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07Java導(dǎo)出多個(gè)excel表打包到zip文件中供客戶端另存為窗口下載實(shí)現(xiàn)方法
最近的項(xiàng)目有一個(gè)導(dǎo)出匯總數(shù)據(jù)的要求,考慮到用戶軟件的差異,所以要分別導(dǎo)出xls以及xlsx并且打包提供下載,下面這篇文章主要給大家介紹了關(guān)于Java導(dǎo)出多個(gè)excel表打包到zip文件中供客戶端另存為窗口下載的實(shí)現(xiàn)方法,需要的朋友可以參考下2023-12-12Java8 將一個(gè)List<T>轉(zhuǎn)為Map<String,T>的操作
這篇文章主要介紹了Java8 將一個(gè)List<T>轉(zhuǎn)為Map<String, T>的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02JAVA 統(tǒng)計(jì)字符串中中文,英文,數(shù)字,空格,特殊字符的個(gè)數(shù)
這篇文章主要介紹了JAVA 統(tǒng)計(jì)字符串中中文,英文,數(shù)字,空格,特殊字符的個(gè)數(shù) ,本文通過一段代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-06-06springboot使用@Slf4j進(jìn)行日志的記錄步驟詳解
這篇文章主要介紹了springboot使用@Slf4j進(jìn)行日志的記錄,使用@Slf4j的注解進(jìn)行日志記錄非常方便,本文給大家分享操作步驟,需要的朋友可以參考下2023-08-08基于Java實(shí)現(xiàn)一個(gè)自己的HTTP瀏覽器
這篇文章主要為大家詳細(xì)介紹了如何基于Java實(shí)現(xiàn)一個(gè)自己的HTTP瀏覽器,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01