聊聊Spring?Boot如何配置多個(gè)Kafka數(shù)據(jù)源
一、配置文件
application.properties配置文件如下
#kafka多數(shù)據(jù)源配置 #kafka數(shù)據(jù)源一,日志審計(jì)推送 spring.kafka.one.bootstrap-servers=172.19.12.109:32182 spring.kafka.one.producer.retries=0 spring.kafka.one.producer.properties.max.block.ms=5000 #kafka數(shù)據(jù)源二,動(dòng)環(huán)數(shù)據(jù)消費(fèi) spring.kafka.two.bootstrap-servers=172.19.12.109:32182 spring.kafka.two.producer.retries=0 spring.kafka.two.producer.properties.max.block.ms=5000 spring.kafka.two.consumer.group-id=bw-convert-data spring.kafka.two.consumer.enable-auto-commit=true
二、pom依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
三、生產(chǎn)者、消費(fèi)者配置
1.第一個(gè)kakfa
package com.gstanzer.convert.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.producer.retries}") private String retries; @Value("${spring.kafka.one.producer.properties.max.block.ms}") private String maxBlockMs; @Bean public KafkaTemplate<String, String> kafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
2.第二個(gè)kakfa
package com.gstanzer.convert.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.producer.retries}") private String retries; @Value("${spring.kafka.two.producer.properties.max.block.ms}") private String maxBlockMs; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplate<String, String> kafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
四.生產(chǎn)者
@Controller public class TestController { @Autowired private KafkaTemplate kafkaOneTemplate; @Autowired private KafkaTemplate kafkaTwoTemplate; @RequestMapping("/send") @ResponseBody public String send() { final String TOPIC = "TOPIC_1"; kafkaOneTemplate.send(TOPIC, "kafka one"); kafkaTwoTemplate.send(TOPIC, "kafka two"); return "success"; } }
五.消費(fèi)者
@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); final String TOPIC = "TOPIC_1"; // containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同 @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory") public void listenerOne(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka one 接收到消息:{}", record.value()); } @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory") public void listenerTwo(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka two 接收到消息:{}", record.value()); } }
備注:
生產(chǎn)者消費(fèi)者代碼參考鏈接,開發(fā)同學(xué)需要以實(shí)際情況按要求自己變更下代碼即可:
Spring Boot 集成多個(gè) Kafka_springboot集成多個(gè)kafka
到此這篇關(guān)于Spring Boot配置多個(gè)Kafka數(shù)據(jù)源的文章就介紹到這了,更多相關(guān)Spring Boot配置Kafka數(shù)據(jù)源內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java使用ThreadLocal實(shí)現(xiàn)當(dāng)前登錄信息的存取功能
ThreadLocal和其他并發(fā)工具一樣,也是用于解決多線程并發(fā)訪問,下這篇文章主要給大家介紹了關(guān)于Java使用ThreadLocal實(shí)現(xiàn)當(dāng)前登錄信息的存取功能,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02IDEA 插件 mapper和xml互相跳轉(zhuǎn)操作
這篇文章主要介紹了IDEA 插件 mapper和xml互相跳轉(zhuǎn)操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02使用java實(shí)現(xiàn)各種數(shù)據(jù)統(tǒng)計(jì)圖(柱形圖,餅圖,折線圖)
用Jfree實(shí)現(xiàn)條形柱狀圖表,java代碼實(shí)現(xiàn)??山?jīng)常用于報(bào)表的制作,代碼自動(dòng)生成后可以自由查看??梢宰杂膳渲脠D表的各個(gè)屬性,用來達(dá)到自己的要求和目的。本文給大家介紹使用java實(shí)現(xiàn)各種數(shù)據(jù)統(tǒng)計(jì)圖(柱形圖,餅圖,折線圖),需要的朋友可以參考下2015-10-10Java那點(diǎn)兒事之Map集合不為人知的秘密有哪些
Map用于保存具有映射關(guān)系的數(shù)據(jù),Map集合里保存著兩組值,一組用于保存Map的key,另一組保存著Map的value,和查字典類似,通過key找到對應(yīng)的value,通過頁數(shù)找到對應(yīng)的信息。用學(xué)生類來說,key相當(dāng)于學(xué)號,value對應(yīng)name,age,sex等信息。用這種對應(yīng)關(guān)系方便查找2021-10-10