springboot項(xiàng)目配置多個(gè)kafka的示例代碼
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.配置文件相關(guān)信息
kafka.bootstrap-servers=localhost:9092 kafka.consumer.group.id=20230321 #可以并發(fā)消費(fèi)的線程數(shù) (通常與partition數(shù)量一致) kafka.consumer.concurrency=10 kafka.consumer.enable.auto.commit=false kafka.bootstrap-servers.pic=localhost:29092 kafka.consumer.group.id.pic=20230322_pic kafka.consumer.concurrency.pic=10 kafka.consumer.enable.auto.commit.pic=false
3.kafka配置類
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.enable.auto.commit}") private String autoCommit; @Value("${kafka.bootstrap-servers}") private String bootstrapServer; @Value("${kafka.consumer.group.id.pic}") private String groupIdPic; @Value("${kafka.consumer.concurrency.pic}") private int concurrencyPic; @Value("${kafka.consumer.enable.auto.commit.pic}") private String autoCommitPic; @Value("${kafka.bootstrap-servers.pic}") private String bootstrapServerPic; @Bean public ConsumerFactory<String, String> consumerFactory() { String bootstrapServers = bootstrapServer; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<String, String> consumerFactoryPic() { String bootstrapServers = bootstrapServerPic; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryPic()); factory.setConcurrency(concurrencyPic); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } }
4.消費(fèi)主題消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { String jsonString = message.value(); if (StringUtils.isNoneBlank(jsonString)) { log.info("消費(fèi):{}",jsonString); //TODO .... } } catch (Exception e) { log.error(" receive topic error ", e); } finally { ack.acknowledge(); } } @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { if (StringUtils.isNoneBlank(message.value())) { //TODO .... } } catch (Exception e) { logger.error(" receive topic error ", e); } finally { ack.acknowledge(); } }
到此這篇關(guān)于springboot項(xiàng)目配置多個(gè)kafka的文章就介紹到這了,更多相關(guān)springboot配置多個(gè)kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot使用kafka的兩種方式
- springboot連接kafka集群的使用示例
- SpringBoot3集成Kafka的方法詳解
- Springboot系列之kafka操作使用詳解
- SpringBoot如何正確配置并運(yùn)行Kafka
- springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問題
- springboot使用@KafkaListener監(jiān)聽多個(gè)kafka配置實(shí)現(xiàn)
相關(guān)文章
Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例
這篇文章主要為大家介紹了Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12基于OpenCv與JVM實(shí)現(xiàn)加載保存圖像功能(JAVA?圖像處理)
openCv有一個(gè)名imread的簡單函數(shù),用于從文件中讀取圖像,本文給大家介紹JAVA?圖像處理基于OpenCv與JVM實(shí)現(xiàn)加載保存圖像功能,感興趣的朋友一起看看吧2022-01-01Java面試synchronized偏向鎖后hashcode存址
這篇文章主要為大家介紹了Java面試中synchronized偏向鎖后hashcode存址詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05MyBatis-Plus雪花算法實(shí)現(xiàn)源碼解讀
雪花算法是一種用于生成唯一標(biāo)識(shí)符(ID)的分布式算法,雪花算法的設(shè)計(jì)目標(biāo)是在分布式系統(tǒng)中生成全局唯一的ID,同時(shí)保證ID的有序性和趨勢遞增,這篇文章主要介紹了MyBatis-Plus雪花算法實(shí)現(xiàn)源碼解析,需要的朋友可以參考下2023-12-12JAVA中數(shù)組從小到大排序的2種方法實(shí)例
JAVA中在運(yùn)用數(shù)組進(jìn)行排序功能時(shí)一般有多種解決方案,下面這篇文章主要給大家介紹了關(guān)于JAVA中數(shù)組從小到大排序的2種方法,文中都給出了詳細(xì)的實(shí)例代碼,需要的朋友可以參考下2023-03-03深入了解Spring中最常用的11個(gè)擴(kuò)展點(diǎn)
我們一說到spring,可能第一個(gè)想到的是?IOC(控制反轉(zhuǎn))?和?AOP(面向切面編程)。除此之外,我們?cè)谑褂胹pring的過程中,有沒有發(fā)現(xiàn)它的擴(kuò)展能力非常強(qiáng)。今天就來跟大家一起聊聊,在Spring中最常用的11個(gè)擴(kuò)展點(diǎn)2022-09-09