欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka消費客戶端協(xié)調器GroupCoordinator詳解

 更新時間:2022年10月18日 10:21:27   作者:石臻臻的雜貨鋪  
這篇文章主要為大家介紹了Kafka消費客戶端協(xié)調器GroupCoordinator使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

協(xié)調器的生命周期

  • 什么是協(xié)調器
  • 協(xié)調器工作原理
  • 協(xié)調器的Rebalance機制

GroupCoordinator的創(chuàng)建

在Kafka啟動的時候, 會自動創(chuàng)建并啟動GroupCoordinator

這個GroupCoordinator對象創(chuàng)建的時候傳入的幾個屬性需要介紹一下

    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)

offsetConfig相關配置

  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
    maxMetadataSize = config.offsetMetadataMaxSize,
    loadBufferSize = config.offsetsLoadBufferSize,
    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
  )
屬性介紹默認值
offset.metadata.max.bytes  
offsets.load.buffer.size  
offsets.retention.minutes  
offsets.retention.check.interval.ms  
offsets.topic.num.partitions  
offsets.commit.timeout.ms  
offsets.topic.segment.bytes  
offsets.topic.replication.factor  
offsets.topic.compression.codec  
offsets.commit.timeout.ms  
offsets.commit.required.acks  

groupConfig相關配置

屬性介紹默認值
group.min.session.timeout.ms  
group.max.session.timeout.ms  
group.initial.rebalance.delay.ms  
group.max.size  
group.initial.rebalance.delay.ms  

groupMetadataManager

組元信息管理類

heartbeatPurgatory

心跳監(jiān)測操作,每一秒執(zhí)行一次

joinPurgatory

GroupCoordinator的啟動

  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

這個啟動對于GroupCoordinator來說只是給屬性isActive標記為了true, 但是同時呢也調用了GroupMetadataManager.startup

定時清理Group元信息

這個Group元信息管理類呢啟動了一個定時任務, 名字為:delete-expired-group-metadata

每隔600000ms的時候就執(zhí)行一下 清理過期組元信息的操作, 這個600000ms時間是代碼寫死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

當內部topic __consumer_offsets 有分區(qū)的Leader變更的時候,比如觸發(fā)了 LeaderAndIsr的請求, 發(fā)現分區(qū)Leader進行了切換。

那么就會執(zhí)行 GroupCoordinator#OnElection 的接口, 這個接口會把任務丟個一個單線程的調度程序, 專門處理offset元數據緩存加載和卸載的。線程名稱前綴為group-metadata-manager- ,一個分區(qū)一個任務

最終執(zhí)行的任務內容是:GroupMetadataManager#doLoadGroupsAndOffsets

__consumer_offsets 的key有兩種消息類型

key version 0: 消費組消費偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消費組消費偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消費組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue

Version-0

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-1

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-2

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-3

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		group_instance_id: NULLABLE_STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Value每個版本的 Scheme如下

  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))

GroupCoordinator onResignation

以上就是Kafka消費客戶端協(xié)調器GroupCoordinator詳解的詳細內容,更多關于Kafka GroupCoordinator的資料請關注腳本之家其它相關文章!

相關文章

  • 通過Java視角簡單談談局部性原理

    通過Java視角簡單談談局部性原理

    程序的局部性原理是指程序在執(zhí)行時呈現出局部性規(guī)律,即在一段時間內,整個程序的執(zhí)行僅限于程序中的某一部分,這篇文章主要給大家介紹了關于通過Java視角簡單談談局部性原理的相關資料,需要的朋友可以參考下
    2021-07-07
  • 使用SpringBoot進行身份驗證和授權的示例詳解

    使用SpringBoot進行身份驗證和授權的示例詳解

    在廣闊的 Web 開發(fā)世界中,身份驗證是每個數字領域的守護者,在本教程中,我們將了解如何以本機方式保護、驗證和授權 Spring-Boot 應用程序的用戶,并遵循框架的良好實踐,希望對大家有所幫助
    2023-11-11
  • IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)

    IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)

    這篇文章主要介紹了IntelliJ IDEA 安裝及初次使用(2020.3.2社區(qū)版),本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • JPA中EntityListeners注解的使用詳解

    JPA中EntityListeners注解的使用詳解

    這篇文章主要介紹了JPA中EntityListeners注解的使用詳解,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • idea引入外部jar包的方法實現

    idea引入外部jar包的方法實現

    本文主要介紹了idea引入外部jar包的方法實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • Struts2學習教程之輸入校驗示例詳解

    Struts2學習教程之輸入校驗示例詳解

    這篇文章主要給大家介紹了關于Struts2學習教程之輸入校驗的相關資料,文中通過示例介紹的非常詳細,對大家學習或者使用struts2具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2018-05-05
  • java程序員必須知道的4個書寫代碼技巧

    java程序員必須知道的4個書寫代碼技巧

    本篇文章主要給大家講述了作為JAVA程序員如何能寫出高效的代碼以及運行效率更高的代碼,一起學習分享下吧。
    2017-12-12
  • Java實現并發(fā)執(zhí)行定時任務并手動控制開始結束

    Java實現并發(fā)執(zhí)行定時任務并手動控制開始結束

    這篇文章主要介紹了Java實現并發(fā)執(zhí)行定時任務并手動控制開始結束,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Ubuntu快速安裝eclipse

    Ubuntu快速安裝eclipse

    這篇文章主要為大家詳細介紹了Ubuntu快速安裝eclipse的簡單教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • 使用Mybatis-plus實現對數據庫表的內部字段進行比較

    使用Mybatis-plus實現對數據庫表的內部字段進行比較

    這篇文章主要介紹了使用Mybatis-plus實現對數據庫表的內部字段進行比較方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07

最新評論