欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析

 更新時間:2022年04月21日 16:26:43   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

 zenDiscovery實(shí)現(xiàn)機(jī)制

ping是集群發(fā)現(xiàn)的基本手段,通過在網(wǎng)絡(luò)上廣播或者指定ping某些節(jié)點(diǎn)獲取集群信息,從而可以找到集群的master加入集群。zenDiscovery實(shí)現(xiàn)了兩種ping機(jī)制:廣播與單播。本篇將詳細(xì)分析一些這MulticastZenPing機(jī)制的實(shí)現(xiàn)為后面的集群發(fā)現(xiàn)和master選舉做好鋪墊。

廣播的過程

首先看一下廣播(MulticastZenPing),廣播的原理很簡單,節(jié)點(diǎn)啟動后向網(wǎng)絡(luò)發(fā)送廣播信息,任何收到的節(jié)點(diǎn)只要集群名字相同都應(yīng)該對此廣播信息作出回應(yīng)。這樣該節(jié)點(diǎn)就獲取了集群的相關(guān)信息。它定義了一個action:"internal:discovery/zen/multicast"和廣播的信息頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通信的基礎(chǔ),但是廣播卻沒有使它。它使用了java的MulticastSocket。這里簡單的介紹一下MulticastSocket的使用。它是一個UDP 機(jī)制的socket,用來進(jìn)行多個數(shù)據(jù)包的廣播。它可以幫到一個ip形成一個group,任何MulticastSocket都可以join進(jìn)來,組內(nèi)的socket發(fā)送的信息會被訂閱了改組的所有機(jī)器接收到。elasticsearch對其進(jìn)行了封裝形成了MulticastChannel,有興趣可以參考相關(guān)源碼。 

首先看一下MulticastZenPing的幾個輔助內(nèi)部類:

它總共定義了4個內(nèi)部類,這些內(nèi)部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應(yīng)都用它來存儲。MulticastPingResponseRequestHandler它是response處理類,類似于之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個messageReceived的方法,當(dāng)收到請求時直接返回一個response。

MulticastPingResponse就不用細(xì)說了,它就是一個響應(yīng)類。最后要著重說一下Receiver類,因?yàn)閺V播并不是使用NettyTransport,因此對于消息處理邏輯都在Receiver中。在初始化MulticastZenPing時會將receiver注冊進(jìn)去。

protected void doStart() throws ElasticsearchException {
        try {
            ....
            multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                    new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                    new Receiver());//將receiver注冊到channel中
        } catch (Throwable t) {
          ....
        }
    }

Receiver類基礎(chǔ)了Listener,實(shí)現(xiàn)了3個方法,消息經(jīng)過onMessage方法區(qū)分,如果是內(nèi)部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區(qū)分方法很簡單,就是讀取信息都看它是否符合所定義的INTERNAL_HEADER 信息頭。

nodeping處理代碼

private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
           ....
            final DiscoveryNodes discoveryNodes = contextProvider.nodes();
            final DiscoveryNode requestingNode = requestingNodeX;
            if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                // 自身發(fā)出的ping,忽略
                return;
            }
        //只接受本集群ping
            if (!requestClusterName.equals(clusterName)) {
            ...return;
            }
            // 兩個client間不需要ping
            if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
            }
        //新建一個response
            final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
            multicastPingResponse.id = id;
            multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
        //無法連接的情況
            if (!transportService.nodeConnected(requestingNode)) {
                // do the connect and send on a thread pool
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        // connect to the node if possible
                        try {
                            transportService.connectToNode(requestingNode);
                            transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                @Override
                                public void handleException(TransportException exp) {
                                    logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                }
                            });
                        } catch (Exception e) {
                            if (lifecycle.started()) {
                                logger.warn("failed to connect to requesting node {}", e, requestingNode);
                            }
                        }
                    }
                });
            } else {
                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                    @Override
                    public void handleException(TransportException exp) {
                        if (lifecycle.started()) {
                            logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                        }
                    }
                });
            }
        }
    }

另外的一個方法是處理外部ping信息,處理過程是返回cluster的信息(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應(yīng)MulticastZenPing的過程,收到其它節(jié)點(diǎn)的響應(yīng)信息后它會把本節(jié)點(diǎn)及集群的master節(jié)點(diǎn)相關(guān)信息返回給廣播節(jié)點(diǎn)。這樣廣播節(jié)點(diǎn)就獲知了集群的相關(guān)信息。在MulticastZenPing類中還有一個類 MulticastPingResponseRequestHandler,它的作用是廣播節(jié)點(diǎn)對其它節(jié)點(diǎn)對廣播信息響應(yīng)的回應(yīng),廣播節(jié)點(diǎn)的第二次發(fā)送信息的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動時注冊到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。

ping請求的發(fā)送策略

代碼如下:

public void ping(final PingListener listener, final TimeValue timeout) {
       ....
    //產(chǎn)生一個id
        final int id = pingIdGenerator.incrementAndGet();
        try {
            receivedResponses.put(id, new PingCollection());
            sendPingRequest(id);//第一次發(fā)送ping請求
            // 等待時間的1/2后再次發(fā)送一個請求
            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                @Override
                public void onFailure(Throwable t) {
                    logger.warn("[{}] failed to send second ping request", t, id);
                    finalizePingCycle(id, listener);
                }
                @Override
                public void doRun() {
                    sendPingRequest(id);
            //再過1/2時間再次發(fā)送一個請求
                    threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                        @Override
                        public void onFailure(Throwable t) {
                            logger.warn("[{}] failed to send third ping request", t, id);
                            finalizePingCycle(id, listener);
                        }
                        @Override
                        public void doRun() {
                            // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                            PingCollection collection = receivedResponses.get(id);
                            FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                            receivedResponses.put(id, finalizingPingCollection);
                            logger.trace("[{}] sending last pings", id);
                            sendPingRequest(id);
                //最后一次發(fā)送請求,超時的1/4后
                            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                @Override
                                public void onFailure(Throwable t) {
                                    logger.warn("[{}] failed to finalize ping", t, id);
                                }
                                @Override
                                protected void doRun() throws Exception {
                                    finalizePingCycle(id, listener);
                                }
                            });
                        }
                    });
                }
            });
        } catch (Exception e) {
            logger.warn("failed to ping", e);
            finalizePingCycle(id, listener);
        }
    }

發(fā)送過程主要是調(diào)用sendPingRequest(id)方法,在該方法中會將id,信息頭,版本,本地節(jié)點(diǎn)信息一起寫入到BytesStreamOutput中然后將其進(jìn)行廣播,這個廣播信息會被其它機(jī)器上的Receiver接收并處理,并且響應(yīng)該ping請求。另外一個需要關(guān)注的是以上加說明的部分,它通過鏈時的定期發(fā)送請求,在等待時間內(nèi)可能會發(fā)出4次請求,這種發(fā)送方式會造成大量的ping請求重復(fù),幸好ping的資源消耗小,但是好處是可以盡可能保證在timeout這個時間段內(nèi)集群的新增節(jié)點(diǎn)都能收到這個ping信息。在單播中也采用了該策略。

總結(jié)

廣播的過程:廣播使用的是jdk的MulticastSocket,在timeout時間內(nèi)4次發(fā)生ping請求,ping請求包括一個id,信息頭,本地節(jié)點(diǎn)的一些信息;這些信息在其它節(jié)點(diǎn)中被接收到交給Receiver處理,Receiver會將集群的master和本機(jī)的相關(guān)信息通過transport返回給廣播節(jié)點(diǎn)。廣播節(jié)點(diǎn)收到這些信息后會理解使用transport返回一個空的response。至此一個廣播過程完成。

在節(jié)點(diǎn)分布在多個網(wǎng)段時,廣播就失效了,因?yàn)閺V播信息不可達(dá)。這個時間就需要使用單播去ping指定的節(jié)點(diǎn)獲取cluster的相關(guān)信息。這就是單播的用處。單播使用的是NettyTransport,它會使用跟廣播一樣的鏈?zhǔn)秸埱笙蛑付ǖ墓?jié)點(diǎn)發(fā)送請求。信息的處理方式是之前所介紹的NettyTransport標(biāo)準(zhǔn)的信息處理過程。

以上就是elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch集群發(fā)現(xiàn)zendiscovery Ping的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java序列化(Serialization) 機(jī)制

    Java序列化(Serialization) 機(jī)制

    本篇文章是對Java中對象的序列化(Serialization) 機(jī)制進(jìn)行了詳細(xì)的分析介紹,并附實(shí)例,需要的朋友可以參考下
    2016-07-07
  • SpringBoot使用thymeleaf模板過程解析

    SpringBoot使用thymeleaf模板過程解析

    這篇文章主要介紹了SpringBoot使用thymeleaf模板過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12
  • Java實(shí)現(xiàn)冒泡排序

    Java實(shí)現(xiàn)冒泡排序

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)冒泡排序,把一列數(shù)組按從小到大或從大到小排序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • SpringBoot使用Sa-Token實(shí)現(xiàn)賬號封禁、分類封禁、階梯封禁的示例代碼

    SpringBoot使用Sa-Token實(shí)現(xiàn)賬號封禁、分類封禁、階梯封禁的示例代碼

    本文主要介紹了SpringBoot使用Sa-Token實(shí)現(xiàn)賬號封禁、分類封禁、階梯封禁的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • springboot yml定義屬性,下文中${} 引用說明

    springboot yml定義屬性,下文中${} 引用說明

    這篇文章主要介紹了springboot yml定義屬性,下文中${} 引用說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-04-04
  • 解析Java異步之call future

    解析Java異步之call future

    當(dāng)調(diào)用一個函數(shù)的時候,如果這個函數(shù)的執(zhí)行過程是很耗時的,就必須要等待,但是有時候并不急著要這個函數(shù)返回的結(jié)果。因此,可以讓被調(diào)者立即返回,讓他在后臺慢慢處理這個請求。對于調(diào)用者來說,可以先處理一些其他事情,在真正需要數(shù)據(jù)的時候再去嘗試獲得需要的數(shù)據(jù)
    2021-06-06
  • Java 讀取PDF中的文本和圖片的方法

    Java 讀取PDF中的文本和圖片的方法

    本文將介紹通過Java程序來讀取PDF文檔中的文本和圖片的方法。分別調(diào)用方法extractText()和extractImages()來讀取,需要的朋友可以參考下
    2019-07-07
  • idea項(xiàng)目debug模式無法啟動的解決

    idea項(xiàng)目debug模式無法啟動的解決

    這篇文章主要介紹了idea項(xiàng)目debug模式無法啟動的解決,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • 散列算法與散列碼(實(shí)例講解)

    散列算法與散列碼(實(shí)例講解)

    下面小編就為大家?guī)硪黄⒘兴惴ㄅc散列碼(實(shí)例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • Spring監(jiān)聽器及定時任務(wù)實(shí)現(xiàn)方法詳解

    Spring監(jiān)聽器及定時任務(wù)實(shí)現(xiàn)方法詳解

    這篇文章主要介紹了Spring監(jiān)聽器及定時任務(wù)實(shí)現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-07-07

最新評論