elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析
zenDiscovery實(shí)現(xiàn)機(jī)制
ping是集群發(fā)現(xiàn)的基本手段,通過在網(wǎng)絡(luò)上廣播或者指定ping某些節(jié)點(diǎn)獲取集群信息,從而可以找到集群的master加入集群。zenDiscovery實(shí)現(xiàn)了兩種ping機(jī)制:廣播與單播。本篇將詳細(xì)分析一些這MulticastZenPing機(jī)制的實(shí)現(xiàn)為后面的集群發(fā)現(xiàn)和master選舉做好鋪墊。
廣播的過程
首先看一下廣播(MulticastZenPing),廣播的原理很簡單,節(jié)點(diǎn)啟動后向網(wǎng)絡(luò)發(fā)送廣播信息,任何收到的節(jié)點(diǎn)只要集群名字相同都應(yīng)該對此廣播信息作出回應(yīng)。這樣該節(jié)點(diǎn)就獲取了集群的相關(guān)信息。它定義了一個action:"internal:discovery/zen/multicast"和廣播的信息頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通信的基礎(chǔ),但是廣播卻沒有使它。它使用了java的MulticastSocket。這里簡單的介紹一下MulticastSocket的使用。它是一個UDP 機(jī)制的socket,用來進(jìn)行多個數(shù)據(jù)包的廣播。它可以幫到一個ip形成一個group,任何MulticastSocket都可以join進(jìn)來,組內(nèi)的socket發(fā)送的信息會被訂閱了改組的所有機(jī)器接收到。elasticsearch對其進(jìn)行了封裝形成了MulticastChannel,有興趣可以參考相關(guān)源碼。
首先看一下MulticastZenPing的幾個輔助內(nèi)部類:
它總共定義了4個內(nèi)部類,這些內(nèi)部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應(yīng)都用它來存儲。MulticastPingResponseRequestHandler它是response處理類,類似于之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個messageReceived的方法,當(dāng)收到請求時直接返回一個response。
MulticastPingResponse就不用細(xì)說了,它就是一個響應(yīng)類。最后要著重說一下Receiver類,因?yàn)閺V播并不是使用NettyTransport,因此對于消息處理邏輯都在Receiver中。在初始化MulticastZenPing時會將receiver注冊進(jìn)去。
protected void doStart() throws ElasticsearchException { try { .... multicastChannel = MulticastChannel.getChannel(nodeName(), shared, new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)), new Receiver());//將receiver注冊到channel中 } catch (Throwable t) { .... } }
Receiver類基礎(chǔ)了Listener,實(shí)現(xiàn)了3個方法,消息經(jīng)過onMessage方法區(qū)分,如果是內(nèi)部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區(qū)分方法很簡單,就是讀取信息都看它是否符合所定義的INTERNAL_HEADER 信息頭。
nodeping處理代碼
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) { .... final DiscoveryNodes discoveryNodes = contextProvider.nodes(); final DiscoveryNode requestingNode = requestingNodeX; if (requestingNode.id().equals(discoveryNodes.localNodeId())) { // 自身發(fā)出的ping,忽略 return; } //只接受本集群ping if (!requestClusterName.equals(clusterName)) { ...return; } // 兩個client間不需要ping if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return; } //新建一個response final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); multicastPingResponse.id = id; multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce()); //無法連接的情況 if (!transportService.nodeConnected(requestingNode)) { // do the connect and send on a thread pool threadPool.generic().execute(new Runnable() { @Override public void run() { // connect to the node if possible try { transportService.connectToNode(requestingNode); transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } }); } catch (Exception e) { if (lifecycle.started()) { logger.warn("failed to connect to requesting node {}", e, requestingNode); } } } }); } else { transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { if (lifecycle.started()) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } } }); } } }
另外的一個方法是處理外部ping信息,處理過程是返回cluster的信息(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應(yīng)MulticastZenPing的過程,收到其它節(jié)點(diǎn)的響應(yīng)信息后它會把本節(jié)點(diǎn)及集群的master節(jié)點(diǎn)相關(guān)信息返回給廣播節(jié)點(diǎn)。這樣廣播節(jié)點(diǎn)就獲知了集群的相關(guān)信息。在MulticastZenPing類中還有一個類 MulticastPingResponseRequestHandler,它的作用是廣播節(jié)點(diǎn)對其它節(jié)點(diǎn)對廣播信息響應(yīng)的回應(yīng),廣播節(jié)點(diǎn)的第二次發(fā)送信息的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動時注冊到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。
ping請求的發(fā)送策略
代碼如下:
public void ping(final PingListener listener, final TimeValue timeout) { .... //產(chǎn)生一個id final int id = pingIdGenerator.incrementAndGet(); try { receivedResponses.put(id, new PingCollection()); sendPingRequest(id);//第一次發(fā)送ping請求 // 等待時間的1/2后再次發(fā)送一個請求 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to send second ping request", t, id); finalizePingCycle(id, listener); } @Override public void doRun() { sendPingRequest(id); //再過1/2時間再次發(fā)送一個請求 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to send third ping request", t, id); finalizePingCycle(id, listener); } @Override public void doRun() { // make one last ping, but finalize as soon as all nodes have responded or a timeout has past PingCollection collection = receivedResponses.get(id); FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener); receivedResponses.put(id, finalizingPingCollection); logger.trace("[{}] sending last pings", id); sendPingRequest(id); //最后一次發(fā)送請求,超時的1/4后 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to finalize ping", t, id); } @Override protected void doRun() throws Exception { finalizePingCycle(id, listener); } }); } }); } }); } catch (Exception e) { logger.warn("failed to ping", e); finalizePingCycle(id, listener); } }
發(fā)送過程主要是調(diào)用sendPingRequest(id)方法,在該方法中會將id,信息頭,版本,本地節(jié)點(diǎn)信息一起寫入到BytesStreamOutput中然后將其進(jìn)行廣播,這個廣播信息會被其它機(jī)器上的Receiver接收并處理,并且響應(yīng)該ping請求。另外一個需要關(guān)注的是以上加說明的部分,它通過鏈時的定期發(fā)送請求,在等待時間內(nèi)可能會發(fā)出4次請求,這種發(fā)送方式會造成大量的ping請求重復(fù),幸好ping的資源消耗小,但是好處是可以盡可能保證在timeout這個時間段內(nèi)集群的新增節(jié)點(diǎn)都能收到這個ping信息。在單播中也采用了該策略。
總結(jié)
廣播的過程:廣播使用的是jdk的MulticastSocket,在timeout時間內(nèi)4次發(fā)生ping請求,ping請求包括一個id,信息頭,本地節(jié)點(diǎn)的一些信息;這些信息在其它節(jié)點(diǎn)中被接收到交給Receiver處理,Receiver會將集群的master和本機(jī)的相關(guān)信息通過transport返回給廣播節(jié)點(diǎn)。廣播節(jié)點(diǎn)收到這些信息后會理解使用transport返回一個空的response。至此一個廣播過程完成。
在節(jié)點(diǎn)分布在多個網(wǎng)段時,廣播就失效了,因?yàn)閺V播信息不可達(dá)。這個時間就需要使用單播去ping指定的節(jié)點(diǎn)獲取cluster的相關(guān)信息。這就是單播的用處。單播使用的是NettyTransport,它會使用跟廣播一樣的鏈?zhǔn)秸埱笙蛑付ǖ墓?jié)點(diǎn)發(fā)送請求。信息的處理方式是之前所介紹的NettyTransport標(biāo)準(zhǔn)的信息處理過程。
以上就是elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch集群發(fā)現(xiàn)zendiscovery Ping的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot使用Sa-Token實(shí)現(xiàn)賬號封禁、分類封禁、階梯封禁的示例代碼
本文主要介紹了SpringBoot使用Sa-Token實(shí)現(xiàn)賬號封禁、分類封禁、階梯封禁的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07springboot yml定義屬性,下文中${} 引用說明
這篇文章主要介紹了springboot yml定義屬性,下文中${} 引用說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04Spring監(jiān)聽器及定時任務(wù)實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Spring監(jiān)聽器及定時任務(wù)實(shí)現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07