欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

聊聊Spring Boot 如何集成多個 Kafka

 更新時間:2023年10月20日 09:33:47   作者://承續(xù)緣_紀錄片  
這篇文章主要介紹了Spring Boot 集成多個 Kafka的相關(guān)資料,包括配置文件,生成者和消費者配置過程,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧

一、配置文件

application.yml
spring:
  kafka:
    one:
      bootstrap-servers: IP:PORT
      consumer:
        group-id: YOUR_GROUP_ID
        enable-auto-commit: true
    two:
      bootstrap-servers: IP:PORT
      consumer:
        group-id: YOUR_GROUP_ID
        enable-auto-commit: true

二、生產(chǎn)者、消費者配置

2.1 第一個 Kafka

@EnableKafka
@Configuration
public class KafkaOneConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能寫成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

2.2 第二個 Kafka

@Configuration
public class KafkaTwoConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

三、生產(chǎn)者

@Controller
public class TestController {
    @Autowired
    private KafkaTemplate kafkaOneTemplate;
    @Autowired
    private KafkaTemplate kafkaTwoTemplate;
    @RequestMapping("/send")
    @ResponseBody
    public String send() {
        final String TOPIC = "TOPIC_1";
        kafkaOneTemplate.send(TOPIC, "kafka one");
        kafkaTwoTemplate.send(TOPIC, "kafka two");
        return "success";
    }
}

四、消費者

@Component
public class KafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
    final String TOPIC = "TOPIC_1";
    // containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同
    @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
    public void listenerOne(ConsumerRecord<?, ?> record) {
        LOGGER.info(" kafka one 接收到消息:{}", record.value());
    }
    @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
    public void listenerTwo(ConsumerRecord<?, ?> record) {
        LOGGER.info(" kafka two 接收到消息:{}", record.value());
    }
}

運行結(jié)果

c.k.s.consumer.KafkaConsumer             :  kafka one 接收到消息:kafka one
c.k.s.consumer.KafkaConsumer             :  kafka two 接收到消息:kafka two

到此這篇關(guān)于聊聊Spring Boot 如何集成多個 Kafka的文章就介紹到這了,更多相關(guān)Spring Boot集成多個 Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot整合MongoDB完整實例代碼

    SpringBoot整合MongoDB完整實例代碼

    本文主要介紹了SpringBoot整合MongoDB完整實例,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • springboot如何實現(xiàn)前后端分離跨域訪問

    springboot如何實現(xiàn)前后端分離跨域訪問

    這篇文章主要介紹了springboot如何實現(xiàn)前后端分離跨域訪問問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java反射技術(shù)原理與用法實例分析

    Java反射技術(shù)原理與用法實例分析

    這篇文章主要介紹了Java反射技術(shù)原理與用法,結(jié)合實例形式分析了Java反射技術(shù)的基本概念、功能、原理、用法及操作注意事項,需要的朋友可以參考下
    2020-04-04
  • Mybatis-plus操作json字段實戰(zhàn)教程

    Mybatis-plus操作json字段實戰(zhàn)教程

    這篇文章主要介紹了Mybatis-plus操作json字段實戰(zhàn)教程,本文結(jié)合實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-02-02
  • Springboot通過圖片路徑形式獲取圖片

    Springboot通過圖片路徑形式獲取圖片

    這篇文章主要介紹了Springboot通過圖片路徑形式獲取圖片,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-05-05
  • Spring Boot Actuator未授權(quán)訪問漏洞的問題解決

    Spring Boot Actuator未授權(quán)訪問漏洞的問題解決

    Spring Boot Actuator 端點的未授權(quán)訪問漏洞是一個安全性問題,可能會導(dǎo)致未經(jīng)授權(quán)的用戶訪問敏感的應(yīng)用程序信息,本文就來介紹一下解決方法,感興趣的可以了解一下
    2023-09-09
  • springboot配置Jackson返回統(tǒng)一默認值的實現(xiàn)示例

    springboot配置Jackson返回統(tǒng)一默認值的實現(xiàn)示例

    在項目開發(fā)中,我們返回的數(shù)據(jù)或者對象沒有的時候一般直接返回的null,那么如何返回統(tǒng)一默認值,感興趣的可以了解一下
    2021-07-07
  • 常見JavaWeb安全問題和解決方案

    常見JavaWeb安全問題和解決方案

    這篇文章主要介紹了常見JavaWeb安全問題和解決方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-03-03
  • 使用Spring Security控制會話的方法

    使用Spring Security控制會話的方法

    在本文中,我們將說明Spring Security如何允許我們控制HTTP會話。這篇文章主要介紹了使用Spring Security控制會話 ,需要的朋友可以參考下
    2019-05-05
  • 用SpringBoot+Vue+uniapp小程序?qū)崿F(xiàn)在線房屋裝修管理系統(tǒng)

    用SpringBoot+Vue+uniapp小程序?qū)崿F(xiàn)在線房屋裝修管理系統(tǒng)

    這篇文章主要介紹了用SpringBoot+Vue+uniapp實現(xiàn)在線房屋裝修管理系統(tǒng),針對裝修樣板信息管理混亂,出錯率高,信息安全性差,勞動強度大,費時費力等問題開發(fā)了這套系統(tǒng),需要的朋友可以參考下
    2023-03-03

最新評論