Apache?Kafka?分區(qū)重分配的實現(xiàn)原理解析
本文作者為中國移動云能力中心大數(shù)據(jù)團(tuán)隊軟件開發(fā)工程師孫大鵬,本文結(jié)合 2.0.0 版本的 Kafka 源碼,詳細(xì)介紹了 Kafka 分區(qū)副本重分配的流程和邏輯,供大家參考。
一、前言
Kafka 是由 Apache 軟件基金會開發(fā)的一個開源流處理平臺,旨在提供一個統(tǒng)一的、高吞吐、低延遲的實時數(shù)據(jù)處理平臺。其持久化層本質(zhì)上是一個“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊列”,這使它作為企業(yè)級基礎(chǔ)設(shè)施來處理流式數(shù)據(jù)非常有價值。
在 Kafka 中,用 topic 來對消息進(jìn)行分類,每個進(jìn)入到 Kafka 的信息都會被放到一個 topic 下,同時每個 topic 中的消息又可以分為若干 partition 以此來提高消息的處理效率。存儲消息數(shù)據(jù)的主機(jī)服務(wù)器被命名為 broker。通常為了保證數(shù)據(jù)的可靠性,數(shù)據(jù)是以多副本的形式保存在不同 broker 的不同磁盤上的。對于每一個 topic 的每一個 partition,如果多個副本之間完成了數(shù)據(jù)同步,保證了數(shù)據(jù)的一致性,則此時的多個副本所在的 broker 的集合稱為 Isr。同一時間,某個 topic 的某個 partition 的多個副本中僅有一個對外提供服務(wù),此時對外提供服務(wù)的 broker 被認(rèn)定為該 partition 的 leader,客戶端的請求都集中到 leader 上。
對于 2 副本 3 分區(qū)的 topic 其描述信息及存儲狀態(tài)如下所示:
test的描述信息: Topic:test PartitionCount:3 ReplicationFactor:2 Configs:min.insync.replic as=1 Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
test的副本分布
健康狀態(tài)的 Kafka 集群,對于每個 topic 的每個 partition,其 Isr 都應(yīng)該等于預(yù)期的副本集合(后面均已 Replicas 表示),但在實際場景中,不可避免的存在磁盤/主機(jī)故障,或者 由于某些原因需要將部分 broker 節(jié)點(diǎn)下線的情況,此時就需要將故障/要下線的 broker 從 Replicas 中移除。對此 Kafka 提供了 kafka-reassign-partitions 工具來進(jìn)行手動的分區(qū)副本遷移。
二、工具的使用
在 Kafka 的根路徑下,通過執(zhí)行如下命令,來完成分區(qū)副本的重分配:
./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute
其中:reassign‐topic.json 文件指定了分區(qū)副本的分布情況,示例如下:
{ "version": 1, "partitions": [ { "topic": "test", "partition": 2, "replicas": [ 2, 1 ], "log_dirs": [ "any", "any" ] } }
文件中指明了將 topic=test,partition=2 的分區(qū)的兩副本分別移動到 brokerId=2 和 brokerId=1 的節(jié)點(diǎn)的任意磁盤路徑上。
下面將結(jié)合 2.0.0 版本的 Kafka 源碼簡單的介紹下 Kafka 分區(qū)副本重分配的流程和邏輯。
三、元數(shù)據(jù)管理及協(xié)調(diào)器
在開始之前先簡單介紹下在 Kafka 分區(qū)副本重分配中涉及到的兩個概念:ZooKeeper 和 Kafka Controller。
3.1 ZooKeeper
Kafka 的元數(shù)據(jù),是存儲在 ZooKeeper 中的。Apache ZooKeeper 是一個提供高可靠性的分布式協(xié)調(diào)服務(wù)框架。它使用的數(shù)據(jù)模型類似于文件系統(tǒng)的樹形結(jié)構(gòu),根目錄也是以“/”開始。該結(jié)構(gòu)上的每個節(jié)點(diǎn)被稱為 znode,用來保存一些元數(shù)據(jù)協(xié)調(diào)信息。同時 ZooKeeper 賦予客戶端監(jiān)控 znode 變更的能力,即所謂的 Watch 通知功能。一旦 znode 節(jié)點(diǎn)被創(chuàng)建、刪除,子節(jié)點(diǎn)數(shù)量發(fā)生變化,或是 znode 所存的數(shù)據(jù)本身變更, ZooKeeper 會通過節(jié)點(diǎn)變更監(jiān)聽器 (ChangeHandler) 的方式顯式通知客戶端以便客戶端 觸發(fā)對應(yīng)的處理操作。
3.2 Kafka Controller
Kafka Controller 是 Apache Kafka 的核心組件,它的主要作用是在 Apache ZooKeeper 的幫助下管理和協(xié)調(diào)整個 Kafka 集群。集群中任意一臺 Broker 都能充當(dāng)控制器的角色,但是,在運(yùn)行過程中,只能有一個 Broker 成為控制器,行使其管理和協(xié)調(diào)的職責(zé)。
四、分區(qū)重分配流程分析
Kafka 的分區(qū)重分配就是在 client、broker 和 controller 的協(xié)同運(yùn)行下完成的。即:
1. 客戶端發(fā)起分區(qū)重分配任務(wù),在 ZooKeeper 中創(chuàng)建/admin/reassign_partitions 節(jié)點(diǎn),然 后向涉及的 broker 發(fā)送 alterReplicaLogDirs 請求
2. controller 監(jiān)測到 ZooKeeper 中/admin/reassign_partitions 的變化,觸發(fā) Kafka 分區(qū)元 數(shù)據(jù)的變更維護(hù)操作
3. broker 接收到客戶端發(fā)送的 alterReplicaLogDirs 請求,根據(jù)具體任務(wù)內(nèi)容在服務(wù)端實際完成分區(qū)副本移動
流程總結(jié)如下圖所示:
下面將針對這三部分分別展開介紹:
4.1 kafka-reassign-partitions 客戶端
分區(qū)重分配任務(wù)是由客戶端發(fā)起的,其入口主類為 ReassignPartitionsCommand.scala 中,調(diào)用 executeAssignment 方法??蛻舳说?executeAssignment 方法主要完成了如下操作:
1.解析 json 文件并進(jìn)行相關(guān)校驗
•讀取 json 文件內(nèi)容,校驗“partitions”的“version”,僅為 1 時,繼續(xù)執(zhí)行副本重分 配
•校驗分區(qū)副本數(shù)和副本數(shù)據(jù)路徑數(shù)是否一致
•校驗 partition/replica 是否為空/重復(fù)
2.檢查待重分配的分區(qū)在集群中是否存在(根據(jù) zk 中的/brokers/topics/${topic})
3.檢查確認(rèn)所有目標(biāo) broker 均在線(zk 中/brokers/ids 的子 znode 列表)
4.檢查是否已存在分區(qū)副本重分配任務(wù),如果已存在相關(guān)任務(wù),則退出
5.將分區(qū)重分配任務(wù)記錄到 zk 中,即在 zk 中創(chuàng)建/admin/reassign_partitions,以便 controller 可以發(fā)現(xiàn)并協(xié)調(diào) broker 進(jìn)行相關(guān)操作
6.根據(jù)解析的 json 內(nèi)容,逐個 topic 向相關(guān)的 broker 發(fā)送 alterReplicaLogDirs 請求
客戶端的處理邏輯可總結(jié)為如下流程圖:
4.2 controller 維護(hù)分區(qū)的元數(shù)據(jù)信息
在 controller 啟動時會創(chuàng)建 partitionReassignmentHandler,kafkaController 主線程回調(diào) onControllerFailover 時,檢測到/admin/reassign_partitions 發(fā)生變化時,觸發(fā)分區(qū)副本重分配操作,在 maybeTriggerPartitionReassignment 中通過調(diào)用 onPartitionReassignment 真正執(zhí)行分區(qū)副本重分配。在 onPartitionReassignment 中定 義了三個概念:
•RAR:指定的分區(qū)副本放置策略
•OAR:原始的分區(qū)副本放置策略
•AR:當(dāng)前的分區(qū)副本放置策略
onPartitionReassignment 的執(zhí)行過程可以總結(jié)為如下步驟:
檢查指定的分區(qū)副本是否處在 isr 中,如果不在則執(zhí)行以下前 3 步,否則直接執(zhí)行第 4 步
1.在 zk 中將 AR 更新為 RAR+OAR (/broker/topics/${topicName})
2.向所有副本(RAR+OAR)中發(fā)送 LeaderAndIsr 請求
3.將 RAR-OAR 的副本狀態(tài)置為 NewReplica,等待 NewReplica 中的數(shù)據(jù)與 leader 中的數(shù)據(jù) 完成同步
4.等待直到所有 RAR 中的副本完成與 leader 的同步
5.將所有 RAR 的副本置為 OnlineReplica 狀態(tài)
6.將 RAR 作為 AR
7.如果當(dāng)前的 leader 不在 RAR 中,發(fā)送 LeaderAndIsr Request 從 RAR 中選出一個新的 leader;如果當(dāng)前 leader 在 RAR 中,檢查 leader 狀態(tài),如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
8.將 OAR-RAR 的副本置為 Offline 狀態(tài)
9.將 OAR-RAR 的副本置為 NonExistentReplica 狀態(tài)(真實刪除對應(yīng)的分區(qū)副本)
10.將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}數(shù)據(jù)格式:{"version":1,"partitions":{"0":[${brokerId}]}})
11.更新 zk 中/admin/reassign_partitions 的值,將完成遷移的分區(qū)刪除
12.同步所有 broker,更新元數(shù)據(jù)信息
邏輯流程圖如下:
4.3 broker 端數(shù)據(jù)跨路徑遷移
底層數(shù)據(jù)跨路徑遷移,是由 broker 端完成的,broker 接收到客戶端發(fā)來的 ALTER_REPLICA_LOG_DIRS 請求后,調(diào)用 alterReplicaLogDirs 方法,相關(guān)流程如下:
1.確保目的路徑/待移動分區(qū)在線
2.如果當(dāng)前分區(qū)副本的 log 路徑不存在給定的目的路徑并且 futureLogs(用于跨路徑數(shù)據(jù)遷移的中間過程)也不包含目的路徑,則在內(nèi)存中記錄當(dāng)前分區(qū)副本和目的 logDir,即標(biāo)記那些需要進(jìn)行遷移的分區(qū)副本路徑
3.對于需要移動的分區(qū)副本,目的 broker 的路徑中創(chuàng)建 future Log
4.停止當(dāng)前 Log 的清理工作,等待 future Log 同步完再清理
5.創(chuàng)建 ReplicaAlterLogDirsThread,逐個 topic 逐個 partition 獲取 fetchOffset、 logStartOffset 、fetchSize 等數(shù)據(jù)構(gòu)造 Fetch 請求
6.通過 ReplicaManager.fetchMessages 從分區(qū)副本 leader 獲取數(shù)據(jù),完成數(shù)據(jù)同步
更詳細(xì)的處理流程如下圖所示:
五、總結(jié)
Kafka 分區(qū)重分配,通過 kafka-reassign-partitions 啟動任務(wù),將任務(wù)記錄在元數(shù)據(jù)管理器 ZooKeeper 中,Kafka controller 通過對 ZooKeeper 的監(jiān)測,發(fā)現(xiàn)相關(guān)任務(wù)通過和 broker 的交互按序處理相關(guān)的遷移任務(wù),同時 controller 實時維護(hù) ZooKeeper 中的元數(shù)據(jù)信息并進(jìn)行相關(guān)變化的記錄,保證在重分配過程中,不影響 topic 分區(qū)的正常使用,在任務(wù)完成后,再由 controller 負(fù)責(zé) ZooKeeper 中重分配任務(wù)標(biāo)記的清理,以便客戶端驗證重分配任務(wù)的結(jié)果。
到此這篇關(guān)于Apache Kafka 分區(qū)重分配的實現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)Apache Kafka 分區(qū)重分配內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Apache中利用mod_rewrite實現(xiàn)防盜鏈
自從上次在博客中推薦《you are my everything》以后,服務(wù)器的流量突然多了起來,有幾次甚至導(dǎo)致了VPS的當(dāng)機(jī)。后來經(jīng)過分析:盜鏈這個MP3的網(wǎng)頁包括諸如QQ空間、校內(nèi)網(wǎng)空間、更有甚者還放到了Taobao小店、個人博客也有不少,全部作為了背景音樂,并且導(dǎo)致各種爬蟲瘋狂抓取這個文件。找到了問題原因就只有一個辦法了,就是利用Apache的mod_rewrite模塊把盜鏈行為拒之門外。2008-04-04Linux命令學(xué)習(xí)總結(jié)之rmdir命令
這篇文章主要介紹了Linux命令學(xué)習(xí)總結(jié)之rmdir命令的相關(guān)資料,需要的朋友可以參考下2016-01-01增強(qiáng)Linux和Unix服務(wù)器安全性的方法詳解
今天小編就為大家分享一篇關(guān)于增強(qiáng)Linux和Unix服務(wù)器安全性的方法詳解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03如何通過其他主機(jī)查看Apahce服務(wù)器的運(yùn)行狀態(tài)
這篇文章主要介紹了如何通過其他主機(jī)查看Apahce服務(wù)器的運(yùn)行狀態(tài),需要的朋友可以參考下2016-04-04Ubuntu18.04.2下安裝 RTX2080 Nvidia顯卡驅(qū)動的方法
這篇文章主要介紹了Ubuntu18.04.2下安裝 RTX2080 Nvidia顯卡驅(qū)動的方法,本文圖文并茂給大家介紹的非常詳細(xì),具有一定的參考借鑒價值 ,需要的朋友可以參考下2019-07-07Ubuntu 16.04 LTS下安裝MATLAB 2014B的方法教程
這篇文章主要介紹了Ubuntu 16.04 LTS下安裝MATLAB 2014B的方法教程,文中介紹的很詳細(xì),相信對大家具有一定的參考價值,有需要的朋友們下面來一起看看吧。2017-02-02