聊聊Spring Boot 如何集成多個 Kafka
一、配置文件
application.yml spring: kafka: one: bootstrap-servers: IP:PORT consumer: group-id: YOUR_GROUP_ID enable-auto-commit: true two: bootstrap-servers: IP:PORT consumer: group-id: YOUR_GROUP_ID enable-auto-commit: true
二、生產(chǎn)者、消費者配置
2.1 第一個 Kafka
@EnableKafka @Configuration public class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Value("${spring.kafka.one.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplate<String, String> kafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能寫成 1 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
2.2 第二個 Kafka
@Configuration public class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplate<String, String> kafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
三、生產(chǎn)者
@Controller public class TestController { @Autowired private KafkaTemplate kafkaOneTemplate; @Autowired private KafkaTemplate kafkaTwoTemplate; @RequestMapping("/send") @ResponseBody public String send() { final String TOPIC = "TOPIC_1"; kafkaOneTemplate.send(TOPIC, "kafka one"); kafkaTwoTemplate.send(TOPIC, "kafka two"); return "success"; } }
四、消費者
@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); final String TOPIC = "TOPIC_1"; // containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同 @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory") public void listenerOne(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka one 接收到消息:{}", record.value()); } @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory") public void listenerTwo(ConsumerRecord<?, ?> record) { LOGGER.info(" kafka two 接收到消息:{}", record.value()); } }
運行結(jié)果
c.k.s.consumer.KafkaConsumer : kafka one 接收到消息:kafka one
c.k.s.consumer.KafkaConsumer : kafka two 接收到消息:kafka two
到此這篇關(guān)于聊聊Spring Boot 如何集成多個 Kafka的文章就介紹到這了,更多相關(guān)Spring Boot集成多個 Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis-plus操作json字段實戰(zhàn)教程
這篇文章主要介紹了Mybatis-plus操作json字段實戰(zhàn)教程,本文結(jié)合實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-02-02Spring Boot Actuator未授權(quán)訪問漏洞的問題解決
Spring Boot Actuator 端點的未授權(quán)訪問漏洞是一個安全性問題,可能會導(dǎo)致未經(jīng)授權(quán)的用戶訪問敏感的應(yīng)用程序信息,本文就來介紹一下解決方法,感興趣的可以了解一下2023-09-09springboot配置Jackson返回統(tǒng)一默認值的實現(xiàn)示例
在項目開發(fā)中,我們返回的數(shù)據(jù)或者對象沒有的時候一般直接返回的null,那么如何返回統(tǒng)一默認值,感興趣的可以了解一下2021-07-07用SpringBoot+Vue+uniapp小程序?qū)崿F(xiàn)在線房屋裝修管理系統(tǒng)
這篇文章主要介紹了用SpringBoot+Vue+uniapp實現(xiàn)在線房屋裝修管理系統(tǒng),針對裝修樣板信息管理混亂,出錯率高,信息安全性差,勞動強度大,費時費力等問題開發(fā)了這套系統(tǒng),需要的朋友可以參考下2023-03-03