怎樣給Kafka新增分區(qū)
給Kafka新增分區(qū)
數(shù)據(jù)量猛增的時候,需要給 kafka 的 topic 新增分區(qū),增大處理的數(shù)據(jù)量,可以通過以下步驟
1、修改 topic 的分區(qū)
kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3
2、遷移數(shù)據(jù)
生成遷移計劃,手動新建一個 json 文件
{ "topics": [ {"topic": "flink-test-03"} ], "version": 1 }
生成遷移計劃
kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate Current partition replica assignment: {"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}
新建一個文件reassignment.json,保存上邊這些信息
3、遷移
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute
4、驗證
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify
Kafka分區(qū)原理機(jī)制
分區(qū)結(jié)構(gòu)
kafka的消息總共是三層結(jié)構(gòu)
Topic(第一層結(jié)構(gòu),表示一個主題)-> Partition(分區(qū),每個消息可以有多個分區(qū)) -> 消息實例(具體的消息文本等等,一個消息實例只可能在一個分區(qū)里面,不會出現(xiàn)在多個分區(qū)中)
分區(qū)優(yōu)點
分區(qū)其實是一個負(fù)載均衡的思想。如此設(shè)計能使每一個分區(qū)獨自處理單獨的讀寫請求,提高吞吐量。
分區(qū)策略
- 輪詢策略Round-robin(未指定key新版本默認(rèn)策略)
- 隨機(jī)策略Randomness(老版本默認(rèn)策略)
- 消息鍵排序策略Key-ordering(指定了key,則使用該策略)
- 根據(jù)地理位置進(jìn)行分區(qū)
- 自定義分區(qū) 需要在生產(chǎn)者端實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并配置一下實現(xiàn)類的全限定名
根據(jù)分區(qū)策略實現(xiàn)消息的順序消費
可以只設(shè)置一個分區(qū),這樣子消息都是放在一個partition,肯定是先進(jìn)先出進(jìn)行消費,然而這種場景無法利用kafka多分區(qū)的高吞吐量以及負(fù)載均衡的優(yōu)勢。
將需要順序消費的消息設(shè)置key,這個時候根據(jù)默認(rèn)的分區(qū)策略,kafka會將所有的相同的key放在一個partition上面,這樣既可以使用kafka的partition又可以實現(xiàn)順序消費。
默認(rèn)分區(qū)策略源碼
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
從類注釋當(dāng)中已經(jīng)很明顯的看出來分區(qū)邏輯
3. 如果指定了分區(qū),則使用指定分區(qū)
4. 如果沒有指定分區(qū),但是有key,則使用hash過的key放置消息
5. 如果沒有指定分區(qū),也沒有key,則使用輪詢
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java面試之動態(tài)規(guī)劃與組合數(shù)
這篇文章主要介紹了Java面試之動態(tài)規(guī)劃與組合數(shù)的相關(guān)知識,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2019-09-09Spring Boot使用Value注解給靜態(tài)變量賦值的方法
這篇文章主要介紹了Spring Boot使用Value注解給靜態(tài)變量賦值的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-07-07java中@JsonValue和@JsonCreator使用
本文主要介紹了java中@JsonValue和@JsonCreator使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06淺談Java消息隊列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
這篇文章主要介紹了淺談Java消息隊列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05