java分布式流式處理組件Producer分區(qū)理論
前言
前面我們已經(jīng)對(duì)Producer發(fā)送原理做了一個(gè)比較詳細(xì)的說明,其中我們提到了分區(qū)器。其實(shí)從整體結(jié)構(gòu)上來(lái)講,分區(qū)器也是屬于一個(gè)非常重要的知識(shí)點(diǎn),所以我們來(lái)專門對(duì)分區(qū)以及分區(qū)策略等內(nèi)容做一個(gè)介紹。
為什么需要分區(qū)
分區(qū)的作用
- 合理的使用存儲(chǔ)資源:把海量的數(shù)據(jù)按照分區(qū)切割成一小塊的數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。此時(shí)能夠保證每臺(tái)服務(wù)器存儲(chǔ)資源能夠被充分利用到。而且小塊數(shù)據(jù)在尋址時(shí)間上更有優(yōu)勢(shì)~
如果將全部的數(shù)據(jù)存儲(chǔ)在一臺(tái)機(jī)器上,那么要對(duì)當(dāng)前數(shù)據(jù)做副本的時(shí)候,由于服務(wù)器資源配置不同,就有可能會(huì)出現(xiàn)副本數(shù)據(jù)存放失敗,從而增加數(shù)據(jù)丟失的可能性。
同時(shí),如果單個(gè)文件過大,副本放置時(shí)間、內(nèi)容檢索時(shí)間都會(huì)極大的延長(zhǎng),從而導(dǎo)致Kafka性能降低。
- 負(fù)載均衡: 數(shù)據(jù)生產(chǎn)或消費(fèi)期間,生產(chǎn)者已分區(qū)的單位發(fā)送數(shù)據(jù),消費(fèi)者分區(qū)的單位進(jìn)行消費(fèi)。 期間,各分區(qū)生產(chǎn)和消費(fèi)數(shù)據(jù)互不影響,這樣能夠達(dá)到合理控制分區(qū)任務(wù)的程度,提高任務(wù)的并行度。從而達(dá)到負(fù)載均衡的效果。
剛才我們提到:生產(chǎn)者已分區(qū)為單位向Broker發(fā)送數(shù)據(jù)。那么問題來(lái)了:
- 生產(chǎn)者是怎么知道該向哪個(gè)分區(qū)發(fā)送數(shù)據(jù)呢?
這就是我們接下來(lái)要研究的分區(qū)策略。
分區(qū)策略
其實(shí)我們?cè)谏弦黄恼轮幸呀?jīng)見到了,看這里:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// 如果在消息中指定了分區(qū)
if (record.partition() != null)
return record.partition();
if (partitioner != null) {
// 分區(qū)器通過計(jì)算得到分區(qū)
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}
// 通過序列化key計(jì)算分區(qū)
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
// 返回-1
return RecordMetadata.UNKNOWN_PARTITION;
}
}
下面的代碼可以說是整個(gè)分區(qū)器的核心部分,可以通過以下的步驟進(jìn)行說明:
- 如果在生產(chǎn)消息的時(shí)候,已經(jīng)指定了需要發(fā)送的分區(qū)位置,那么就會(huì)直接使用已經(jīng)指定的份具體的位置,這樣子還節(jié)省了也不計(jì)算的時(shí)間
- 如果在生產(chǎn)者配置
Properties中指定了分區(qū)策略類,那么消息生產(chǎn)就會(huì)通過已經(jīng)指定的分區(qū)策略類進(jìn)行分區(qū)計(jì)算 - 否則就會(huì)以
serializedKey作為參數(shù),通過hash取模的方式計(jì)算。如果serializedKey == null,那么就會(huì)采用粘性分區(qū)的邏輯。 這在Kafka中屬于默認(rèn)分區(qū)器。 - 如果以上情況都沒有包含,那么他就會(huì)直接返回-1。相當(dāng)于
ack=0的情況。
在Kafka中分區(qū)策略我們是可以自定義的。當(dāng)然Kafka也為我們內(nèi)置了三種分區(qū)策略類。 接下來(lái)我們挑個(gè)重點(diǎn)來(lái)介紹,來(lái)給我們自定義分區(qū)器做一個(gè)鋪墊~

我們已經(jīng)看到,DefaultPartitioner和UniformStickyPartitioner已經(jīng)被標(biāo)注為過期類,當(dāng)然也并不妨礙我們來(lái)了解一下。
DefaultPartitioner
在當(dāng)前版本中,如果沒有對(duì)partitioner.class進(jìn)行配置,此時(shí)的分區(qū)策略就會(huì)采用當(dāng)前類作為默認(rèn)分區(qū)策略類。
而以下是DefaultPartitioner策略類的核心實(shí)現(xiàn)方式,并且標(biāo)記部分的代碼實(shí)現(xiàn)其實(shí)就是UniformStickyPartitioner的計(jì)算邏輯
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
if (keyBytes == null) {
// 就是這段屬于UniformStickyPartitioner的實(shí)現(xiàn)邏輯
return stickyPartitionCache.partition(topic, cluster);
}
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
還有一段代碼讓我們來(lái)一起看看
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
這段代碼不管有多復(fù)雜,調(diào)用方法有多少,但最終我們是能夠發(fā)現(xiàn):
- 它的本質(zhì)其實(shí)是在對(duì)
序列化Key做哈希計(jì)算,然后通過hash值和分區(qū)數(shù)做取模運(yùn)算,然后得到結(jié)果分區(qū)位置
這是一種比較重要的計(jì)算方式,但卻不是唯一的方式

---這是分割線---
接下來(lái)繼續(xù),我們看看如果無(wú)法對(duì)序列化Key計(jì)算,會(huì)是怎么樣的計(jì)算邏輯?
我們先開始來(lái)看一下,是在哪個(gè)地方得到的serializedKey,并且什么情況下serializedKey會(huì)是NULL
看看下面的這個(gè)代碼眼熟不?
// 生產(chǎn)者生產(chǎn)消息對(duì)象
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"data from " + KafkaQuickProducer.class.getName()
);

// KafkaProducer#doSend() // line994 serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String> {
// 省略。。。
@Override
public byte[] serialize(String topic, String data) {
if (data == null) {
return null;
} else {
return data.getBytes(encoding);
}
}
}
從上面的代碼來(lái)看,基本上能夠?qū)嶅N了:
- 當(dāng)在生成
ProducerRecord對(duì)象的時(shí)候,如果沒有對(duì)消息設(shè)置key參數(shù),此時(shí)序列化之后的key就是個(gè)null - 那么當(dāng)序列化之后的Key為NULL之后,此時(shí)分區(qū)計(jì)算邏輯就會(huì)改變。
此時(shí)相當(dāng)于我們已經(jīng)進(jìn)入到UniformStickyPartitioner的計(jì)算邏輯, 當(dāng)然了在我們使用的3.3版本中當(dāng)前類也已經(jīng)被標(biāo)注為過期
根據(jù)前面的說法,粘性分區(qū)主要解決了消息無(wú)Key的分區(qū)計(jì)算邏輯,那么粘性分區(qū)并不是說每次都使用同一個(gè)分區(qū)
它是通過一個(gè)大的Batch為單位,盡量將batch內(nèi)的消息固定在同一個(gè)分區(qū)內(nèi),這樣在很大程度上能夠保證:
- 防止消息無(wú)規(guī)律的分散在不同的分區(qū)內(nèi),降低分區(qū)傾斜
- 同時(shí)不需要每次進(jìn)行分區(qū)計(jì)算,也降低了Producer的延遲
而實(shí)現(xiàn)方式是采用ConcurrentMap來(lái)進(jìn)行緩存,感興趣的大家可以看看StickyPartitionCache的源碼
而當(dāng)Batch內(nèi)消息滿足發(fā)送條件被發(fā)送出去之后,才會(huì)開始再次計(jì)算下一個(gè)分區(qū),為此在KafkaProducer中還專門調(diào)用了新的方法
partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}

RoundRobinPartitioner
這是在當(dāng)前版本中唯一沒有被標(biāo)注的類,未來(lái)說不定會(huì)成為默認(rèn)分區(qū)策略類,我們不看??,就瞄一眼
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
return counter.getAndIncrement();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
這個(gè)類的解釋,嗯。。你們看那個(gè)合適吧~

其實(shí)這個(gè)邏輯非常簡(jiǎn)單:
- 通過
AtomicInteger.getAndIncrement()的方式將每次寫入平均分配到不同的分區(qū)中 - 不同與其他分區(qū)策略類,它不關(guān)心Key是否為NULL
我們先來(lái)做個(gè)小實(shí)驗(yàn)吧: 將分區(qū)策略類修改為RoundRobinPartitioner,也方便后續(xù)自定義分區(qū)器的配置操作
config.setProperty(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.kafka.clients.producer.RoundRobinPartitioner"
);
就這樣就能實(shí)現(xiàn),看結(jié)果驗(yàn)證~

中間穿插了一點(diǎn)小知識(shí),那么接下來(lái)就會(huì)進(jìn)入到我們最后一個(gè)環(huán)節(jié):嘗試自定義分區(qū)器
自定義分區(qū)器
前面我們也提到過,相信大家沒有忘記partitioner.class這個(gè)配置
那么接下來(lái)就進(jìn)入到重頭戲:自定義分區(qū)器實(shí)戰(zhàn)編碼環(huán)節(jié)。
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// nothing
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 如果keyBytes == null
// 直接去0號(hào)位置
if (null == keyBytes) {
return 0;
}
// 已默認(rèn)分區(qū)策略實(shí)現(xiàn)
int numPartitions = cluster.partitionsForTopic(topic).size();
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
@Override
public void close() {
// nothing
}
}
我們就先做的簡(jiǎn)單一點(diǎn),主要是想讓大家明白自定義分區(qū)器的實(shí)現(xiàn):
- 如果沒有給定指定key,那么就默認(rèn)全部去0號(hào)分區(qū)
- 否則就通過key做取模計(jì)算
當(dāng)自定義分區(qū)器實(shí)現(xiàn)完成之后,接下來(lái)我們就需要通過發(fā)送者進(jìn)行驗(yàn)證。當(dāng)然了,主要還是通過partitioner.class進(jìn)行修改
// 給出關(guān)鍵代碼,其他的都是一樣的。就不贅述了~~~ config.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.zopx.kafka.partitioner.CustomPartitioner");
通過執(zhí)行之后,我們來(lái)看看它的運(yùn)行效果是否滿足我們的預(yù)期

另一種運(yùn)行結(jié)果與默認(rèn)分區(qū)器有Key的情況類似,這里就不再重復(fù)貼圖
代碼說明
本文全部代碼可進(jìn)入Gitee中進(jìn)行查看,更多精彩內(nèi)容敬請(qǐng)關(guān)注~
本次關(guān)于生產(chǎn)者分區(qū)器就介紹到這里,下期我們將推出針對(duì)Producer的生產(chǎn)優(yōu)化核心關(guān)注點(diǎn),更多關(guān)于java分布式Producer分區(qū)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
踩坑Debug啟動(dòng)失敗,無(wú)報(bào)錯(cuò)信息問題
在進(jìn)行項(xiàng)目debug時(shí)遇到了無(wú)法啟動(dòng)的問題,項(xiàng)目一直處于正在啟動(dòng)狀態(tài),但未出現(xiàn)任何報(bào)錯(cuò)信息,分析原因可能是存在不合法的斷點(diǎn)位置,即斷點(diǎn)未打在方法內(nèi)部,解決方法是檢查所有斷點(diǎn)信息,并移除非法斷點(diǎn),之后項(xiàng)目能夠正常啟動(dòng)2023-02-02
Java請(qǐng)求調(diào)用參數(shù)格式為form-data類型的接口代碼示例
這篇文章主要給大家介紹了關(guān)于Java請(qǐng)求調(diào)用參數(shù)格式為form-data類型的接口的相關(guān)資料,文中給出了詳細(xì)的代碼示例,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
LRU算法及Apache?LRUMap源碼實(shí)例解析
這篇文章主要給大家介紹了關(guān)于LRU算法及Apache?LRUMap源碼解析的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-11-11
使用@ConfigurationProperties實(shí)現(xiàn)類型安全的配置過程
這篇文章主要介紹了使用@ConfigurationProperties實(shí)現(xiàn)類型安全的配置過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
將本地SpringBoot項(xiàng)目發(fā)布到云服務(wù)器的方法
這篇文章主要介紹了如何將本地SpringBoot項(xiàng)目發(fā)布到云服務(wù)器,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12
java poi設(shè)置生成的word的圖片為上下型環(huán)繞以及其位置的實(shí)現(xiàn)
這篇文章主要介紹了java poi設(shè)置生成的word的圖片為上下型環(huán)繞以及其位置的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
詳解Java對(duì)象轉(zhuǎn)換神器MapStruct庫(kù)的使用
在我們?nèi)粘i_發(fā)的程序中,為了各層之間解耦,一般會(huì)定義不同的對(duì)象用來(lái)在不同層之間傳遞數(shù)據(jù)。當(dāng)在不同層之間傳輸數(shù)據(jù)時(shí),不可避免地經(jīng)常需要將這些對(duì)象進(jìn)行相互轉(zhuǎn)換。今天給大家介紹一個(gè)對(duì)象轉(zhuǎn)換工具M(jìn)apStruct,代碼簡(jiǎn)潔安全、性能高,強(qiáng)烈推薦2022-09-09
springboot 自定義配置Boolean屬性不生效的解決
這篇文章主要介紹了springboot 自定義配置Boolean屬性不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

