AsyncHttpClient的ConnectionSemaphore方法源碼流程解讀
序
本文主要研究一下AsyncHttpClient的ConnectionSemaphore
ConnectionSemaphore
org/asynchttpclient/netty/channel/ConnectionSemaphore.java
/**
* Max connections and max-per-host connections limiter.
*
* @author Stepan Koltsov
*/
public class ConnectionSemaphore {
private final NonBlockingSemaphoreLike freeChannels;
private final int maxConnectionsPerHost;
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
private final IOException tooManyConnections;
private final IOException tooManyConnectionsPerHost;
private ConnectionSemaphore(AsyncHttpClientConfig config) {
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ConnectionSemaphore.class, "acquireChannelLock");
tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ConnectionSemaphore.class, "acquireChannelLock");
int maxTotalConnections = config.getMaxConnections();
maxConnectionsPerHost = config.getMaxConnectionsPerHost();
freeChannels = maxTotalConnections > 0 ?
new NonBlockingSemaphore(config.getMaxConnections()) :
NonBlockingSemaphoreInfinite.INSTANCE;
}
public static ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
return config.getMaxConnections() > 0 || config.getMaxConnectionsPerHost() > 0 ? new ConnectionSemaphore(config) : null;
}
private boolean tryAcquireGlobal() {
return freeChannels.tryAcquire();
}
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
return maxConnectionsPerHost > 0 ?
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) :
NonBlockingSemaphoreInfinite.INSTANCE;
}
private boolean tryAcquirePerHost(Object partitionKey) {
return getFreeConnectionsForHost(partitionKey).tryAcquire();
}
public void acquireChannelLock(Object partitionKey) throws IOException {
if (!tryAcquireGlobal())
throw tooManyConnections;
if (!tryAcquirePerHost(partitionKey)) {
freeChannels.release();
throw tooManyConnectionsPerHost;
}
}
public void releaseChannelLock(Object partitionKey) {
freeChannels.release();
getFreeConnectionsForHost(partitionKey).release();
}
}ConnectionSemaphore主要用于控制連接的maxConnections及maxConnectionsPerHost;它定義了freeChannels表示可用連接的信號(hào)量,定義了freeChannelsPerHost維護(hù)每個(gè)host的可用連接新用量,類型是NonBlockingSemaphoreLike;它提供了tryAcquireGlobal用于獲取全局的空閑連接,tryAcquirePerHost用于獲取指定host的空閑連接;acquireChannelLock先獲取全局空閑連接,獲取不到拋出TooManyConnectionsException,再獲取指定host的空閑連接,獲取不到則釋放全局空閑連接,拋出TooManyConnectionsPerHostException;releaseChannelLock則先釋放全局空閑連接,再釋放指定host的空閑連接
NonBlockingSemaphoreLike
org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java
/**
* Non-blocking semaphore API.
*
* @author Stepan Koltsov
*/
interface NonBlockingSemaphoreLike {
void release();
boolean tryAcquire();
}NonBlockingSemaphoreLike接口定義了release、tryAcquire方法,它有兩個(gè)實(shí)現(xiàn)類,分別是NonBlockingSemaphore、NonBlockingSemaphoreInfinite
NonBlockingSemaphore
org/asynchttpclient/netty/channel/NonBlockingSemaphore.java
class NonBlockingSemaphore implements NonBlockingSemaphoreLike {
private final AtomicInteger permits;
NonBlockingSemaphore(int permits) {
this.permits = new AtomicInteger(permits);
}
@Override
public void release() {
permits.incrementAndGet();
}
@Override
public boolean tryAcquire() {
for (; ; ) {
int count = permits.get();
if (count <= 0) {
return false;
}
if (permits.compareAndSet(count, count - 1)) {
return true;
}
}
}
@Override
public String toString() {
// mimic toString of Semaphore class
return super.toString() + "[Permits = " + permits + "]";
}
}NonBlockingSemaphore內(nèi)部使用AtomicInteger來(lái)進(jìn)行控制,permits表示可用的數(shù)量,release方法則遞增permits,tryAcquire則循環(huán)執(zhí)行先判斷permits是否大于0,否則返回false,若permits.compareAndSet(count, count - 1)成功則返回true,否則繼續(xù)循環(huán)執(zhí)行直到返回false或者true
NonBlockingSemaphoreInfinite
org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java
enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike {
INSTANCE;
@Override
public void release() {
}
@Override
public boolean tryAcquire() {
return true;
}
@Override
public String toString() {
return NonBlockingSemaphore.class.getName();
}
}NonBlockingSemaphoreInfinite表示無(wú)限的信號(hào)量,release為空操作,tryAcquire始終返回true
NettyResponseFuture
org/asynchttpclient/netty/NettyResponseFuture.java
/**
* A {@link Future} that can be used to track when an asynchronous HTTP request
* has been fully processed.
*
* @param <V> the result type
*/
public final class NettyResponseFuture<V> implements ListenableFuture<V> {
//......
public void acquirePartitionLockLazily() throws IOException {
if (connectionSemaphore == null || partitionKeyLock != null) {
return;
}
Object partitionKey = getPartitionKey();
connectionSemaphore.acquireChannelLock(partitionKey);
Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey);
if (prevKey != null) {
// self-check
connectionSemaphore.releaseChannelLock(prevKey);
releasePartitionKeyLock();
throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report.");
}
if (isDone()) {
// may be cancelled while we acquired a lock
releasePartitionKeyLock();
}
}
public boolean cancel(boolean force) {
releasePartitionKeyLock();
cancelTimeouts();
if (IS_CANCELLED_FIELD.getAndSet(this, 1) != 0)
return false;
// cancel could happen before channel was attached
if (channel != null) {
Channels.setDiscard(channel);
Channels.silentlyCloseChannel(channel);
}
if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
try {
asyncHandler.onThrowable(new CancellationException());
} catch (Throwable t) {
LOGGER.warn("cancel", t);
}
}
future.cancel(false);
return true;
}
private boolean terminateAndExit() {
releasePartitionKeyLock();
cancelTimeouts();
this.channel = null;
this.reuseChannel = false;
return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
}
private void releasePartitionKeyLock() {
if (connectionSemaphore == null) {
return;
}
Object partitionKey = takePartitionKeyLock();
if (partitionKey != null) {
connectionSemaphore.releaseChannelLock(partitionKey);
}
}
}NettyResponseFuture提供了acquirePartitionLockLazily方法,它會(huì)通過(guò)connectionSemaphore.acquireChannelLock(partitionKey)來(lái)獲取連接信號(hào)量;cancel和terminateAndExit都會(huì)執(zhí)行releasePartitionKeyLock,它會(huì)調(diào)用connectionSemaphore.releaseChannelLock(partitionKey)
NettyRequestSender
org/asynchttpclient/netty/request/NettyRequestSender.java
public final class NettyRequestSender {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class);
private final AsyncHttpClientConfig config;
private final ChannelManager channelManager;
private final ConnectionSemaphore connectionSemaphore;
private final Timer nettyTimer;
private final AsyncHttpClientState clientState;
private final NettyRequestFactory requestFactory;
public NettyRequestSender(AsyncHttpClientConfig config,
ChannelManager channelManager,
Timer nettyTimer,
AsyncHttpClientState clientState) {
this.config = config;
this.channelManager = channelManager;
this.connectionSemaphore = ConnectionSemaphore.newConnectionSemaphore(config);
this.nettyTimer = nettyTimer;
this.clientState = clientState;
requestFactory = new NettyRequestFactory(config);
}
public <T> ListenableFuture<T> sendRequest(final Request request,
final AsyncHandler<T> asyncHandler,
NettyResponseFuture<T> future) {
if (isClosed()) {
throw new IllegalStateException("Closed");
}
validateWebSocketRequest(request, asyncHandler);
ProxyServer proxyServer = getProxyServer(config, request);
// WebSockets use connect tunneling to work with proxies
if (proxyServer != null //
&& (request.getUri().isSecured() || request.getUri().isWebSocket()) //
&& !isConnectDone(request, future) //
&& proxyServer.getProxyType().isHttp()) {
// Proxy with HTTPS or WebSocket: CONNECT for sure
if (future != null && future.isConnectAllowed()) {
// Perform CONNECT
return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, true);
} else {
// CONNECT will depend if we can pool or connection or if we have to open a new
// one
return sendRequestThroughSslProxy(request, asyncHandler, future, proxyServer);
}
} else {
// no CONNECT for sure
return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, false);
}
}
//......
}NettyRequestSender的構(gòu)造器會(huì)根據(jù)配置創(chuàng)建ConnectionSemaphore,其sendRequest方法內(nèi)部調(diào)用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy
sendRequestWithCertainForceConnect
/**
* We know for sure if we have to force to connect or not, so we can build the
* HttpRequest right away This reduces the probability of having a pooled
* channel closed by the server by the time we build the request
*/
private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
AsyncHandler<T> asyncHandler,
NettyResponseFuture<T> future,
ProxyServer proxyServer,
boolean performConnectRequest) {
NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
performConnectRequest);
Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
return Channels.isChannelActive(channel)
? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
: sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
}sendRequestWithCertainForceConnect方法先通過(guò)getOpenChannel獲取channel,然后執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel
sendRequestThroughSslProxy
private <T> ListenableFuture<T> sendRequestThroughSslProxy(Request request,
AsyncHandler<T> asyncHandler,
NettyResponseFuture<T> future,
ProxyServer proxyServer) {
NettyResponseFuture<T> newFuture = null;
for (int i = 0; i < 3; i++) {
Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
if (channel == null) {
// pool is empty
break;
}
if (newFuture == null) {
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, false);
}
if (Channels.isChannelActive(channel)) {
// if the channel is still active, we can use it,
// otherwise, channel was closed by the time we computed the request, try again
return sendRequestWithOpenChannel(newFuture, asyncHandler, channel);
}
}
// couldn't poll an active channel
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, true);
return sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
}sendRequestThroughSslProxy也是先通過(guò)getOpenChannel獲取channel,然后執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel
sendRequestWithNewChannel
private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request,
ProxyServer proxy,
NettyResponseFuture<T> future,
AsyncHandler<T> asyncHandler) {
// some headers are only set when performing the first request
HttpHeaders headers = future.getNettyRequest().getHttpRequest().headers();
Realm realm = future.getRealm();
Realm proxyRealm = future.getProxyRealm();
requestFactory.addAuthorizationHeader(headers, perConnectionAuthorizationHeader(request, proxy, realm));
requestFactory.setProxyAuthorizationHeader(headers, perConnectionProxyAuthorizationHeader(request, proxyRealm));
future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM);
future.setInProxyAuth(
proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM);
try {
if (!channelManager.isOpen()) {
throw PoolAlreadyClosedException.INSTANCE;
}
// Do not throw an exception when we need an extra connection for a
// redirect.
future.acquirePartitionLockLazily();
} catch (Throwable t) {
abort(null, future, getCause(t));
// exit and don't try to resolve address
return future;
}
resolveAddresses(request, proxy, future, asyncHandler)
.addListener(new SimpleFutureListener<List<InetSocketAddress>>() {
@Override
protected void onSuccess(List<InetSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future,
NettyRequestSender.this, channelManager, connectionSemaphore);
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(),
addresses, asyncHandler, clientState);
if (!future.isDone()) {
// Do not throw an exception when we need an extra connection for a redirect
// FIXME why? This violate the max connection per host handling, right?
channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy)
.addListener((Future<Bootstrap> whenBootstrap) -> {
if (whenBootstrap.isSuccess()) {
connector.connect(whenBootstrap.get(), connectListener);
} else {
abort(null, future, whenBootstrap.cause());
}
});
}
}
@Override
protected void onFailure(Throwable cause) {
abort(null, future, getCause(cause));
}
});
return future;
}sendRequestWithNewChannel方法會(huì)執(zhí)行future.acquirePartitionLockLazily()來(lái)判斷連接是否超出限制,而sendRequestWithOpenChannel方法則沒(méi)有這一層判斷
小結(jié)
- AsyncHttpClient通過(guò)ConnectionSemaphore來(lái)控制連接的maxConnections及maxConnectionsPerHost
- NonBlockingSemaphore內(nèi)部使用AtomicInteger來(lái)進(jìn)行控制,permits表示可用的數(shù)量,release方法則遞增permits,tryAcquire則循環(huán)執(zhí)行先判斷permits是否大于0,否則返回false,若permits.compareAndSet(count, count - 1)成功則返回true,否則繼續(xù)循環(huán)執(zhí)行直到返回false或者true
- NettyResponseFuture提供了acquirePartitionLockLazily方法,它會(huì)通過(guò)connectionSemaphore.acquireChannelLock(partitionKey)來(lái)獲取連接信號(hào)量;cancel和terminateAndExit都會(huì)執(zhí)行releasePartitionKeyLock,它會(huì)調(diào)用connectionSemaphore.releaseChannelLock(partitionKey)
- NettyRequestSender的構(gòu)造器會(huì)根據(jù)配置創(chuàng)建ConnectionSemaphore,其sendRequest方法內(nèi)部調(diào)用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy,它們都是先通過(guò)getOpenChannel獲取channel,然后根據(jù)channel是否active來(lái)執(zhí)行sendRequestWithOpenChannel或者sendRequestWithNewChannel;sendRequestWithNewChannel方法會(huì)執(zhí)行future.acquirePartitionLockLazily()來(lái)判斷連接是否超出限制,而sendRequestWithOpenChannel方法則沒(méi)有這一層判斷
綜上,AsyncHttpClient有定義了ChannelPool,不過(guò)其連接數(shù)的控制不是在ChannelPool里頭,而是通過(guò)ConnectionSemaphore來(lái)控制連接的maxConnections及maxConnectionsPerHost來(lái)執(zhí)行,它主要是在每次sendRequestWithNewChannel的時(shí)候進(jìn)行控制,先執(zhí)行future.acquirePartitionLockLazily()獲取允許,再進(jìn)行connect建立連接。
以上就是聊聊AsyncHttpClient的ConnectionSemaphore的詳細(xì)內(nèi)容,更多關(guān)于AsyncHttpClient的ConnectionSemaphore的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- AsyncHttpClient的TimeoutTimerTask連接池異步超時(shí)
- AsyncHttpClient?RequestFilter請(qǐng)求篩選源碼解讀
- AsyncHttpClient IOExceptionFilter異常過(guò)濾器
- AsyncHttpClient KeepAliveStrategy源碼流程解讀
- AsyncHttpClient exception異常源碼流程解析
- AsyncHttpClient?ChannelPool線程池頻道池源碼流程解析
- AsyncHttpClient的默認(rèn)配置源碼流程解讀
- AsyncHttpClient?ClientStats源碼流程解讀
相關(guān)文章
Java這個(gè)名字的來(lái)歷與優(yōu)勢(shì)
Java是Sun公司開(kāi)發(fā)的一種編程語(yǔ)言,Sun公司最初的方向是讓Java來(lái)開(kāi)發(fā)一些電器裝置程序,Java名字的由來(lái),實(shí)際上是一個(gè)有趣的故事。2014-10-10
Java實(shí)現(xiàn)短信驗(yàn)證碼的示例代碼
本文主要介紹了Java實(shí)現(xiàn)短信驗(yàn)證碼的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
Java實(shí)現(xiàn)從jar包中讀取指定文件的方法
這篇文章主要介紹了Java實(shí)現(xiàn)從jar包中讀取指定文件的方法,涉及java針對(duì)jar文件的讀取及查找相關(guān)操作技巧,需要的朋友可以參考下2017-08-08
spring-cloud入門之spring-cloud-config(配置中心)
這篇文章主要介紹了spring-cloud入門之spring-cloud-config(配置中心),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-01-01
springBoot解決static和@Component遇到的bug
這篇文章主要介紹了springBoot解決static和@Component遇到的bug,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
《阿里巴巴 Java開(kāi)發(fā)手冊(cè)》讀后感小結(jié)
這篇文章主要介紹了《阿里巴巴 Java開(kāi)發(fā)手冊(cè)》讀后感小結(jié),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12
java實(shí)現(xiàn)excel導(dǎo)出合并單元格的步驟詳解
這篇文章主要介紹了java實(shí)現(xiàn)excel導(dǎo)出合并單元格,通過(guò)使用Apache POI庫(kù),我們可以方便地創(chuàng)建Excel文件、填充數(shù)據(jù)、合并單元格和導(dǎo)出Excel文件,需要的朋友可以參考下2023-04-04

