欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文秒懂?kafka?HA(高可用)

 更新時間:2021年11月26日 14:25:01   作者:程序員耕耘  
這篇文章主要介紹了秒懂?kafka?HA(高可用)的相關知識,本文我們來說一說和?kafka?高可用相關的一些策略,對kafka?HA相關知識感興趣的朋友一起看看吧
我們知道,kafka中每個topic被劃分為多個partition,每個partition又有多個副本,那么這些分區(qū)副本是怎么均勻的分布在整個kafka集群的broker節(jié)點上的?partition副本的leader是通過什么算法選舉出來的?partition副本的follower是怎么復制備份leader的數(shù)據(jù)的?本文我們就來說一說和 kafka 高可用相關的一些策略。

01名詞解釋

要想說明白kafka的HA機制,我們必須先搞明白幾個縮寫名詞,
1、AR、ISR、OSR
AR:Assigned Replicas,某分區(qū)的所有副本(這里所說的副本包括leader和follower)統(tǒng)稱為 AR。
ISR:In Sync Replicas,所有與leader副本保持"一定程度同步"的副本(包括leader副本在內(nèi))組成 ISR 。生產(chǎn)者發(fā)送消息時,只有l(wèi)eader與客戶端發(fā)生交互,follower只是同步備份leader的數(shù)據(jù),以保障高可用,所以生產(chǎn)者的消息會先發(fā)送到leader,然后follower才能從leader中拉取消息進行同步,同步期間,follower的數(shù)據(jù)相對leader而言會有一定程度的滯后,前面所說的"一定程度同步"就是指可忍受的滯后范圍,這個范圍可以通過server.properties中的參數(shù)進行配置。
OSR :Out-of-Sync Replied,在上面的描述中,相對leader滯后過多的follower將組成OSR 。
由此可見,AR = ISR + OSR,理想情況下,所有的follower副本都應該與leader 保持一定程度的同步,即AR=ISR,OSR集合為空
2、ISR 的伸縮性
leader負責跟蹤維護 ISR 集合中所有follower副本的滯后狀態(tài),當follower副本"落后太多" 或 "follower超過一定時間沒有向leader發(fā)送同步請求"時,leader副本會把它從 ISR 集合中剔除。如果 OSR 集合中有follower副本"追上"了leader副本,那么leader副本會把它從 OSR 集合轉(zhuǎn)移至 ISR 集合。
上面描述的"落后太多"是指follower復制的消息落后于leader的條數(shù)超過預定值,這個預定值可在server.properties中通過replica.lag.max.messages配置,其默認值是4000。"超過一定時間沒有向leader發(fā)送同步請求",這個"一定時間"可以在server.properties中通過replica.lag.time.max.ms來配置,其默認值是10000,默認情況下,當leader發(fā)生故障時,只有 ISR 集合中的follower副本才有資格被選舉為新的leader,而在 OSR 集合中的副本則沒有任何機會(不過這個可以通過配置來改變)。
3、HW
HW (High Watermark)俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能消費HW之前的消息。
下圖表示一個日志文件,這個日志文件中有9條消息,第一條消息的offset為0,最后一條消息的offset為8,虛線表示的offset為9的消息,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。
4、LEO
LEO (Log End Offset),標識當前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當前日志文件的 LEO,分區(qū) ISR 集合中的每個副本都會維護自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW(你品,你細品...),對消費者而言只能消費 HW 之前的消息。
5、 ISR 集合和 HW、LEO的關系
producer在發(fā)布消息到partition時,只會與該partition的leader發(fā)生交互將消息發(fā)送給leader,leader會將該消息寫入其本地log,每個follower都從leader上pull數(shù)據(jù)做同步備份,follower在pull到該消息并寫入其log后,會向leader發(fā)送ack,一旦leader收到了ISR中的所有follower的ack(只關注ISR中的所有follower,不考慮OSR,一定程度上提升了吞吐),該消息就被認為已經(jīng)commit了,leader將增加HW,然后向producer發(fā)送ack。
也就是說,在ISR中所有的follower還沒有完成數(shù)據(jù)備份之前,leader不會增加HW,也就是這條消息暫時還不能被消費者消費,只有當ISR中所有的follower都備份完成后,leader才會將HW后移。
ISR集合中LEO最小的副本,即同步數(shù)據(jù)同步的最慢的一個,這個最慢副本的LEO即leader的HW,消費者只能消費HW之前的消息。

02kafka HA

Tips:我們說的副本包括leader和follower,都叫副本,不要認為叫副本說的就是follower。
kafka在0.8以前的版本中是沒有分區(qū)副本的概念的,一旦某一個broker宕機,這個broker上的所有分區(qū)都將不可用。在0.8版本以后,引入了分區(qū)副本的概念,同一個partition可以有多個副本,在多個副本中會選出一個做leader,其余的作為follower,只有l(wèi)eader對外提供讀寫服務,follower只負責從leader上同步拉取數(shù)據(jù),已保障高可用。
1、partition副本的分配策略
每個topic有多個partition,每個partition有多個副本,這些partition副本分布在不同的broker上,以保障高可用,那么這些partition副本是怎么均勻的分布到集群中的每個broker上的呢?
※ kafka分配partition副本的算法如下,
① 將所有的broker(假設總共n個broker)和 待分配的partition排序;
② 將第i個partition分配到第(i mod n)個broker上;
③ 第i個partition的第j個副本分配到第((i+j) mod n)個broker上;
2、kafka的消息傳遞備份策略
生產(chǎn)者將消息發(fā)送給分區(qū)的leader,leader會將該消息寫入其本地log,然后每個follower都會從leader pull數(shù)據(jù),follower pull到該消息并將其寫入log后,會向leader發(fā)送ack,當leader收到了ISR集合中所有follower的ack后,就認為這條消息已經(jīng)commit了,leader將增加HW并且向生產(chǎn)者返回ack。在整個流程中,follower也可以批量的從leader復制數(shù)據(jù),以提升復制性能。
producer在發(fā)送消息的時候,可指定參數(shù)acks,表示"在生產(chǎn)者認為發(fā)送請求完成之前,有多少分區(qū)副本必須接收到數(shù)據(jù)",有三個可選值,0、1、all(或-1),默認為1,
  • acks=0,表示producer只管發(fā),只要發(fā)出去就認為發(fā)發(fā)送請求完成了,不管leader有沒有收到,更不管follower有沒有備份完成。
  • acks=1,表示只要leader收到消息,并將其寫入自己log后,就會返回給producer ack,不考慮follower有沒有備份完成。
  • acks=all(或-1),表示不僅要leader收到消息寫入本地log,還要等所有ISR集合中的follower都備份完成后,producer才認為發(fā)送成功。
實際上,為了提高性能,follower在pull到消息將其保存到內(nèi)存中而尚未寫入磁盤時,就會向leader發(fā)送ack,所以也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費。
3、kafka中的Leader選舉
面試官在考查你kafka知識的時候如果問你:kafka中的選舉是怎么回事?而不說具體哪種選舉,那這個面試官可能對kafka也是一知半解,這個時候就是"弄死"他的時候了,當然如果你沒有一定的知識儲備,那么就是你被"弄死"的時候。
因為kafka中涉及到選舉的地方有多處,最常提及的也有:①cotroller選舉 、 ②分區(qū)leader選舉 和 ③consumer group leader的選舉。我們在前面說過同一個partition有多個副本,其中一個副本作為leader,其余的作為follower。這里我們再說一個角色:controller!kafka集群中多個broker,有一個會被選舉為controller,注意區(qū)分兩者,一個是broker的leader,我們稱為controller,一個是分區(qū)副本的leader,我們稱為leader。
① controller的選舉【broker的leader】
controller的選舉是通過broker在zookeeper的"/controller"節(jié)點下創(chuàng)建臨時節(jié)點來實現(xiàn)的,并在該節(jié)點中寫入當前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} ,利用zookeeper的強一致性特性,一個節(jié)點只能被一個客戶端創(chuàng)建成功,創(chuàng)建成功的broker即為controller,即"先到先得"。?
當controller宕機或者和zookeeper失去連接時,zookeeper檢測不到心跳,zookeeper上的臨時節(jié)點會被刪除,而其它broker會監(jiān)聽臨時節(jié)點的變化,當節(jié)點被刪除時,其它broker會收到通知,重新發(fā)起controller選舉。
② leader的選舉【分區(qū)副本的leader】
分區(qū)leader的選舉由 controller 負責管理和實施,當leader發(fā)生故障時,controller會將leader的改變直接通過RPC的方式通知需要為此作出響應的broker,需要為此作出響應的broker即該分區(qū)的ISR集合中follower所在的broker,kafka在zookeeper中動態(tài)維護了一個ISR,只有ISR里的follower才有被選為Leader的可能。
具體過程是這樣的:按照AR集合中副本的順序 查找到 第一個 存活的、并且屬于ISR集合的 副本作為新的leader。一個分區(qū)的AR集合在創(chuàng)建分區(qū)副本的時候就被指定,只要不發(fā)生重分配的情況,AR集合內(nèi)部副本的順序是保持不變的,而分區(qū)的ISR集合上面說過因為同步滯后等原因可能會改變,所以注意這里是根據(jù)AR的順序而不是ISR的順序找。
※ 對于上面描述的過程我們假設一種極端的情況,如果partition的所有副本都不可用時,怎么辦?這種情況下kafka提供了兩種可行的方案:
1、選擇 ISR中 第一個活過來的副本作為Leader;
2、選擇第一個活過來的副本(不一定是ISR中的)作為Leader;
這就需要在可用性和數(shù)據(jù)一致性當中做出選擇,如果一定要等待ISR中的副本活過來,那不可用的時間可能會相對較長。選擇第一個活過來的副本作為Leader,如果這個副本不在ISR中,那數(shù)據(jù)的一致性則難以保證。kafka支持用戶通過配置選擇,以根據(jù)業(yè)務場景在可用性和數(shù)據(jù)一致性之間做出權衡。
③消費組leader的選舉
組協(xié)調(diào)器會為消費組(consumer group)內(nèi)的所有消費者選舉出一個leader,這個選舉的算法也很簡單,第一個加入consumer group的consumer即為leader,如果某一時刻leader消費者退出了消費組,那么會重新 隨機 選舉一個新的leader。

03kafka架構(gòu)中zookeeper的結(jié)構(gòu)

1、查看方式
我們知道,kafka是基于zookeeper協(xié)調(diào)管理的,那么zookeeper中究竟存儲了哪些信息?另外在后面分析 broker宕機 和 controller宕機 時,我們也需要先了解zookeeper的目錄結(jié)構(gòu),所以我們先學習一下怎么查看zookeeper的目錄結(jié)構(gòu)?
① 首先啟動zookeeper客戶端連接zk服務
# cd /usr/local/zookeeper-cluster/zk1/bin
# ./zkCli.sh
② 查看zk根節(jié)點的子目錄
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
③ 可以看到zk根節(jié)點下有很多子目錄,以brokers為例,查看brokers的層級結(jié)構(gòu)
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.80.219:9092"],"jmx_port":-1,"host":"172.17.80.219","timestamp":"1584267365984","port":9092,"version":4}
cZxid = 0x300000535
ctime = Sun Mar 15 18:16:06 CST 2020
mZxid = 0x300000535
mtime = Sun Mar 15 18:16:06 CST 2020
pZxid = 0x300000535
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x20191d7053f0009
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 4] 
[zk: localhost:2181(CONNECTED) 4]
[zk: localhost:2181(CONNECTED) 4]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, first]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/first
[partitions]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/first/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/first/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/first/partitions/0/state
{"controller_epoch":21,"leader":0,"version":1,"leader_epoch":8,"isr":[0]}
cZxid = 0x3000003e9
ctime = Sun Mar 08 16:24:37 CST 2020
mZxid = 0x3000005cb
mtime = Sun Mar 15 18:54:09 CST 2020
pZxid = 0x3000003e9
cversion = 0
dataVersion = 10
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
[zk: localhost:2181(CONNECTED) 9]
可以看到,brokers下包括[ids, topics, seqid],ids里面存儲了存活的broker的信息,topics里面存儲了kafka集群中topic的信息。同樣的方法,可以查看其余節(jié)點的結(jié)構(gòu),這里不再演示。
2、節(jié)點信息(這里只列出和HA相關的部分節(jié)點)
① controller
controller節(jié)點下存放的是kafka集群中controller的信息(controller即kafka集群中所有broker的leader)。
② controller_epoch
controller_epoch用于記錄controller發(fā)生變更的次數(shù)(controller宕機后會重新選舉controller,這時候controller_epoch的值會+1),即記錄當前的控制器是第幾代控制器,用于防止broker腦裂。
③ brokes
brokers下的ids存儲了存活的broker信息,topics存儲了kafka集群中topic的信息,其中有一個特殊的topic:_consumer_offsets,新版本的kafka將消費者的offset就存儲在__consumer_offsets下。

04broker failover

我們了解了kafka集群中zookpeeper的結(jié)構(gòu),本文的主題是kafka的高可用分析,所以我們還是結(jié)合zookpper的結(jié)構(gòu),來分析一下,當kafka集群中的一個broker節(jié)點宕機時(非controller節(jié)點),會發(fā)生什么?
在講之前,我們再來回顧一下brokers的結(jié)構(gòu),
※ 當非controller的broker宕機時,會執(zhí)行如下操作,
1、controller會在zookeeper的 " /brokers/ids/" 節(jié)點注冊一個watcher(監(jiān)視器),當有broker宕機時,zookeeper會觸發(fā)監(jiān)視器(fire watch)通知controller。
2、controller 從 "/brokers/ids" 節(jié)點讀取到所有可用的broker。
3、controller會聲明一個set_p集合,該集合包含了宕機broker上所有的partition。
4、針對set_p中的每一個partition,
① 從 "/state"節(jié)點 讀取該partition當前的ISR;
② 決定該partition的新leader:如果該分區(qū)的 ISR中有存活的副本,則選擇其中一個作為新leader;如果該partition的ISR副本全部掛了,則選擇該partition的 AR集合 中任一幸存的副本作為leader;如果該partition的所有副本都掛,則將分區(qū)的leader設為-1;
③ 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點;
5、通過RPC向set_p相關的broker發(fā)送LeaderAndISR Request命令。

05 controller failover

當 controller 宕機時會觸發(fā) controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節(jié)點注冊 watcher(監(jiān)聽器),當 controller 宕機時 zookeeper 中的臨時節(jié)點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創(chuàng)建新的臨時節(jié)點,只有一個會創(chuàng)建成功并當選為 controller。
當新的 controller 當選時,會回調(diào)KafkaController的onControllerFailover()方法,在這個方法中完成controller的初始化,controller 在初始化時,首先會利用 ZK 的 watch 機制注冊很多不同類型的監(jiān)聽器,主要有以下幾種:
  • 監(jiān)聽 /admin/reassign_partitions 節(jié)點,用于分區(qū)副本遷移的監(jiān)聽;
  • 監(jiān)聽 /isr_change_notification 節(jié)點,用于 Partition Isr 變動的監(jiān)聽;
  • 監(jiān)聽 /admin/preferred_replica_election 節(jié)點,用于 Partition 最優(yōu) leader 選舉的監(jiān)聽;
  • 監(jiān)聽 /brokers/topics 節(jié)點,用于 topic 新建的監(jiān)聽;
  • 監(jiān)聽 /brokers/topics/TOPIC_NAME 節(jié)點,用于 Topic Partition 擴容的監(jiān)聽;
  • 監(jiān)聽 /admin/delete_topics 節(jié)點,用于 topic 刪除的監(jiān)聽;
  • 監(jiān)聽 /brokers/ids 節(jié)點,用于 Broker 上下線的監(jiān)聽;
除了注冊多種監(jiān)聽器外,controller初始化時還做以下操作,
  • initializeControllerContext()
初始化controller上下文,設置當前所有broker、topic、partition的leader、ISR等;
  • replicaStateMachine.startup()
  • partitionStateMachine.startup()
啟動狀態(tài)機;
  • brokerState.newState(RunningAsController)
將 brokerState 狀態(tài)設置為 RunningAsController;
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
把partition leadership信息發(fā)到所有brokers;
  • autoRebalanceScheduler.startup()
如果打開了autoLeaderRebalance,則啟動"partition-rebalance-thread"線程;
  • deleteTopicManager.start()
如果delete.topic.enable=true,且 /admin/delete_topics 節(jié)點下有值,則刪除相應的topic;
最后,把onControllerFailover()方法的源碼貼一下,上面說的這些操作就是在這個方法中完成的,感興趣的可以再去看下kafka源碼,
def onControllerFailover() {
    if (isRunning) {
        info("Broker %d starting become controller state transition".format(config.brokerId))
        //read controller epoch from zk
        readControllerEpochFromZookeeper()
        // increment the controller epoch
        incrementControllerEpoch(zkUtils.zkClient)
        // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
        registerReassignedPartitionsListener()
        registerIsrChangeNotificationListener()
        registerPreferredReplicaElectionListener()
        partitionStateMachine.registerListeners()
        replicaStateMachine.registerListeners()
        initializeControllerContext()
        replicaStateMachine.startup()
        partitionStateMachine.startup()
        // register the partition change listeners for all existing topics on failover
        controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
        info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
        brokerState.newState(RunningAsController)
        maybeTriggerPartitionReassignment()
        maybeTriggerPreferredReplicaElection()
        /* send partition leadership info to all live brokers */
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
        if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
                5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
        }
        deleteTopicManager.start()
    }
    else
        info("Controller has been shut down, aborting startup/failover")
}

到此這篇關于秒懂 kafka HA(高可用)的文章就介紹到這了,更多相關kafka HA內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 詳解Java中while和do-while循環(huán)、break的使用

    詳解Java中while和do-while循環(huán)、break的使用

    本文介紹了循環(huán)結(jié)構(gòu)語句while和do-while循環(huán)、break的使用,while循環(huán)語句通過流程圖和語法語句結(jié)合一個求1~10的整數(shù)和的例子來幫助大家理解while循環(huán)的用法,感興趣的朋友跟隨小編來看看吧
    2020-11-11
  • MyBatis Example And與Or混合使用的實例

    MyBatis Example And與Or混合使用的實例

    這篇文章主要介紹了MyBatis Example And與Or混合使用的實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot項目讀取外置logback配置文件的問題及解決

    SpringBoot項目讀取外置logback配置文件的問題及解決

    SpringBoot項目讀取外置logback配置文件的問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • SpringBoot bean依賴屬性配置詳細介紹

    SpringBoot bean依賴屬性配置詳細介紹

    Spring容器是Spring的核心,一切SpringBean都存儲在Spring容器內(nèi)??梢哉fbean是spring核心中的核心。Bean配置信息定義了Bean的實現(xiàn)及依賴關系,這篇文章主要介紹了SpringBoot bean依賴屬性配置
    2022-09-09
  • java中常見的死鎖以及解決方法代碼

    java中常見的死鎖以及解決方法代碼

    這篇文章主要介紹了java中常見的死鎖以及解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03
  • 淺談SpringBoot如何正確攔截thymeleaf異常

    淺談SpringBoot如何正確攔截thymeleaf異常

    Thymeleaf是一個模板引擎工具,主要用于頁面渲染操作,本文主要介紹了淺談SpringBoot如何正確攔截thymeleaf異常,具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • SpringCloud集成Eureka并實現(xiàn)負載均衡的過程詳解

    SpringCloud集成Eureka并實現(xiàn)負載均衡的過程詳解

    這篇文章主要給大家詳細介紹了SpringCloud集成Eureka并實現(xiàn)負載均衡的過程,文章通過代碼示例和圖文講解的非常詳細,對大家的學習或工作有一定的參考價值,需要的朋友可以參考下
    2023-11-11
  • Java中匿名類的兩種實現(xiàn)方式

    Java中匿名類的兩種實現(xiàn)方式

    本文主要介紹了Java中匿名類的兩種實現(xiàn)方式。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-02-02
  • Java進行反編譯生成.java文件方式(javap、jad下載安裝使用)

    Java進行反編譯生成.java文件方式(javap、jad下載安裝使用)

    這篇文章主要介紹了Java進行反編譯生成.java文件方式(javap、jad下載安裝使用),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • 編程入門:掌握Java運算符技巧

    編程入門:掌握Java運算符技巧

    掌握Java運算符技巧,能讓你的編程之旅輕松許多,本指南將帶你深入了解如何巧妙地使用這些強大的工具,讓代碼不僅高效,還充滿樂趣,跟著我們一起,讓你的Java代碼在運算符的魔法下煥發(fā)新生!
    2023-12-12

最新評論