Kafka消費客戶端協(xié)調器GroupCoordinator詳解
協(xié)調器的生命周期
- 什么是協(xié)調器
- 協(xié)調器工作原理
- 協(xié)調器的Rebalance機制
GroupCoordinator的創(chuàng)建
在Kafka啟動的時候, 會自動創(chuàng)建并啟動GroupCoordinator
這個GroupCoordinator對象創(chuàng)建的時候傳入的幾個屬性需要介紹一下
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
offsetConfig相關配置
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig( maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L, offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions = config.offsetsTopicPartitions, offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks )
屬性 | 介紹 | 默認值 |
---|---|---|
offset.metadata.max.bytes | ||
offsets.load.buffer.size | ||
offsets.retention.minutes | ||
offsets.retention.check.interval.ms | ||
offsets.topic.num.partitions | ||
offsets.commit.timeout.ms | ||
offsets.topic.segment.bytes | ||
offsets.topic.replication.factor | ||
offsets.topic.compression.codec | ||
offsets.commit.timeout.ms | ||
offsets.commit.required.acks |
groupConfig相關配置
屬性 | 介紹 | 默認值 |
---|---|---|
group.min.session.timeout.ms | ||
group.max.session.timeout.ms | ||
group.initial.rebalance.delay.ms | ||
group.max.size | ||
group.initial.rebalance.delay.ms |
groupMetadataManager
組元信息管理類
heartbeatPurgatory
心跳監(jiān)測操作,每一秒執(zhí)行一次
joinPurgatory
GroupCoordinator的啟動
def startup(enableMetadataExpiration: Boolean = true): Unit = { info("Starting up.") groupManager.startup(enableMetadataExpiration) isActive.set(true) info("Startup complete.") }
這個啟動對于GroupCoordinator來說只是給屬性isActive
標記為了true, 但是同時呢也調用了GroupMetadataManager.startup
定時清理Group元信息
這個Group元信息管理類呢啟動了一個定時任務, 名字為:delete-expired-group-metadata
每隔600000ms的時候就執(zhí)行一下 清理過期組元信息的操作, 這個600000ms時間是代碼寫死的。
TODO:GroupMetadataManager#cleanupGroupMetadata
GroupCoordinator OnElection
當內部topic __consumer_offsets
有分區(qū)的Leader變更的時候,比如觸發(fā)了 LeaderAndIsr的請求, 發(fā)現分區(qū)Leader進行了切換。
那么就會執(zhí)行 GroupCoordinator#OnElection
的接口, 這個接口會把任務丟個一個單線程的調度程序, 專門處理offset元數據緩存加載和卸載的。線程名稱前綴為group-metadata-manager-
,一個分區(qū)一個任務
最終執(zhí)行的任務內容是:GroupMetadataManager#doLoadGroupsAndOffsets
__consumer_offsets
的key有兩種消息類型
key version 0: 消費組消費偏移量信息 -> value version 0: [offset, metadata, timestamp]
key version 1: 消費組消費偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
key version 2: 消費組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,
例如 version:3 的schemaForGroupValue
Version-0
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-1
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-2
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-3
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({ member_id: STRING, group_instance_id: NULLABLE_STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Value每個版本的 Scheme如下
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0))) private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1))) private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2))) private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
GroupCoordinator onResignation
以上就是Kafka消費客戶端協(xié)調器GroupCoordinator詳解的詳細內容,更多關于Kafka GroupCoordinator的資料請關注腳本之家其它相關文章!
相關文章
IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)
這篇文章主要介紹了IntelliJ IDEA 安裝及初次使用(2020.3.2社區(qū)版),本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03Java實現并發(fā)執(zhí)行定時任務并手動控制開始結束
這篇文章主要介紹了Java實現并發(fā)執(zhí)行定時任務并手動控制開始結束,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05使用Mybatis-plus實現對數據庫表的內部字段進行比較
這篇文章主要介紹了使用Mybatis-plus實現對數據庫表的內部字段進行比較方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07