Kafka源碼系列教程之刪除topic
前言
Apache Kafka發(fā)源于LinkedIn,于2011年成為Apache的孵化項(xiàng)目,隨后于2012年成為Apache的主要項(xiàng)目之一。Kafka使用Scala和Java進(jìn)行編寫。Apache Kafka是一個(gè)快速、可擴(kuò)展的、高吞吐、可容錯(cuò)的分布式發(fā)布訂閱消息系統(tǒng)。Kafka具有高吞吐量、內(nèi)置分區(qū)、支持?jǐn)?shù)據(jù)副本和容錯(cuò)的特性,適合在大規(guī)模消息處理場(chǎng)景中使用。
本文依然是以kafka0.8.2.2為例講解
一,如何刪除一個(gè)topic
刪除一個(gè)topic有兩個(gè)關(guān)鍵點(diǎn):
1,配置刪除參數(shù)
delete.topic.enable這個(gè)Broker參數(shù)配置為True。
2,執(zhí)行
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
假如不配置刪除參數(shù)為true的話,topic其實(shí)并沒(méi)有被清除,只是被標(biāo)記為刪除。此時(shí),估計(jì)一般人的做法是刪除topic在Zookeeper的信息和日志,其實(shí)這個(gè)操作并不會(huì)清除kafkaBroker內(nèi)存的topic數(shù)據(jù)。所以,此時(shí)最佳的策略是配置刪除參數(shù)為true然后,重啟kafka。
二,重要的類介紹
1,PartitionStateMachine
該類代表分區(qū)的狀態(tài)機(jī)。決定者分區(qū)的當(dāng)前狀態(tài),和狀態(tài)轉(zhuǎn)移。四種狀態(tài)
- NonExistentPartition
- NewPartition
- OnlinePartition
- OfflinePartition
2,ReplicaManager
負(fù)責(zé)管理當(dāng)前機(jī)器的所有副本,處理讀寫、刪除等具體動(dòng)作。
讀寫:寫獲取partition對(duì)象,再獲取Replica對(duì)象,再獲取Log對(duì)象,采用其管理的Segment對(duì)象將數(shù)據(jù)寫入、讀出。
3,ReplicaStateMachine
副本的狀態(tài)機(jī)。決定者副本的當(dāng)前狀態(tài)和狀態(tài)之間的轉(zhuǎn)移。一個(gè)副本總共可以處于一下幾種狀態(tài)的一種
NewReplica:Crontroller在分區(qū)重分配的時(shí)候可以創(chuàng)建一個(gè)新的副本。只能接受變?yōu)閒ollower的請(qǐng)求。前狀態(tài)可以是NonExistentReplica
OnlineReplica:新啟動(dòng)的分區(qū),能接受變?yōu)閘eader或者follower請(qǐng)求。前狀態(tài)可以是NewReplica, OnlineReplica or OfflineReplica
OfflineReplica:死亡的副本處于這種狀態(tài)。前狀態(tài)可以是NewReplica, OnlineReplica
ReplicaDeletionStarted:分本刪除開(kāi)始的時(shí)候處于這種狀態(tài),前狀態(tài)是OfflineReplica
ReplicaDeletionSuccessful:副本刪除成功。前狀態(tài)是ReplicaDeletionStarted
ReplicaDeletionIneligible:刪除失敗的時(shí)候處于這種狀態(tài)。前狀態(tài)是ReplicaDeletionStarted
NonExistentReplica:副本成功刪除之后處于這種狀態(tài),前狀態(tài)是ReplicaDeletionSuccessful
4,TopicDeletionManager
該類管理著topic刪除的狀態(tài)機(jī)
1),TopicCommand通過(guò)創(chuàng)建/admin/delete_topics/<topic>,來(lái)發(fā)布topic刪除命令。
2),Controller監(jiān)聽(tīng)/admin/delete_topic子節(jié)點(diǎn)變動(dòng),開(kāi)始分別刪除topic
3),Controller有個(gè)后臺(tái)線程負(fù)責(zé)刪除Topic
三,源碼徹底解析topic的刪除過(guò)程
此處會(huì)分四個(gè)部分:
A),客戶端執(zhí)行刪除命令作用
B),不配置delete.topic.enable整個(gè)流水的源碼
C),配置了delete.topic.enable整個(gè)流水的源碼
D),手動(dòng)刪除zk上topic信息和磁盤數(shù)據(jù)
1,客戶端執(zhí)行刪除命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
進(jìn)入kafka-topics.sh我們會(huì)看到
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@
進(jìn)入TopicCommand里面,main方法里面
else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts)
實(shí)際內(nèi)容是
val topics = getTopics(zkClient, opts) if (topics.length == 0) { println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt))) } topics.foreach { topic => try { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
在"/admin/delete_topics"目錄下創(chuàng)建了一個(gè)topicName的節(jié)點(diǎn)。
2,假如不配置delete.topic.enable整個(gè)流水是
總共有兩處listener會(huì)響應(yīng):
A),TopicChangeListener
B),DeleteTopicsListener
使用topic的刪除命令刪除一個(gè)topic的話,指揮觸發(fā)DeleteTopicListener。
var topicsToBeDeleted = { import JavaConversions._ (children: Buffer[String]).toSet } val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) topicsToBeDeleted --= nonExistentTopics if(topicsToBeDeleted.size > 0) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } // add topic to deletion list controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) }
由于都會(huì)判斷delete.topic.enable是否為true,假如不為true就不會(huì)執(zhí)行,為true就進(jìn)入執(zhí)行
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
3,delete.topic.enable配置為true
此處與步驟2的區(qū)別,就是那兩個(gè)處理函數(shù)。
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
markTopicIneligibleForDeletion函數(shù)的處理為
if(isDeleteTopicEnabled) { val newTopicsToHaltDeletion = topicsToBeDeleted & topics topicsIneligibleForDeletion ++= newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size > 0) info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) }
主要是停止刪除topic,假如存儲(chǔ)以下三種情況
* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic
enqueueTopicsForDeletion主要作用是更新刪除topic的集合,并激活TopicDeleteThread
def enqueueTopicsForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { topicsToBeDeleted ++= topics partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() } }
在刪除線程DeleteTopicsThread的doWork方法中
topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completeDeleteTopic(topic) info("Deletion of topic %s successfully completed".format(topic)) }
進(jìn)入completeDeleteTopic方法中
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica) val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic) // move respective partition to OfflinePartition and NonExistentPartition state partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic)
主要作用是解除掉監(jiān)控分區(qū)變動(dòng)的listener,刪除Zookeeper具體節(jié)點(diǎn)信息,刪除磁盤數(shù)據(jù),更新內(nèi)存數(shù)據(jù)結(jié)構(gòu),比如從副本狀態(tài)機(jī)里面移除分區(qū)的具體信息。
其實(shí),最終要的是我們的副本磁盤數(shù)據(jù)是如何刪除的。我們重點(diǎn)介紹這個(gè)部分。
首次清除的話,在刪除線程DeleteTopicsThread的doWork方法中
{ // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // mark topic for deletion retry markTopicForDeletionRetry(topic) }
進(jìn)入markTopicForDeletionRetry
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" .format(topic, failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
在ReplicaStateMachine的handleStateChanges方法中,調(diào)用了handleStateChange,處理OfflineReplica
// send stop replica command to the replica so that it stops fetching from the leader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
接著在handleStateChanges中
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
給副本數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)發(fā)送StopReplicaKey副本指令,并開(kāi)始刪除數(shù)據(jù)
stopReplicaRequestMap foreach { case(broker, replicaInfoList) => val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet debug("The stop replica request (delete = true) sent to broker %d is %s" .format(broker, stopReplicaWithDelete.mkString(","))) debug("The stop replica request (delete = false) sent to broker %d is %s" .format(broker, stopReplicaWithoutDelete.mkString(","))) replicaInfoList.foreach { r => val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) controller.sendRequest(broker, stopReplicaRequest, r.callback) } } stopReplicaRequestMap.clear()
Broker的KafkaApis的Handle方法在接受到指令后
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
接著是在stopReplicas方法中
{ controllerEpoch = stopReplicaRequest.controllerEpoch // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition))) for(topicAndPartition <- stopReplicaRequest.partitions){ val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions) responseMap.put(topicAndPartition, errorCode) } (responseMap, ErrorMapping.NoError) }
進(jìn)一步進(jìn)入stopReplica方法,正式進(jìn)入日志刪除
getPartition(topic, partitionId) match { case Some(partition) => if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) if (removedPartition != null) removedPartition.delete() // this will delete the local log }
以上就是kafka的整個(gè)日志刪除流水。
4,手動(dòng)刪除zk上topic信息和磁盤數(shù)據(jù)
TopicChangeListener會(huì)監(jiān)聽(tīng)處理,但是處理很簡(jiǎn)單,只是更新了
val deletedTopics = controllerContext.allTopics -- currentChildren controllerContext.allTopics = currentChildren val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
四,總結(jié)
Kafka的topic的刪除過(guò)程,實(shí)際上就是基于Zookeeper做了一個(gè)訂閱發(fā)布系統(tǒng)。Zookeeper的客戶端創(chuàng)建一個(gè)節(jié)點(diǎn)/admin/delete_topics/<topic>,由kafka Controller監(jiān)聽(tīng)到事件之后正式觸發(fā)topic的刪除:解除Partition變更監(jiān)聽(tīng)的listener,清除內(nèi)存數(shù)據(jù)結(jié)構(gòu),刪除副本數(shù)據(jù),刪除topic的相關(guān)Zookeeper節(jié)點(diǎn)。
delete.topic.enable配置該參數(shù)為false的情況下執(zhí)行了topic的刪除命令,實(shí)際上未做任何動(dòng)作。我們此時(shí)要徹底刪除topic建議修改該參數(shù)為true,重啟kafka,這樣topic信息會(huì)被徹底刪除,已經(jīng)測(cè)試。
一般流行的做法是手動(dòng)刪除Zookeeper的topic相關(guān)信息及磁盤數(shù)據(jù)但是這樣的話會(huì)造成部分內(nèi)存數(shù)據(jù)未清除。至于是否會(huì)有隱患,未測(cè)試。
好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
簡(jiǎn)單聊聊工作中常用的Java?Lambda表達(dá)式
日常開(kāi)發(fā)中,我們很多時(shí)候需要用到Java?8的Lambda表達(dá)式,它允許把函數(shù)作為一個(gè)方法的參數(shù),讓我們的代碼更優(yōu)雅、更簡(jiǎn)潔。所以整理了一波工作中常用的Lambda表達(dá)式??赐暌欢〞?huì)有幫助的2022-11-11解析Hibernate + MySQL中文亂碼問(wèn)題
如果持久化的類中有包括了漢字的String對(duì)象,那么對(duì)應(yīng)到數(shù)據(jù)庫(kù)中漢字的部分就會(huì)是亂碼。這主要是由于MySQL數(shù)據(jù)表的字符集與我們當(dāng)前使用的本地字符集不相同造成的2013-07-07Java 通過(guò)位運(yùn)算求一個(gè)集合的所有子集方法
下面小編就為大家?guī)?lái)一篇Java 通過(guò)位運(yùn)算求一個(gè)集合的所有子集方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-03-03創(chuàng)建Jersey REST 服務(wù),基于Maven的實(shí)現(xiàn)
下面小編就為大家?guī)?lái)一篇?jiǎng)?chuàng)建Jersey REST 服務(wù),基于Maven的實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06