springboot配置kafka批量消費,并發(fā)消費方式
更新時間:2024年12月30日 16:50:22 作者:梵法利亞
文章介紹了如何在Spring Boot中配置Kafka進(jìn)行批量消費,并發(fā)消費,需要注意的是,并發(fā)量必須小于等于分區(qū)數(shù),否則會導(dǎo)致線程空閑,文章還總結(jié)了創(chuàng)建Kafka分區(qū)的命令,并鼓勵讀者分享經(jīng)驗
springboot配置kafka批量消費,并發(fā)消費
@KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})}, containerFactory = "batchFactory") public void listener0(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})}, containerFactory = "batchFactory") public void listener1(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})}, containerFactory = "batchFactory") public void listener2(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } } @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3", topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})}, containerFactory = "batchFactory") public void listener3(List<String> record, Consumer<String,String> consumer){ consumer.commitSync(); try { //業(yè)務(wù)處理 } catch (Exception e) { log.error(e.toString()); } }
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServersConfig; public Map<String,Object> consumerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3"); log.info("bootstrapServersConfig:自定義配置="+ bootstrapServersConfig); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) { //Map<String, Object> consumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //并發(fā)數(shù)量 factory.setConcurrency(3); //開啟批量監(jiān)聽,消費 factory.setBatchListener(true); //factory.set return factory; } }
按照以上配置內(nèi)容即可,可以達(dá)到kafka批量消費的能力。
但是,要特別需要注意的一個點是:
- 并發(fā)量根據(jù)實際的分區(qū)數(shù)量決定
- 必須小于等于分區(qū)數(shù)
- 否則會有線程一直處于空閑狀態(tài)
下面是創(chuàng)建4個分區(qū)的命令寫法
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic personnel_card_real_time_recordinfo --partitions 4 --replication-factor 1
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
使用GenericObjectPool避免泄漏設(shè)置方法
這篇文章主要為大家介紹了使用GenericObjectPool避免泄漏的設(shè)置方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09Java 仿天貓服裝商城系統(tǒng)的實現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實現(xiàn)一個仿天貓服裝商城系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11