一文秒懂?kafka?HA(高可用)
更新時間:2021年11月26日 14:25:01 作者:程序員耕耘
這篇文章主要介紹了秒懂?kafka?HA(高可用)的相關(guān)知識,本文我們來說一說和?kafka?高可用相關(guān)的一些策略,對kafka?HA相關(guān)知識感興趣的朋友一起看看吧
我們知道,kafka中每個topic被劃分為多個partition,每個partition又有多個副本,那么這些分區(qū)副本是怎么均勻的分布在整個kafka集群的broker節(jié)點上的?partition副本的leader是通過什么算法選舉出來的?partition副本的follower是怎么復(fù)制備份leader的數(shù)據(jù)的?本文我們就來說一說和 kafka 高可用相關(guān)的一些策略。
01名詞解釋
要想說明白kafka的HA機(jī)制,我們必須先搞明白幾個縮寫名詞,
1、AR、ISR、OSR
AR:Assigned Replicas,某分區(qū)的所有副本(這里所說的副本包括leader和follower)統(tǒng)稱為 AR。
ISR:In Sync Replicas,所有與leader副本保持"一定程度同步"的副本(包括leader副本在內(nèi))組成 ISR 。生產(chǎn)者發(fā)送消息時,只有l(wèi)eader與客戶端發(fā)生交互,follower只是同步備份leader的數(shù)據(jù),以保障高可用,所以生產(chǎn)者的消息會先發(fā)送到leader,然后follower才能從leader中拉取消息進(jìn)行同步,同步期間,follower的數(shù)據(jù)相對leader而言會有一定程度的滯后,前面所說的"一定程度同步"就是指可忍受的滯后范圍,這個范圍可以通過server.properties中的參數(shù)進(jìn)行配置。
OSR :Out-of-Sync Replied,在上面的描述中,相對leader滯后過多的follower將組成OSR 。
由此可見,AR = ISR + OSR,理想情況下,所有的follower副本都應(yīng)該與leader 保持一定程度的同步,即AR=ISR,OSR集合為空
2、ISR 的伸縮性
leader負(fù)責(zé)跟蹤維護(hù) ISR 集合中所有follower副本的滯后狀態(tài),當(dāng)follower副本"落后太多" 或 "follower超過一定時間沒有向leader發(fā)送同步請求"時,leader副本會把它從 ISR 集合中剔除。如果 OSR 集合中有follower副本"追上"了leader副本,那么leader副本會把它從 OSR 集合轉(zhuǎn)移至 ISR 集合。
上面描述的"落后太多"是指follower復(fù)制的消息落后于leader的條數(shù)超過預(yù)定值,這個預(yù)定值可在server.properties中通過replica.lag.max.messages配置,其默認(rèn)值是4000。"超過一定時間沒有向leader發(fā)送同步請求",這個"一定時間"可以在server.properties中通過replica.lag.time.max.ms來配置,其默認(rèn)值是10000,默認(rèn)情況下,當(dāng)leader發(fā)生故障時,只有 ISR 集合中的follower副本才有資格被選舉為新的leader,而在 OSR 集合中的副本則沒有任何機(jī)會(不過這個可以通過配置來改變)。
3、HW
HW (High Watermark)俗稱高水位,它標(biāo)識了一個特定的消息偏移量(offset),消費者只能消費HW之前的消息。
下圖表示一個日志文件,這個日志文件中有9條消息,第一條消息的offset為0,最后一條消息的offset為8,虛線表示的offset為9的消息,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。

4、LEO
LEO (Log End Offset),標(biāo)識當(dāng)前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,分區(qū) ISR 集合中的每個副本都會維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW(你品,你細(xì)品...),對消費者而言只能消費 HW 之前的消息。
5、 ISR 集合和 HW、LEO的關(guān)系
producer在發(fā)布消息到partition時,只會與該partition的leader發(fā)生交互將消息發(fā)送給leader,leader會將該消息寫入其本地log,每個follower都從leader上pull數(shù)據(jù)做同步備份,follower在pull到該消息并寫入其log后,會向leader發(fā)送ack,一旦leader收到了ISR中的所有follower的ack(只關(guān)注ISR中的所有follower,不考慮OSR,一定程度上提升了吞吐),該消息就被認(rèn)為已經(jīng)commit了,leader將增加HW,然后向producer發(fā)送ack。
也就是說,在ISR中所有的follower還沒有完成數(shù)據(jù)備份之前,leader不會增加HW,也就是這條消息暫時還不能被消費者消費,只有當(dāng)ISR中所有的follower都備份完成后,leader才會將HW后移。
ISR集合中LEO最小的副本,即同步數(shù)據(jù)同步的最慢的一個,這個最慢副本的LEO即leader的HW,消費者只能消費HW之前的消息。
02kafka HA
Tips:我們說的副本包括leader和follower,都叫副本,不要認(rèn)為叫副本說的就是follower。
kafka在0.8以前的版本中是沒有分區(qū)副本的概念的,一旦某一個broker宕機(jī),這個broker上的所有分區(qū)都將不可用。在0.8版本以后,引入了分區(qū)副本的概念,同一個partition可以有多個副本,在多個副本中會選出一個做leader,其余的作為follower,只有l(wèi)eader對外提供讀寫服務(wù),follower只負(fù)責(zé)從leader上同步拉取數(shù)據(jù),已保障高可用。
1、partition副本的分配策略
每個topic有多個partition,每個partition有多個副本,這些partition副本分布在不同的broker上,以保障高可用,那么這些partition副本是怎么均勻的分布到集群中的每個broker上的呢?
※ kafka分配partition副本的算法如下,
① 將所有的broker(假設(shè)總共n個broker)和 待分配的partition排序;
② 將第i個partition分配到第(i mod n)個broker上;
③ 第i個partition的第j個副本分配到第((i+j) mod n)個broker上;
2、kafka的消息傳遞備份策略
生產(chǎn)者將消息發(fā)送給分區(qū)的leader,leader會將該消息寫入其本地log,然后每個follower都會從leader pull數(shù)據(jù),follower pull到該消息并將其寫入log后,會向leader發(fā)送ack,當(dāng)leader收到了ISR集合中所有follower的ack后,就認(rèn)為這條消息已經(jīng)commit了,leader將增加HW并且向生產(chǎn)者返回ack。在整個流程中,follower也可以批量的從leader復(fù)制數(shù)據(jù),以提升復(fù)制性能。
producer在發(fā)送消息的時候,可指定參數(shù)acks,表示"在生產(chǎn)者認(rèn)為發(fā)送請求完成之前,有多少分區(qū)副本必須接收到數(shù)據(jù)",有三個可選值,0、1、all(或-1),默認(rèn)為1,
- acks=0,表示producer只管發(fā),只要發(fā)出去就認(rèn)為發(fā)發(fā)送請求完成了,不管leader有沒有收到,更不管follower有沒有備份完成。
- acks=1,表示只要leader收到消息,并將其寫入自己log后,就會返回給producer ack,不考慮follower有沒有備份完成。
- acks=all(或-1),表示不僅要leader收到消息寫入本地log,還要等所有ISR集合中的follower都備份完成后,producer才認(rèn)為發(fā)送成功。

實際上,為了提高性能,follower在pull到消息將其保存到內(nèi)存中而尚未寫入磁盤時,就會向leader發(fā)送ack,所以也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費。
3、kafka中的Leader選舉
面試官在考查你kafka知識的時候如果問你:kafka中的選舉是怎么回事?而不說具體哪種選舉,那這個面試官可能對kafka也是一知半解,這個時候就是"弄死"他的時候了,當(dāng)然如果你沒有一定的知識儲備,那么就是你被"弄死"的時候。
因為kafka中涉及到選舉的地方有多處,最常提及的也有:①cotroller選舉 、 ②分區(qū)leader選舉 和 ③consumer group leader的選舉。我們在前面說過同一個partition有多個副本,其中一個副本作為leader,其余的作為follower。這里我們再說一個角色:controller!kafka集群中多個broker,有一個會被選舉為controller,注意區(qū)分兩者,一個是broker的leader,我們稱為controller,一個是分區(qū)副本的leader,我們稱為leader。
① controller的選舉【broker的leader】
controller的選舉是通過broker在zookeeper的"/controller"節(jié)點下創(chuàng)建臨時節(jié)點來實現(xiàn)的,并在該節(jié)點中寫入當(dāng)前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} ,利用zookeeper的強(qiáng)一致性特性,一個節(jié)點只能被一個客戶端創(chuàng)建成功,創(chuàng)建成功的broker即為controller,即"先到先得"。?
當(dāng)controller宕機(jī)或者和zookeeper失去連接時,zookeeper檢測不到心跳,zookeeper上的臨時節(jié)點會被刪除,而其它broker會監(jiān)聽臨時節(jié)點的變化,當(dāng)節(jié)點被刪除時,其它broker會收到通知,重新發(fā)起controller選舉。
② leader的選舉【分區(qū)副本的leader】
分區(qū)leader的選舉由 controller 負(fù)責(zé)管理和實施,當(dāng)leader發(fā)生故障時,controller會將leader的改變直接通過RPC的方式通知需要為此作出響應(yīng)的broker,需要為此作出響應(yīng)的broker即該分區(qū)的ISR集合中follower所在的broker,kafka在zookeeper中動態(tài)維護(hù)了一個ISR,只有ISR里的follower才有被選為Leader的可能。
具體過程是這樣的:按照AR集合中副本的順序 查找到 第一個 存活的、并且屬于ISR集合的 副本作為新的leader。一個分區(qū)的AR集合在創(chuàng)建分區(qū)副本的時候就被指定,只要不發(fā)生重分配的情況,AR集合內(nèi)部副本的順序是保持不變的,而分區(qū)的ISR集合上面說過因為同步滯后等原因可能會改變,所以注意這里是根據(jù)AR的順序而不是ISR的順序找。
※ 對于上面描述的過程我們假設(shè)一種極端的情況,如果partition的所有副本都不可用時,怎么辦?這種情況下kafka提供了兩種可行的方案:
1、選擇 ISR中 第一個活過來的副本作為Leader;
2、選擇第一個活過來的副本(不一定是ISR中的)作為Leader;
這就需要在可用性和數(shù)據(jù)一致性當(dāng)中做出選擇,如果一定要等待ISR中的副本活過來,那不可用的時間可能會相對較長。選擇第一個活過來的副本作為Leader,如果這個副本不在ISR中,那數(shù)據(jù)的一致性則難以保證。kafka支持用戶通過配置選擇,以根據(jù)業(yè)務(wù)場景在可用性和數(shù)據(jù)一致性之間做出權(quán)衡。
③消費組leader的選舉
組協(xié)調(diào)器會為消費組(consumer group)內(nèi)的所有消費者選舉出一個leader,這個選舉的算法也很簡單,第一個加入consumer group的consumer即為leader,如果某一時刻leader消費者退出了消費組,那么會重新 隨機(jī) 選舉一個新的leader。
03kafka架構(gòu)中zookeeper的結(jié)構(gòu)
1、查看方式
我們知道,kafka是基于zookeeper協(xié)調(diào)管理的,那么zookeeper中究竟存儲了哪些信息?另外在后面分析 broker宕機(jī) 和 controller宕機(jī) 時,我們也需要先了解zookeeper的目錄結(jié)構(gòu),所以我們先學(xué)習(xí)一下怎么查看zookeeper的目錄結(jié)構(gòu)?
① 首先啟動zookeeper客戶端連接zk服務(wù)
# cd /usr/local/zookeeper-cluster/zk1/bin # ./zkCli.sh
② 查看zk根節(jié)點的子目錄
[zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
③ 可以看到zk根節(jié)點下有很多子目錄,以brokers為例,查看brokers的層級結(jié)構(gòu)
[zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.80.219:9092"],"jmx_port":-1,"host":"172.17.80.219","timestamp":"1584267365984","port":9092,"version":4} cZxid = 0x300000535 ctime = Sun Mar 15 18:16:06 CST 2020 mZxid = 0x300000535 mtime = Sun Mar 15 18:16:06 CST 2020 pZxid = 0x300000535 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x20191d7053f0009 dataLength = 196 numChildren = 0 [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics [__consumer_offsets, first] [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/first [partitions] [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/first/partitions [0, 1] [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/first/partitions/0 [state] [zk: localhost:2181(CONNECTED) 8] get /brokers/topics/first/partitions/0/state {"controller_epoch":21,"leader":0,"version":1,"leader_epoch":8,"isr":[0]} cZxid = 0x3000003e9 ctime = Sun Mar 08 16:24:37 CST 2020 mZxid = 0x3000005cb mtime = Sun Mar 15 18:54:09 CST 2020 pZxid = 0x3000003e9 cversion = 0 dataVersion = 10 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 73 numChildren = 0 [zk: localhost:2181(CONNECTED) 9]
可以看到,brokers下包括[ids, topics, seqid],ids里面存儲了存活的broker的信息,topics里面存儲了kafka集群中topic的信息。同樣的方法,可以查看其余節(jié)點的結(jié)構(gòu),這里不再演示。
2、節(jié)點信息(這里只列出和HA相關(guān)的部分節(jié)點)
① controller
controller節(jié)點下存放的是kafka集群中controller的信息(controller即kafka集群中所有broker的leader)。
② controller_epoch
controller_epoch用于記錄controller發(fā)生變更的次數(shù)(controller宕機(jī)后會重新選舉controller,這時候controller_epoch的值會+1),即記錄當(dāng)前的控制器是第幾代控制器,用于防止broker腦裂。
③ brokes
brokers下的ids存儲了存活的broker信息,topics存儲了kafka集群中topic的信息,其中有一個特殊的topic:_consumer_offsets,新版本的kafka將消費者的offset就存儲在__consumer_offsets下。
04broker failover
我們了解了kafka集群中zookpeeper的結(jié)構(gòu),本文的主題是kafka的高可用分析,所以我們還是結(jié)合zookpper的結(jié)構(gòu),來分析一下,當(dāng)kafka集群中的一個broker節(jié)點宕機(jī)時(非controller節(jié)點),會發(fā)生什么?
在講之前,我們再來回顧一下brokers的結(jié)構(gòu),

※ 當(dāng)非controller的broker宕機(jī)時,會執(zhí)行如下操作,
1、controller會在zookeeper的 " /brokers/ids/" 節(jié)點注冊一個watcher(監(jiān)視器),當(dāng)有broker宕機(jī)時,zookeeper會觸發(fā)監(jiān)視器(fire watch)通知controller。
2、controller 從 "/brokers/ids" 節(jié)點讀取到所有可用的broker。
3、controller會聲明一個set_p集合,該集合包含了宕機(jī)broker上所有的partition。
4、針對set_p中的每一個partition,
① 從 "/state"節(jié)點 讀取該partition當(dāng)前的ISR;
② 決定該partition的新leader:如果該分區(qū)的 ISR中有存活的副本,則選擇其中一個作為新leader;如果該partition的ISR副本全部掛了,則選擇該partition的 AR集合 中任一幸存的副本作為leader;如果該partition的所有副本都掛,則將分區(qū)的leader設(shè)為-1;
③ 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點;
5、通過RPC向set_p相關(guān)的broker發(fā)送LeaderAndISR Request命令。
05 controller failover
當(dāng) controller 宕機(jī)時會觸發(fā) controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節(jié)點注冊 watcher(監(jiān)聽器),當(dāng) controller 宕機(jī)時 zookeeper 中的臨時節(jié)點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創(chuàng)建新的臨時節(jié)點,只有一個會創(chuàng)建成功并當(dāng)選為 controller。
當(dāng)新的 controller 當(dāng)選時,會回調(diào)KafkaController的onControllerFailover()方法,在這個方法中完成controller的初始化,controller 在初始化時,首先會利用 ZK 的 watch 機(jī)制注冊很多不同類型的監(jiān)聽器,主要有以下幾種:
- 監(jiān)聽 /admin/reassign_partitions 節(jié)點,用于分區(qū)副本遷移的監(jiān)聽;
- 監(jiān)聽 /isr_change_notification 節(jié)點,用于 Partition Isr 變動的監(jiān)聽;
- 監(jiān)聽 /admin/preferred_replica_election 節(jié)點,用于 Partition 最優(yōu) leader 選舉的監(jiān)聽;
- 監(jiān)聽 /brokers/topics 節(jié)點,用于 topic 新建的監(jiān)聽;
- 監(jiān)聽 /brokers/topics/TOPIC_NAME 節(jié)點,用于 Topic Partition 擴(kuò)容的監(jiān)聽;
- 監(jiān)聽 /admin/delete_topics 節(jié)點,用于 topic 刪除的監(jiān)聽;
- 監(jiān)聽 /brokers/ids 節(jié)點,用于 Broker 上下線的監(jiān)聽;
除了注冊多種監(jiān)聽器外,controller初始化時還做以下操作,
- initializeControllerContext()
初始化controller上下文,設(shè)置當(dāng)前所有broker、topic、partition的leader、ISR等;
- replicaStateMachine.startup()
- partitionStateMachine.startup()
啟動狀態(tài)機(jī);
- brokerState.newState(RunningAsController)
將 brokerState 狀態(tài)設(shè)置為 RunningAsController;
- sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
把partition leadership信息發(fā)到所有brokers;
- autoRebalanceScheduler.startup()
如果打開了autoLeaderRebalance,則啟動"partition-rebalance-thread"線程;
- deleteTopicManager.start()
如果delete.topic.enable=true,且 /admin/delete_topics 節(jié)點下有值,則刪除相應(yīng)的topic;
最后,把onControllerFailover()方法的源碼貼一下,上面說的這些操作就是在這個方法中完成的,感興趣的可以再去看下kafka源碼,
def onControllerFailover() { if (isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) //read controller epoch from zk readControllerEpochFromZookeeper() // increment the controller epoch incrementControllerEpoch(zkUtils.zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
到此這篇關(guān)于秒懂 kafka HA(高可用)的文章就介紹到這了,更多相關(guān)kafka HA內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Java中while和do-while循環(huán)、break的使用
本文介紹了循環(huán)結(jié)構(gòu)語句while和do-while循環(huán)、break的使用,while循環(huán)語句通過流程圖和語法語句結(jié)合一個求1~10的整數(shù)和的例子來幫助大家理解while循環(huán)的用法,感興趣的朋友跟隨小編來看看吧2020-11-11SpringBoot項目讀取外置logback配置文件的問題及解決
SpringBoot項目讀取外置logback配置文件的問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08SpringCloud集成Eureka并實現(xiàn)負(fù)載均衡的過程詳解
這篇文章主要給大家詳細(xì)介紹了SpringCloud集成Eureka并實現(xiàn)負(fù)載均衡的過程,文章通過代碼示例和圖文講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的參考價值,需要的朋友可以參考下2023-11-11Java進(jìn)行反編譯生成.java文件方式(javap、jad下載安裝使用)
這篇文章主要介紹了Java進(jìn)行反編譯生成.java文件方式(javap、jad下載安裝使用),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12