欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

elasticsearch集群cluster?discovery可配式模塊示例分析

 更新時間:2022年04月21日 16:07:15   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch集群cluster?discovery可配式模塊示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

elasticsearch cluster實現(xiàn)了自己發(fā)現(xiàn)機制zen。Discovery功能主要包括以下幾部分內(nèi)容:master選舉,master錯誤探測,集群中其它節(jié)點探測,單播多播ping。本篇會首先概述以下Discovery這一部分的功能,然后介紹節(jié)點檢測。其它內(nèi)容會在接下來介紹。

Discovery模塊的概述

discovery是可配式模塊,官方支持亞馬遜的Azure discovery,Google Compute Engine,EC2 Discovery三種發(fā)現(xiàn)機制,根據(jù)插件規(guī)則完全可以自己實現(xiàn)其它的發(fā)現(xiàn)機制。整個模塊通過實現(xiàn)guice的DiscoveryModule對外提供模塊的注冊和啟動, 默認使用zen discovery。發(fā)現(xiàn)模塊對外接口為DiscoveryService,它的方法如下所示:

它本質(zhì)上是discovery的一個代理,所有的功能最終都是由所綁定的discovery所實現(xiàn)的。節(jié)點啟動時通過DiscoveryModule獲取DiscoveryService,然后啟動DiscoveryService,DiscoveryService啟動綁定的Discovery,整個功能模塊就完成了加載和啟動。這也是elasticsearch所有模塊的實現(xiàn)方式,通過module對外提供綁定和獲取,通過service接口對外提供模塊的功能,在后面的分析中會經(jīng)常遇到。

cluster節(jié)點探測

接下來分析cluster的一個重要功能就是節(jié)點探測。cluster中不能沒有master節(jié)點,因此集群中所有節(jié)點都要周期探測master節(jié)點,一旦無法檢測到,將會進行master選舉。同時作為master,對于節(jié)點變動也要時刻關(guān)注,因此它需要周期性探測集群中所有節(jié)點,確保及時剔除已經(jīng)宕機的節(jié)點。這種相互間的心跳檢測就是cluster的faultdetection。下圖是faultdetection的繼承關(guān)系:

有兩種實現(xiàn)方式,分別是master探測集群中其它節(jié)點和其它節(jié)點對master節(jié)點的探測。

FaultDetection只要一個抽象方法handleTransportDisconnect,該方法在內(nèi)部類FDConnectionListener中被調(diào)用。在elasticsearch中大量使用了listener的異步方式,異步可以極大提升系統(tǒng)性能。它的代碼如下所示:

private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }
        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

faultdetection啟動時會注冊相應(yīng)的FDConnetionListener,當探測到節(jié)點丟失,會通過onNodeDisconnected方法回調(diào)對于的handleTransportDisconnect進行處理。

MasterFaultDetection的啟動代碼

private?void?innerStart(final?DiscoveryNode masterNode) {
        this.masterNode = masterNode;
        this.retryCount = 0;
        this.notifiedMasterFailure.set(false);
        // 嘗試連接master節(jié)點
        try {
            transportService.connectToNode(masterNode);
        } catch (final Exception e) {
            // 連接失敗通知masterNode失敗
            notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
            return;
        }
    //關(guān)閉之前的masterping,重啟新的masterping
        if (masterPinger != null) {
            masterPinger.stop();
        }
        this.masterPinger = new MasterPinger();
        // 周期之后啟動masterPing,這里并沒有周期啟動masterPing,只是設(shè)定了延遲時間。
        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
    }

代碼有有詳細注釋,就不再過多解釋。

master連接失敗的邏輯

代碼如下:

private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
            //通知所有l(wèi)istener master丟失
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, reason);
                    }
                }
            });
            stop("master failure, " + reason);
        }
    }

 在ZenDiscovery中實現(xiàn)了listener.onMasterFailure接口。會進行master丟失的相關(guān)處理,在后面再分析。

MasterPing的關(guān)鍵代碼

private class MasterPinger implements Runnable {
        private volatile boolean running = true;
        public void stop() {
            this.running = false;
        }
        @Override
        public void run() {
            if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
   final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
            final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
                        @Override
                        public MasterPingResponseResponse newInstance() {
                            return new MasterPingResponseResponse();
                        }
                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {
                            if (!running) {
                                return;
                            }
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // 啟動新的ping周期
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                            }
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            if (!running) {
                                return;
                            }
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                        handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NoLongerMasterException) {
                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {
                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, "not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                        notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                        return;
                                    }
                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                    if (retryCount >= pingRetryCount) {
                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                         // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }
            );
        }
    }

MasterPing是一個線程,在innerStart的方法中沒有設(shè)定周期啟動masterping,但是masterping需要周期進行,這個秘密就在run 方法中,如果ping成功就會重啟一個新的ping。這樣既保證了ping線程的唯一性同時也保證了ping的順序和間隔。

ping的方式跟之前一樣是也是通過transport發(fā)送一個masterpingrequest,進行一個連接。節(jié)點收到該請求后,如果已不再是master會拋出NotMasterException,狀態(tài)更新出差會拋出其它異常,異常會通過。否則會正常響應(yīng)notifyMasterFailure方法處理跟啟動邏輯一樣。

對于網(wǎng)絡(luò)問題導(dǎo)致的無響應(yīng)情況,會調(diào)用handleTransportDisconnect(masterToPing)方法處理。masterfaultDetection對該方法的實現(xiàn)如下:

protected void handleTransportDisconnect(DiscoveryNode node) {
    //這里需要同步
        synchronized (masterNodeMutex) {
        //master 已經(jīng)換成其它節(jié)點,就沒必要再連接
            if (!node.equals(this.masterNode)) {
                return;
            }
            if (connectOnNetworkDisconnect) {
                try {
            //嘗試再次連接
                    transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {
                        masterPinger.stop();
                    }
            //連接成功啟動新的masterping
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                } catch (Exception e) {
            //連接出現(xiàn)異常,啟動master節(jié)點丟失通知
                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                }
            } else {
          //不需要重連,通知master丟失。
                logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, "transport disconnected");
            }
        }
    }

這就是masterfaultDetection的整個流程:啟動中如果master丟失則通知節(jié)點丟失,否則在一定延遲(3s)后啟動masterping,masterping線程嘗試連接master節(jié)點,如果master節(jié)點網(wǎng)絡(luò)失聯(lián),嘗試再次連接。master節(jié)點收到masterpingrequest后首先看一下自己還是不是master,如果不是則拋出異常,否則正?;貞?yīng)。節(jié)點如果收到響應(yīng)是異常則啟動master丟失通知,否則此次ping結(jié)束。在一定延遲后啟動新的masterping線程。

NodeFaultDetection的邏輯跟實現(xiàn)上跟MasterFualtDetetion相似,區(qū)別主要在于ping異常處理上。當某個節(jié)點出現(xiàn)異?;蛘邲]有響應(yīng)時,會啟動節(jié)點丟失機制,只是受到通知后的處理邏輯不通。就不再詳細分析,有興趣可以參考具體代碼,希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論