springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn)
背景
使用springboot整合kafka時, springboot默認(rèn)讀取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener時指定topic即可, 當(dāng)服務(wù)中需要監(jiān)聽多個kafka時, 需要配置多個kafka, 這種方式不適用
方案
可以手動讀取不同kafka配置信息, 創(chuàng)建不同的Kafka 監(jiān)聽容器工廠, 使用@KafkaListener時指定相應(yīng)的容器工廠, 代碼如下:
1. 導(dǎo)入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2. yml配置
kafka: # 默認(rèn)消費者配置 default-consumer: # 自動提交已消費offset enable-auto-commit: true # 自動提交間隔時間 auto-commit-interval: 1000 # 消費的超時時間 poll-timeout: 1500 # 如果Kafka中沒有初始偏移量,或者服務(wù)器上不再存在當(dāng)前偏移量(例如,因為該數(shù)據(jù)已被刪除)自動將該偏移量重置成最新偏移量 auto.offset.reset: latest # 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作) session.timeout.ms: 120000 # 消費請求超時時間 request.timeout.ms: 180000 # 1號kafka配置 test1: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx # 2號kafka配置 test2: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx
3. 容器工廠配置
package com.zhdx.modules.backstage.config; import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.Map; /** * kafka監(jiān)聽容器工廠配置 * <p> * 拓展其他消費者配置只需配置指定的屬性和bean即可 */ @EnableKafka @Configuration @RefreshScope public class KafkaListenerContainerFactoryConfig { /** * test1 kafka配置 */ @Value("${kafka.test1.bootstrap-servers}") private String test1KafkaServerUrls; @Value("${kafka.test1.consumer.group-id}") private String test1GroupId; @Value("${kafka.test1.consumer.sasl.mechanism}") private String test1SaslMechanism; @Value("${kafka.test1.consumer.security.protocol}") private String test1SecurityProtocol; @Value("${kafka.test1.consumer.sasl.jaas.config}") private String test1SaslJaasConfig; /** * test2 kafka配置 */ @Value("${kafka.test2.bootstrap-servers}") private String test2KafkaServerUrls; @Value("${kafka.test2.consumer.group-id}") private String test2GroupId; @Value("${kafka.test2.consumer.sasl.mechanism}") private String test2SaslMechanism; @Value("${kafka.test2.consumer.security.protocol}") private String test2SecurityProtocol; @Value("${kafka.test2.consumer.sasl.jaas.config}") private String test2SaslJaasConfig; /** * 默認(rèn)消費者配置 */ @Value("${kafka.default-consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.default-consumer.poll-timeout}") private int pollTimeout; @Value("${kafka.default-consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.default-consumer.session.timeout.ms}") private int sessionTimeoutMs; @Value("${kafka.default-consumer.request.timeout.ms}") private int requestTimeoutMs; /** * test1消費者配置 */ public Map<String, Object> test1ConsumerConfigs() { Map<String, Object> props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls); // 消費者組 props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism); props.put("security.protocol", test1SecurityProtocol); // 賬號密碼 props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig); return props; } /** * test2消費者配置 */ public Map<String, Object> test2ConsumerConfigs() { Map<String, Object> props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls); // 消費者組 props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism); props.put("security.protocol", test2SecurityProtocol); // 賬號密碼 props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig); return props; } /** * 默認(rèn)消費者配置 */ private Map<String, Object> getDefaultConsumerConfigs() { Map<String, Object> props = Maps.newHashMap(); // 自動提交(按周期)已消費offset 批量消費下設(shè)置false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作) props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // 消費請求超時時間 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); // 序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 如果Kafka中沒有初始偏移量,或者服務(wù)器上不再存在當(dāng)前偏移量(例如,因為該數(shù)據(jù)已被刪除)自動將該偏移量重置成最新偏移量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } /** * 消費者工廠類 */ public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) { return new DefaultKafkaConsumerFactory<>(consumerConfigs); } public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory( Map<String, Object> consumerConfigs) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(initConsumerFactory(consumerConfigs)); // 是否開啟批量消費 factory.setBatchListener(false); // 消費的超時時間 factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } /** * 創(chuàng)建test1 Kafka 監(jiān)聽容器工廠。 * * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 對象 */ @Bean(name = "test1KafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() { Map<String, Object> consumerConfigs = this.test1ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } /** * 創(chuàng)建test2 Kafka 監(jiān)聽容器工廠。 * * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 對象 */ @Bean(name = "test2KafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() { Map<String, Object> consumerConfigs = this.test2ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } }
4. @KafkaListener使用
package com.zhdx.modules.backstage.kafka; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka監(jiān)聽器 */ @Slf4j @Component public class test1KafkaListener { @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx") public void handleHyPm(ConsumerRecord<String, String> record) { log.info("消費到topic xxx消息:{}", JSON.toJSONString(record.value())); } }
到此這篇關(guān)于springboot使用@KafkaListener監(jiān)聽多個kafka配置實現(xiàn)的文章就介紹到這了,更多相關(guān)springboot 監(jiān)聽多個kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Aop 如何獲取參數(shù)名參數(shù)值
這篇文章主要介紹了Spring Aop 如何獲取參數(shù)名參數(shù)值的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Java String字符串內(nèi)容實現(xiàn)添加雙引號
這篇文章主要介紹了Java String字符串內(nèi)容實現(xiàn)添加雙引號,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09關(guān)于JpaRepository的關(guān)聯(lián)查詢和@Query查詢
這篇文章主要介紹了JpaRepository的關(guān)聯(lián)查詢和@Query查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11