欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot配置kafka批量消費,并發(fā)消費方式

 更新時間:2024年12月30日 16:50:22   作者:梵法利亞  
文章介紹了如何在Spring Boot中配置Kafka進行批量消費,并發(fā)消費,需要注意的是,并發(fā)量必須小于等于分區(qū)數(shù),否則會導致線程空閑,文章還總結(jié)了創(chuàng)建Kafka分區(qū)的命令,并鼓勵讀者分享經(jīng)驗

springboot配置kafka批量消費,并發(fā)消費

 @KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})},
            containerFactory = "batchFactory")
    public void listener0(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
          //業(yè)務處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})},
            containerFactory = "batchFactory")
    public void listener1(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})},
            containerFactory = "batchFactory")
    public void listener2(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})},
            containerFactory = "batchFactory")
    public void listener3(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersConfig;





    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3");
        log.info("bootstrapServersConfig:自定義配置="+ bootstrapServersConfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) {
        //Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并發(fā)數(shù)量
        factory.setConcurrency(3);
        //開啟批量監(jiān)聽,消費
        factory.setBatchListener(true);
        //factory.set
        return factory;
    }

}

按照以上配置內(nèi)容即可,可以達到kafka批量消費的能力。

但是,要特別需要注意的一個點是:

  • 并發(fā)量根據(jù)實際的分區(qū)數(shù)量決定
  • 必須小于等于分區(qū)數(shù)
  • 否則會有線程一直處于空閑狀態(tài)

下面是創(chuàng)建4個分區(qū)的命令寫法

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic   personnel_card_real_time_recordinfo    --partitions 4 --replication-factor 1

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • JAVA如何獲取客戶端IP地址和MAC地址

    JAVA如何獲取客戶端IP地址和MAC地址

    本篇文章主要介紹了JAVA如何獲取客戶端IP地址和MAC地址非常具有實用價值,這里整理了詳細的代碼,需要的朋友可以參考下
    2017-08-08
  • SpringBoot實現(xiàn)防止XSS攻擊的示例詳解

    SpringBoot實現(xiàn)防止XSS攻擊的示例詳解

    這篇文章主要為大家詳細介紹了SpringBoot如何實現(xiàn)防止XSS攻擊,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2024-03-03
  • Java如何獲取文件夾下所有壓縮包下指定文件

    Java如何獲取文件夾下所有壓縮包下指定文件

    在Java中,通過遍歷文件夾并對壓縮包進行解析,可以實現(xiàn)提取指定文件的功能,如文檔、PDF等,該過程中可增加過濾條件來適應不同需求,例如文件類型或文件名過濾,該方法適用于處理大量數(shù)據(jù)時的文件管理和數(shù)據(jù)提取
    2024-09-09
  • java 指定某個jdk版本方法

    java 指定某個jdk版本方法

    這篇文章主要介紹了java 指定某個jdk版本方法的相關資料,需要的朋友可以參考下
    2017-05-05
  • Java分布式鎖的三種實現(xiàn)方案

    Java分布式鎖的三種實現(xiàn)方案

    本文主要介紹了Java分布式鎖的三種實現(xiàn)方案。具有一定的參考價值,下面跟著小編一起來看下吧
    2017-01-01
  • RabbitMQ使用案例詳解

    RabbitMQ使用案例詳解

    RabbitMQ是基于Erlang語言開發(fā)的開源的消息中間件,這篇文章給大家介紹RabbitMQ使用案例,感興趣的朋友跟隨小編一起看看吧
    2024-03-03
  • java實現(xiàn)單鏈表中的增刪改

    java實現(xiàn)單鏈表中的增刪改

    這篇文章主要為大家詳細介紹了java實現(xiàn)單鏈表中的增刪改,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • 一文教你Java如何快速構(gòu)建項目骨架

    一文教你Java如何快速構(gòu)建項目骨架

    在?Java?項目開發(fā)過程中,構(gòu)建項目骨架是一項繁瑣但又基礎重要的工作,Java?領域有許多代碼生成工具可以幫助我們快速完成這一任務,下面就跟隨小編一起來了解下吧
    2025-05-05
  • springboot web項目打jar或者war包并運行的實現(xiàn)

    springboot web項目打jar或者war包并運行的實現(xiàn)

    這篇文章主要介紹了springboot web項目打jar或者war包并運行的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-11-11
  • 基于指針pointers和引用references的區(qū)別分析

    基于指針pointers和引用references的區(qū)別分析

    本篇文章介紹了,基于指針pointers和引用references的區(qū)別分析。需要的朋友參考下
    2013-05-05

最新評論