欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼

 更新時間:2023年02月20日 09:32:59   作者:russle  
kakfa是我們在項(xiàng)目開發(fā)中經(jīng)常使用的消息中間件。由于它的寫性能非常高,因此,經(jīng)常會碰到讀取Kafka消息隊列時擁堵的情況,這篇文章主要介紹了Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息,需要的朋友可以參考下

kakfa是我們在項(xiàng)目開發(fā)中經(jīng)常使用的消息中間件。由于它的寫性能非常高,因此,經(jīng)常會碰到讀取Kafka消息隊列時擁堵的情況。遇到這種情況時,有時我們不能直接清理整個topic,因?yàn)檫€有別的服務(wù)正在使用該topic。因此只能額外啟動一個相同名稱的consumer-group來加快消息消費(fèi)(如果該topic只有一個分區(qū),再啟動一個新的消費(fèi)者,沒有作用)。

完整的代碼在這里,歡迎加星號、fork。

官方文檔在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并發(fā)消費(fèi)###
先看代碼,重點(diǎn)是這我們使用的是ConcurrentKafkaListenerContainerFactory并且設(shè)置了factory.setConcurrency(4); (我的topic有4個分區(qū),為了加快消費(fèi)將并發(fā)設(shè)置為4,也就是有4個KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并發(fā)消費(fèi)。

###第二步,批量消費(fèi)###
然后是批量消費(fèi)。重點(diǎn)是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一個設(shè)啟用批量消費(fèi),一個設(shè)置批量消費(fèi)每次最多消費(fèi)多少條消息記錄。

重點(diǎn)說明一下,我們設(shè)置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是說如果沒有達(dá)到50條消息,我們就一直等待。官方的解釋是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的記錄數(shù)。

從啟動日志中可以看到還有個 max.poll.interval.ms = 300000, 也就說每間隔max.poll.interval.ms我們就調(diào)用一次poll。每次poll最多返回50條記錄。

max.poll.interval.ms官方解釋是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

啟動日志截圖

這里寫圖片描述

關(guān)于max.poll.records和max.poll.interval.ms官方解釋截圖:

這里寫圖片描述

###第三步,分區(qū)消費(fèi)###
對于只有一個分區(qū)的topic,不需要分區(qū)消費(fèi),因?yàn)闆]有意義。下面的例子是針對有2個分區(qū)的情況(我的完整代碼中有4個listenPartitionX方法,我的topic設(shè)置了4個分區(qū)),讀者可以根據(jù)自己的情況進(jìn)行調(diào)整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

關(guān)于分區(qū)和消費(fèi)者關(guān)系,后面會補(bǔ)充,先摘錄如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,總結(jié),如果我們的topic有多個分區(qū),經(jīng)過以上步驟可以很好的加快消息消費(fèi)。如果只有一個分區(qū),因?yàn)橐呀?jīng)有一個同名group id在消費(fèi)了,新啟動的一個基本上沒有作用(本人測試結(jié)果)。

具體代碼在這里,歡迎加星號,fork。

到此這篇關(guān)于Spring Boot 中使用@KafkaListener并發(fā)批量接收消息的文章就介紹到這了,更多相關(guān)Spring Boot 使用@KafkaListener并發(fā)批量接收消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn)

    SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn)

    本文主要介紹了SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn),主要包含JdbcTemplate和mybatis框架的整合應(yīng)用,具有一定的參考價值,感興趣的可以了解一下
    2022-03-03
  • Java存儲過程調(diào)用CallableStatement的方法

    Java存儲過程調(diào)用CallableStatement的方法

    這篇文章主要介紹了Java存儲過程調(diào)用CallableStatement的方法,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下
    2020-11-11
  • Java SSH 秘鑰連接mysql數(shù)據(jù)庫的方法

    Java SSH 秘鑰連接mysql數(shù)據(jù)庫的方法

    這篇文章主要介紹了Java SSH 秘鑰連接mysql數(shù)據(jù)庫的方法,包括引入依賴的代碼和出現(xiàn)異常報錯問題,需要的朋友可以參考下
    2021-06-06
  • MyBatis如何通過攔截修改SQL

    MyBatis如何通過攔截修改SQL

    這篇文章主要介紹了MyBatis如何通過攔截修改SQL問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程

    spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程

    Spring Boot支持容器的自動配置,默認(rèn)是Tomcat,當(dāng)然我們也是可以進(jìn)行修改的。這篇文章給大家介紹了spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程,需要的朋友參考下吧
    2018-01-01
  • mybatis plus 自動轉(zhuǎn)駝峰配置小結(jié)

    mybatis plus 自動轉(zhuǎn)駝峰配置小結(jié)

    SpringBoot提供兩種配置Mybatis的方式,第一種是通過yml或application.properties文件開啟配置,第二種是使用自定義配置類,通過給容器添加一個ConfigurationCustomizer來實(shí)現(xiàn)更靈活的配置,這兩種方法可以根據(jù)項(xiàng)目需求和個人喜好選擇使用
    2024-10-10
  • java線程同步操作實(shí)例詳解

    java線程同步操作實(shí)例詳解

    這篇文章主要介紹了java線程同步操作,結(jié)合實(shí)例形式分析了Java線程同步與鎖機(jī)制相關(guān)原理、操作技巧與注意事項(xiàng),需要的朋友可以參考下
    2018-09-09
  • Java使用J4L識別驗(yàn)證碼的操作方法

    Java使用J4L識別驗(yàn)證碼的操作方法

    這篇文章主要介紹了Java使用J4L識別驗(yàn)證碼的操作方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-02-02
  • 詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法

    詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法

    這篇文章主要介紹了詳解Springboot 優(yōu)雅停止服務(wù)的幾種方法 ,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Java實(shí)現(xiàn)淘寶秒殺聚劃算搶購自動提醒源碼

    Java實(shí)現(xiàn)淘寶秒殺聚劃算搶購自動提醒源碼

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)淘寶秒殺聚劃算搶購自動提醒源碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-02-02

最新評論