elasticsearch節(jié)點間通信的基礎(chǔ)transport啟動過程
前言
在前一篇中我們分析了cluster的一些元素。接下來的章節(jié)會對cluster的運作機(jī)制做詳細(xì)分析。本節(jié)先分析一些transport,它是cluster間通信的基礎(chǔ)。它有兩種實現(xiàn),一種是基于netty實現(xiàn)nettytransport,主要用于節(jié)點間的通信。另一種是localtransport,主要是用于同一個jvm上的節(jié)點通信。因為是同一個jvm上的網(wǎng)絡(luò)模擬,localtransport實現(xiàn)上非常簡單,實際用處也非常有限,這里就不過多說明。這一篇的重點是nettytransport。
transport
transport顧名思義是集群通信的基本通道,無論是集群狀態(tài)信息,還是搜索索引請求信息,都是通過transport傳送。elasticsearch定義了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基礎(chǔ)接口。這里將以transport為主,分析過程中會附帶介紹其它接口。首先看一下transport節(jié)點的定義,如下圖所示:

NettyTransport實現(xiàn)了該接口。分析NettyTransport前簡單說一下Netty的用法,Netty的使用需要三個模塊ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap啟動服務(wù)器,ClientBootStrap啟動客戶端并連接服務(wù)器,MessageHandler是message處理邏輯所在,也就是業(yè)務(wù)邏輯。其它詳細(xì)使用請參考Netty官方文檔。
啟動serverBootStrap
NettyTransport每個在doStart()方法中啟動serverBootStrap,和ClientBootStrap,并綁定ip,代碼如下所示:
protected void doStart() throws ElasticsearchException {
clientBootstrap = createClientBootstrap();//根據(jù)配置啟動客戶端
……//省略了無關(guān)分代碼
createServerBootstrap(name, mergedSettings);//啟動server端
bindServerBootstrap(name, mergedSettings);//綁定ip
}每一個節(jié)點都需要發(fā)送和接收,因此兩者都需要啟動,client和server的啟動分別在相應(yīng)的方法中,啟動過程就是netty的啟動過程,有興趣可以去看相應(yīng)方法。bindServerBootstrap(name, mergedSettings)將本地ip和斷開綁定到netty同時設(shè)定好export host(export host的具體作業(yè)我也看明白也沒有看到相關(guān)的綁定,需要進(jìn)一步研究)。
啟動client及server的過程中將messagehandler注入到channelpipeline中。至此啟動過程完成,但是client并未連接任何server,連接過程是在節(jié)點啟動后,才連接到其它節(jié)點的。
如何連接到node
方法代碼如下所示:
public void connectToNode(DiscoveryNode node, boolean light) {
//transport的模塊必須要啟動
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
//獲取讀鎖,每個節(jié)點可以和多個節(jié)點建立連接,因此這里用讀鎖
globalLock.readLock().lock();
try {
//以node.id為基礎(chǔ)獲取一個鎖,這保證對于每個node只能建立一次連接
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
try {
if (light) {//這里的light,就是對該節(jié)點只獲取一個channel,所有類型(5種連接類型下面會說到)都使用者一個channel
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Throwable e) {
logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
nodeChannels.close();
throw e;
}
}
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
}
}如果不是輕連接,每個server和clien之間都有5中連接,著5中連接承擔(dān)著不同的任務(wù)
連接方法的代碼
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
//五種連接方式,不同的連接方式對應(yīng)不同的集群操作
ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
//嘗試建立連接
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectState.length; i++) {
connectState[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}
//獲取每個連接的channel存入到相應(yīng)的channels中便于后面使用。
try {
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectRecovery[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
}
nodeChannels.recovery[i] = connectRecovery[i].getChannel();
nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectBulk[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
}
nodeChannels.bulk[i] = connectBulk[i].getChannel();
nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectReg[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
}
nodeChannels.reg[i] = connectReg[i].getChannel();
nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectState.length; i++) {
connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectState[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
}
nodeChannels.state[i] = connectState[i].getChannel();
nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectPing[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
}
nodeChannels.ping[i] = connectPing[i].getChannel();
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
if (nodeChannels.recovery.length == 0) {
if (nodeChannels.bulk.length > 0) {
nodeChannels.recovery = nodeChannels.bulk;
} else {
nodeChannels.recovery = nodeChannels.reg;
}
}
if (nodeChannels.bulk.length == 0) {
nodeChannels.bulk = nodeChannels.reg;
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) {
try {
future.getChannel().close();
} catch (Exception e1) {
// ignore
}
}
}
throw e;
}
}以上就是節(jié)點建立連接的過程,每一對client和server間都會建立一定數(shù)量的不同連接。之所以要區(qū)分連接,是因為不同的操作消耗的資源不同,請求的頻率也不同。對于資源消耗少請求頻率高的如ping,可以建立多一些連接,來確保并發(fā)。對于消耗資源多如bulk操作,則要少建立一些連接,保證機(jī)器不被拖垮。節(jié)點的斷開,這是講相應(yīng)的channel釋放的過程。這里就不再做詳細(xì)說明,可以參考相關(guān)源碼。
總結(jié)
nettytransport的連接過程,啟動過程分別啟動client和server,同時將對于的messagehandler注入,啟動多次就是netty的啟動過程。然后綁定server ip和斷開。但是這里并沒有連接,連接發(fā)送在節(jié)點啟動時,節(jié)點啟動會獲取cluster信息,分別對集群中的節(jié)點建立上述的5種連接。
這就是NettyTransport的啟動和連接過程。transport還有一個很重要的功能就是發(fā)送request,及如何處理request,這些功能會在下一篇中分析,希望大家以后多多支持腳本之家!
相關(guān)文章
Java中文件創(chuàng)建于寫入內(nèi)容的常見方法
在日常開發(fā)中,肯定離不開要和文件打交道,今天就簡單羅列一下平時比較常用的創(chuàng)建文件并向文件中寫入數(shù)據(jù)的幾種方式,希望對大家有一定的幫助2023-10-10
java基礎(chǔ)之?dāng)?shù)組常用操作總結(jié)(必看篇)
下面小編就為大家?guī)硪黄猨ava基礎(chǔ)之?dāng)?shù)組常用操作總結(jié)(必看篇)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06
Java調(diào)用elasticsearch本地代碼的操作方法
這篇文章主要介紹了Java調(diào)用elasticsearch本地代碼的操作方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04

