怎樣給Kafka新增分區(qū)
給Kafka新增分區(qū)
數(shù)據(jù)量猛增的時(shí)候,需要給 kafka 的 topic 新增分區(qū),增大處理的數(shù)據(jù)量,可以通過(guò)以下步驟
1、修改 topic 的分區(qū)
kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3
2、遷移數(shù)據(jù)
生成遷移計(jì)劃,手動(dòng)新建一個(gè) json 文件
{ "topics": [ {"topic": "flink-test-03"} ], "version": 1 }
生成遷移計(jì)劃
kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate Current partition replica assignment: {"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}
新建一個(gè)文件reassignment.json,保存上邊這些信息
3、遷移
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute
4、驗(yàn)證
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify
Kafka分區(qū)原理機(jī)制
分區(qū)結(jié)構(gòu)
kafka的消息總共是三層結(jié)構(gòu)
Topic(第一層結(jié)構(gòu),表示一個(gè)主題)-> Partition(分區(qū),每個(gè)消息可以有多個(gè)分區(qū)) -> 消息實(shí)例(具體的消息文本等等,一個(gè)消息實(shí)例只可能在一個(gè)分區(qū)里面,不會(huì)出現(xiàn)在多個(gè)分區(qū)中)
分區(qū)優(yōu)點(diǎn)
分區(qū)其實(shí)是一個(gè)負(fù)載均衡的思想。如此設(shè)計(jì)能使每一個(gè)分區(qū)獨(dú)自處理單獨(dú)的讀寫請(qǐng)求,提高吞吐量。
分區(qū)策略
- 輪詢策略Round-robin(未指定key新版本默認(rèn)策略)
- 隨機(jī)策略Randomness(老版本默認(rèn)策略)
- 消息鍵排序策略Key-ordering(指定了key,則使用該策略)
- 根據(jù)地理位置進(jìn)行分區(qū)
- 自定義分區(qū) 需要在生產(chǎn)者端實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并配置一下實(shí)現(xiàn)類的全限定名
根據(jù)分區(qū)策略實(shí)現(xiàn)消息的順序消費(fèi)
可以只設(shè)置一個(gè)分區(qū),這樣子消息都是放在一個(gè)partition,肯定是先進(jìn)先出進(jìn)行消費(fèi),然而這種場(chǎng)景無(wú)法利用kafka多分區(qū)的高吞吐量以及負(fù)載均衡的優(yōu)勢(shì)。
將需要順序消費(fèi)的消息設(shè)置key,這個(gè)時(shí)候根據(jù)默認(rèn)的分區(qū)策略,kafka會(huì)將所有的相同的key放在一個(gè)partition上面,這樣既可以使用kafka的partition又可以實(shí)現(xiàn)順序消費(fèi)。
默認(rèn)分區(qū)策略源碼
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
從類注釋當(dāng)中已經(jīng)很明顯的看出來(lái)分區(qū)邏輯
3. 如果指定了分區(qū),則使用指定分區(qū)
4. 如果沒(méi)有指定分區(qū),但是有key,則使用hash過(guò)的key放置消息
5. 如果沒(méi)有指定分區(qū),也沒(méi)有key,則使用輪詢
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot 全局日期格式化處理的實(shí)現(xiàn)
這篇文章主要介紹了Springboot 全局日期格式化處理的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Java面試之動(dòng)態(tài)規(guī)劃與組合數(shù)
這篇文章主要介紹了Java面試之動(dòng)態(tài)規(guī)劃與組合數(shù)的相關(guān)知識(shí),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-09-09Spring Boot使用Value注解給靜態(tài)變量賦值的方法
這篇文章主要介紹了Spring Boot使用Value注解給靜態(tài)變量賦值的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07java中@JsonValue和@JsonCreator使用
本文主要介紹了java中@JsonValue和@JsonCreator使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
這篇文章主要介紹了淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-05-05