聊聊Spring Boot 如何集成多個(gè) Kafka
一、配置文件
application.yml
spring:
kafka:
one:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
enable-auto-commit: true
two:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
enable-auto-commit: true二、生產(chǎn)者、消費(fèi)者配置
2.1 第一個(gè) Kafka
@EnableKafka
@Configuration
public class KafkaOneConfig {
@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.one.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> kafkaOneTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能寫成 1
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}2.2 第二個(gè) Kafka
@Configuration
public class KafkaTwoConfig {
@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.two.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> kafkaTwoTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}三、生產(chǎn)者
@Controller
public class TestController {
@Autowired
private KafkaTemplate kafkaOneTemplate;
@Autowired
private KafkaTemplate kafkaTwoTemplate;
@RequestMapping("/send")
@ResponseBody
public String send() {
final String TOPIC = "TOPIC_1";
kafkaOneTemplate.send(TOPIC, "kafka one");
kafkaTwoTemplate.send(TOPIC, "kafka two");
return "success";
}
}四、消費(fèi)者
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
final String TOPIC = "TOPIC_1";
// containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同
@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
public void listenerOne(ConsumerRecord<?, ?> record) {
LOGGER.info(" kafka one 接收到消息:{}", record.value());
}
@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
public void listenerTwo(ConsumerRecord<?, ?> record) {
LOGGER.info(" kafka two 接收到消息:{}", record.value());
}
}運(yùn)行結(jié)果
c.k.s.consumer.KafkaConsumer : kafka one 接收到消息:kafka one
c.k.s.consumer.KafkaConsumer : kafka two 接收到消息:kafka two
到此這篇關(guān)于聊聊Spring Boot 如何集成多個(gè) Kafka的文章就介紹到這了,更多相關(guān)Spring Boot集成多個(gè) Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合MongoDB完整實(shí)例代碼
本文主要介紹了SpringBoot整合MongoDB完整實(shí)例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02
springboot如何實(shí)現(xiàn)前后端分離跨域訪問
這篇文章主要介紹了springboot如何實(shí)現(xiàn)前后端分離跨域訪問問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
Mybatis-plus操作json字段實(shí)戰(zhàn)教程
這篇文章主要介紹了Mybatis-plus操作json字段實(shí)戰(zhàn)教程,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-02-02
Spring Boot Actuator未授權(quán)訪問漏洞的問題解決
Spring Boot Actuator 端點(diǎn)的未授權(quán)訪問漏洞是一個(gè)安全性問題,可能會(huì)導(dǎo)致未經(jīng)授權(quán)的用戶訪問敏感的應(yīng)用程序信息,本文就來介紹一下解決方法,感興趣的可以了解一下2023-09-09
springboot配置Jackson返回統(tǒng)一默認(rèn)值的實(shí)現(xiàn)示例
在項(xiàng)目開發(fā)中,我們返回的數(shù)據(jù)或者對(duì)象沒有的時(shí)候一般直接返回的null,那么如何返回統(tǒng)一默認(rèn)值,感興趣的可以了解一下2021-07-07
用SpringBoot+Vue+uniapp小程序?qū)崿F(xiàn)在線房屋裝修管理系統(tǒng)
這篇文章主要介紹了用SpringBoot+Vue+uniapp實(shí)現(xiàn)在線房屋裝修管理系統(tǒng),針對(duì)裝修樣板信息管理混亂,出錯(cuò)率高,信息安全性差,勞動(dòng)強(qiáng)度大,費(fèi)時(shí)費(fèi)力等問題開發(fā)了這套系統(tǒng),需要的朋友可以參考下2023-03-03

