欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

elasticsearch節(jié)點(diǎn)間通信的基礎(chǔ)transport啟動(dòng)過(guò)程

 更新時(shí)間:2022年04月21日 15:39:50   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch節(jié)點(diǎn)間通信的基礎(chǔ)transport啟動(dòng)過(guò)程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

在前一篇中我們分析了cluster的一些元素。接下來(lái)的章節(jié)會(huì)對(duì)cluster的運(yùn)作機(jī)制做詳細(xì)分析。本節(jié)先分析一些transport,它是cluster間通信的基礎(chǔ)。它有兩種實(shí)現(xiàn),一種是基于netty實(shí)現(xiàn)nettytransport,主要用于節(jié)點(diǎn)間的通信。另一種是localtransport,主要是用于同一個(gè)jvm上的節(jié)點(diǎn)通信。因?yàn)槭峭粋€(gè)jvm上的網(wǎng)絡(luò)模擬,localtransport實(shí)現(xiàn)上非常簡(jiǎn)單,實(shí)際用處也非常有限,這里就不過(guò)多說(shuō)明。這一篇的重點(diǎn)是nettytransport。

transport

transport顧名思義是集群通信的基本通道,無(wú)論是集群狀態(tài)信息,還是搜索索引請(qǐng)求信息,都是通過(guò)transport傳送。elasticsearch定義了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基礎(chǔ)接口。這里將以transport為主,分析過(guò)程中會(huì)附帶介紹其它接口。首先看一下transport節(jié)點(diǎn)的定義,如下圖所示:

NettyTransport實(shí)現(xiàn)了該接口。分析NettyTransport前簡(jiǎn)單說(shuō)一下Netty的用法,Netty的使用需要三個(gè)模塊ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap啟動(dòng)服務(wù)器,ClientBootStrap啟動(dòng)客戶端并連接服務(wù)器,MessageHandler是message處理邏輯所在,也就是業(yè)務(wù)邏輯。其它詳細(xì)使用請(qǐng)參考Netty官方文檔。

啟動(dòng)serverBootStrap

NettyTransport每個(gè)在doStart()方法中啟動(dòng)serverBootStrap,和ClientBootStrap,并綁定ip,代碼如下所示:

protected void doStart() throws ElasticsearchException {
       clientBootstrap = createClientBootstrap();//根據(jù)配置啟動(dòng)客戶端
       ……//省略了無(wú)關(guān)分代碼
    createServerBootstrap(name, mergedSettings);//啟動(dòng)server端
       bindServerBootstrap(name, mergedSettings);//綁定ip
        }

每一個(gè)節(jié)點(diǎn)都需要發(fā)送和接收,因此兩者都需要啟動(dòng),client和server的啟動(dòng)分別在相應(yīng)的方法中,啟動(dòng)過(guò)程就是netty的啟動(dòng)過(guò)程,有興趣可以去看相應(yīng)方法。bindServerBootstrap(name, mergedSettings)將本地ip和斷開(kāi)綁定到netty同時(shí)設(shè)定好export host(export host的具體作業(yè)我也看明白也沒(méi)有看到相關(guān)的綁定,需要進(jìn)一步研究)。

啟動(dòng)client及server的過(guò)程中將messagehandler注入到channelpipeline中。至此啟動(dòng)過(guò)程完成,但是client并未連接任何server,連接過(guò)程是在節(jié)點(diǎn)啟動(dòng)后,才連接到其它節(jié)點(diǎn)的。

如何連接到node

方法代碼如下所示:

public void connectToNode(DiscoveryNode node, boolean light) {
     //transport的模塊必須要啟動(dòng)
        if (!lifecycle.started()) {
            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
        }
     //獲取讀鎖,每個(gè)節(jié)點(diǎn)可以和多個(gè)節(jié)點(diǎn)建立連接,因此這里用讀鎖
        globalLock.readLock().lock();
        try {
        //以node.id為基礎(chǔ)獲取一個(gè)鎖,這保證對(duì)于每個(gè)node只能建立一次連接
            connectionLock.acquire(node.id());
            try {
                if (!lifecycle.started()) {
                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
                }
                NodeChannels nodeChannels = connectedNodes.get(node);
                if (nodeChannels != null) {
                    return;
                }
                try {
                    if (light) {//這里的light,就是對(duì)該節(jié)點(diǎn)只獲取一個(gè)channel,所有類型(5種連接類型下面會(huì)說(shuō)到)都使用者一個(gè)channel
                        nodeChannels = connectToChannelsLight(node);
                    } else {
                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
                        try {
                            connectToChannels(nodeChannels, node);
                        } catch (Throwable e) {
                            logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
                            nodeChannels.close();
                            throw e;
                        }
                    }
                    // we acquire a connection lock, so no way there is an existing connection
                    connectedNodes.put(node, nodeChannels);
                    if (logger.isDebugEnabled()) {
                        logger.debug("connected to node [{}]", node);
                    }
                    transportServiceAdapter.raiseNodeConnected(node);
                } catch (ConnectTransportException e) {
                    throw e;
                } catch (Exception e) {
                    throw new ConnectTransportException(node, "general node connection failure", e);
                }
            } finally {
                connectionLock.release(node.id());
            }
        } finally {
            globalLock.readLock().unlock();
        }
    }

如果不是輕連接,每個(gè)server和clien之間都有5中連接,著5中連接承擔(dān)著不同的任務(wù)

連接方法的代碼

protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
    //五種連接方式,不同的連接方式對(duì)應(yīng)不同的集群操作
        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    //嘗試建立連接
        for (int i = 0; i < connectRecovery.length; i++) {
            connectRecovery[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectBulk.length; i++) {
            connectBulk[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectReg.length; i++) {
            connectReg[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectState.length; i++) {
            connectState[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectPing.length; i++) {
            connectPing[i] = clientBootstrap.connect(address);
        }
    //獲取每個(gè)連接的channel存入到相應(yīng)的channels中便于后面使用。
        try {
            for (int i = 0; i < connectRecovery.length; i++) {
                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectRecovery[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
                }
                nodeChannels.recovery[i] = connectRecovery[i].getChannel();
                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectBulk.length; i++) {
                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectBulk[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
                }
                nodeChannels.bulk[i] = connectBulk[i].getChannel();
                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectReg.length; i++) {
                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectReg[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
                }
                nodeChannels.reg[i] = connectReg[i].getChannel();
                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectState.length; i++) {
                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectState[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
                }
                nodeChannels.state[i] = connectState[i].getChannel();
                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectPing.length; i++) {
                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectPing[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
                }
                nodeChannels.ping[i] = connectPing[i].getChannel();
                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            if (nodeChannels.recovery.length == 0) {
                if (nodeChannels.bulk.length > 0) {
                    nodeChannels.recovery = nodeChannels.bulk;
                } else {
                    nodeChannels.recovery = nodeChannels.reg;
                }
            }
            if (nodeChannels.bulk.length == 0) {
                nodeChannels.bulk = nodeChannels.reg;
            }
        } catch (RuntimeException e) {
            // clean the futures
            for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
                future.cancel();
                if (future.getChannel() != null && future.getChannel().isOpen()) {
                    try {
                        future.getChannel().close();
                    } catch (Exception e1) {
                        // ignore
                    }
                }
            }
            throw e;
        }
    }

以上就是節(jié)點(diǎn)建立連接的過(guò)程,每一對(duì)client和server間都會(huì)建立一定數(shù)量的不同連接。之所以要區(qū)分連接,是因?yàn)椴煌牟僮飨牡馁Y源不同,請(qǐng)求的頻率也不同。對(duì)于資源消耗少請(qǐng)求頻率高的如ping,可以建立多一些連接,來(lái)確保并發(fā)。對(duì)于消耗資源多如bulk操作,則要少建立一些連接,保證機(jī)器不被拖垮。節(jié)點(diǎn)的斷開(kāi),這是講相應(yīng)的channel釋放的過(guò)程。這里就不再做詳細(xì)說(shuō)明,可以參考相關(guān)源碼。

總結(jié)

nettytransport的連接過(guò)程,啟動(dòng)過(guò)程分別啟動(dòng)client和server,同時(shí)將對(duì)于的messagehandler注入,啟動(dòng)多次就是netty的啟動(dòng)過(guò)程。然后綁定server ip和斷開(kāi)。但是這里并沒(méi)有連接,連接發(fā)送在節(jié)點(diǎn)啟動(dòng)時(shí),節(jié)點(diǎn)啟動(dòng)會(huì)獲取cluster信息,分別對(duì)集群中的節(jié)點(diǎn)建立上述的5種連接。

這就是NettyTransport的啟動(dòng)和連接過(guò)程。transport還有一個(gè)很重要的功能就是發(fā)送request,及如何處理request,這些功能會(huì)在下一篇中分析,希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java利用反射自動(dòng)封裝成實(shí)體對(duì)象的方法

    Java利用反射自動(dòng)封裝成實(shí)體對(duì)象的方法

    這篇文章主要介紹了Java利用反射自動(dòng)封裝成實(shí)體對(duì)象的方法,可實(shí)現(xiàn)自動(dòng)封裝成bean對(duì)象功能,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-01-01
  • Java 實(shí)現(xiàn)隨機(jī)驗(yàn)證碼功能簡(jiǎn)單實(shí)例

    Java 實(shí)現(xiàn)隨機(jī)驗(yàn)證碼功能簡(jiǎn)單實(shí)例

    這篇文章主要介紹了Java 實(shí)現(xiàn)隨機(jī)驗(yàn)證碼功能簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-04-04
  • FastJSON字段智能匹配踩坑的解決

    FastJSON字段智能匹配踩坑的解決

    這篇文章主要介紹了FastJSON字段智能匹配踩坑的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 指定jdk啟動(dòng)jar包的方法總結(jié)

    指定jdk啟動(dòng)jar包的方法總結(jié)

    這篇文章主要給大家總結(jié)介紹了關(guān)于指定jdk啟動(dòng)jar包的方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2023-07-07
  • Java中WeakHashMap的回收問(wèn)題詳解

    Java中WeakHashMap的回收問(wèn)題詳解

    這篇文章主要介紹了Java中WeakHashMap的回收問(wèn)題詳解,WeakHashMap弱鍵大致上是通過(guò)WeakReference和ReferenceQueue實(shí)現(xiàn),WeakHashMap的key是"弱鍵",即是WeakReference類型的,ReferenceQueue是一個(gè)隊(duì)列,它會(huì)保存被GC回收的"弱鍵",需要的朋友可以參考下
    2023-09-09
  • Java中文件創(chuàng)建于寫(xiě)入內(nèi)容的常見(jiàn)方法

    Java中文件創(chuàng)建于寫(xiě)入內(nèi)容的常見(jiàn)方法

    在日常開(kāi)發(fā)中,肯定離不開(kāi)要和文件打交道,今天就簡(jiǎn)單羅列一下平時(shí)比較常用的創(chuàng)建文件并向文件中寫(xiě)入數(shù)據(jù)的幾種方式,希望對(duì)大家有一定的幫助
    2023-10-10
  • java基礎(chǔ)之?dāng)?shù)組常用操作總結(jié)(必看篇)

    java基礎(chǔ)之?dāng)?shù)組常用操作總結(jié)(必看篇)

    下面小編就為大家?guī)?lái)一篇java基礎(chǔ)之?dāng)?shù)組常用操作總結(jié)(必看篇)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-06-06
  • Java實(shí)現(xiàn)三子棋小游戲

    Java實(shí)現(xiàn)三子棋小游戲

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)三子棋小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Java調(diào)用elasticsearch本地代碼的操作方法

    Java調(diào)用elasticsearch本地代碼的操作方法

    這篇文章主要介紹了Java調(diào)用elasticsearch本地代碼的操作方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • idea推送項(xiàng)目到gitee中的創(chuàng)建方法

    idea推送項(xiàng)目到gitee中的創(chuàng)建方法

    這篇文章主要介紹了idea推送項(xiàng)目到gitee中的創(chuàng)建方法,本文通過(guò)圖文實(shí)例相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08

最新評(píng)論