springboot配置kafka批量消費(fèi),并發(fā)消費(fèi)方式
更新時(shí)間:2024年12月30日 16:50:22 作者:梵法利亞
文章介紹了如何在Spring Boot中配置Kafka進(jìn)行批量消費(fèi),并發(fā)消費(fèi),需要注意的是,并發(fā)量必須小于等于分區(qū)數(shù),否則會(huì)導(dǎo)致線程空閑,文章還總結(jié)了創(chuàng)建Kafka分區(qū)的命令,并鼓勵(lì)讀者分享經(jīng)驗(yàn)
springboot配置kafka批量消費(fèi),并發(fā)消費(fèi)
@KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})}, containerFactory = "batchFactory") public void listener0(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})}, containerFactory = "batchFactory") public void listener1(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})}, containerFactory = "batchFactory") public void listener2(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})}, containerFactory = "batchFactory") public void listener3(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } }
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServersConfig; public Map<String,Object> consumerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3"); log.info("bootstrapServersConfig:自定義配置="+ bootstrapServersConfig); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) { //Map<String, Object> consumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //并發(fā)數(shù)量 factory.setConcurrency(3); //開啟批量監(jiān)聽,消費(fèi) factory.setBatchListener(true); //factory.set return factory; } }
按照以上配置內(nèi)容即可,可以達(dá)到kafka批量消費(fèi)的能力。
但是,要特別需要注意的一個(gè)點(diǎn)是:
- 并發(fā)量根據(jù)實(shí)際的分區(qū)數(shù)量決定
- 必須小于等于分區(qū)數(shù)
- 否則會(huì)有線程一直處于空閑狀態(tài)
下面是創(chuàng)建4個(gè)分區(qū)的命令寫法
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic personnel_card_real_time_recordinfo --partitions 4 --replication-factor 1
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
使用GenericObjectPool避免泄漏設(shè)置方法
這篇文章主要為大家介紹了使用GenericObjectPool避免泄漏的設(shè)置方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09Java 仿天貓服裝商城系統(tǒng)的實(shí)現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實(shí)現(xiàn)一個(gè)仿天貓服裝商城系統(tǒng),大家可以在過程中查缺補(bǔ)漏,提升水平2021-11-11Java參數(shù)傳遞實(shí)現(xiàn)代碼及過程圖解
這篇文章主要介紹了Java參數(shù)傳遞實(shí)現(xiàn)代碼及過程圖解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11MyBatis實(shí)現(xiàn)三級(jí)樹查詢的示例代碼
在實(shí)際項(xiàng)目開發(fā)中,樹形結(jié)構(gòu)的數(shù)據(jù)查詢是一個(gè)非常常見的需求,比如組織架構(gòu)、菜單管理、地區(qū)選擇等場景都需要處理樹形數(shù)據(jù),本文將詳細(xì)講解如何使用MyBatis實(shí)現(xiàn)三級(jí)樹形數(shù)據(jù)的查詢,需要的朋友可以參考下2024-12-12Java?FTP協(xié)議實(shí)現(xiàn)文件下載功能
FTP(File?Transfer?Protocol)就是文件傳輸協(xié)議。通過FTP客戶端從遠(yuǎn)程FTP服務(wù)器上拷貝文件到本地計(jì)算機(jī)稱為下載,將本地計(jì)算機(jī)上的文件復(fù)制到遠(yuǎn)程FTP服務(wù)器上稱為上傳,上傳和下載是FTP最常用的兩個(gè)功能2022-11-11