AsyncHttpClient的TimeoutTimerTask連接池異步超時
序
本文主要研究一下AsyncHttpClient的TimeoutTimerTask
TimerTask
io/netty/util/TimerTask.java
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*/
public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout a handle which is associated with this task
*/
void run(Timeout timeout) throws Exception;
}netty的TimerTask接口定義了run方法,其入?yún)門imeout
Timeout
io/netty/util/Timeout.java
public interface Timeout {
/**
* Returns the {@link Timer} that created this handle.
*/
Timer timer();
/**
* Returns the {@link TimerTask} which is associated with this handle.
*/
TimerTask task();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been expired.
*/
boolean isExpired();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been cancelled.
*/
boolean isCancelled();
/**
* Attempts to cancel the {@link TimerTask} associated with this handle.
* If the task has been executed or cancelled already, it will return with
* no side effect.
*
* @return True if the cancellation completed successfully, otherwise false
*/
boolean cancel();
}Timeout接口定義了timer()、task()、isExpired()、isCancelled()、cancel()方法
TimeoutTimerTask
org/asynchttpclient/netty/timeout/TimeoutTimerTask.java
public abstract class TimeoutTimerTask implements TimerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
protected final AtomicBoolean done = new AtomicBoolean();
protected final NettyRequestSender requestSender;
final TimeoutsHolder timeoutsHolder;
volatile NettyResponseFuture<?> nettyResponseFuture;
TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
this.nettyResponseFuture = nettyResponseFuture;
this.requestSender = requestSender;
this.timeoutsHolder = timeoutsHolder;
}
void expire(String message, long time) {
LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
}
/**
* When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
* channel, and heavy objects such as SslEngines
*/
public void clean() {
if (done.compareAndSet(false, true)) {
nettyResponseFuture = null;
}
}
void appendRemoteAddress(StringBuilder sb) {
InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
sb.append(remoteAddress.getHostName());
if (!remoteAddress.isUnresolved()) {
sb.append('/').append(remoteAddress.getAddress().getHostAddress());
}
sb.append(':').append(remoteAddress.getPort());
}
}TimeoutTimerTask聲明實現(xiàn)TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture
ReadTimeoutTimerTask
org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java
public class ReadTimeoutTimerTask extends TimeoutTimerTask {
private final long readTimeout;
ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
NettyRequestSender requestSender,
TimeoutsHolder timeoutsHolder,
int readTimeout) {
super(nettyResponseFuture, requestSender, timeoutsHolder);
this.readTimeout = readTimeout;
}
public void run(Timeout timeout) {
if (done.getAndSet(true) || requestSender.isClosed())
return;
if (nettyResponseFuture.isDone()) {
timeoutsHolder.cancel();
return;
}
long now = unpreciseMillisTime();
long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;
if (durationBeforeCurrentReadTimeout <= 0L) {
// idleConnectTimeout reached
StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
appendRemoteAddress(sb);
String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
expire(message, durationSinceLastTouch);
// cancel request timeout sibling
timeoutsHolder.cancel();
} else {
done.set(false);
timeoutsHolder.startReadTimeout(this);
}
}
}ReadTimeoutTimerTask繼承了TimeoutTimerTask,其run方法會根據(jù)readTimeout及nettyResponseFuture.getLastTouch()計算currentReadTimeoutInstant,然后判斷是否已經(jīng)超時,是則執(zhí)行expire及timeoutsHolder.cancel(),否則執(zhí)行timeoutsHolder.startReadTimeout(this)
startReadTimeout
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
void startReadTimeout(ReadTimeoutTimerTask task) {
if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
// only schedule a new readTimeout if the requestTimeout doesn't happen first
if (task == null) {
// first call triggered from outside (else is read timeout is re-scheduling itself)
task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
}
this.readTimeout = newTimeout(task, readTimeoutValue);
} else if (task != null) {
// read timeout couldn't re-scheduling itself, clean up
task.clean();
}
}startReadTimeout會判斷readTimeoutValue+當前時間是否小于requestTimeoutMillisTime,是則通過newTimeout調(diào)度,否則執(zhí)行task.clean()
RequestTimeoutTimerTask
org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java
public class RequestTimeoutTimerTask extends TimeoutTimerTask {
private final long requestTimeout;
RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
NettyRequestSender requestSender,
TimeoutsHolder timeoutsHolder,
int requestTimeout) {
super(nettyResponseFuture, requestSender, timeoutsHolder);
this.requestTimeout = requestTimeout;
}
public void run(Timeout timeout) {
if (done.getAndSet(true) || requestSender.isClosed())
return;
// in any case, cancel possible readTimeout sibling
timeoutsHolder.cancel();
if (nettyResponseFuture.isDone())
return;
StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
appendRemoteAddress(sb);
String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
expire(message, age);
}
}RequestTimeoutTimerTask繼承了TimeoutTimerTask,其run方法在done為true或者requestSender為closed則直接返回,對于nettyResponseFuture.isDone()也直接返回,其余的執(zhí)行expire方法
TimeoutsHolder
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
public class TimeoutsHolder {
private final Timeout requestTimeout;
private final AtomicBoolean cancelled = new AtomicBoolean();
private final Timer nettyTimer;
private final NettyRequestSender requestSender;
private final long requestTimeoutMillisTime;
private final int readTimeoutValue;
private volatile Timeout readTimeout;
private volatile NettyResponseFuture<?> nettyResponseFuture;
private volatile InetSocketAddress remoteAddress;
public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
this.nettyTimer = nettyTimer;
this.nettyResponseFuture = nettyResponseFuture;
this.requestSender = requestSender;
this.remoteAddress = originalRemoteAddress;
final Request targetRequest = nettyResponseFuture.getTargetRequest();
final int readTimeoutInMs = targetRequest.getReadTimeout();
this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs;
int requestTimeoutInMs = targetRequest.getRequestTimeout();
if (requestTimeoutInMs == 0) {
requestTimeoutInMs = config.getRequestTimeout();
}
if (requestTimeoutInMs != -1) {
requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
} else {
requestTimeoutMillisTime = -1L;
requestTimeout = null;
}
}
//......
}TimeoutsHolder的構(gòu)造器對于requestTimeoutInMs不為-1的會創(chuàng)建RequestTimeoutTimerTask,然后通過newTimeout進行調(diào)度
scheduleRequestTimeout
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future,
AsyncHandler<T> asyncHandler,
Channel channel) {
try {
asyncHandler.onConnectionPooled(channel);
} catch (Exception e) {
LOGGER.error("onConnectionPooled crashed", e);
abort(channel, future, e);
return future;
}
SocketAddress channelRemoteAddress = channel.remoteAddress();
if (channelRemoteAddress != null) {
// otherwise, bad luck, the channel was closed, see bellow
scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress);
}
future.setChannelState(ChannelState.POOLED);
future.attachChannel(channel, false);
if (LOGGER.isDebugEnabled()) {
HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri());
}
// channelInactive might be called between isChannelValid and writeRequest
// so if we don't store the Future now, channelInactive won't perform
// handleUnexpectedClosedChannel
Channels.setAttribute(channel, future);
if (Channels.isChannelActive(channel)) {
writeRequest(future, channel);
} else {
// bad luck, the channel was closed in-between
// there's a very good chance onClose was already notified but the
// future wasn't already registered
handleUnexpectedClosedChannel(channel, future);
}
return future;
}
private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture,
InetSocketAddress originalRemoteAddress) {
nettyResponseFuture.touch();
TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config,
originalRemoteAddress);
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
}
public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
NettyRequest nettyRequest = future.getNettyRequest();
HttpRequest httpRequest = nettyRequest.getHttpRequest();
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
// if the channel is dead because it was pooled and the remote server decided to
// close it,
// we just let it go and the channelInactive do its work
if (!Channels.isChannelActive(channel))
return;
try {
if (asyncHandler instanceof TransferCompletionHandler) {
configureTransferAdapter(asyncHandler, httpRequest);
}
boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue()
&& httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null;
if (!future.isHeadersAlreadyWrittenOnContinue()) {
try {
asyncHandler.onRequestSend(nettyRequest);
} catch (Exception e) {
LOGGER.error("onRequestSend crashed", e);
abort(channel, future, e);
return;
}
// if the request has a body, we want to track progress
if (writeBody) {
// FIXME does this really work??? the promise is for the request without body!!!
ChannelProgressivePromise promise = channel.newProgressivePromise();
ChannelFuture f = channel.write(httpRequest, promise);
f.addListener(new WriteProgressListener(future, true, 0L));
} else {
// we can just track write completion
ChannelPromise promise = channel.newPromise();
ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
f.addListener(new WriteCompleteListener(future));
}
}
if (writeBody)
nettyRequest.getBody().write(channel, future);
// don't bother scheduling read timeout if channel became invalid
if (Channels.isChannelActive(channel)) {
scheduleReadTimeout(future);
}
} catch (Exception e) {
LOGGER.error("Can't write request", e);
abort(channel, future, e);
}
}NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不為null時會執(zhí)行scheduleRequestTimeout,創(chuàng)建TimeoutsHolder調(diào)度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel還是active的則通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask
小結(jié)
AsyncHttpClient的TimeoutTimerTask聲明實現(xiàn)了netty的TimerTask接口,它定義了expire方法,執(zhí)行requestSender.abort;clean方法來重置done及nettyResponseFuture;它有一個抽象子類為TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask繼承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder來封裝了這些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法會觸發(fā)調(diào)度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通過scheduleReadTimeout(future)調(diào)度ReadTimeoutTimerTask。
可以看到requestTimeoutMillisTime是總的請求時間,它包含了寫入數(shù)據(jù)之后的readTimeoutValue
以上就是AsyncHttpClient的TimeoutTimerTask連接池異步超時的詳細內(nèi)容,更多關(guān)于AsyncHttpClient TimeoutTimerTask的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
String與XML互轉(zhuǎn)以及從XML取節(jié)點值并修改的方法
今天小編就為大家分享一篇String與XML互轉(zhuǎn)以及從XML取節(jié)點值并修改的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07
Java導出多個excel表打包到zip文件中供客戶端另存為窗口下載實現(xiàn)方法
最近的項目有一個導出匯總數(shù)據(jù)的要求,考慮到用戶軟件的差異,所以要分別導出xls以及xlsx并且打包提供下載,下面這篇文章主要給大家介紹了關(guān)于Java導出多個excel表打包到zip文件中供客戶端另存為窗口下載的實現(xiàn)方法,需要的朋友可以參考下2023-12-12
Java8 將一個List<T>轉(zhuǎn)為Map<String,T>的操作
這篇文章主要介紹了Java8 將一個List<T>轉(zhuǎn)為Map<String, T>的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
JAVA 統(tǒng)計字符串中中文,英文,數(shù)字,空格,特殊字符的個數(shù)
這篇文章主要介紹了JAVA 統(tǒng)計字符串中中文,英文,數(shù)字,空格,特殊字符的個數(shù) ,本文通過一段代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-06-06

