欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

怎樣給Kafka新增分區(qū)

 更新時(shí)間:2022年12月27日 15:19:16   作者:KK架構(gòu)  
這篇文章主要介紹了怎樣給Kafka新增分區(qū)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

給Kafka新增分區(qū)

數(shù)據(jù)量猛增的時(shí)候,需要給 kafka 的 topic 新增分區(qū),增大處理的數(shù)據(jù)量,可以通過(guò)以下步驟

1、修改 topic 的分區(qū)

kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3

2、遷移數(shù)據(jù)

生成遷移計(jì)劃,手動(dòng)新建一個(gè) json 文件

{
"topics": [
{"topic": "flink-test-03"}
],
"version": 1
}

生成遷移計(jì)劃

kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate

Current partition replica assignment:

{"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}

新建一個(gè)文件reassignment.json,保存上邊這些信息

3、遷移

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute

4、驗(yàn)證

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify

Kafka分區(qū)原理機(jī)制

分區(qū)結(jié)構(gòu)

kafka的消息總共是三層結(jié)構(gòu)

Topic(第一層結(jié)構(gòu),表示一個(gè)主題)-> Partition(分區(qū),每個(gè)消息可以有多個(gè)分區(qū)) -> 消息實(shí)例(具體的消息文本等等,一個(gè)消息實(shí)例只可能在一個(gè)分區(qū)里面,不會(huì)出現(xiàn)在多個(gè)分區(qū)中)


在這里插入圖片描述

分區(qū)優(yōu)點(diǎn)

分區(qū)其實(shí)是一個(gè)負(fù)載均衡的思想。如此設(shè)計(jì)能使每一個(gè)分區(qū)獨(dú)自處理單獨(dú)的讀寫請(qǐng)求,提高吞吐量。

分區(qū)策略

  • 輪詢策略Round-robin(未指定key新版本默認(rèn)策略)
  • 隨機(jī)策略Randomness(老版本默認(rèn)策略)
  • 消息鍵排序策略Key-ordering(指定了key,則使用該策略)
  • 根據(jù)地理位置進(jìn)行分區(qū)
  • 自定義分區(qū) 需要在生產(chǎn)者端實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并配置一下實(shí)現(xiàn)類的全限定名

根據(jù)分區(qū)策略實(shí)現(xiàn)消息的順序消費(fèi)

可以只設(shè)置一個(gè)分區(qū),這樣子消息都是放在一個(gè)partition,肯定是先進(jìn)先出進(jìn)行消費(fèi),然而這種場(chǎng)景無(wú)法利用kafka多分區(qū)的高吞吐量以及負(fù)載均衡的優(yōu)勢(shì)。

將需要順序消費(fèi)的消息設(shè)置key,這個(gè)時(shí)候根據(jù)默認(rèn)的分區(qū)策略,kafka會(huì)將所有的相同的key放在一個(gè)partition上面,這樣既可以使用kafka的partition又可以實(shí)現(xiàn)順序消費(fèi)。

默認(rèn)分區(qū)策略源碼

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public void configure(Map<String, ?> configs) {}
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close() {}
}

從類注釋當(dāng)中已經(jīng)很明顯的看出來(lái)分區(qū)邏輯

3. 如果指定了分區(qū),則使用指定分區(qū)

4. 如果沒(méi)有指定分區(qū),但是有key,則使用hash過(guò)的key放置消息

5. 如果沒(méi)有指定分區(qū),也沒(méi)有key,則使用輪詢

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java實(shí)現(xiàn)斗地主小游戲

    Java實(shí)現(xiàn)斗地主小游戲

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)斗地主小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-06-06
  • Springboot 全局日期格式化處理的實(shí)現(xiàn)

    Springboot 全局日期格式化處理的實(shí)現(xiàn)

    這篇文章主要介紹了Springboot 全局日期格式化處理的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • JUnit5常用注解的使用

    JUnit5常用注解的使用

    注解是JUnit的標(biāo)志性技術(shù),本文就來(lái)對(duì)它的20個(gè)注解,以及元注解和組合注解進(jìn)行學(xué)習(xí),感興趣的可以了解一下
    2021-07-07
  • Java面試之動(dòng)態(tài)規(guī)劃與組合數(shù)

    Java面試之動(dòng)態(tài)規(guī)劃與組合數(shù)

    這篇文章主要介紹了Java面試之動(dòng)態(tài)規(guī)劃與組合數(shù)的相關(guān)知識(shí),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-09-09
  • Java中Integer類型值相等判斷方法

    Java中Integer類型值相等判斷方法

    這篇文章主要給大家介紹了關(guān)于Java中Integer類型值相等判斷的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • Spring Boot使用Value注解給靜態(tài)變量賦值的方法

    Spring Boot使用Value注解給靜態(tài)變量賦值的方法

    這篇文章主要介紹了Spring Boot使用Value注解給靜態(tài)變量賦值的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07
  • Java中常用數(shù)據(jù)類型的輸入輸出詳解

    Java中常用數(shù)據(jù)類型的輸入輸出詳解

    本文主要介紹了Java中幾個(gè)常用的數(shù)據(jù)類型是如何輸入和輸出的,例如:Char型、int型、double型、數(shù)組、字符串等,對(duì)我們學(xué)習(xí)java有一定的幫助,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)學(xué)習(xí)
    2021-12-12
  • java中@JsonValue和@JsonCreator使用

    java中@JsonValue和@JsonCreator使用

    本文主要介紹了java中@JsonValue和@JsonCreator使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-06-06
  • 淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    這篇文章主要介紹了淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-05-05
  • Spring?Boot如何利用攔截器加緩存完成接口防刷操作

    Spring?Boot如何利用攔截器加緩存完成接口防刷操作

    流的需求出現(xiàn)在許多常見(jiàn)的場(chǎng)景中,下面這篇文章主要給大家介紹了關(guān)于Spring?Boot如何利用攔截器加緩存完成接口防刷操作的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-02-02

最新評(píng)論