欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

zookeeper的Leader選舉機制源碼解析

 更新時間:2023年03月31日 14:53:43   作者:京東物流?梁吉超  
這篇文章主要為大家介紹了zookeeper的Leader選舉源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

zookeeper

一個分布式服務(wù)框架,主要解決分布式應(yīng)用中常見的多種數(shù)據(jù)問題,例如集群管理,狀態(tài)同步等。為解決這些問題zookeeper需要Leader選舉進行保障數(shù)據(jù)的強一致性機制和穩(wěn)定性。本文通過集群的配置,對leader選舉源進行解析,讓讀者們了解如何利用BIO通信機制,多線程多層隊列實現(xiàn)高性能架構(gòu)。

01Leader選舉機制

Leader選舉機制采用半數(shù)選舉算法。

每一個zookeeper服務(wù)端稱之為一個節(jié)點,每個節(jié)點都有投票權(quán),把其選票投向每一個有選舉權(quán)的節(jié)點,當(dāng)其中一個節(jié)點選舉出票數(shù)過半,這個節(jié)點就會成為Leader,其它節(jié)點成為Follower。

02Leader選舉集群配置

  • 重命名zoo_sample.cfg文件為zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
  • 修改zoo.cfg文件,修改值如下:
【plain】
zoo1.cfg文件內(nèi)容:
dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo2.cfg文件內(nèi)容:
dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo3.cfg文件內(nèi)容:
dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo4.cfg文件內(nèi)容:
dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
  • server.第幾號服務(wù)器(對應(yīng)myid文件內(nèi)容)=ip:數(shù)據(jù)同步端口:選舉端口:選舉標識
  • participant默認參與選舉標識,可不寫. observer不參與選舉

4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目錄下創(chuàng)建myid文件,文件內(nèi)容分別寫1 ,2,3,4,用于標識sid(全稱:Server ID)賦值。

  • 啟動三個zookeeper實例:
  • bin/zkServer.sh start conf/zoo1.cfg
  • bin/zkServer.sh start conf/zoo2.cfg
  • bin/zkServer.sh start conf/zoo3.cfg
  • 每啟動一個實例,都會讀取啟動參數(shù)配置zoo.cfg文件,這樣實例就可以知道其作為服務(wù)端身份信息sid以及集群中有多少個實例參與選舉。

03Leader選舉流程

圖1 第一輪到第二輪投票流程

前提:

設(shè)定票據(jù)數(shù)據(jù)格式vote(sid,zxid,epoch)

  • sid是Server ID每臺服務(wù)的唯一標識,是myid文件內(nèi)容;
  • zxid是數(shù)據(jù)事務(wù)id號;
  • epoch為選舉周期,為方便理解下面講解內(nèi)容暫定為1初次選舉,不寫入下面內(nèi)容里。

按照順序啟動sid=1,sid=2節(jié)點

第一輪投票:

  • sid=1節(jié)點:初始選票為自己,將選票vote(1,0)發(fā)送給sid=2節(jié)點;
  • sid=2節(jié)點:初始選票為自己,將選票vote(2,0)發(fā)送給sid=1節(jié)點;
  • sid=1節(jié)點:收到sid=2節(jié)點選票vote(2,0)和當(dāng)前自己的選票vote(1,0),首先比對zxid值,zxid越大代表數(shù)據(jù)最新,優(yōu)先選擇zxid最大的選票,如果zxid相同,選舉最大sid。當(dāng)前投票選舉結(jié)果為vote(2,0),sid=1節(jié)點的選票變?yōu)関ote(2,0);
  • sid=2節(jié)點:收到sid=1節(jié)點選票vote(1,0)和當(dāng)前自己的選票vote(2,0),參照上述選舉方式,選舉結(jié)果為vote(2,0),sid=2節(jié)點的選票不變;
  • 第一輪投票選舉結(jié)束。

第二輪投票:

  • sid=1節(jié)點:當(dāng)前自己的選票為vote(2,0),將選票vote(2,0)發(fā)送給sid=2節(jié)點;
  • sid=2節(jié)點:當(dāng)前自己的選票為vote(2,0),將選票vote(2,0)發(fā)送給sid=1節(jié)點;
  • sid=1節(jié)點:收到sid=2節(jié)點選票vote(2,0)和自己的選票vote(2,0), 按照半數(shù)選舉算法,總共3個節(jié)點參與選舉,已有2個節(jié)點選舉出相同選票,推舉sid=2節(jié)點為Leader,自己角色變?yōu)镕ollower;
  • sid=2節(jié)點:收到sid=1節(jié)點選票vote(2,0)和自己的選票vote(2,0),按照半數(shù)選舉算法推舉sid=2節(jié)點為Leader,自己角色變?yōu)長eader。

這時啟動sid=3節(jié)點后,集群里已經(jīng)選舉出leader,sid=1和sid=2節(jié)點會將自己的leader選票發(fā)回給sid=3節(jié)點,通過半數(shù)選舉結(jié)果還是sid=2節(jié)點為leader。

3.1 Leader選舉采用多層隊列架構(gòu)

zookeeper選舉底層主要分為選舉應(yīng)用層和消息傳輸隊列層,第一層應(yīng)用層隊列統(tǒng)一接收和發(fā)送選票,而第二層傳輸層隊列,是按照服務(wù)端sid分成了多個隊列,是為了避免給每臺服務(wù)端發(fā)送消息互相影響。比如對某臺機器發(fā)送不成功不會影響正常服務(wù)端的發(fā)送。

圖2 多層隊列上下關(guān)系交互流程圖

04解析代碼入口類

通過查看zkServer.sh文件內(nèi)容找到服務(wù)啟動類:

org.apache.zookeeper.server.quorum.QuorumPeerMain

05選舉流程代碼解析

  • 加載配置文件QuorumPeerConfig.parse(path);

針對 Leader選舉關(guān)鍵配置信息如下:

  • 讀取dataDir目錄找到myid文件內(nèi)容,設(shè)置當(dāng)前應(yīng)用sid標識,做為投票人身份信息。下面遇到myid變量為當(dāng)前節(jié)點自己sid標識。
    • 設(shè)置peerType當(dāng)前應(yīng)用是否參與選舉
  • new QuorumMaj()解析server.前綴加載集群成員信息,加載allMembers所有成員,votingMembers參與選舉成員,observingMembers觀察者成員,設(shè)置half值votingMembers.size()/2.
【Java】
public QuorumMaj(Properties props) throws ConfigException {
        for (Entry<Object, Object> entry : props.entrySet()) {
            String key = entry.getKey().toString();
            String value = entry.getValue().toString();
            //讀取集群配置文件中的server.開頭的應(yīng)用實例配置信息
            if (key.startsWith("server.")) {
                int dot = key.indexOf('.');
                long sid = Long.parseLong(key.substring(dot + 1));
                QuorumServer qs = new QuorumServer(sid, value);
                allMembers.put(Long.valueOf(sid), qs);
                if (qs.type == LearnerType.PARTICIPANT)
//應(yīng)用實例綁定的角色為PARTICIPANT意為參與選舉
                    votingMembers.put(Long.valueOf(sid), qs);
                else {
                    //觀察者成員
                    observingMembers.put(Long.valueOf(sid), qs);
                }
            } else if (key.equals("version")) {
                version = Long.parseLong(value, 16);
            }
        }
        //過半基數(shù)
        half = votingMembers.size() / 2;
    }

QuorumPeerMain.runFromConfig(config) 啟動服務(wù);

QuorumPeer.startLeaderElection() 開啟選舉服務(wù);

  • 設(shè)置當(dāng)前選票new Vote(sid,zxid,epoch)
【plain】
synchronized public void startLeaderElection(){
try {
           if (getPeerState() == ServerState.LOOKING) {
               //首輪:當(dāng)前節(jié)點默認投票對象為自己
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }
//........
}
  • 創(chuàng)建選舉管理類:QuorumCnxnManager;
  • 初始化recvQueue<Message(sid,ByteBuffer)>接收投票隊列(第二層傳輸隊列);
  • 初始化queueSendMap<sid,queue>按sid發(fā)送投票隊列(第二層傳輸隊列);
  • 初始化senderWorkerMap<sid,SendWorker>發(fā)送投票工作線程容器,表示著與sid投票節(jié)點已連接;
  • 初始化選舉監(jiān)聽線程類QuorumCnxnManager.Listener。
【Java】
//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
                        final long mySid,
                        Map&lt;Long,QuorumPeer.QuorumServer&gt; view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled) {
    //接收投票隊列(第二層傳輸隊列)
    this.recvQueue = new ArrayBlockingQueue&lt;Message&gt;(RECV_CAPACITY);
    //按sid發(fā)送投票隊列(第二層傳輸隊列)
    this.queueSendMap = new ConcurrentHashMap&lt;Long, ArrayBlockingQueue&lt;ByteBuffer&gt;&gt;();
    //發(fā)送投票工作線程容器,表示著與sid投票節(jié)點已連接 
    this.senderWorkerMap = new ConcurrentHashMap&lt;Long, SendWorker&gt;();
    this.lastMessageSent = new ConcurrentHashMap&lt;Long, ByteBuffer&gt;();
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }
    this.self = self;
    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;
    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);
    // Starts listener thread that waits for connection requests 
    //創(chuàng)建選舉監(jiān)聽線程 接收選舉投票請求
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();//啟動選舉監(jiān)聽線程
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
return le;}
  • 開啟選舉監(jiān)聽線程QuorumCnxnManager.Listener;
  • 創(chuàng)建ServerSockket等待大于自己sid節(jié)點連接,連接信息存儲到senderWorkerMap<sid,SendWorker>;
  • sid>self.sid才可以連接過來。
【Java】
//上面的listener.start()執(zhí)行后,選擇此方法
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    Socket client = null;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);
            if (self.getQuorumListenOnAllIPs()) {
                int port = self.getElectionAddress().getPort();
                addr = new InetSocketAddress(port);
            } else {
                // Resolve hostname for this server in case the
                // underlying ip address has changed.
                self.recreateSocketAddresses(self.getId());
                addr = self.getElectionAddress();
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(addr.toString());
            ss.bind(addr);
            while (!shutdown) {
                client = ss.accept();
                setSockOpts(client);
                LOG.info("Received connection request "
                        + client.getRemoteSocketAddress());
                // Receive and handle the connection request
                // asynchronously if the quorum sasl authentication is
                // enabled. This is required because sasl server
                // authentication process may take few seconds to finish,
                // this may delay next peer connection requests.
                if (quorumSaslAuthEnabled) {
                    receiveConnectionAsync(client);
                } else {
//接收連接信息
                    receiveConnection(client);
                }
                numRetries = 0;
            }
        } catch (IOException e) {
            if (shutdown) {
                break;
            }
            LOG.error("Exception while listening", e);
            numRetries++;
            try {
                ss.close();
                Thread.sleep(1000);
            } catch (IOException ie) {
                LOG.error("Error closing server socket", ie);
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while sleeping. " +
                    "Ignoring exception", ie);
            }
            closeSocket(client);
        }
    }
    LOG.info("Leaving listener");
    if (!shutdown) {
        LOG.error("As I'm leaving the listener thread, "
                + "I won't be able to participate in leader "
                + "election any longer: "
                + self.getElectionAddress());
    } else if (ss != null) {
        // Clean up for shutdown.
        try {
            ss.close();
        } catch (IOException ie) {
            // Don't log an error for shutdown.
            LOG.debug("Error closing server socket", ie);
        }
    }
}
//代碼執(zhí)行路徑:receiveConnection()->handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
//...省略
     if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: {}", sid);
            closeSocket(sock);
            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }
        } else { // Otherwise start worker threads to receive data.
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
  //存儲連接信息<sid,SendWorker>
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid,
                    new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
            sw.start();
            rw.start();
     }
}
  • 創(chuàng)建FastLeaderElection快速選舉服務(wù);
  • 初始選票發(fā)送隊列sendqueue(第一層隊列)
  • 初始選票接收隊列recvqueue(第一層隊列)
  • 創(chuàng)建線程WorkerSender
  • 創(chuàng)建線程WorkerReceiver
【Java】
//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    //發(fā)送隊列sendqueue(第一層隊列)
    sendqueue = new LinkedBlockingQueue<ToSend>();
    //接收隊列recvqueue(第一層隊列)
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
    //創(chuàng)建線程WorkerSender
    this.ws = new WorkerSender(manager);
    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);
    //創(chuàng)建線程WorkerReceiver
    this.wr = new WorkerReceiver(manager);
    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}
  • 開啟WorkerSender和WorkerReceiver線程。

WorkerSender線程自旋獲取sendqueue第一層隊列元素

  • sendqueue隊列元素內(nèi)容為相關(guān)選票信息詳見ToSend類;
  • 首先判斷選票sid是否和自己sid值相同,相等直接放入到recvQueue隊列中;
  • 不相同將sendqueue隊列元素轉(zhuǎn)儲到queueSendMap<sid,queue>第二層傳輸隊列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
  public void run() {
    while (!stop) {
        try {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;
  //將投票信息發(fā)送出去
            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
    /*
     * If sending message to myself, then simply enqueue it (loopback).
     */
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
        /*
         * Otherwise send to the corresponding thread to send.
         */
    } else {
         /*
          * Start a new connection if doesn't have one already.
          */
         ArrayBlockingQueue&lt;ByteBuffer&gt; bq = new ArrayBlockingQueue&lt;ByteBuffer&gt;(
            SEND_CAPACITY);
         ArrayBlockingQueue&lt;ByteBuffer&gt; oldq = queueSendMap.putIfAbsent(sid, bq);
         //轉(zhuǎn)儲到queueSendMap&lt;sid,queue&gt;第二層傳輸隊列中
         if (oldq != null) {
             addToSendQueue(oldq, b);
         } else {
             addToSendQueue(bq, b);
         }
         connectOne(sid);     
    }
}

WorkerReceiver線程自旋獲取recvQueue第二層傳輸隊列元素轉(zhuǎn)存到recvqueue第一層隊列中。

【Java】
//WorkerReceiver
public void run() {
    Message response;
    while (!stop) {
      // Sleeps on receive
      try {
          //自旋獲取recvQueue第二層傳輸隊列元素
          response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
          if(response == null) continue;
          // The current protocol and two previous generations all send at least 28 bytes
          if (response.buffer.capacity() &lt; 28) {
              LOG.error("Got a short response: " + response.buffer.capacity());
              continue;
          }
          //...
  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
         //第二層傳輸隊列元素轉(zhuǎn)存到recvqueue第一層隊列中
         recvqueue.offer(n);
         //...
      }
    }
//...
}

06選舉核心邏輯

  • 啟動線程QuorumPeer

開始Leader選舉投票makeLEStrategy().lookForLeader();

sendNotifications()向其它節(jié)點發(fā)送選票信息,選票信息存儲到sendqueue隊列中。sendqueue隊列由WorkerSender線程處理。

【plain】
//QuorunPeer.run
//...
try {
   reconfigFlagClear();
    if (shuttingDownLE) {
       shuttingDownLE = false;
       startLeaderElection();
       }
    //makeLEStrategy().lookForLeader() 發(fā)送投票
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
    setPeerState(ServerState.LOOKING);
}  
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
  //向其他應(yīng)用發(fā)送投票
sendNotifications();
//...
}
private void sendNotifications() {
    //獲取應(yīng)用節(jié)點
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch, qv.toString().getBytes());
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        //儲存投票信息
        sendqueue.offer(notmsg);
    }
}
class WorkerSender extends ZooKeeperThread {
    //...
    public void run() {
    while (!stop) {
        try {
//提取已儲存的投票信息
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;
            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
//...
}

自旋recvqueue隊列元素獲取投票過來的選票信息:

【Java】
public Vote lookForLeader() throws InterruptedException {
//...
/*
 * Loop in which we exchange notifications until we find a leader
 */
while ((self.getPeerState() == ServerState.LOOKING) &amp;&amp;
        (!stop)){
    /*
     * Remove next notification from queue, times out after 2 times
     * the termination time
     */
    //提取投遞過來的選票信息
    Notification n = recvqueue.poll(notTimeout,
            TimeUnit.MILLISECONDS);
/*
 * Sends more notifications if haven't received enough.
 * Otherwise processes new notification.
 */
if(n == null){
    if(manager.haveDelivered()){
        //已全部連接成功,并且前一輪投票都完成,需要再次發(fā)起投票
        sendNotifications();
    } else {
        //如果未收到選票信息,manager.contentAll()自動連接其它socket節(jié)點
        manager.connectAll();
    }
    /*
     * Exponential backoff
     */
    int tmpTimeOut = notTimeout*2;
    notTimeout = (tmpTimeOut &lt; maxNotificationInterval?
            tmpTimeOut : maxNotificationInterval);
    LOG.info("Notification time out: " + notTimeout);
         }
     //....
    }
  //...
}
【Java】
//manager.connectAll()-&gt;connectOne(sid)-&gt;initiateConnection(...)-&gt;startConnection(...)
private boolean startConnection(Socket sock, Long sid)
        throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // Use BufferedOutputStream to reduce the number of IP packets. This is
        // important for x-DC scenarios.
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);
        // Sending id and challenge
        // represents protocol version (in other words - message type)
        dout.writeLong(PROTOCOL_VERSION);
        dout.writeLong(self.getId());
        String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();
        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }
    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        // TODO - investigate why reconfig makes qps null.
        authLearner.authenticate(sock, qps.hostname);
    }
    // If lost the challenge, then drop the new connection
    //保證集群中所有節(jié)點之間只有一個通道連接
    if (sid &gt; self.getId()) {
        LOG.info("Have smaller server identifier, so dropping the " +
                "connection: (" + sid + ", " + self.getId() + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);
        SendWorker vsw = senderWorkerMap.get(sid);
        if(vsw != null)
            vsw.finish();
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue&lt;ByteBuffer&gt;(
                SEND_CAPACITY));
        sw.start();
        rw.start();
        return true;
    }
    return false;
}

如上述代碼中所示,sid>self.sid才可以創(chuàng)建連接Socket和SendWorker,RecvWorker線程,存儲到senderWorkerMap<sid,SendWorker>中。對應(yīng)第2步中的sid<self.sid邏輯,保證集群中所有節(jié)點之間只有一個通道連接。

節(jié)點之間連接方式

【Java】
public Vote lookForLeader() throws InterruptedException {
//...
    if (n.electionEpoch > logicalclock.get()) {
        //當(dāng)前選舉周期小于選票周期,重置recvset選票池
        //大于當(dāng)前周期更新當(dāng)前選票信息,再次發(fā)送投票
        logicalclock.set(n.electionEpoch);
        recvset.clear();
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
        } else {
            updateProposal(getInitId(),
                    getInitLastLoggedZxid(),
                    getPeerEpoch());
        }
        sendNotifications();
    } else if (n.electionEpoch < logicalclock.get()) {
        if(LOG.isDebugEnabled()){
            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                    + Long.toHexString(n.electionEpoch)
                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
        }
        break;
    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
            proposedLeader, proposedZxid, proposedEpoch)) {//相同選舉周期
        //接收的選票與當(dāng)前選票PK成功后,替換當(dāng)前選票
        updateProposal(n.leader, n.zxid, n.peerEpoch);
        sendNotifications();
    }
//...
}

在上代碼中,自旋從recvqueue隊列中獲取到選票信息。開始進行選舉:

  • 判斷當(dāng)前選票和接收過來的選票周期是否一致
  • 大于當(dāng)前周期更新當(dāng)前選票信息,再次發(fā)送投票
  • 周期相等:當(dāng)前選票信息和接收的選票信息進行PK
【Java】
//接收的選票與當(dāng)前選票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }
        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
  }

在上述代碼中的totalOrderPredicate方法邏輯如下:

  • 競選周期大于當(dāng)前周期為true
  • 競選周期相等,競選zxid大于當(dāng)前zxid為true
  • 競選周期相等,競選zxid等于當(dāng)前zxid,競選sid大于當(dāng)前sid為true
  • 經(jīng)過上述條件判斷為true將當(dāng)前選票信息替換為競選成功的選票,同時再次將新的選票投出去。
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
   //存儲節(jié)點對應(yīng)的選票信息
    // key:選票來源sid  value:選票推舉的Leader sid
    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
    //半數(shù)選舉開始
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // Verify if there is any change in the proposed leader
        while((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null){
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)){
                recvqueue.put(n);
                break;
            }
        }
        /*WorkerSender
         * This predicate is true once we don't read any new
         * relevant message from the reception queue
         */
        if (n == null) {
            //已選舉出leader 更新當(dāng)前節(jié)點是否為leader 
            self.setPeerState((proposedLeader == self.getId()) ?
                    ServerState.LEADING: learningState());
            Vote endVote = new Vote(proposedLeader,
                    proposedZxid, proposedEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }
//...
}
/**
     * Termination predicate. Given a set of votes, determines if have
     * sufficient to declare the end of the election round.
     *
     * @param votes
     *            Set of votes
     * @param vote
     *            Identifier of the vote received last  PK后的選票
     */
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }
    /*
     * First make the views consistent. Sometimes peers will have different
     * zxids for a server depending on timing.
     */
    //votes 來源于recvset 存儲各個節(jié)點推舉出來的選票信息
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
//選舉出的sid和其它節(jié)點選擇的sid相同存儲到voteSet變量中。
        if (vote.equals(entry.getValue())) {
//保存推舉出來的sid
            voteSet.addAck(entry.getKey());
        }
    }
    //判斷選舉出來的選票數(shù)量是否過半
    return voteSet.hasAllQuorums();
}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set<Long> ackSet) {
    return (ackSet.size() > half);
   }

在上述代碼中:recvset是存儲每個sid推舉的選票信息。

第一輪 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

第二輪 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

最終經(jīng)過選舉信息vote(2,0,1)為推薦leader,并用推薦leader在recvset選票池里比對持相同票數(shù)量為2個。因為總共有3個節(jié)點參與選舉,sid1和sid2都選舉sid2為leader,滿足票數(shù)過半要求,故確認sid2為leader。

  • setPeerState更新當(dāng)前節(jié)點角色;
  • proposedLeader選舉出來的sid和自己sid相等,設(shè)置為Leader;
  • 上述條件不相等,設(shè)置為Follower或Observing;
  • 更新currentVote當(dāng)前選票為Leader的選票vote(2,0,1)。

07總結(jié)

通過對Leader選舉源碼的解析,可以了解到:

  • 多個應(yīng)用節(jié)點之間網(wǎng)絡(luò)通信采用BIO方式進行相互投票,同時保證每個節(jié)點之間只使用一個通道,減少網(wǎng)絡(luò)資源的消耗,足以見得在BIO分布式中間件開發(fā)中的技術(shù)重要性。
  • 基于BIO的基礎(chǔ)上,靈活運用多線程和內(nèi)存消息隊列完好實現(xiàn)多層隊列架構(gòu),每層隊列由不同的線程分工協(xié)作,提高快速選舉性能目的。
  • 為BIO在多線程技術(shù)上的實踐帶來了寶貴的經(jīng)驗。

以上就是zookeeper的Leader選舉機制源碼解析的詳細內(nèi)容,更多關(guān)于zookeeper Leader選舉的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java中Instant的使用及轉(zhuǎn)換

    Java中Instant的使用及轉(zhuǎn)換

    Instant是java.time包中的一個類,本文主要介紹了Java中Instant的使用及轉(zhuǎn)換,具有一定的參考價值,感興趣的可以了解一下
    2024-06-06
  • 三分鐘快速掌握Java中枚舉(enum)

    三分鐘快速掌握Java中枚舉(enum)

    enum的全稱為enumeration, 是 JDK 1.5中引入的新特性,存放在 java.lang包中。下面這篇文章是我在使用enum過程中的一些經(jīng)驗和總結(jié),分享出來方便大家快速的掌握Java中枚舉(enum),有需要的朋友們下面跟著小編來一起看看吧。
    2016-12-12
  • Java正則表達式的基本用法和實例大全

    Java正則表達式的基本用法和實例大全

    這篇文章主要給大家介紹了關(guān)于Java正則表達式的基本用法和實例的相關(guān)資料,大家在使用Java正則表達式的時候可查閱這篇文章,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-03-03
  • spring依賴注入原理與用法實例分析

    spring依賴注入原理與用法實例分析

    這篇文章主要介紹了spring依賴注入原理與用法,結(jié)合實例形式分析了spring框架依賴注入的概念、原理、用法案例及相關(guān)操作注意事項,需要的朋友可以參考下
    2019-10-10
  • java?ThreadPoolExecutor線程池內(nèi)部處理流程解析

    java?ThreadPoolExecutor線程池內(nèi)部處理流程解析

    這篇文章主要為大家介紹了java?ThreadPoolExecutor線程池內(nèi)部處理流程解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-12-12
  • SpringBoot 統(tǒng)一異常處理的實現(xiàn)示例

    SpringBoot 統(tǒng)一異常處理的實現(xiàn)示例

    本文主要介紹了SpringBoot 統(tǒng)一異常處理的實現(xiàn)示例,目的就是在異常發(fā)生時,盡可能地減少破壞,下面就來介紹一下,感興趣的可以了解一下
    2024-07-07
  • Java基礎(chǔ)第二篇方法與數(shù)據(jù)成員

    Java基礎(chǔ)第二篇方法與數(shù)據(jù)成員

    在上一篇文章中介紹了Java基礎(chǔ) 從HelloWorld到面向?qū)ο?,我們初步了解了對?object)。對象中的數(shù)據(jù)成員表示對象的狀態(tài)。對象可以執(zhí)行方法,表示特定的動作。這篇文章我們進一步深入到對象。了解Java中方法與數(shù)據(jù)成員的一些細節(jié)。
    2021-09-09
  • ArrayList?foreach循環(huán)增添刪除導(dǎo)致ConcurrentModificationException解決分析

    ArrayList?foreach循環(huán)增添刪除導(dǎo)致ConcurrentModificationException解決分

    這篇文章主要為大家介紹了ArrayList?foreach循環(huán)增添刪除導(dǎo)致ConcurrentModificationException解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪<BR>
    2023-12-12
  • TOMCAT內(nèi)存溢出及大小調(diào)整的實現(xiàn)方法

    TOMCAT內(nèi)存溢出及大小調(diào)整的實現(xiàn)方法

    下面小編就為大家?guī)硪黄猅OMCAT內(nèi)存溢出及大小調(diào)整的實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-05-05
  • SpringBoot集成Elasticsearch過程實例

    SpringBoot集成Elasticsearch過程實例

    這篇文章主要介紹了SpringBoot集成Elasticsearch過程實例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-04-04

最新評論