Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解
協(xié)調(diào)器的生命周期
- 什么是協(xié)調(diào)器
- 協(xié)調(diào)器工作原理
- 協(xié)調(diào)器的Rebalance機(jī)制
GroupCoordinator的創(chuàng)建
在Kafka啟動(dòng)的時(shí)候, 會(huì)自動(dòng)創(chuàng)建并啟動(dòng)GroupCoordinator

這個(gè)GroupCoordinator對(duì)象創(chuàng)建的時(shí)候傳入的幾個(gè)屬性需要介紹一下
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
offsetConfig相關(guān)配置
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
)
| 屬性 | 介紹 | 默認(rèn)值 |
|---|---|---|
| offset.metadata.max.bytes | ||
| offsets.load.buffer.size | ||
| offsets.retention.minutes | ||
| offsets.retention.check.interval.ms | ||
| offsets.topic.num.partitions | ||
| offsets.commit.timeout.ms | ||
| offsets.topic.segment.bytes | ||
| offsets.topic.replication.factor | ||
| offsets.topic.compression.codec | ||
| offsets.commit.timeout.ms | ||
| offsets.commit.required.acks |
groupConfig相關(guān)配置
| 屬性 | 介紹 | 默認(rèn)值 |
|---|---|---|
| group.min.session.timeout.ms | ||
| group.max.session.timeout.ms | ||
| group.initial.rebalance.delay.ms | ||
| group.max.size | ||
| group.initial.rebalance.delay.ms |
groupMetadataManager
組元信息管理類
heartbeatPurgatory
心跳監(jiān)測(cè)操作,每一秒執(zhí)行一次
joinPurgatory
GroupCoordinator的啟動(dòng)
def startup(enableMetadataExpiration: Boolean = true): Unit = {
info("Starting up.")
groupManager.startup(enableMetadataExpiration)
isActive.set(true)
info("Startup complete.")
}
這個(gè)啟動(dòng)對(duì)于GroupCoordinator來說只是給屬性isActive標(biāo)記為了true, 但是同時(shí)呢也調(diào)用了GroupMetadataManager.startup
定時(shí)清理Group元信息
這個(gè)Group元信息管理類呢啟動(dòng)了一個(gè)定時(shí)任務(wù), 名字為:delete-expired-group-metadata
每隔600000ms的時(shí)候就執(zhí)行一下 清理過期組元信息的操作, 這個(gè)600000ms時(shí)間是代碼寫死的。
TODO:GroupMetadataManager#cleanupGroupMetadata
GroupCoordinator OnElection
當(dāng)內(nèi)部topic __consumer_offsets 有分區(qū)的Leader變更的時(shí)候,比如觸發(fā)了 LeaderAndIsr的請(qǐng)求, 發(fā)現(xiàn)分區(qū)Leader進(jìn)行了切換。
那么就會(huì)執(zhí)行 GroupCoordinator#OnElection 的接口, 這個(gè)接口會(huì)把任務(wù)丟個(gè)一個(gè)單線程的調(diào)度程序, 專門處理offset元數(shù)據(jù)緩存加載和卸載的。線程名稱前綴為group-metadata-manager- ,一個(gè)分區(qū)一個(gè)任務(wù)
最終執(zhí)行的任務(wù)內(nèi)容是:GroupMetadataManager#doLoadGroupsAndOffsets
__consumer_offsets 的key有兩種消息類型
key version 0: 消費(fèi)組消費(fèi)偏移量信息 -> value version 0: [offset, metadata, timestamp]
key version 1: 消費(fèi)組消費(fèi)偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
key version 2: 消費(fèi)組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue
Version-0
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-1
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-2
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
current_state_timestamp: INT64,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-3
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
current_state_timestamp: INT64,
members: ARRAY({
member_id: STRING,
group_instance_id: NULLABLE_STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Value每個(gè)版本的 Scheme如下
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
GroupCoordinator onResignation

以上就是Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解的詳細(xì)內(nèi)容,更多關(guān)于Kafka GroupCoordinator的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用SpringBoot進(jìn)行身份驗(yàn)證和授權(quán)的示例詳解
IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)
Struts2學(xué)習(xí)教程之輸入校驗(yàn)示例詳解
Java實(shí)現(xiàn)并發(fā)執(zhí)行定時(shí)任務(wù)并手動(dòng)控制開始結(jié)束
使用Mybatis-plus實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)表的內(nèi)部字段進(jìn)行比較

