欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot配置kafka批量消費,并發(fā)消費方式

 更新時間:2024年12月30日 16:50:22   作者:梵法利亞  
文章介紹了如何在Spring Boot中配置Kafka進(jìn)行批量消費,并發(fā)消費,需要注意的是,并發(fā)量必須小于等于分區(qū)數(shù),否則會導(dǎo)致線程空閑,文章還總結(jié)了創(chuàng)建Kafka分區(qū)的命令,并鼓勵讀者分享經(jīng)驗

springboot配置kafka批量消費,并發(fā)消費

 @KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})},
            containerFactory = "batchFactory")
    public void listener0(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
          //業(yè)務(wù)處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})},
            containerFactory = "batchFactory")
    public void listener1(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務(wù)處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})},
            containerFactory = "batchFactory")
    public void listener2(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務(wù)處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})},
            containerFactory = "batchFactory")
    public void listener3(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //業(yè)務(wù)處理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersConfig;





    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3");
        log.info("bootstrapServersConfig:自定義配置="+ bootstrapServersConfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) {
        //Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并發(fā)數(shù)量
        factory.setConcurrency(3);
        //開啟批量監(jiān)聽,消費
        factory.setBatchListener(true);
        //factory.set
        return factory;
    }

}

按照以上配置內(nèi)容即可,可以達(dá)到kafka批量消費的能力。

但是,要特別需要注意的一個點是:

  • 并發(fā)量根據(jù)實際的分區(qū)數(shù)量決定
  • 必須小于等于分區(qū)數(shù)
  • 否則會有線程一直處于空閑狀態(tài)

下面是創(chuàng)建4個分區(qū)的命令寫法

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic   personnel_card_real_time_recordinfo    --partitions 4 --replication-factor 1

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Zookeeper原理及在Dubbo中的使用示例詳解

    Zookeeper原理及在Dubbo中的使用示例詳解

    這篇文章主要為大家介紹了Zookeeper原理及在Dubbo中的使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • Eclipse下Javassist正確使用方法代碼解析

    Eclipse下Javassist正確使用方法代碼解析

    這篇文章主要介紹了Eclipse下Javassist正確使用方法代碼解析,javassist-3.15.0-ga.jar包是一款在java開發(fā)中十分重要的jar文件包,需要的朋友可以參考下,文中附下載鏈接。
    2017-12-12
  • 使用GenericObjectPool避免泄漏設(shè)置方法

    使用GenericObjectPool避免泄漏設(shè)置方法

    這篇文章主要為大家介紹了使用GenericObjectPool避免泄漏的設(shè)置方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • Java 仿天貓服裝商城系統(tǒng)的實現(xiàn)流程

    Java 仿天貓服裝商城系統(tǒng)的實現(xiàn)流程

    讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實現(xiàn)一個仿天貓服裝商城系統(tǒng),大家可以在過程中查缺補漏,提升水平
    2021-11-11
  • Java參數(shù)傳遞實現(xiàn)代碼及過程圖解

    Java參數(shù)傳遞實現(xiàn)代碼及過程圖解

    這篇文章主要介紹了Java參數(shù)傳遞實現(xiàn)代碼及過程圖解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-11-11
  • mybatis-plus開啟sql日志打印的三種方法

    mybatis-plus開啟sql日志打印的三種方法

    本文主要介紹了mybatis-plus開啟sql日志打印的三種方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • MyBatis實現(xiàn)三級樹查詢的示例代碼

    MyBatis實現(xiàn)三級樹查詢的示例代碼

    在實際項目開發(fā)中,樹形結(jié)構(gòu)的數(shù)據(jù)查詢是一個非常常見的需求,比如組織架構(gòu)、菜單管理、地區(qū)選擇等場景都需要處理樹形數(shù)據(jù),本文將詳細(xì)講解如何使用MyBatis實現(xiàn)三級樹形數(shù)據(jù)的查詢,需要的朋友可以參考下
    2024-12-12
  • SpringBoot中的Bean裝配詳解

    SpringBoot中的Bean裝配詳解

    Spring?IoC?容器是一個管理?Bean?的容器,在?Spring?的定義中,它要求所有的?IoC?容器都需要實現(xiàn)接口?BeanFactory,它是一個頂級容器接口,這篇文章主要介紹了SpringBoot中的Bean裝配詳解,需要的朋友可以參考下
    2024-04-04
  • Java比較兩個List的值是否相等的方法

    Java比較兩個List的值是否相等的方法

    這篇文章主要介紹了Java比較兩個List的值是否相等的方法,涉及java針對隊列比較的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • Java?FTP協(xié)議實現(xiàn)文件下載功能

    Java?FTP協(xié)議實現(xiàn)文件下載功能

    FTP(File?Transfer?Protocol)就是文件傳輸協(xié)議。通過FTP客戶端從遠(yuǎn)程FTP服務(wù)器上拷貝文件到本地計算機(jī)稱為下載,將本地計算機(jī)上的文件復(fù)制到遠(yuǎn)程FTP服務(wù)器上稱為上傳,上傳和下載是FTP最常用的兩個功能
    2022-11-11

最新評論