欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka源碼系列教程之刪除topic

 更新時(shí)間:2018年08月19日 15:15:07   作者:浪尖  
這篇文章主要給大家介紹了關(guān)于Kafka源碼系列教程之刪除topic的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前言

Apache Kafka發(fā)源于LinkedIn,于2011年成為Apache的孵化項(xiàng)目,隨后于2012年成為Apache的主要項(xiàng)目之一。Kafka使用Scala和Java進(jìn)行編寫。Apache Kafka是一個(gè)快速、可擴(kuò)展的、高吞吐、可容錯(cuò)的分布式發(fā)布訂閱消息系統(tǒng)。Kafka具有高吞吐量、內(nèi)置分區(qū)、支持?jǐn)?shù)據(jù)副本和容錯(cuò)的特性,適合在大規(guī)模消息處理場(chǎng)景中使用。

本文依然是以kafka0.8.2.2為例講解

一,如何刪除一個(gè)topic

刪除一個(gè)topic有兩個(gè)關(guān)鍵點(diǎn):

1,配置刪除參數(shù)

delete.topic.enable這個(gè)Broker參數(shù)配置為True。

2,執(zhí)行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置刪除參數(shù)為true的話,topic其實(shí)并沒(méi)有被清除,只是被標(biāo)記為刪除。此時(shí),估計(jì)一般人的做法是刪除topic在Zookeeper的信息和日志,其實(shí)這個(gè)操作并不會(huì)清除kafkaBroker內(nèi)存的topic數(shù)據(jù)。所以,此時(shí)最佳的策略是配置刪除參數(shù)為true然后,重啟kafka。

二,重要的類介紹

1,PartitionStateMachine

該類代表分區(qū)的狀態(tài)機(jī)。決定者分區(qū)的當(dāng)前狀態(tài),和狀態(tài)轉(zhuǎn)移。四種狀態(tài)

  • NonExistentPartition
  • NewPartition
  • OnlinePartition
  • OfflinePartition

2,ReplicaManager

負(fù)責(zé)管理當(dāng)前機(jī)器的所有副本,處理讀寫、刪除等具體動(dòng)作。

讀寫:寫獲取partition對(duì)象,再獲取Replica對(duì)象,再獲取Log對(duì)象,采用其管理的Segment對(duì)象將數(shù)據(jù)寫入、讀出。

3,ReplicaStateMachine

副本的狀態(tài)機(jī)。決定者副本的當(dāng)前狀態(tài)和狀態(tài)之間的轉(zhuǎn)移。一個(gè)副本總共可以處于一下幾種狀態(tài)的一種
NewReplica:Crontroller在分區(qū)重分配的時(shí)候可以創(chuàng)建一個(gè)新的副本。只能接受變?yōu)閒ollower的請(qǐng)求。前狀態(tài)可以是NonExistentReplica

OnlineReplica:新啟動(dòng)的分區(qū),能接受變?yōu)閘eader或者follower請(qǐng)求。前狀態(tài)可以是NewReplica, OnlineReplica or OfflineReplica

OfflineReplica:死亡的副本處于這種狀態(tài)。前狀態(tài)可以是NewReplica, OnlineReplica

ReplicaDeletionStarted:分本刪除開(kāi)始的時(shí)候處于這種狀態(tài),前狀態(tài)是OfflineReplica

ReplicaDeletionSuccessful:副本刪除成功。前狀態(tài)是ReplicaDeletionStarted

ReplicaDeletionIneligible:刪除失敗的時(shí)候處于這種狀態(tài)。前狀態(tài)是ReplicaDeletionStarted

NonExistentReplica:副本成功刪除之后處于這種狀態(tài),前狀態(tài)是ReplicaDeletionSuccessful

4,TopicDeletionManager

該類管理著topic刪除的狀態(tài)機(jī)

1),TopicCommand通過(guò)創(chuàng)建/admin/delete_topics/<topic>,來(lái)發(fā)布topic刪除命令。

2),Controller監(jiān)聽(tīng)/admin/delete_topic子節(jié)點(diǎn)變動(dòng),開(kāi)始分別刪除topic

3),Controller有個(gè)后臺(tái)線程負(fù)責(zé)刪除Topic

三,源碼徹底解析topic的刪除過(guò)程

此處會(huì)分四個(gè)部分:

A),客戶端執(zhí)行刪除命令作用

B),不配置delete.topic.enable整個(gè)流水的源碼

C),配置了delete.topic.enable整個(gè)流水的源碼

D),手動(dòng)刪除zk上topic信息和磁盤數(shù)據(jù)

1,客戶端執(zhí)行刪除命令

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

進(jìn)入kafka-topics.sh我們會(huì)看到

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

進(jìn)入TopicCommand里面,main方法里面

else if(opts.options.has(opts.deleteOpt))
 deleteTopic(zkClient, opts)

實(shí)際內(nèi)容是

val topics = getTopics(zkClient, opts)
if (topics.length == 0) {
 println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt)))
}
topics.foreach { topic =>
 try {
 ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))

在"/admin/delete_topics"目錄下創(chuàng)建了一個(gè)topicName的節(jié)點(diǎn)。

2,假如不配置delete.topic.enable整個(gè)流水是

總共有兩處listener會(huì)響應(yīng):

A),TopicChangeListener

B),DeleteTopicsListener

使用topic的刪除命令刪除一個(gè)topic的話,指揮觸發(fā)DeleteTopicListener。

var topicsToBeDeleted = {
 import JavaConversions._
 (children: Buffer[String]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
 info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
 // mark topic ineligible for deletion if other state changes are in progress
 topicsToBeDeleted.foreach { topic =>
 val preferredReplicaElectionInProgress =
  controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
 val partitionReassignmentInProgress =
  controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
 if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
  controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
 }
 // add topic to deletion list 
 controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}

由于都會(huì)判斷delete.topic.enable是否為true,假如不為true就不會(huì)執(zhí)行,為true就進(jìn)入執(zhí)行

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

3,delete.topic.enable配置為true

此處與步驟2的區(qū)別,就是那兩個(gè)處理函數(shù)。

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

markTopicIneligibleForDeletion函數(shù)的處理為

if(isDeleteTopicEnabled) {
 val newTopicsToHaltDeletion = topicsToBeDeleted & topics
 topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
 if(newTopicsToHaltDeletion.size > 0)
 info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
}

主要是停止刪除topic,假如存儲(chǔ)以下三種情況

* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion主要作用是更新刪除topic的集合,并激活TopicDeleteThread

def enqueueTopicsForDeletion(topics: Set[String]) {
 if(isDeleteTopicEnabled) {
 topicsToBeDeleted ++= topics
 partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
 resumeTopicDeletionThread()
 }
}

在刪除線程DeleteTopicsThread的doWork方法中

topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
 if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
 // clear up all state for this topic from controller cache and zookeeper
 completeDeleteTopic(topic)
 info("Deletion of topic %s successfully completed".format(topic))
 }

進(jìn)入completeDeleteTopic方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)

主要作用是解除掉監(jiān)控分區(qū)變動(dòng)的listener,刪除Zookeeper具體節(jié)點(diǎn)信息,刪除磁盤數(shù)據(jù),更新內(nèi)存數(shù)據(jù)結(jié)構(gòu),比如從副本狀態(tài)機(jī)里面移除分區(qū)的具體信息。

其實(shí),最終要的是我們的副本磁盤數(shù)據(jù)是如何刪除的。我們重點(diǎn)介紹這個(gè)部分。

首次清除的話,在刪除線程DeleteTopicsThread的doWork方法中

{
 // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
 // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
 // or there is at least one failed replica (which means topic deletion should be retried).
 if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
 // mark topic for deletion retry
 markTopicForDeletionRetry(topic)
 }

進(jìn)入markTopicForDeletionRetry

val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
 .format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)

在ReplicaStateMachine的handleStateChanges方法中,調(diào)用了handleStateChange,處理OfflineReplica

// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)

接著在handleStateChanges中

brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)

給副本數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)發(fā)送StopReplicaKey副本指令,并開(kāi)始刪除數(shù)據(jù)

stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
 val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
 val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
 debug("The stop replica request (delete = true) sent to broker %d is %s"
 .format(broker, stopReplicaWithDelete.mkString(",")))
 debug("The stop replica request (delete = false) sent to broker %d is %s"
 .format(broker, stopReplicaWithoutDelete.mkString(",")))
 replicaInfoList.foreach { r =>
 val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
  Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
 controller.sendRequest(broker, stopReplicaRequest, r.callback)
 }
}
stopReplicaRequestMap.clear()

Broker的KafkaApis的Handle方法在接受到指令后

case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)

接著是在stopReplicas方法中

{
 controllerEpoch = stopReplicaRequest.controllerEpoch
 // First stop fetchers for all partitions, then stop the corresponding replicas
 replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
 for(topicAndPartition <- stopReplicaRequest.partitions){
 val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
 responseMap.put(topicAndPartition, errorCode)
 }
 (responseMap, ErrorMapping.NoError)
}

進(jìn)一步進(jìn)入stopReplica方法,正式進(jìn)入日志刪除

getPartition(topic, partitionId) match {
 case Some(partition) =>
 if(deletePartition) {
  val removedPartition = allPartitions.remove((topic, partitionId))
  if (removedPartition != null)
  removedPartition.delete() // this will delete the local log
 }

以上就是kafka的整個(gè)日志刪除流水。

4,手動(dòng)刪除zk上topic信息和磁盤數(shù)據(jù)

TopicChangeListener會(huì)監(jiān)聽(tīng)處理,但是處理很簡(jiǎn)單,只是更新了

val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren

val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>

四,總結(jié)

Kafka的topic的刪除過(guò)程,實(shí)際上就是基于Zookeeper做了一個(gè)訂閱發(fā)布系統(tǒng)。Zookeeper的客戶端創(chuàng)建一個(gè)節(jié)點(diǎn)/admin/delete_topics/<topic>,由kafka Controller監(jiān)聽(tīng)到事件之后正式觸發(fā)topic的刪除:解除Partition變更監(jiān)聽(tīng)的listener,清除內(nèi)存數(shù)據(jù)結(jié)構(gòu),刪除副本數(shù)據(jù),刪除topic的相關(guān)Zookeeper節(jié)點(diǎn)。

delete.topic.enable配置該參數(shù)為false的情況下執(zhí)行了topic的刪除命令,實(shí)際上未做任何動(dòng)作。我們此時(shí)要徹底刪除topic建議修改該參數(shù)為true,重啟kafka,這樣topic信息會(huì)被徹底刪除,已經(jīng)測(cè)試。

一般流行的做法是手動(dòng)刪除Zookeeper的topic相關(guān)信息及磁盤數(shù)據(jù)但是這樣的話會(huì)造成部分內(nèi)存數(shù)據(jù)未清除。至于是否會(huì)有隱患,未測(cè)試。

好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • Spring中字段格式化的使用小結(jié)

    Spring中字段格式化的使用小結(jié)

    Spring提供的一個(gè)core.convert包?是一個(gè)通用類型轉(zhuǎn)換系統(tǒng)。它提供了統(tǒng)一的?ConversionService??API和強(qiáng)類型的Converter SPI,用于實(shí)現(xiàn)從一種類型到另一種類型的轉(zhuǎn)換邏輯,這篇文章主要介紹了Spring中字段格式化的使用詳解,需要的朋友可以參考下
    2022-06-06
  • Java AWT中常用的三種布局管理器詳解

    Java AWT中常用的三種布局管理器詳解

    這篇文章主要介紹了Java AWT中常用的三種布局管理器詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • 簡(jiǎn)單聊聊工作中常用的Java?Lambda表達(dá)式

    簡(jiǎn)單聊聊工作中常用的Java?Lambda表達(dá)式

    日常開(kāi)發(fā)中,我們很多時(shí)候需要用到Java?8的Lambda表達(dá)式,它允許把函數(shù)作為一個(gè)方法的參數(shù),讓我們的代碼更優(yōu)雅、更簡(jiǎn)潔。所以整理了一波工作中常用的Lambda表達(dá)式??赐暌欢〞?huì)有幫助的
    2022-11-11
  • 解析Hibernate + MySQL中文亂碼問(wèn)題

    解析Hibernate + MySQL中文亂碼問(wèn)題

    如果持久化的類中有包括了漢字的String對(duì)象,那么對(duì)應(yīng)到數(shù)據(jù)庫(kù)中漢字的部分就會(huì)是亂碼。這主要是由于MySQL數(shù)據(jù)表的字符集與我們當(dāng)前使用的本地字符集不相同造成的
    2013-07-07
  • 詳解Spring中接口的bean是如何注入的

    詳解Spring中接口的bean是如何注入的

    這篇文章主要介紹了詳解Spring中接口的bean是如何注入的的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Java 通過(guò)位運(yùn)算求一個(gè)集合的所有子集方法

    Java 通過(guò)位運(yùn)算求一個(gè)集合的所有子集方法

    下面小編就為大家?guī)?lái)一篇Java 通過(guò)位運(yùn)算求一個(gè)集合的所有子集方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-03-03
  • java實(shí)現(xiàn)服務(wù)器巡查的代碼

    java實(shí)現(xiàn)服務(wù)器巡查的代碼

    接到上級(jí)領(lǐng)導(dǎo)任務(wù),需要實(shí)現(xiàn)一個(gè)這樣的需求,一大批服務(wù)器,需要檢查服務(wù)器能否ping通,ssh密碼是否正常,以及檢查服務(wù)器的cpu,內(nèi)存,硬盤占用情況,下面通過(guò)java代碼實(shí)現(xiàn)服務(wù)器巡查功能,需要的朋友一起看看吧
    2021-12-12
  • 創(chuàng)建Jersey REST 服務(wù),基于Maven的實(shí)現(xiàn)

    創(chuàng)建Jersey REST 服務(wù),基于Maven的實(shí)現(xiàn)

    下面小編就為大家?guī)?lái)一篇?jiǎng)?chuàng)建Jersey REST 服務(wù),基于Maven的實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-06-06
  • 史上最難的一道Java面試題

    史上最難的一道Java面試題

    本文給大家分享一道史上最難的一道Java面試題,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下吧
    2018-03-03
  • Java實(shí)現(xiàn)文件及文件夾的刪除

    Java實(shí)現(xiàn)文件及文件夾的刪除

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)文件及文件夾的刪除,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-06-06

最新評(píng)論